fix(#33): break the RCA#2970 management-MCP re-provision deadlock for concierges #3228
@@ -111,8 +111,29 @@ type RegistryHandler struct {
|
||||
// reconcile when unset (e.g. unit tests, CP/SaaS mode without a plugins
|
||||
// handler). Wired by the router to PluginsHandler.ReconcileWorkspacePlugins.
|
||||
reconcilePlugins ReconcileFunc
|
||||
// mcpRecoveryLastFire rate-limits the RCA#2970 deadlock-break reconcile (#33).
|
||||
// The gate fails on EVERY heartbeat until the management MCP lands, so without
|
||||
// a throttle a concierge that cannot recover (e.g. a missing plugin-source
|
||||
// token, and where deliver() restarts the container) would re-fire a
|
||||
// clone+deliver every heartbeat interval — restart churn that never converges.
|
||||
// Keyed by workspace ID → time.Time of the last fire; a new fire is allowed
|
||||
// only after mcpRecoveryCooldown. The happy path leaves the mcp-missing state
|
||||
// on the next heartbeat (MCP now present), so the cooldown only throttles the
|
||||
// genuinely-stuck case. Stored-before-fire so concurrent heartbeats can't both
|
||||
// fire (the second sees the just-stored timestamp); a rare double-fire is
|
||||
// harmless — ReconcileWorkspacePlugins is idempotent. In-memory only: a CP
|
||||
// redeploy / conductor tick resets it, so the once-per-cooldown guarantee
|
||||
// holds within a process lifetime (acceptable — redeploys are not sub-minute,
|
||||
// and one extra reconcile per redeploy on a stuck concierge is tolerable).
|
||||
mcpRecoveryLastFire sync.Map
|
||||
}
|
||||
|
||||
// mcpRecoveryCooldown bounds how often a single concierge's RCA#2970
|
||||
// deadlock-break reconcile may fire (#33). Long enough to cover a
|
||||
// clone+deliver+restart+boot cycle so a stuck concierge retries gently rather
|
||||
// than hammering, short enough to self-heal a transient miss within minutes.
|
||||
const mcpRecoveryCooldown = 5 * time.Minute
|
||||
|
||||
func NewRegistryHandler(b *events.Broadcaster) *RegistryHandler {
|
||||
return &RegistryHandler{broadcaster: b}
|
||||
}
|
||||
@@ -146,6 +167,44 @@ func (h *RegistryHandler) fireReconcileOnline(ctx context.Context, workspaceID s
|
||||
globalGoAsync(func() { h.reconcilePlugins(rctx, wsID) })
|
||||
}
|
||||
|
||||
// fireReconcileMCPRecovery breaks the RCA#2970 management-MCP deadlock (#33).
|
||||
//
|
||||
// A kind=platform concierge whose runtime reports mcp_server_present=false is
|
||||
// marked failed and the heartbeat returns BEFORE the recovery branches that
|
||||
// fire fireReconcileOnline — so the declared-plugin reconcile (the ONLY SaaS
|
||||
// path that installs the management MCP into the running container, reading
|
||||
// workspace_declared_plugins) never runs, and mcp_server_present can never flip
|
||||
// to true. A concierge that boots MCP-less (e.g. a boot-install miss on
|
||||
// re-provision) is then permanently stuck failed. This fires that reconcile
|
||||
// from the fail branch so the declared management MCP is delivered; the
|
||||
// workspace stays failed for THIS heartbeat (fail-closed preserved), and once
|
||||
// the runtime re-reads /configs/.claude/settings.json and reports
|
||||
// mcp_server_present=true the existing failed→online recovery in evaluateStatus
|
||||
// climbs it back. If the reconcile cannot deliver (missing token / fetch
|
||||
// failure) it logs loudly and the concierge stays failed — the correct
|
||||
// fail-closed outcome, now with a root cause surfaced in the logs.
|
||||
//
|
||||
// Rate-limited per workspace by mcpRecoveryLastFire/mcpRecoveryCooldown so a
|
||||
// sustained-missing concierge retries gently (the gate fails on every heartbeat
|
||||
// until the MCP lands) rather than re-firing a clone+deliver every beat.
|
||||
// nil-safe via the reconcilePlugins check + the empty-id guard.
|
||||
func (h *RegistryHandler) fireReconcileMCPRecovery(ctx context.Context, workspaceID string) {
|
||||
if h.reconcilePlugins == nil || workspaceID == "" {
|
||||
return
|
||||
}
|
||||
if last, ok := h.mcpRecoveryLastFire.Load(workspaceID); ok {
|
||||
if t, _ := last.(time.Time); time.Since(t) < mcpRecoveryCooldown {
|
||||
return // fired recently — let the in-flight reconcile converge
|
||||
}
|
||||
}
|
||||
// Store BEFORE firing so a concurrent heartbeat sees the fresh timestamp and
|
||||
// does not double-fire (a rare race double-fire is harmless — idempotent).
|
||||
h.mcpRecoveryLastFire.Store(workspaceID, time.Now())
|
||||
rctx := context.WithoutCancel(ctx)
|
||||
wsID := workspaceID
|
||||
globalGoAsync(func() { h.reconcilePlugins(rctx, wsID) })
|
||||
}
|
||||
|
||||
// validateAgentURL rejects URLs that could be used as SSRF vectors against
|
||||
// cloud metadata services or other internal infrastructure.
|
||||
//
|
||||
@@ -667,6 +726,13 @@ func (h *RegistryHandler) Register(c *gin.Context) {
|
||||
msg = "platform agent registered without /opt/molecule-mcp-server; refusing online"
|
||||
reason = "mcp_server_missing"
|
||||
logCode = "platform_agent_mcp_server_missing"
|
||||
// #33 deadlock-break (mirrors the heartbeat gate): this branch
|
||||
// return()s before the register's provisioning→online reconcile
|
||||
// fire, so without this a concierge registering MCP-less would
|
||||
// never get the declared management MCP delivered. Fire it here;
|
||||
// the in-flight guard dedupes against the heartbeat-path fire.
|
||||
// Stays fail-closed (markWorkspaceFailed below + 400 response).
|
||||
h.fireReconcileMCPRecovery(ctx, payload.ID)
|
||||
}
|
||||
log.Printf("Registry register: %s (workspace=%s)", msg, payload.ID)
|
||||
h.markWorkspaceFailed(ctx, payload.ID, msg, reason)
|
||||
@@ -1350,6 +1416,16 @@ func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.Heartbea
|
||||
case !hasMCP:
|
||||
msg = "platform agent heartbeat denied: /opt/molecule-mcp-server missing; refusing to mark online (RCA #2970 FAIL-CLOSED)"
|
||||
reason = "mcp_server_missing"
|
||||
// #33 deadlock-break: the management MCP is delivered on SaaS by
|
||||
// the declared-plugin reconcile (workspace_declared_plugins), but
|
||||
// this branch return()s before the recovery paths that fire it —
|
||||
// so a concierge that boots MCP-less can never self-heal. Fire the
|
||||
// reconcile here to deliver the declared MCP into the running
|
||||
// container. Stays fail-closed for THIS heartbeat (markWorkspaceFailed
|
||||
// below); the NEXT heartbeat — once the runtime re-reads settings.json
|
||||
// and reports mcp_server_present=true — recovers failed→online via
|
||||
// the existing recovery branch. Guarded against per-heartbeat storms.
|
||||
h.fireReconcileMCPRecovery(ctx, payload.WorkspaceID)
|
||||
}
|
||||
log.Printf("Heartbeat: %s (workspace=%s)", msg, payload.WorkspaceID)
|
||||
h.markWorkspaceFailed(ctx, payload.WorkspaceID, msg, reason)
|
||||
|
||||
@@ -0,0 +1,200 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// #33 (RCA#2970 deadlock-break): a kind=platform concierge whose runtime
|
||||
// reports mcp_server_present=false is marked failed and the heartbeat returns
|
||||
// BEFORE the recovery branches that fire the declared-plugin reconcile. On
|
||||
// SaaS that reconcile is the ONLY path that installs the management MCP into
|
||||
// the running container, so without firing it here the concierge is stuck
|
||||
// failed forever (mcp_server_present can never become true). This asserts the
|
||||
// fix: the mcp-missing heartbeat STILL fails closed (markWorkspaceFailed) AND
|
||||
// fires the recovery reconcile so the MCP can be delivered and a later
|
||||
// heartbeat can climb failed→online.
|
||||
func TestHeartbeatHandler_PlatformMCPMissing_FiresRecoveryReconcile(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
reconcileFired := make(chan string, 4)
|
||||
handler.SetReconcileFunc(func(_ context.Context, workspaceID string) {
|
||||
reconcileFired <- workspaceID
|
||||
})
|
||||
|
||||
// prevTask/status read (status=online → not provisioning, so the
|
||||
// prevStatus==provisioning reconcile fire does NOT match; only the
|
||||
// deadlock-break fire can fire here).
|
||||
mock.ExpectQuery("SELECT COALESCE\\(current_task").
|
||||
WithArgs("ws-mcp-fail").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
|
||||
|
||||
// Main heartbeat UPDATE.
|
||||
mock.ExpectExec("UPDATE workspaces SET").
|
||||
WithArgs("ws-mcp-fail", 0.0, "", 0, 60, "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// evaluateStatus: currentStatus=online, kind=platform.
|
||||
mock.ExpectQuery("SELECT status, kind, last_register_failure_at, mcp_unloaded_since FROM workspaces WHERE id =").
|
||||
WithArgs("ws-mcp-fail").
|
||||
WillReturnRows(evalStatusRows("online", "platform", nil, nil))
|
||||
|
||||
// RCA#2970 gate: model secret present (so we fall to the !hasMCP branch).
|
||||
mock.ExpectQuery("SELECT EXISTS").
|
||||
WithArgs("ws-mcp-fail").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
|
||||
|
||||
// markWorkspaceFailed: broadcast (structure_events) then the failed UPDATE.
|
||||
mcpMissingMsg := "platform agent heartbeat denied: /opt/molecule-mcp-server missing; refusing to mark online (RCA #2970 FAIL-CLOSED)"
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("UPDATE workspaces SET status =").
|
||||
WithArgs("ws-mcp-fail", mcpMissingMsg, models.StatusFailed).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
body := `{"workspace_id":"ws-mcp-fail","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":60,"mcp_server_present":false}`
|
||||
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Heartbeat(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
// The deadlock-break reconcile is fire-and-forget via globalGoAsync.
|
||||
select {
|
||||
case got := <-reconcileFired:
|
||||
if got != "ws-mcp-fail" {
|
||||
t.Errorf("recovery reconcile fired for wrong workspace: got %q", got)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("#33 regression: mcp-missing heartbeat did NOT fire the recovery reconcile (concierge would stay failed forever)")
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// A MODEL-missing platform agent must STILL fail closed but must NOT fire the
|
||||
// plugin reconcile — a missing MODEL secret is not something a declared-plugin
|
||||
// reconcile can fix, so the recovery fire is scoped to the !hasMCP branch only.
|
||||
func TestHeartbeatHandler_PlatformModelMissing_DoesNotFireReconcile(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewRegistryHandler(broadcaster)
|
||||
|
||||
reconcileFired := make(chan string, 4)
|
||||
handler.SetReconcileFunc(func(_ context.Context, workspaceID string) {
|
||||
reconcileFired <- workspaceID
|
||||
})
|
||||
|
||||
mock.ExpectQuery("SELECT COALESCE\\(current_task").
|
||||
WithArgs("ws-model-fail").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"current_task", "monthly_spend", "status"}).AddRow("", 0, "online"))
|
||||
mock.ExpectExec("UPDATE workspaces SET").
|
||||
WithArgs("ws-model-fail", 0.0, "", 0, 60, "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectQuery("SELECT status, kind, last_register_failure_at, mcp_unloaded_since FROM workspaces WHERE id =").
|
||||
WithArgs("ws-model-fail").
|
||||
WillReturnRows(evalStatusRows("online", "platform", nil, nil))
|
||||
// Model secret ABSENT → the switch picks the !hasModel branch first.
|
||||
mock.ExpectQuery("SELECT EXISTS").
|
||||
WithArgs("ws-model-fail").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
|
||||
modelMissingMsg := "platform agent heartbeat denied: no seeded MODEL workspace_secret; refusing to mark online (RCA #2970 FAIL-CLOSED)"
|
||||
mock.ExpectExec("INSERT INTO structure_events").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec("UPDATE workspaces SET status =").
|
||||
WithArgs("ws-model-fail", modelMissingMsg, models.StatusFailed).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
// mcp_server_present=true so hasMCP=true; only the model is missing.
|
||||
body := `{"workspace_id":"ws-model-fail","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":60,"mcp_server_present":true}`
|
||||
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Heartbeat(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
select {
|
||||
case got := <-reconcileFired:
|
||||
t.Fatalf("model-missing must NOT fire the plugin reconcile, but it fired for %q", got)
|
||||
case <-time.After(300 * time.Millisecond):
|
||||
// good — no reconcile fired
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// The recovery reconcile is rate-limited per workspace: the gate fails on every
|
||||
// heartbeat until the MCP lands, so without a throttle each blocked beat would
|
||||
// spawn another clone+deliver (restart churn). A new fire is allowed only after
|
||||
// mcpRecoveryCooldown.
|
||||
func TestFireReconcileMCPRecovery_RateLimited(t *testing.T) {
|
||||
handler := NewRegistryHandler(newTestBroadcaster())
|
||||
fired := make(chan string, 8)
|
||||
handler.SetReconcileFunc(func(_ context.Context, workspaceID string) {
|
||||
fired <- workspaceID
|
||||
})
|
||||
ctx := context.Background()
|
||||
|
||||
handler.fireReconcileMCPRecovery(ctx, "ws-x") // fires
|
||||
handler.fireReconcileMCPRecovery(ctx, "ws-x") // within cooldown → must NOT fire
|
||||
|
||||
select {
|
||||
case got := <-fired:
|
||||
if got != "ws-x" {
|
||||
t.Fatalf("reconcile fired for wrong workspace: %q", got)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("first recovery reconcile did not fire")
|
||||
}
|
||||
select {
|
||||
case <-fired:
|
||||
t.Fatal("rate-limit failed: a second reconcile fired within the cooldown window")
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
// good — cooldown held
|
||||
}
|
||||
|
||||
// Simulate the cooldown elapsing → a fresh fire is allowed again so a
|
||||
// genuinely-stuck concierge keeps retrying (gently).
|
||||
handler.mcpRecoveryLastFire.Store("ws-x", time.Now().Add(-2*mcpRecoveryCooldown))
|
||||
handler.fireReconcileMCPRecovery(ctx, "ws-x")
|
||||
select {
|
||||
case <-fired:
|
||||
// good — re-fired after cooldown
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("recovery reconcile did not re-fire after the cooldown elapsed")
|
||||
}
|
||||
}
|
||||
|
||||
// nil reconcile func / empty id must be no-ops (never panic).
|
||||
func TestFireReconcileMCPRecovery_NilSafe(t *testing.T) {
|
||||
h := &RegistryHandler{} // reconcilePlugins is nil
|
||||
h.fireReconcileMCPRecovery(context.Background(), "ws-x")
|
||||
|
||||
h2 := NewRegistryHandler(newTestBroadcaster())
|
||||
h2.SetReconcileFunc(func(_ context.Context, _ string) { t.Fatal("must not fire for empty workspace id") })
|
||||
h2.fireReconcileMCPRecovery(context.Background(), "")
|
||||
}
|
||||
Reference in New Issue
Block a user