fix(registry): fail-closed status-persist guards on heartbeat degrade/recovery paths (#73) #2393
@@ -450,6 +450,98 @@ func TestHeartbeat_DegradedRecovery(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestHeartbeat_ErrorRateDegrade_Guarded verifies the error_rate degrade path
|
||||
// carries the `AND status = 'online'` guard, preventing a racing heartbeat
|
||||
// from flipping a concurrently-removed workspace back to degraded.
|
||||
func TestHeartbeat_ErrorRateDegrade_Guarded(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
mock.ExpectQuery("SELECT COALESCE\\(current_task").
|
||||
WithArgs("ws-degrade-guard").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
|
||||
mock.ExpectExec("UPDATE workspaces SET").
|
||||
WithArgs("ws-degrade-guard", 0.6, "", 1, 100, "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Stale read: heartbeat started before CascadeDelete set status='removed'
|
||||
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
|
||||
WithArgs("ws-degrade-guard").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
|
||||
|
||||
// Guarded UPDATE returns 0 rows because row is actually 'removed'
|
||||
mock.ExpectExec("UPDATE workspaces SET status =.*AND status = 'online'").
|
||||
WithArgs(models.StatusDegraded, "ws-degrade-guard").
|
||||
WillReturnResult(sqlmock.NewResult(0, 0))
|
||||
|
||||
// Broadcast still fires (existing behaviour) — mock it so sqlmock passes
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
body := `{"workspace_id":"ws-degrade-guard","error_rate":0.6,"sample_error":"","active_tasks":1,"uptime_seconds":100}`
|
||||
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Heartbeat(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHeartbeat_DegradedRecovery_Guarded verifies the degraded→online recovery
|
||||
// path carries the `AND status = 'degraded'` guard, preventing a racing
|
||||
// heartbeat from flipping a concurrently-removed workspace back to online.
|
||||
func TestHeartbeat_DegradedRecovery_Guarded(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
mock.ExpectQuery("SELECT COALESCE\\(current_task").
|
||||
WithArgs("ws-recover-guard").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
|
||||
mock.ExpectExec("UPDATE workspaces SET").
|
||||
WithArgs("ws-recover-guard", 0.05, "", 1, 100, "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Stale read: heartbeat started before CascadeDelete set status='removed'
|
||||
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
|
||||
WithArgs("ws-recover-guard").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("degraded"))
|
||||
|
||||
// Guarded UPDATE returns 0 rows because row is actually 'removed'
|
||||
mock.ExpectExec("UPDATE workspaces SET status =.*AND status = 'degraded'").
|
||||
WithArgs(models.StatusOnline, "ws-recover-guard").
|
||||
WillReturnResult(sqlmock.NewResult(0, 0))
|
||||
|
||||
// Broadcast still fires (existing behaviour) — mock it so sqlmock passes
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
body := `{"workspace_id":"ws-recover-guard","error_rate":0.05,"sample_error":"","active_tasks":1,"uptime_seconds":100}`
|
||||
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Heartbeat(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- a2a_proxy.go: Workspace has no URL (503 with status) ----------
|
||||
|
||||
func TestProxyA2A_WorkspaceNoURL(t *testing.T) {
|
||||
|
||||
@@ -787,7 +787,8 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
|
||||
nativeStatus := runtimeOverrides.HasCapability(payload.WorkspaceID, "status_mgmt")
|
||||
|
||||
if !nativeStatus && currentStatus == "online" && payload.ErrorRate >= 0.5 {
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2`, models.StatusDegraded, payload.WorkspaceID); err != nil {
|
||||
// #73 guard: heartbeat degrade must not resurrect a removed workspace.
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status = 'online'`, models.StatusDegraded, payload.WorkspaceID); err != nil {
|
||||
log.Printf("Heartbeat: failed to mark %s degraded: %v", payload.WorkspaceID, err)
|
||||
}
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceDegraded), payload.WorkspaceID, map[string]interface{}{
|
||||
@@ -806,7 +807,8 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
|
||||
// Skipped under native_status_mgmt for the same reason as the
|
||||
// degrade branch above: the adapter owns the transition.
|
||||
if !nativeStatus && currentStatus == "degraded" && payload.ErrorRate < 0.1 && payload.RuntimeState == "" {
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2`, models.StatusOnline, payload.WorkspaceID); err != nil {
|
||||
// #73 guard: heartbeat recovery must not resurrect a removed workspace.
|
||||
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status = 'degraded'`, models.StatusOnline, payload.WorkspaceID); err != nil {
|
||||
log.Printf("Heartbeat: failed to recover %s to online: %v", payload.WorkspaceID, err)
|
||||
}
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOnline), payload.WorkspaceID, map[string]interface{}{})
|
||||
|
||||
Reference in New Issue
Block a user