RFC#2843 #32: fire declared-plugin reconcile on the heartbeat provisioning→online self-heal #3002

Merged
core-devops merged 3 commits from fix/rfc2843-32-reconcile-fires-on-heartbeat-provisioning-online into main 2026-06-16 23:03:51 +00:00
2 changed files with 97 additions and 44 deletions
+45 -8
View File
@@ -882,10 +882,26 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) {
return // response already written
}
// Read previous current_task to detect changes (before the UPDATE)
// Read previous current_task + status to detect changes (before the UPDATE).
//
// prevStatus is load-bearing for the RFC#2843 #32 plugin reconcile: the
// main heartbeat UPDATE below SELF-HEALS status provisioning→online INLINE
// (the `CASE WHEN status = 'provisioning' THEN 'online'` clause). That flip
// happens BEFORE evaluateStatus runs, so by the time evaluateStatus reads
// currentStatus it is already 'online' and its `currentStatus ==
// "provisioning"` branch (which fires fireReconcileOnline) never matches on
// the normal fresh-boot path — the runtime only ever calls
// /registry/heartbeat, never /registry/register, so this IS the path every
// new workspace takes. The result: declared plugins never installed on a
// fresh seo-agent (the #32 regression). Capturing prevStatus here lets us
// fire the reconcile when THIS heartbeat performed the provisioning→online
// flip, independent of evaluateStatus. evaluateStatus still owns the OTHER
// recovery transitions (offline/degraded/awaiting_agent/failed→online),
// which the inline CASE does not touch.
var prevTask string
var prevSpend int64
if err := db.DB.QueryRowContext(ctx, `SELECT COALESCE(current_task, ''), COALESCE(monthly_spend, 0) FROM workspaces WHERE id = $1`, payload.WorkspaceID).Scan(&prevTask, &prevSpend); err != nil {
var prevStatus string
if err := db.DB.QueryRowContext(ctx, `SELECT COALESCE(current_task, ''), COALESCE(monthly_spend, 0), COALESCE(status, '') FROM workspaces WHERE id = $1`, payload.WorkspaceID).Scan(&prevTask, &prevSpend, &prevStatus); err != nil {
log.Printf("registry heartbeat: prev_task query failed for workspace %s: %v", payload.WorkspaceID, err)
}
@@ -968,6 +984,21 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) {
return
}
// RFC#2843 #32: fire the declared-plugin reconcile when THIS heartbeat just
// performed the provisioning→online self-heal (the inline CASE in the UPDATE
// above). This is the primary fresh-boot transition: a newly-provisioned
// workspace is created with status='provisioning', the runtime's first
// heartbeat flips it to 'online' via that CASE, and there is no
// /registry/register on the boot path. Without firing here, the reconcile
// hook in evaluateStatus never sees a provisioning→online transition (the
// CASE already moved the row to 'online' before evaluateStatus reads
// currentStatus), so declared plugins (e.g. seo-all) never install. Firing
// is idempotent — ReconcileWorkspacePlugins diffs declared-vs-installed and
// no-ops when everything is present — and nil-safe via fireReconcileOnline.
if prevStatus == string(models.StatusProvisioning) {
h.fireReconcileOnline(ctx, payload.WorkspaceID)
}
// #2421: backfill agent_card when the initial register failed and the
// heartbeat carries it. Only writes when NULL — never overwrites a
// reconciled or updated card. This is the recovery path for fast-cloud
@@ -1213,12 +1244,18 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
h.fireReconcileOnline(ctx, payload.WorkspaceID)
}
// Auto-recovery: if a workspace is marked "provisioning" but is actively sending
// heartbeats, it has successfully started up. Transition to "online" so the scheduler
// and A2A proxy can dispatch tasks to it. The provisioner does not call
// /registry/register on container start — only the heartbeat loop does, so this
// transition is the only mechanism that moves newly-started workspaces out of
// the phantom-idle state. (#1784)
// Auto-recovery: if a workspace is STILL marked "provisioning" by the time
// this branch runs, transition it to "online". Defense-in-depth only: the
// main heartbeat UPDATE above already self-heals provisioning→online via its
// inline CASE, so on the normal path currentStatus is 'online' here and this
// branch is a no-op. It still covers any future path that reaches
// evaluateStatus with a 'provisioning' row that the inline CASE missed. (#1784)
//
// NOTE (RFC#2843 #32): because the inline CASE pre-empts this branch on the
// real fresh-boot path, the declared-plugin reconcile is fired from the
// heartbeat handler itself (on prevStatus=='provisioning'), NOT only here —
// see the fireReconcileOnline call right after the main UPDATE. Do not rely
// on this branch as the reconcile trigger; it does not fire for new boxes.
if currentStatus == "provisioning" {
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status = 'provisioning'`, models.StatusOnline, payload.WorkspaceID); err != nil {
log.Printf("Heartbeat: failed to transition %s from provisioning to online: %v", payload.WorkspaceID, err)
@@ -2,6 +2,7 @@ package handlers
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"log"
@@ -209,7 +210,7 @@ func TestHeartbeatHandler_OfflineToOnline(t *testing.T) {
// Expect prevTask SELECT
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-offline").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
// Expect heartbeat UPDATE
mock.ExpectExec("UPDATE workspaces SET").
@@ -256,29 +257,34 @@ func TestHeartbeatHandler_ProvisioningToOnline(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
// Expect prevTask SELECT
// RFC#2843 #32 regression: the reconcile MUST fire when a fresh workspace's
// heartbeat performs the provisioning→online self-heal. The runtime never
// calls /registry/register on boot, so the heartbeat (whose UPDATE's inline
// CASE flips provisioning→online before evaluateStatus runs) is the ONLY
// fresh-boot transition. Pre-fix, fireReconcileOnline was only wired into
// evaluateStatus's provisioning branch, which the inline CASE makes
// unreachable — so declared plugins (e.g. seo-all) never installed.
reconcileFired := make(chan string, 4)
handler.SetReconcileFunc(func(_ context.Context, workspaceID string) {
reconcileFired <- workspaceID
})
// prevTask + prevStatus SELECT — prevStatus='provisioning' is the state a
// freshly-created workspace is in before its first heartbeat.
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-provisioning").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "provisioning"))
// Expect heartbeat UPDATE
// Heartbeat UPDATE — its inline CASE flips provisioning→online.
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-provisioning", 0.0, "", 1, 3000, "").
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect evaluateStatus SELECT — currently provisioning
// evaluateStatus SELECT — reads the post-CASE status ('online'), so its own
// provisioning→online branch does NOT fire (no duplicate transition exec).
mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id =").
WithArgs("ws-provisioning").
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("provisioning", nil))
// Expect status transition to online (#1784)
mock.ExpectExec("UPDATE workspaces SET status =").
WithArgs(models.StatusOnline, "ws-provisioning").
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect RecordAndBroadcast INSERT for WORKSPACE_ONLINE
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -293,6 +299,16 @@ func TestHeartbeatHandler_ProvisioningToOnline(t *testing.T) {
t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
// The reconcile fires fire-and-forget via globalGoAsync; wait briefly.
select {
case got := <-reconcileFired:
if got != "ws-provisioning" {
t.Errorf("reconcile fired for wrong workspace: got %q", got)
}
case <-time.After(2 * time.Second):
t.Fatal("RFC#2843 #32 regression: reconcile did NOT fire on provisioning→online heartbeat")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
@@ -311,7 +327,7 @@ func TestHeartbeatHandler_FailedToOnline(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-failed").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-failed", 0.0, "", 1, 3000, "").
@@ -361,7 +377,7 @@ func TestHeartbeatHandler_AwaitingAgentToOnline(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-external").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-external", 0.0, "", 0, 60, "").
@@ -446,7 +462,7 @@ func TestHeartbeatHandler_DBUpdateError(t *testing.T) {
// Expect prevTask SELECT
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-dberr").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
// Heartbeat UPDATE fails
mock.ExpectExec("UPDATE workspaces SET").
@@ -482,7 +498,7 @@ func TestHeartbeatHandler_OnlineStaysOnline(t *testing.T) {
// Expect prevTask SELECT
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-stable").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
// Expect heartbeat UPDATE
mock.ExpectExec("UPDATE workspaces SET").
@@ -530,7 +546,7 @@ func TestHeartbeatHandler_RuntimeWedged_FlipsOnlineToDegraded(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-wedged").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
// Heartbeat UPDATE — sample_error carries the wedge reason from the
// workspace's _runtime_state_payload() helper.
@@ -585,7 +601,7 @@ func TestHeartbeatHandler_DegradedRecoversOnlyAfterWedgeClears(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-still-wedged").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-still-wedged", 0.0, "still broken", 0, 800, "").
@@ -631,7 +647,7 @@ func TestHeartbeatHandler_DegradedToOnline_AfterWedgeClears(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-recovered").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-recovered", 0.0, "", 0, 30, "").
@@ -873,7 +889,7 @@ func TestHeartbeat_SkipsRemovedRows(t *testing.T) {
// prevTask lookup
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-zombie").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
// UPDATE must include `AND status != 'removed'`. 0 rows affected is fine —
// this is the tombstoned case the fix protects against.
@@ -912,7 +928,7 @@ func TestHeartbeatHandler_BackfillsAgentCard_WhenNull(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-nocard").
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend"}).AddRow("", 0))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-nocard", 0.0, "", 0, 0, "").
@@ -952,7 +968,7 @@ func TestHeartbeatHandler_SkipsAgentCardBackfill_WhenAlreadySet(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-hascard").
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend"}).AddRow("", 0))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-hascard", 0.0, "", 0, 0, "").
@@ -997,7 +1013,7 @@ func TestHeartbeatHandler_BackfillAgentCard_ClearsRegisterFailure(t *testing.T)
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-degraded-register-fail").
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend"}).AddRow("", 0))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-degraded-register-fail", 0.0, "", 0, 0, "").
@@ -1651,7 +1667,7 @@ func TestHeartbeat_MonthlySpend_WithinBounds(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-spend-ok").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
// Expect the 7-argument UPDATE (with monthly_spend = $7).
mock.ExpectExec("UPDATE workspaces SET").
@@ -1687,7 +1703,7 @@ func TestHeartbeat_MonthlySpend_NegativeClamped(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-spend-neg").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
// Clamped to 0 → no monthly_spend field → 6-argument UPDATE.
mock.ExpectExec("UPDATE workspaces SET").
@@ -1723,7 +1739,7 @@ func TestHeartbeat_MonthlySpend_OverflowClamped(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-spend-overflow").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
// Expect the 7-argument UPDATE with monthly_spend clamped to 1_000_000_000_000.
mock.ExpectExec("UPDATE workspaces SET").
@@ -1759,7 +1775,7 @@ func TestHeartbeat_MonthlySpend_ExactCap(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-spend-cap").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-spend-cap", 0.0, "", 0, 0, "", int64(1_000_000_000_000)).
@@ -1794,7 +1810,7 @@ func TestHeartbeat_MonthlySpend_Zero_NoUpdate(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-spend-zero").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
// 6-argument UPDATE — monthly_spend NOT included.
mock.ExpectExec("UPDATE workspaces SET").
@@ -2457,7 +2473,7 @@ func TestHeartbeatHandler_DeliversPlatformInboundSecret(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-with-secret").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-with-secret", 0.0, "", 0, 100, "").
@@ -2512,7 +2528,7 @@ func TestHeartbeatHandler_LazyHealsPlatformInboundSecret(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-needs-heal").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-needs-heal", 0.0, "", 0, 100, "").
@@ -2568,7 +2584,7 @@ func TestHeartbeatHandler_OmitsSecretOnHealFailure(t *testing.T) {
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-heal-fails").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-heal-fails", 0.0, "", 0, 100, "").
@@ -2933,7 +2949,7 @@ func TestHeartbeat_RecentRegisterFailure_DegradesWorkspace(t *testing.T) {
// prevTask SELECT
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-degrade-reg").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
// heartbeat UPDATE
mock.ExpectExec("UPDATE workspaces SET").
@@ -2983,7 +2999,7 @@ func TestHeartbeat_RecentRegisterFailure_BlocksRecovery(t *testing.T) {
// prevTask SELECT
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-no-recover").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
// heartbeat UPDATE
mock.ExpectExec("UPDATE workspaces SET").