From 6da3349cad882638e0f609cdd9915b5f16ae1efd Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Sun, 7 Jun 2026 08:15:21 +0000 Subject: [PATCH] fix(registry): fail-closed status-persist guards on heartbeat degrade/recovery paths (#73) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add SQL status guards to the two heartbeat evaluateStatus paths that lacked them, preventing a removed workspace from being resurrected to 'online' or 'degraded' by a racing heartbeat: 1. error_rate degrade path: AND status = 'online' 2. degraded→online recovery path: AND status = 'degraded' Both paths previously used WHERE id = only, so a heartbeat that started before CascadeDelete's UPDATE could flip the just-removed row back to a live status. This is the status-persist trio false-online / inconsistent-teardown class identified in the Researcher cleanup audit. The other three evaluateStatus paths (wedged→degraded, offline→online, provisioning→online, awaiting_agent→online) already carried guards. Adds regression tests (TestHeartbeat_ErrorRateDegrade_Guarded and TestHeartbeat_DegradedRecovery_Guarded) that verify the guards are present and return 0 rows when the workspace has been concurrently removed. Closes ticket #2 from Researcher cleanup audit. --- .../handlers/handlers_additional_test.go | 92 +++++++++++++++++++ .../internal/handlers/registry.go | 6 +- 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/workspace-server/internal/handlers/handlers_additional_test.go b/workspace-server/internal/handlers/handlers_additional_test.go index 6135bdb06..c792743b9 100644 --- a/workspace-server/internal/handlers/handlers_additional_test.go +++ b/workspace-server/internal/handlers/handlers_additional_test.go @@ -450,6 +450,98 @@ func TestHeartbeat_DegradedRecovery(t *testing.T) { } } +// TestHeartbeat_ErrorRateDegrade_Guarded verifies the error_rate degrade path +// carries the `AND status = 'online'` guard, preventing a racing heartbeat +// from flipping a concurrently-removed workspace back to degraded. +func TestHeartbeat_ErrorRateDegrade_Guarded(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + mock.ExpectQuery("SELECT COALESCE\\(current_task"). + WithArgs("ws-degrade-guard"). + WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + mock.ExpectExec("UPDATE workspaces SET"). + WithArgs("ws-degrade-guard", 0.6, "", 1, 100, ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Stale read: heartbeat started before CascadeDelete set status='removed' + mock.ExpectQuery("SELECT status FROM workspaces WHERE id ="). + WithArgs("ws-degrade-guard"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online")) + + // Guarded UPDATE returns 0 rows because row is actually 'removed' + mock.ExpectExec("UPDATE workspaces SET status =.*AND status = 'online'"). + WithArgs(models.StatusDegraded, "ws-degrade-guard"). + WillReturnResult(sqlmock.NewResult(0, 0)) + + // Broadcast still fires (existing behaviour) — mock it so sqlmock passes + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"workspace_id":"ws-degrade-guard","error_rate":0.6,"sample_error":"","active_tasks":1,"uptime_seconds":100}` + c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Heartbeat(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestHeartbeat_DegradedRecovery_Guarded verifies the degraded→online recovery +// path carries the `AND status = 'degraded'` guard, preventing a racing +// heartbeat from flipping a concurrently-removed workspace back to online. +func TestHeartbeat_DegradedRecovery_Guarded(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + mock.ExpectQuery("SELECT COALESCE\\(current_task"). + WithArgs("ws-recover-guard"). + WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + mock.ExpectExec("UPDATE workspaces SET"). + WithArgs("ws-recover-guard", 0.05, "", 1, 100, ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Stale read: heartbeat started before CascadeDelete set status='removed' + mock.ExpectQuery("SELECT status FROM workspaces WHERE id ="). + WithArgs("ws-recover-guard"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("degraded")) + + // Guarded UPDATE returns 0 rows because row is actually 'removed' + mock.ExpectExec("UPDATE workspaces SET status =.*AND status = 'degraded'"). + WithArgs(models.StatusOnline, "ws-recover-guard"). + WillReturnResult(sqlmock.NewResult(0, 0)) + + // Broadcast still fires (existing behaviour) — mock it so sqlmock passes + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"workspace_id":"ws-recover-guard","error_rate":0.05,"sample_error":"","active_tasks":1,"uptime_seconds":100}` + c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Heartbeat(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + // ---------- a2a_proxy.go: Workspace has no URL (503 with status) ---------- func TestProxyA2A_WorkspaceNoURL(t *testing.T) { diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index f37543db4..047b8ea1d 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -787,7 +787,8 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea nativeStatus := runtimeOverrides.HasCapability(payload.WorkspaceID, "status_mgmt") if !nativeStatus && currentStatus == "online" && payload.ErrorRate >= 0.5 { - if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2`, models.StatusDegraded, payload.WorkspaceID); err != nil { + // #73 guard: heartbeat degrade must not resurrect a removed workspace. + if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status = 'online'`, models.StatusDegraded, payload.WorkspaceID); err != nil { log.Printf("Heartbeat: failed to mark %s degraded: %v", payload.WorkspaceID, err) } h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceDegraded), payload.WorkspaceID, map[string]interface{}{ @@ -806,7 +807,8 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea // Skipped under native_status_mgmt for the same reason as the // degrade branch above: the adapter owns the transition. if !nativeStatus && currentStatus == "degraded" && payload.ErrorRate < 0.1 && payload.RuntimeState == "" { - if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2`, models.StatusOnline, payload.WorkspaceID); err != nil { + // #73 guard: heartbeat recovery must not resurrect a removed workspace. + if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status = 'degraded'`, models.StatusOnline, payload.WorkspaceID); err != nil { log.Printf("Heartbeat: failed to recover %s to online: %v", payload.WorkspaceID, err) } h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOnline), payload.WorkspaceID, map[string]interface{}{}) -- 2.52.0