diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index 2470b979..3e4bca7d 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -882,10 +882,26 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) { return // response already written } - // Read previous current_task to detect changes (before the UPDATE) + // Read previous current_task + status to detect changes (before the UPDATE). + // + // prevStatus is load-bearing for the RFC#2843 #32 plugin reconcile: the + // main heartbeat UPDATE below SELF-HEALS status provisioning→online INLINE + // (the `CASE WHEN status = 'provisioning' THEN 'online'` clause). That flip + // happens BEFORE evaluateStatus runs, so by the time evaluateStatus reads + // currentStatus it is already 'online' and its `currentStatus == + // "provisioning"` branch (which fires fireReconcileOnline) never matches on + // the normal fresh-boot path — the runtime only ever calls + // /registry/heartbeat, never /registry/register, so this IS the path every + // new workspace takes. The result: declared plugins never installed on a + // fresh seo-agent (the #32 regression). Capturing prevStatus here lets us + // fire the reconcile when THIS heartbeat performed the provisioning→online + // flip, independent of evaluateStatus. evaluateStatus still owns the OTHER + // recovery transitions (offline/degraded/awaiting_agent/failed→online), + // which the inline CASE does not touch. var prevTask string var prevSpend int64 - if err := db.DB.QueryRowContext(ctx, `SELECT COALESCE(current_task, ''), COALESCE(monthly_spend, 0) FROM workspaces WHERE id = $1`, payload.WorkspaceID).Scan(&prevTask, &prevSpend); err != nil { + var prevStatus string + if err := db.DB.QueryRowContext(ctx, `SELECT COALESCE(current_task, ''), COALESCE(monthly_spend, 0), COALESCE(status, '') FROM workspaces WHERE id = $1`, payload.WorkspaceID).Scan(&prevTask, &prevSpend, &prevStatus); err != nil { log.Printf("registry heartbeat: prev_task query failed for workspace %s: %v", payload.WorkspaceID, err) } @@ -968,6 +984,21 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) { return } + // RFC#2843 #32: fire the declared-plugin reconcile when THIS heartbeat just + // performed the provisioning→online self-heal (the inline CASE in the UPDATE + // above). This is the primary fresh-boot transition: a newly-provisioned + // workspace is created with status='provisioning', the runtime's first + // heartbeat flips it to 'online' via that CASE, and there is no + // /registry/register on the boot path. Without firing here, the reconcile + // hook in evaluateStatus never sees a provisioning→online transition (the + // CASE already moved the row to 'online' before evaluateStatus reads + // currentStatus), so declared plugins (e.g. seo-all) never install. Firing + // is idempotent — ReconcileWorkspacePlugins diffs declared-vs-installed and + // no-ops when everything is present — and nil-safe via fireReconcileOnline. + if prevStatus == string(models.StatusProvisioning) { + h.fireReconcileOnline(ctx, payload.WorkspaceID) + } + // #2421: backfill agent_card when the initial register failed and the // heartbeat carries it. Only writes when NULL — never overwrites a // reconciled or updated card. This is the recovery path for fast-cloud @@ -1213,12 +1244,18 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea h.fireReconcileOnline(ctx, payload.WorkspaceID) } - // Auto-recovery: if a workspace is marked "provisioning" but is actively sending - // heartbeats, it has successfully started up. Transition to "online" so the scheduler - // and A2A proxy can dispatch tasks to it. The provisioner does not call - // /registry/register on container start — only the heartbeat loop does, so this - // transition is the only mechanism that moves newly-started workspaces out of - // the phantom-idle state. (#1784) + // Auto-recovery: if a workspace is STILL marked "provisioning" by the time + // this branch runs, transition it to "online". Defense-in-depth only: the + // main heartbeat UPDATE above already self-heals provisioning→online via its + // inline CASE, so on the normal path currentStatus is 'online' here and this + // branch is a no-op. It still covers any future path that reaches + // evaluateStatus with a 'provisioning' row that the inline CASE missed. (#1784) + // + // NOTE (RFC#2843 #32): because the inline CASE pre-empts this branch on the + // real fresh-boot path, the declared-plugin reconcile is fired from the + // heartbeat handler itself (on prevStatus=='provisioning'), NOT only here — + // see the fireReconcileOnline call right after the main UPDATE. Do not rely + // on this branch as the reconcile trigger; it does not fire for new boxes. if currentStatus == "provisioning" { if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, updated_at = now() WHERE id = $2 AND status = 'provisioning'`, models.StatusOnline, payload.WorkspaceID); err != nil { log.Printf("Heartbeat: failed to transition %s from provisioning to online: %v", payload.WorkspaceID, err) diff --git a/workspace-server/internal/handlers/registry_test.go b/workspace-server/internal/handlers/registry_test.go index 252333f0..5104d92b 100644 --- a/workspace-server/internal/handlers/registry_test.go +++ b/workspace-server/internal/handlers/registry_test.go @@ -2,6 +2,7 @@ package handlers import ( "bytes" + "context" "database/sql" "encoding/json" "log" @@ -209,7 +210,7 @@ func TestHeartbeatHandler_OfflineToOnline(t *testing.T) { // Expect prevTask SELECT mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-offline"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) // Expect heartbeat UPDATE mock.ExpectExec("UPDATE workspaces SET"). @@ -256,29 +257,34 @@ func TestHeartbeatHandler_ProvisioningToOnline(t *testing.T) { broadcaster := newTestBroadcaster() handler := NewRegistryHandler(broadcaster) - // Expect prevTask SELECT + // RFC#2843 #32 regression: the reconcile MUST fire when a fresh workspace's + // heartbeat performs the provisioning→online self-heal. The runtime never + // calls /registry/register on boot, so the heartbeat (whose UPDATE's inline + // CASE flips provisioning→online before evaluateStatus runs) is the ONLY + // fresh-boot transition. Pre-fix, fireReconcileOnline was only wired into + // evaluateStatus's provisioning branch, which the inline CASE makes + // unreachable — so declared plugins (e.g. seo-all) never installed. + reconcileFired := make(chan string, 4) + handler.SetReconcileFunc(func(_ context.Context, workspaceID string) { + reconcileFired <- workspaceID + }) + + // prevTask + prevStatus SELECT — prevStatus='provisioning' is the state a + // freshly-created workspace is in before its first heartbeat. mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-provisioning"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "provisioning")) - // Expect heartbeat UPDATE + // Heartbeat UPDATE — its inline CASE flips provisioning→online. mock.ExpectExec("UPDATE workspaces SET"). WithArgs("ws-provisioning", 0.0, "", 1, 3000, ""). WillReturnResult(sqlmock.NewResult(0, 1)) - // Expect evaluateStatus SELECT — currently provisioning + // evaluateStatus SELECT — reads the post-CASE status ('online'), so its own + // provisioning→online branch does NOT fire (no duplicate transition exec). mock.ExpectQuery("SELECT status, last_register_failure_at FROM workspaces WHERE id ="). WithArgs("ws-provisioning"). - WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("provisioning", nil)) - - // Expect status transition to online (#1784) - mock.ExpectExec("UPDATE workspaces SET status ="). - WithArgs(models.StatusOnline, "ws-provisioning"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // Expect RecordAndBroadcast INSERT for WORKSPACE_ONLINE - mock.ExpectExec("INSERT INTO structure_events"). - WillReturnResult(sqlmock.NewResult(0, 1)) + WillReturnRows(sqlmock.NewRows([]string{"status", "last_register_failure_at"}).AddRow("online", nil)) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -293,6 +299,16 @@ func TestHeartbeatHandler_ProvisioningToOnline(t *testing.T) { t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String()) } + // The reconcile fires fire-and-forget via globalGoAsync; wait briefly. + select { + case got := <-reconcileFired: + if got != "ws-provisioning" { + t.Errorf("reconcile fired for wrong workspace: got %q", got) + } + case <-time.After(2 * time.Second): + t.Fatal("RFC#2843 #32 regression: reconcile did NOT fire on provisioning→online heartbeat") + } + if err := mock.ExpectationsWereMet(); err != nil { t.Errorf("unmet sqlmock expectations: %v", err) } @@ -311,7 +327,7 @@ func TestHeartbeatHandler_FailedToOnline(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-failed"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) mock.ExpectExec("UPDATE workspaces SET"). WithArgs("ws-failed", 0.0, "", 1, 3000, ""). @@ -361,7 +377,7 @@ func TestHeartbeatHandler_AwaitingAgentToOnline(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-external"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) mock.ExpectExec("UPDATE workspaces SET"). WithArgs("ws-external", 0.0, "", 0, 60, ""). @@ -446,7 +462,7 @@ func TestHeartbeatHandler_DBUpdateError(t *testing.T) { // Expect prevTask SELECT mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-dberr"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) // Heartbeat UPDATE fails mock.ExpectExec("UPDATE workspaces SET"). @@ -482,7 +498,7 @@ func TestHeartbeatHandler_OnlineStaysOnline(t *testing.T) { // Expect prevTask SELECT mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-stable"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) // Expect heartbeat UPDATE mock.ExpectExec("UPDATE workspaces SET"). @@ -530,7 +546,7 @@ func TestHeartbeatHandler_RuntimeWedged_FlipsOnlineToDegraded(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-wedged"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) // Heartbeat UPDATE — sample_error carries the wedge reason from the // workspace's _runtime_state_payload() helper. @@ -585,7 +601,7 @@ func TestHeartbeatHandler_DegradedRecoversOnlyAfterWedgeClears(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-still-wedged"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) mock.ExpectExec("UPDATE workspaces SET"). WithArgs("ws-still-wedged", 0.0, "still broken", 0, 800, ""). @@ -631,7 +647,7 @@ func TestHeartbeatHandler_DegradedToOnline_AfterWedgeClears(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-recovered"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) mock.ExpectExec("UPDATE workspaces SET"). WithArgs("ws-recovered", 0.0, "", 0, 30, ""). @@ -873,7 +889,7 @@ func TestHeartbeat_SkipsRemovedRows(t *testing.T) { // prevTask lookup mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-zombie"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) // UPDATE must include `AND status != 'removed'`. 0 rows affected is fine — // this is the tombstoned case the fix protects against. @@ -912,7 +928,7 @@ func TestHeartbeatHandler_BackfillsAgentCard_WhenNull(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-nocard"). - WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend"}).AddRow("", 0)) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) mock.ExpectExec("UPDATE workspaces SET"). WithArgs("ws-nocard", 0.0, "", 0, 0, ""). @@ -952,7 +968,7 @@ func TestHeartbeatHandler_SkipsAgentCardBackfill_WhenAlreadySet(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-hascard"). - WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend"}).AddRow("", 0)) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) mock.ExpectExec("UPDATE workspaces SET"). WithArgs("ws-hascard", 0.0, "", 0, 0, ""). @@ -997,7 +1013,7 @@ func TestHeartbeatHandler_BackfillAgentCard_ClearsRegisterFailure(t *testing.T) mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-degraded-register-fail"). - WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend"}).AddRow("", 0)) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) mock.ExpectExec("UPDATE workspaces SET"). WithArgs("ws-degraded-register-fail", 0.0, "", 0, 0, ""). @@ -1651,7 +1667,7 @@ func TestHeartbeat_MonthlySpend_WithinBounds(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-spend-ok"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) // Expect the 7-argument UPDATE (with monthly_spend = $7). mock.ExpectExec("UPDATE workspaces SET"). @@ -1687,7 +1703,7 @@ func TestHeartbeat_MonthlySpend_NegativeClamped(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-spend-neg"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) // Clamped to 0 → no monthly_spend field → 6-argument UPDATE. mock.ExpectExec("UPDATE workspaces SET"). @@ -1723,7 +1739,7 @@ func TestHeartbeat_MonthlySpend_OverflowClamped(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-spend-overflow"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) // Expect the 7-argument UPDATE with monthly_spend clamped to 1_000_000_000_000. mock.ExpectExec("UPDATE workspaces SET"). @@ -1759,7 +1775,7 @@ func TestHeartbeat_MonthlySpend_ExactCap(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-spend-cap"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) mock.ExpectExec("UPDATE workspaces SET"). WithArgs("ws-spend-cap", 0.0, "", 0, 0, "", int64(1_000_000_000_000)). @@ -1794,7 +1810,7 @@ func TestHeartbeat_MonthlySpend_Zero_NoUpdate(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-spend-zero"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) // 6-argument UPDATE — monthly_spend NOT included. mock.ExpectExec("UPDATE workspaces SET"). @@ -2457,7 +2473,7 @@ func TestHeartbeatHandler_DeliversPlatformInboundSecret(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-with-secret"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) mock.ExpectExec("UPDATE workspaces SET"). WithArgs("ws-with-secret", 0.0, "", 0, 100, ""). @@ -2512,7 +2528,7 @@ func TestHeartbeatHandler_LazyHealsPlatformInboundSecret(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-needs-heal"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) mock.ExpectExec("UPDATE workspaces SET"). WithArgs("ws-needs-heal", 0.0, "", 0, 100, ""). @@ -2568,7 +2584,7 @@ func TestHeartbeatHandler_OmitsSecretOnHealFailure(t *testing.T) { mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-heal-fails"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) mock.ExpectExec("UPDATE workspaces SET"). WithArgs("ws-heal-fails", 0.0, "", 0, 100, ""). @@ -2933,7 +2949,7 @@ func TestHeartbeat_RecentRegisterFailure_DegradesWorkspace(t *testing.T) { // prevTask SELECT mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-degrade-reg"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) // heartbeat UPDATE mock.ExpectExec("UPDATE workspaces SET"). @@ -2983,7 +2999,7 @@ func TestHeartbeat_RecentRegisterFailure_BlocksRecovery(t *testing.T) { // prevTask SELECT mock.ExpectQuery("SELECT COALESCE\\(current_task"). WithArgs("ws-no-recover"). - WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online")) // heartbeat UPDATE mock.ExpectExec("UPDATE workspaces SET").