fix(#33): break the RCA#2970 management-MCP re-provision deadlock for concierges #3228

Merged
agent-reviewer-cr2 merged 3 commits from fix/concierge-mgmt-mcp-reprovision-deadlock-33 into main 2026-06-24 12:08:32 +00:00
2 changed files with 276 additions and 0 deletions
@@ -111,8 +111,29 @@ type RegistryHandler struct {
// reconcile when unset (e.g. unit tests, CP/SaaS mode without a plugins
// handler). Wired by the router to PluginsHandler.ReconcileWorkspacePlugins.
reconcilePlugins ReconcileFunc
// mcpRecoveryLastFire rate-limits the RCA#2970 deadlock-break reconcile (#33).
// The gate fails on EVERY heartbeat until the management MCP lands, so without
// a throttle a concierge that cannot recover (e.g. a missing plugin-source
// token, and where deliver() restarts the container) would re-fire a
// clone+deliver every heartbeat interval — restart churn that never converges.
// Keyed by workspace ID → time.Time of the last fire; a new fire is allowed
// only after mcpRecoveryCooldown. The happy path leaves the mcp-missing state
// on the next heartbeat (MCP now present), so the cooldown only throttles the
// genuinely-stuck case. Stored-before-fire so concurrent heartbeats can't both
// fire (the second sees the just-stored timestamp); a rare double-fire is
// harmless — ReconcileWorkspacePlugins is idempotent. In-memory only: a CP
// redeploy / conductor tick resets it, so the once-per-cooldown guarantee
// holds within a process lifetime (acceptable — redeploys are not sub-minute,
// and one extra reconcile per redeploy on a stuck concierge is tolerable).
mcpRecoveryLastFire sync.Map
}
// mcpRecoveryCooldown bounds how often a single concierge's RCA#2970
// deadlock-break reconcile may fire (#33). Long enough to cover a
// clone+deliver+restart+boot cycle so a stuck concierge retries gently rather
// than hammering, short enough to self-heal a transient miss within minutes.
const mcpRecoveryCooldown = 5 * time.Minute
func NewRegistryHandler(b *events.Broadcaster) *RegistryHandler {
return &RegistryHandler{broadcaster: b}
}
@@ -146,6 +167,44 @@ func (h *RegistryHandler) fireReconcileOnline(ctx context.Context, workspaceID s
globalGoAsync(func() { h.reconcilePlugins(rctx, wsID) })
}
// fireReconcileMCPRecovery breaks the RCA#2970 management-MCP deadlock (#33).
//
// A kind=platform concierge whose runtime reports mcp_server_present=false is
// marked failed and the heartbeat returns BEFORE the recovery branches that
// fire fireReconcileOnline — so the declared-plugin reconcile (the ONLY SaaS
// path that installs the management MCP into the running container, reading
// workspace_declared_plugins) never runs, and mcp_server_present can never flip
// to true. A concierge that boots MCP-less (e.g. a boot-install miss on
// re-provision) is then permanently stuck failed. This fires that reconcile
// from the fail branch so the declared management MCP is delivered; the
// workspace stays failed for THIS heartbeat (fail-closed preserved), and once
// the runtime re-reads /configs/.claude/settings.json and reports
// mcp_server_present=true the existing failed→online recovery in evaluateStatus
// climbs it back. If the reconcile cannot deliver (missing token / fetch
// failure) it logs loudly and the concierge stays failed — the correct
// fail-closed outcome, now with a root cause surfaced in the logs.
//
// Rate-limited per workspace by mcpRecoveryLastFire/mcpRecoveryCooldown so a
// sustained-missing concierge retries gently (the gate fails on every heartbeat
// until the MCP lands) rather than re-firing a clone+deliver every beat.
// nil-safe via the reconcilePlugins check + the empty-id guard.
func (h *RegistryHandler) fireReconcileMCPRecovery(ctx context.Context, workspaceID string) {
if h.reconcilePlugins == nil || workspaceID == "" {
return
}
if last, ok := h.mcpRecoveryLastFire.Load(workspaceID); ok {
if t, _ := last.(time.Time); time.Since(t) < mcpRecoveryCooldown {
return // fired recently — let the in-flight reconcile converge
}
}
// Store BEFORE firing so a concurrent heartbeat sees the fresh timestamp and
// does not double-fire (a rare race double-fire is harmless — idempotent).
h.mcpRecoveryLastFire.Store(workspaceID, time.Now())
rctx := context.WithoutCancel(ctx)
wsID := workspaceID
globalGoAsync(func() { h.reconcilePlugins(rctx, wsID) })
}
// validateAgentURL rejects URLs that could be used as SSRF vectors against
// cloud metadata services or other internal infrastructure.
//
@@ -667,6 +726,13 @@ func (h *RegistryHandler) Register(c *gin.Context) {
msg = "platform agent registered without /opt/molecule-mcp-server; refusing online"
reason = "mcp_server_missing"
logCode = "platform_agent_mcp_server_missing"
// #33 deadlock-break (mirrors the heartbeat gate): this branch
// return()s before the register's provisioning→online reconcile
// fire, so without this a concierge registering MCP-less would
// never get the declared management MCP delivered. Fire it here;
// the in-flight guard dedupes against the heartbeat-path fire.
// Stays fail-closed (markWorkspaceFailed below + 400 response).
h.fireReconcileMCPRecovery(ctx, payload.ID)
}
log.Printf("Registry register: %s (workspace=%s)", msg, payload.ID)
h.markWorkspaceFailed(ctx, payload.ID, msg, reason)
@@ -1350,6 +1416,16 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
case !hasMCP:
msg = "platform agent heartbeat denied: /opt/molecule-mcp-server missing; refusing to mark online (RCA #2970 FAIL-CLOSED)"
reason = "mcp_server_missing"
// #33 deadlock-break: the management MCP is delivered on SaaS by
// the declared-plugin reconcile (workspace_declared_plugins), but
// this branch return()s before the recovery paths that fire it —
// so a concierge that boots MCP-less can never self-heal. Fire the
// reconcile here to deliver the declared MCP into the running
// container. Stays fail-closed for THIS heartbeat (markWorkspaceFailed
// below); the NEXT heartbeat — once the runtime re-reads settings.json
// and reports mcp_server_present=true — recovers failed→online via
// the existing recovery branch. Guarded against per-heartbeat storms.
h.fireReconcileMCPRecovery(ctx, payload.WorkspaceID)
}
log.Printf("Heartbeat: %s (workspace=%s)", msg, payload.WorkspaceID)
h.markWorkspaceFailed(ctx, payload.WorkspaceID, msg, reason)
@@ -0,0 +1,200 @@
package handlers
import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// #33 (RCA#2970 deadlock-break): a kind=platform concierge whose runtime
// reports mcp_server_present=false is marked failed and the heartbeat returns
// BEFORE the recovery branches that fire the declared-plugin reconcile. On
// SaaS that reconcile is the ONLY path that installs the management MCP into
// the running container, so without firing it here the concierge is stuck
// failed forever (mcp_server_present can never become true). This asserts the
// fix: the mcp-missing heartbeat STILL fails closed (markWorkspaceFailed) AND
// fires the recovery reconcile so the MCP can be delivered and a later
// heartbeat can climb failed→online.
func TestHeartbeatHandler_PlatformMCPMissing_FiresRecoveryReconcile(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
reconcileFired := make(chan string, 4)
handler.SetReconcileFunc(func(_ context.Context, workspaceID string) {
reconcileFired <- workspaceID
})
// prevTask/status read (status=online → not provisioning, so the
// prevStatus==provisioning reconcile fire does NOT match; only the
// deadlock-break fire can fire here).
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-mcp-fail").
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
// Main heartbeat UPDATE.
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-mcp-fail", 0.0, "", 0, 60, "").
WillReturnResult(sqlmock.NewResult(0, 1))
// evaluateStatus: currentStatus=online, kind=platform.
mock.ExpectQuery("SELECT status, kind, last_register_failure_at, mcp_unloaded_since FROM workspaces WHERE id =").
WithArgs("ws-mcp-fail").
WillReturnRows(evalStatusRows("online", "platform", nil, nil))
// RCA#2970 gate: model secret present (so we fall to the !hasMCP branch).
mock.ExpectQuery("SELECT EXISTS").
WithArgs("ws-mcp-fail").
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// markWorkspaceFailed: broadcast (structure_events) then the failed UPDATE.
mcpMissingMsg := "platform agent heartbeat denied: /opt/molecule-mcp-server missing; refusing to mark online (RCA #2970 FAIL-CLOSED)"
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("UPDATE workspaces SET status =").
WithArgs("ws-mcp-fail", mcpMissingMsg, models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"workspace_id":"ws-mcp-fail","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":60,"mcp_server_present":false}`
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Heartbeat(c)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
// The deadlock-break reconcile is fire-and-forget via globalGoAsync.
select {
case got := <-reconcileFired:
if got != "ws-mcp-fail" {
t.Errorf("recovery reconcile fired for wrong workspace: got %q", got)
}
case <-time.After(2 * time.Second):
t.Fatal("#33 regression: mcp-missing heartbeat did NOT fire the recovery reconcile (concierge would stay failed forever)")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// A MODEL-missing platform agent must STILL fail closed but must NOT fire the
// plugin reconcile — a missing MODEL secret is not something a declared-plugin
// reconcile can fix, so the recovery fire is scoped to the !hasMCP branch only.
func TestHeartbeatHandler_PlatformModelMissing_DoesNotFireReconcile(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
reconcileFired := make(chan string, 4)
handler.SetReconcileFunc(func(_ context.Context, workspaceID string) {
reconcileFired <- workspaceID
})
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-model-fail").
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-model-fail", 0.0, "", 0, 60, "").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status, kind, last_register_failure_at, mcp_unloaded_since FROM workspaces WHERE id =").
WithArgs("ws-model-fail").
WillReturnRows(evalStatusRows("online", "platform", nil, nil))
// Model secret ABSENT → the switch picks the !hasModel branch first.
mock.ExpectQuery("SELECT EXISTS").
WithArgs("ws-model-fail").
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
modelMissingMsg := "platform agent heartbeat denied: no seeded MODEL workspace_secret; refusing to mark online (RCA #2970 FAIL-CLOSED)"
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("UPDATE workspaces SET status =").
WithArgs("ws-model-fail", modelMissingMsg, models.StatusFailed).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
// mcp_server_present=true so hasMCP=true; only the model is missing.
body := `{"workspace_id":"ws-model-fail","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":60,"mcp_server_present":true}`
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Heartbeat(c)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
select {
case got := <-reconcileFired:
t.Fatalf("model-missing must NOT fire the plugin reconcile, but it fired for %q", got)
case <-time.After(300 * time.Millisecond):
// good — no reconcile fired
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// The recovery reconcile is rate-limited per workspace: the gate fails on every
// heartbeat until the MCP lands, so without a throttle each blocked beat would
// spawn another clone+deliver (restart churn). A new fire is allowed only after
// mcpRecoveryCooldown.
func TestFireReconcileMCPRecovery_RateLimited(t *testing.T) {
handler := NewRegistryHandler(newTestBroadcaster())
fired := make(chan string, 8)
handler.SetReconcileFunc(func(_ context.Context, workspaceID string) {
fired <- workspaceID
})
ctx := context.Background()
handler.fireReconcileMCPRecovery(ctx, "ws-x") // fires
handler.fireReconcileMCPRecovery(ctx, "ws-x") // within cooldown → must NOT fire
select {
case got := <-fired:
if got != "ws-x" {
t.Fatalf("reconcile fired for wrong workspace: %q", got)
}
case <-time.After(2 * time.Second):
t.Fatal("first recovery reconcile did not fire")
}
select {
case <-fired:
t.Fatal("rate-limit failed: a second reconcile fired within the cooldown window")
case <-time.After(200 * time.Millisecond):
// good — cooldown held
}
// Simulate the cooldown elapsing → a fresh fire is allowed again so a
// genuinely-stuck concierge keeps retrying (gently).
handler.mcpRecoveryLastFire.Store("ws-x", time.Now().Add(-2*mcpRecoveryCooldown))
handler.fireReconcileMCPRecovery(ctx, "ws-x")
select {
case <-fired:
// good — re-fired after cooldown
case <-time.After(2 * time.Second):
t.Fatal("recovery reconcile did not re-fire after the cooldown elapsed")
}
}
// nil reconcile func / empty id must be no-ops (never panic).
func TestFireReconcileMCPRecovery_NilSafe(t *testing.T) {
h := &RegistryHandler{} // reconcilePlugins is nil
h.fireReconcileMCPRecovery(context.Background(), "ws-x")
h2 := NewRegistryHandler(newTestBroadcaster())
h2.SetReconcileFunc(func(_ context.Context, _ string) { t.Fatal("must not fire for empty workspace id") })
h2.fireReconcileMCPRecovery(context.Background(), "")
}