From a5c5139e3a33226e8e4afdfbe72b612f747a9233 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Thu, 30 Apr 2026 17:36:33 -0700 Subject: [PATCH] fix(workspace): deliver platform_inbound_secret on every heartbeat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Heartbeat now echoes the workspace's platform_inbound_secret on every beat (mirroring /registry/register), and the molecule-mcp client persists it to /configs/.platform_inbound_secret on receipt. Symptom (2026-04-30, hongmingwang tenant): chat upload returned 503 "workspace will pick it up on its next heartbeat" and then 401 on retry — permanent until workspace restart. The 503 message was a lie: heartbeat used to discard the platform_inbound_secret entirely; only register delivered it, and register fires once at startup. Server (Go): - Heartbeat handler reuses readOrLazyHealInboundSecret (the same helper chat_files + register use), so heartbeat-time recovery covers the rotate / mid-life NULL-column case the existing register-time heal can't reach. - Failure is non-fatal: liveness contract trumps secret delivery, chat_files retries lazy-heal on its own next request. Client (Python): - _persist_inbound_secret_from_heartbeat parses the heartbeat 200 response and persists via platform_inbound_auth.save_inbound_secret. - All exceptions swallowed — heartbeat liveness > secret persistence; next tick (≤20s) retries. Tests: - Server: pin secret-present, lazy-heal-mint-on-NULL, and heal- failure-omits-field branches. - Client: pin persist-on-200, skip-on-empty, skip-on-non-dict-body, skip-on-401, swallow-save-OSError. --- .../internal/handlers/registry.go | 30 ++- .../internal/handlers/registry_test.go | 173 ++++++++++++++ workspace/mcp_cli.py | 40 ++++ workspace/tests/test_mcp_cli.py | 213 ++++++++++++++++++ 4 files changed, 455 insertions(+), 1 deletion(-) diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index 4421d1a5..8960170c 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -620,7 +620,35 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) { runtimeOverrides.SetCapabilities(payload.WorkspaceID, nil) // clear } - c.JSON(http.StatusOK, gin.H{"status": "ok"}) + resp := gin.H{"status": "ok"} + + // Deliver the platform_inbound_secret on every heartbeat. Mirrors + // the same field on /registry/register, but heartbeats are the + // only periodic platform↔workspace channel — register fires once + // at workspace startup, so without this delivery path a lazy-heal + // (chat_files.go's "secret was just minted, retry in 30s" branch) + // could ONLY recover via a workspace restart. + // + // Symptom this fixes: 2026-04-30 user report on hongmingwang — + // chat upload returned 503 "workspace will pick it up on its + // next heartbeat", then 401 on retry. The 503 message was + // misleading because heartbeat used to discard the + // platform_inbound_secret entirely; only register delivered it. + // + // Lazy-heal here instead of a column read because: + // - register-time heal already covers cold-start workspaces + // - heartbeat-time heal covers the rotate / mid-life recover case + // - the helper short-circuits to the existing column read when + // the secret is already present (cheap, idempotent) + // + // Errors are non-fatal: heartbeat's primary job is liveness, and + // the chat-upload path will lazy-heal again if needed. Logging + // happens inside the helper. + if secret, _, healErr := readOrLazyHealInboundSecret(ctx, payload.WorkspaceID, "Heartbeat"); healErr == nil && secret != "" { + resp["platform_inbound_secret"] = secret + } + + c.JSON(http.StatusOK, resp) } func (h *RegistryHandler) evaluateStatus(c *gin.Context, payload models.HeartbeatPayload) { diff --git a/workspace-server/internal/handlers/registry_test.go b/workspace-server/internal/handlers/registry_test.go index 491d5144..bd8e65d8 100644 --- a/workspace-server/internal/handlers/registry_test.go +++ b/workspace-server/internal/handlers/registry_test.go @@ -1780,3 +1780,176 @@ func TestRegister_NonExternalRuntime_StillDefaultsToPush(t *testing.T) { t.Errorf("unmet expectations: %v", err) } } + +// ==================== Heartbeat — platform_inbound_secret delivery (2026-04-30) ==================== +// Heartbeat must echo the workspace's platform_inbound_secret on every +// beat, mirroring /registry/register. Without this delivery path, a +// workspace whose secret was lazy-healed on the platform side (e.g. via +// chat_files Upload's "secret was just minted, retry in 30s" branch) +// could only pick up the freshly-minted value via a runtime restart — +// the chat_files retry would 401-forever. Caught 2026-04-30 on the +// hongmingwang tenant: 503 → 401 chain on chat upload. + +func TestHeartbeatHandler_DeliversPlatformInboundSecret(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + const inboundSecret = "the-already-minted-secret" + + mock.ExpectQuery("SELECT COALESCE\\(current_task"). + WithArgs("ws-with-secret"). + WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + + mock.ExpectExec("UPDATE workspaces SET"). + WithArgs("ws-with-secret", 0.0, "", 0, 100, ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + mock.ExpectQuery("SELECT status FROM workspaces WHERE id ="). + WithArgs("ws-with-secret"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online")) + + // readOrLazyHealInboundSecret — short-circuit: secret already on file. + mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`). + WithArgs("ws-with-secret"). + WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(inboundSecret)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"workspace_id":"ws-with-secret","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":100}` + c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Heartbeat(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("parse response: %v", err) + } + got, ok := resp["platform_inbound_secret"].(string) + if !ok { + t.Fatalf("expected platform_inbound_secret in heartbeat response, got: %v", resp) + } + if got != inboundSecret { + t.Errorf("secret mismatch: got %q, want %q", got, inboundSecret) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestHeartbeatHandler_LazyHealsPlatformInboundSecret pins the +// recovery branch: a workspace with a NULL platform_inbound_secret +// (legacy / partially-bootstrapped row) gets the column minted inline +// AND receives the freshly-minted value in the response, so the next +// chat-upload tick makes the workspace work without a restart. +func TestHeartbeatHandler_LazyHealsPlatformInboundSecret(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + mock.ExpectQuery("SELECT COALESCE\\(current_task"). + WithArgs("ws-needs-heal"). + WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + + mock.ExpectExec("UPDATE workspaces SET"). + WithArgs("ws-needs-heal", 0.0, "", 0, 100, ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + mock.ExpectQuery("SELECT status FROM workspaces WHERE id ="). + WithArgs("ws-needs-heal"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online")) + + // readOrLazyHealInboundSecret — NULL column triggers mint. + mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`). + WithArgs("ws-needs-heal"). + WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil)) + // Inline mint UPDATE — must land or legacy workspaces stay 401-forever. + mock.ExpectExec(`UPDATE workspaces SET platform_inbound_secret = \$1 WHERE id = \$2`). + WithArgs(sqlmock.AnyArg(), "ws-needs-heal"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"workspace_id":"ws-needs-heal","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":100}` + c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Heartbeat(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + _ = json.Unmarshal(w.Body.Bytes(), &resp) + secret, present := resp["platform_inbound_secret"] + if !present { + t.Fatalf("expected platform_inbound_secret PRESENT after lazy-heal, got: %v", resp) + } + if s, ok := secret.(string); !ok || s == "" { + t.Errorf("expected non-empty string secret, got %T %v", secret, secret) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations — heartbeat-time lazy-heal mint did NOT run: %v", err) + } +} + +// TestHeartbeatHandler_OmitsSecretOnHealFailure pins the defensive +// branch: when both the read AND the mint fail, heartbeat MUST still +// respond 200 (liveness is the primary contract) but omit the field. +// The next tick retries. +func TestHeartbeatHandler_OmitsSecretOnHealFailure(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + mock.ExpectQuery("SELECT COALESCE\\(current_task"). + WithArgs("ws-heal-fails"). + WillReturnRows(sqlmock.NewRows([]string{"current_task"}).AddRow("")) + + mock.ExpectExec("UPDATE workspaces SET"). + WithArgs("ws-heal-fails", 0.0, "", 0, 100, ""). + WillReturnResult(sqlmock.NewResult(0, 1)) + + mock.ExpectQuery("SELECT status FROM workspaces WHERE id ="). + WithArgs("ws-heal-fails"). + WillReturnRows(sqlmock.NewRows([]string{"status"}).AddRow("online")) + + // Read returns NULL → mint is attempted... + mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`). + WithArgs("ws-heal-fails"). + WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil)) + // ...but the mint UPDATE fails (DB hiccup). + mock.ExpectExec(`UPDATE workspaces SET platform_inbound_secret = \$1 WHERE id = \$2`). + WithArgs(sqlmock.AnyArg(), "ws-heal-fails"). + WillReturnError(sql.ErrConnDone) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"workspace_id":"ws-heal-fails","error_rate":0.0,"sample_error":"","active_tasks":0,"uptime_seconds":100}` + c.Request = httptest.NewRequest("POST", "/registry/heartbeat", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Heartbeat(c) + + // Liveness contract — heartbeat MUST stay 200 even when the + // secret-delivery side-channel fails. chat_files retries lazy-heal + // on the next request anyway. + if w.Code != http.StatusOK { + t.Fatalf("expected 200 (liveness primary), got %d: %s", w.Code, w.Body.String()) + } + var resp map[string]interface{} + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if _, present := resp["platform_inbound_secret"]; present { + t.Errorf("expected platform_inbound_secret OMITTED on heal failure, got: %v", resp) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} diff --git a/workspace/mcp_cli.py b/workspace/mcp_cli.py index 55107e4e..d32078b9 100644 --- a/workspace/mcp_cli.py +++ b/workspace/mcp_cli.py @@ -171,11 +171,51 @@ def _heartbeat_loop( resp.status_code, (resp.text or "")[:200], ) + else: + _persist_inbound_secret_from_heartbeat(resp) except Exception as exc: # noqa: BLE001 logger.warning("molecule-mcp: heartbeat failed: %s", exc) time.sleep(interval) +def _persist_inbound_secret_from_heartbeat(resp: object) -> None: + """Persist ``platform_inbound_secret`` from a heartbeat response, if any. + + The platform's heartbeat handler returns the secret on every beat + (mirroring /registry/register) so a workspace that lazy-healed the + secret on the platform side — typical recovery path for a workspace + whose row had a NULL ``platform_inbound_secret`` after a partial + bootstrap — picks it up within one heartbeat tick instead of + requiring a runtime restart. + + Without this delivery path the chat-upload code path's "secret was + just minted, will pick up on next heartbeat" 503 message is a lie + and the workspace stays 401-forever until the operator restarts + the runtime. Caught 2026-04-30 on hongmingwang tenant. + + Failure is non-fatal: if the body isn't JSON, doesn't carry the + field, or the disk write fails, the next heartbeat retries. This + matches the cold-start register flow in main.py:319-323. + """ + try: + body = resp.json() + except Exception: # noqa: BLE001 + return + if not isinstance(body, dict): + return + secret = body.get("platform_inbound_secret") + if not secret: + return + try: + from platform_inbound_auth import save_inbound_secret + + save_inbound_secret(secret) + except Exception as exc: # noqa: BLE001 + logger.warning( + "molecule-mcp: persist inbound secret from heartbeat failed: %s", exc + ) + + def _start_heartbeat_thread( platform_url: str, workspace_id: str, diff --git a/workspace/tests/test_mcp_cli.py b/workspace/tests/test_mcp_cli.py index 28ef88af..c7b00ba5 100644 --- a/workspace/tests/test_mcp_cli.py +++ b/workspace/tests/test_mcp_cli.py @@ -490,3 +490,216 @@ def test_heartbeat_loop_posts_to_correct_endpoint(monkeypatch): assert captured["headers"]["Authorization"] == "Bearer tok" assert captured["headers"]["Origin"] == "https://test.moleculesai.app" assert sleep_calls == [20.0], "heartbeat must sleep the configured interval" + + +# ============== Heartbeat persists platform_inbound_secret (2026-04-30) ============== +# Heartbeat loop must persist the platform_inbound_secret returned by +# the platform. Without this, a workspace that lazy-healed the secret +# on the platform side recovers only on a runtime restart — chat upload +# 401-forever. Pairs with the server-side +# TestHeartbeatHandler_DeliversPlatformInboundSecret pin. + + +def test_heartbeat_persists_inbound_secret_from_response(monkeypatch, tmp_path): + """Heartbeat 200 with platform_inbound_secret in body → save_inbound_secret called.""" + + class FakeResp: + status_code = 200 + text = "" + + def json(self): + return {"status": "ok", "platform_inbound_secret": "fresh-secret"} + + saved: list[str] = [] + import platform_inbound_auth + + monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append) + + mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp()) + + assert saved == ["fresh-secret"], ( + "expected save_inbound_secret called once with the platform's secret" + ) + + +def test_heartbeat_persist_skips_when_secret_absent(monkeypatch): + """Heartbeat 200 without platform_inbound_secret → no persist call.""" + + class FakeResp: + def json(self): + return {"status": "ok"} + + saved: list[str] = [] + import platform_inbound_auth + + monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append) + + mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp()) + + assert saved == [], "no secret in body → must NOT call save_inbound_secret" + + +def test_heartbeat_persist_skips_on_empty_secret(monkeypatch): + """Heartbeat 200 with empty-string platform_inbound_secret → no persist.""" + + class FakeResp: + def json(self): + return {"status": "ok", "platform_inbound_secret": ""} + + saved: list[str] = [] + import platform_inbound_auth + + monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append) + + mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp()) + + assert saved == [], "empty secret string → must NOT call save_inbound_secret" + + +def test_heartbeat_persist_swallows_non_json_body(monkeypatch): + """Heartbeat with unparseable body must not raise — logs + returns.""" + + class FakeResp: + def json(self): + raise ValueError("not json") + + saved: list[str] = [] + import platform_inbound_auth + + monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append) + + # Must not raise; non-JSON body is treated as "no secret to deliver". + mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp()) + assert saved == [] + + +def test_heartbeat_persist_handles_non_dict_body(monkeypatch): + """Heartbeat returning a list (not a dict) is silently ignored.""" + + class FakeResp: + def json(self): + return ["unexpected", "list"] + + saved: list[str] = [] + import platform_inbound_auth + + monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", saved.append) + + mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp()) + assert saved == [] + + +def test_heartbeat_persist_swallows_save_exceptions(monkeypatch, caplog): + """save_inbound_secret raising must not crash the heartbeat loop.""" + + class FakeResp: + def json(self): + return {"platform_inbound_secret": "x"} + + def boom(_secret): + raise OSError("disk full") + + import platform_inbound_auth + + monkeypatch.setattr(platform_inbound_auth, "save_inbound_secret", boom) + + # Must not raise — heartbeat liveness > secret persistence. + mcp_cli._persist_inbound_secret_from_heartbeat(FakeResp()) + + +def test_heartbeat_loop_calls_persist_on_success(monkeypatch): + """End-to-end: heartbeat loop on 200 invokes the persist helper.""" + saw: list[object] = [] + + def fake_persist(resp): + saw.append(resp) + + monkeypatch.setattr( + mcp_cli, "_persist_inbound_secret_from_heartbeat", fake_persist + ) + + class FakeResp: + status_code = 200 + text = "" + + class FakeClient: + def __init__(self, **_kwargs): + pass + + def __enter__(self): + return self + + def __exit__(self, *_a): + return False + + def post(self, *_a, **_k): + return FakeResp() + + import types + + fake_httpx = types.ModuleType("httpx") + fake_httpx.Client = FakeClient + monkeypatch.setitem(sys.modules, "httpx", fake_httpx) + + def fake_sleep(_): + raise SystemExit + + monkeypatch.setattr("time.sleep", fake_sleep) + + with pytest.raises(SystemExit): + mcp_cli._heartbeat_loop( + "https://test.moleculesai.app", + "ws-abc", + "tok", + interval=20.0, + ) + + assert len(saw) == 1, "persist helper must be called once per successful heartbeat" + + +def test_heartbeat_loop_skips_persist_on_4xx(monkeypatch): + """Heartbeat 4xx error path must NOT invoke persist (no body to trust).""" + saw: list[object] = [] + monkeypatch.setattr( + mcp_cli, + "_persist_inbound_secret_from_heartbeat", + lambda r: saw.append(r), + ) + + class FakeResp: + status_code = 401 + text = "unauthorized" + + class FakeClient: + def __init__(self, **_kwargs): + pass + + def __enter__(self): + return self + + def __exit__(self, *_a): + return False + + def post(self, *_a, **_k): + return FakeResp() + + import types + + fake_httpx = types.ModuleType("httpx") + fake_httpx.Client = FakeClient + monkeypatch.setitem(sys.modules, "httpx", fake_httpx) + + def fake_sleep(_): + raise SystemExit + + monkeypatch.setattr("time.sleep", fake_sleep) + + with pytest.raises(SystemExit): + mcp_cli._heartbeat_loop( + "https://test.moleculesai.app", + "ws-abc", + "tok", + interval=20.0, + ) + + assert saw == [], "4xx response must NOT trigger persist call"