fix(workspace): deliver platform_inbound_secret on every heartbeat

Heartbeat now echoes the workspace's platform_inbound_secret on every
beat (mirroring /registry/register), and the molecule-mcp client
persists it to /configs/.platform_inbound_secret on receipt.

Symptom (2026-04-30, hongmingwang tenant): chat upload returned 503
"workspace will pick it up on its next heartbeat" and then 401 on
retry — permanent until workspace restart. The 503 message was a lie:
heartbeat used to discard the platform_inbound_secret entirely; only
register delivered it, and register fires once at startup.

Server (Go):
  - Heartbeat handler reuses readOrLazyHealInboundSecret (the same
    helper chat_files + register use), so heartbeat-time recovery
    covers the rotate / mid-life NULL-column case the existing
    register-time heal can't reach.
  - Failure is non-fatal: liveness contract trumps secret delivery,
    chat_files retries lazy-heal on its own next request.

Client (Python):
  - _persist_inbound_secret_from_heartbeat parses the heartbeat 200
    response and persists via platform_inbound_auth.save_inbound_secret.
  - All exceptions swallowed — heartbeat liveness > secret persistence;
    next tick (≤20s) retries.

Tests:
  - Server: pin secret-present, lazy-heal-mint-on-NULL, and heal-
    failure-omits-field branches.
  - Client: pin persist-on-200, skip-on-empty, skip-on-non-dict-body,
    skip-on-401, swallow-save-OSError.
This commit is contained in:
Hongming Wang 2026-04-30 17:36:33 -07:00
parent 665582b612
commit a5c5139e3a
4 changed files with 455 additions and 1 deletions

View File

@ -620,7 +620,35 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) {
runtimeOverrides.SetCapabilities(payload.WorkspaceID, nil) // clear
}
c.JSON(http.StatusOK, gin.H{"status": "ok"})
resp := gin.H{"status": "ok"}
// Deliver the platform_inbound_secret on every heartbeat. Mirrors
// the same field on /registry/register, but heartbeats are the
// only periodic platform↔workspace channel — register fires once
// at workspace startup, so without this delivery path a lazy-heal
// (chat_files.go's "secret was just minted, retry in 30s" branch)
// could ONLY recover via a workspace restart.
//
// Symptom this fixes: 2026-04-30 user report on hongmingwang —
// chat upload returned 503 "workspace will pick it up on its
// next heartbeat", then 401 on retry. The 503 message was
// misleading because heartbeat used to discard the
// platform_inbound_secret entirely; only register delivered it.
//
// Lazy-heal here instead of a column read because:
// - register-time heal already covers cold-start workspaces
// - heartbeat-time heal covers the rotate / mid-life recover case
// - the helper short-circuits to the existing column read when
// the secret is already present (cheap, idempotent)
//
// Errors are non-fatal: heartbeat's primary job is liveness, and
// the chat-upload path will lazy-heal again if needed. Logging
// happens inside the helper.
if secret, _, healErr := readOrLazyHealInboundSecret(ctx, payload.WorkspaceID, "Heartbeat"); healErr == nil && secret != "" {
resp["platform_inbound_secret"] = secret
}
c.JSON(http.StatusOK, resp)
}
func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.HeartbeatPayload) {

View File

@ -1780,3 +1780,176 @@ func TestRegister_NonExternalRuntime_StillDefaultsToPush(t *testing.T) {
t.Errorf("unmet expectations: %v", err)
}
}
// ==================== Heartbeat — platform_inbound_secret delivery (2026-04-30) ====================
// Heartbeat must echo the workspace's platform_inbound_secret on every
// beat, mirroring /registry/register. Without this delivery path, a
// workspace whose secret was lazy-healed on the platform side (e.g. via
// chat_files Upload's "secret was just minted, retry in 30s" branch)
// could only pick up the freshly-minted value via a runtime restart —
// the chat_files retry would 401-forever. Caught 2026-04-30 on the
// hongmingwang tenant: 503 → 401 chain on chat upload.
func TestHeartbeatHandler_DeliversPlatformInboundSecret(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
const inboundSecret = "the-already-minted-secret"
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-with-secret").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-with-secret", 0.0, "", 0, 100, "").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
WithArgs("ws-with-secret").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
// readOrLazyHealInboundSecret — short-circuit: secret already on file.
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WithArgs("ws-with-secret").
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(inboundSecret))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"workspace_id":"ws-with-secret","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":100}`
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Heartbeat(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse response: %v", err)
}
got, ok := resp["platform_inbound_secret"].(string)
if !ok {
t.Fatalf("expected platform_inbound_secret in heartbeat response, got: %v", resp)
}
if got != inboundSecret {
t.Errorf("secret mismatch: got %q, want %q", got, inboundSecret)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestHeartbeatHandler_LazyHealsPlatformInboundSecret pins the
// recovery branch: a workspace with a NULL platform_inbound_secret
// (legacy / partially-bootstrapped row) gets the column minted inline
// AND receives the freshly-minted value in the response, so the next
// chat-upload tick makes the workspace work without a restart.
func TestHeartbeatHandler_LazyHealsPlatformInboundSecret(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-needs-heal").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-needs-heal", 0.0, "", 0, 100, "").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
WithArgs("ws-needs-heal").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
// readOrLazyHealInboundSecret — NULL column triggers mint.
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WithArgs("ws-needs-heal").
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
// Inline mint UPDATE — must land or legacy workspaces stay 401-forever.
mock.ExpectExec(`UPDATE workspaces SET platform_inbound_secret = \$1 WHERE id = \$2`).
WithArgs(sqlmock.AnyArg(), "ws-needs-heal").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"workspace_id":"ws-needs-heal","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":100}`
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Heartbeat(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
_ = json.Unmarshal(w.Body.Bytes(), &resp)
secret, present := resp["platform_inbound_secret"]
if !present {
t.Fatalf("expected platform_inbound_secret PRESENT after lazy-heal, got: %v", resp)
}
if s, ok := secret.(string); !ok || s == "" {
t.Errorf("expected non-empty string secret, got %T %v", secret, secret)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations — heartbeat-time lazy-heal mint did NOT run: %v", err)
}
}
// TestHeartbeatHandler_OmitsSecretOnHealFailure pins the defensive
// branch: when both the read AND the mint fail, heartbeat MUST still
// respond 200 (liveness is the primary contract) but omit the field.
// The next tick retries.
func TestHeartbeatHandler_OmitsSecretOnHealFailure(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
mock.ExpectQuery("SELECT COALESCE\\(current_task").
WithArgs("ws-heal-fails").
WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow(""))
mock.ExpectExec("UPDATE workspaces SET").
WithArgs("ws-heal-fails", 0.0, "", 0, 100, "").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT status FROM workspaces WHERE id =").
WithArgs("ws-heal-fails").
WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online"))
// Read returns NULL → mint is attempted...
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WithArgs("ws-heal-fails").
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
// ...but the mint UPDATE fails (DB hiccup).
mock.ExpectExec(`UPDATE workspaces SET platform_inbound_secret = \$1 WHERE id = \$2`).
WithArgs(sqlmock.AnyArg(), "ws-heal-fails").
WillReturnError(sql.ErrConnDone)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"workspace_id":"ws-heal-fails","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":100}`
c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Heartbeat(c)
// Liveness contract — heartbeat MUST stay 200 even when the
// secret-delivery side-channel fails. chat_files retries lazy-heal
// on the next request anyway.
if w.Code != http.StatusOK {
t.Fatalf("expected 200 (liveness primary), got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
_ = json.Unmarshal(w.Body.Bytes(), &resp)
if _, present := resp["platform_inbound_secret"]; present {
t.Errorf("expected platform_inbound_secret OMITTED on heal failure, got: %v", resp)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}

View File

@ -171,11 +171,51 @@ def _heartbeat_loop(
resp.status_code,
(resp.text or "")[:200],
)
else:
_persist_inbound_secret_from_heartbeat(resp)
except Exception as exc: # noqa: BLE001
logger.warning("molecule-mcp: heartbeat failed: %s", exc)
time.sleep(interval)
def _persist_inbound_secret_from_heartbeat(resp: object) -> None:
"""Persist ``platform_inbound_secret`` from a heartbeat response, if any.
The platform's heartbeat handler returns the secret on every beat
(mirroring /registry/register) so a workspace that lazy-healed the
secret on the platform side typical recovery path for a workspace
whose row had a NULL ``platform_inbound_secret`` after a partial
bootstrap picks it up within one heartbeat tick instead of
requiring a runtime restart.
Without this delivery path the chat-upload code path's "secret was
just minted, will pick up on next heartbeat" 503 message is a lie
and the workspace stays 401-forever until the operator restarts
the runtime. Caught 2026-04-30 on hongmingwang tenant.
Failure is non-fatal: if the body isn't JSON, doesn't carry the
field, or the disk write fails, the next heartbeat retries. This
matches the cold-start register flow in main.py:319-323.
"""
try:
body = resp.json()
except Exception: # noqa: BLE001
return
if not isinstance(body, dict):
return
secret = body.get("platform_inbound_secret")
if not secret:
return
try:
from platform_inbound_auth import save_inbound_secret
save_inbound_secret(secret)
except Exception as exc: # noqa: BLE001
logger.warning(
"molecule-mcp: persist inbound secret from heartbeat failed: %s", exc
)
def _start_heartbeat_thread(
platform_url: str,
workspace_id: str,

View File

@ -490,3 +490,216 @@ def test_heartbeat_loop_posts_to_correct_endpoint(monkeypatch):
assert captured["headers"]["Authorization"] == "Bearer tok"
assert captured["headers"]["Origin"] == "https://test.moleculesai.app"
assert sleep_calls == [20.0], "heartbeat must sleep the configured interval"
# ============== Heartbeat persists platform_inbound_secret (2026-04-30) ==============
# Heartbeat loop must persist the platform_inbound_secret returned by
# the platform. Without this, a workspace that lazy-healed the secret
# on the platform side recovers only on a runtime restart — chat upload
# 401-forever. Pairs with the server-side
# TestHeartbeatHandler_DeliversPlatformInboundSecret pin.
def test_heartbeat_persists_inbound_secret_from_response(monkeypatch, tmp_path):
"""Heartbeat 200 with platform_inbound_secret in body → save_inbound_secret called."""
class FakeResp:
status_code = 200
text = ""
def json(self):
return {"status": "ok", "platform_inbound_secret": "fresh-secret"}
saved: list[str] = []
import platform_inbound_auth
monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append)
mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp())
assert saved == ["fresh-secret"], (
"expected save_inbound_secret called once with the platform's secret"
)
def test_heartbeat_persist_skips_when_secret_absent(monkeypatch):
"""Heartbeat 200 without platform_inbound_secret → no persist call."""
class FakeResp:
def json(self):
return {"status": "ok"}
saved: list[str] = []
import platform_inbound_auth
monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append)
mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp())
assert saved == [], "no secret in body → must NOT call save_inbound_secret"
def test_heartbeat_persist_skips_on_empty_secret(monkeypatch):
"""Heartbeat 200 with empty-string platform_inbound_secret → no persist."""
class FakeResp:
def json(self):
return {"status": "ok", "platform_inbound_secret": ""}
saved: list[str] = []
import platform_inbound_auth
monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append)
mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp())
assert saved == [], "empty secret string → must NOT call save_inbound_secret"
def test_heartbeat_persist_swallows_non_json_body(monkeypatch):
"""Heartbeat with unparseable body must not raise — logs + returns."""
class FakeResp:
def json(self):
raise ValueError("not json")
saved: list[str] = []
import platform_inbound_auth
monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append)
# Must not raise; non-JSON body is treated as "no secret to deliver".
mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp())
assert saved == []
def test_heartbeat_persist_handles_non_dict_body(monkeypatch):
"""Heartbeat returning a list (not a dict) is silently ignored."""
class FakeResp:
def json(self):
return ["unexpected", "list"]
saved: list[str] = []
import platform_inbound_auth
monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append)
mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp())
assert saved == []
def test_heartbeat_persist_swallows_save_exceptions(monkeypatch, caplog):
"""save_inbound_secret raising must not crash the heartbeat loop."""
class FakeResp:
def json(self):
return {"platform_inbound_secret": "x"}
def boom(_secret):
raise OSError("disk full")
import platform_inbound_auth
monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", boom)
# Must not raise — heartbeat liveness > secret persistence.
mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp())
def test_heartbeat_loop_calls_persist_on_success(monkeypatch):
"""End-to-end: heartbeat loop on 200 invokes the persist helper."""
saw: list[object] = []
def fake_persist(resp):
saw.append(resp)
monkeypatch.setattr(
mcp_cli, "_persist_inbound_secret_from_heartbeat", fake_persist
)
class FakeResp:
status_code = 200
text = ""
class FakeClient:
def __init__(self, **_kwargs):
pass
def __enter__(self):
return self
def __exit__(self, *_a):
return False
def post(self, *_a, **_k):
return FakeResp()
import types
fake_httpx = types.ModuleType("httpx")
fake_httpx.Client = FakeClient
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
def fake_sleep(_):
raise SystemExit
monkeypatch.setattr("time.sleep", fake_sleep)
with pytest.raises(SystemExit):
mcp_cli._heartbeat_loop(
"https://test.moleculesai.app",
"ws-abc",
"tok",
interval=20.0,
)
assert len(saw) == 1, "persist helper must be called once per successful heartbeat"
def test_heartbeat_loop_skips_persist_on_4xx(monkeypatch):
"""Heartbeat 4xx error path must NOT invoke persist (no body to trust)."""
saw: list[object] = []
monkeypatch.setattr(
mcp_cli,
"_persist_inbound_secret_from_heartbeat",
lambda r: saw.append(r),
)
class FakeResp:
status_code = 401
text = "unauthorized"
class FakeClient:
def __init__(self, **_kwargs):
pass
def __enter__(self):
return self
def __exit__(self, *_a):
return False
def post(self, *_a, **_k):
return FakeResp()
import types
fake_httpx = types.ModuleType("httpx")
fake_httpx.Client = FakeClient
monkeypatch.setitem(sys.modules, "httpx", fake_httpx)
def fake_sleep(_):
raise SystemExit
monkeypatch.setattr("time.sleep", fake_sleep)
with pytest.raises(SystemExit):
mcp_cli._heartbeat_loop(
"https://test.moleculesai.app",
"ws-abc",
"tok",
interval=20.0,
)
assert saw == [], "4xx response must NOT trigger persist call"