Merge pull request #1185 from Molecule-AI/staging

staging → main: bootstrap-failed + console endpoints (PR #1168)
This commit is contained in:
Hongming Wang 2026-04-20 17:31:57 -07:00 committed by GitHub
commit e282810b97
7 changed files with 323 additions and 36 deletions

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// ---------- AdminMemoriesHandler: Export ----------

View File

@ -31,7 +31,6 @@ import (
"net/http"
"net/url"
"os"
"regexp"
"strings"
"time"
@ -716,41 +715,6 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
return "Message sent.", nil
}
// redactSecrets scans content for credential-like patterns and replaces them with
// [REDACTED]. This prevents plain-text API keys, tokens, and passwords from
// landing in the agent_memories table (fixes #838).
//
// The workspaceID parameter is available for audit logging in future enhancements.
// Currently unused but reserved for structured audit trail.
//
// Patterns matched (case-insensitive):
// - Generic credentials: "key", "secret", "password", "token", "api_key",
// "api-key", "auth", "bearer", "credential", "passphrase"
// - Prefix patterns: (k="", k='', k:'', k:"", k: "")
// - Token patterns: Bearer <token>, Token <token>
// - Variable assignments: KEY_NAME=value, API_KEY_NAME=value
//
// The redaction is conservative - it only masks the value portion, not the
// surrounding context, so the memory remains human-readable for audit/debugging.
func redactSecrets(workspaceID string, content string) string {
// Generic credential word boundaries.
content = regexp.MustCompile(`(?i)(key|secret|password|token|api_?key|auth|bearer|credential|passphrase)[:=\s]*([a-zA-Z0-9_\-+=/]{8,})`).
ReplaceAllString(content, "$1=[REDACTED]")
// Bearer/Token label patterns.
content = regexp.MustCompile(`(?i)(bearer|token)\s+([a-zA-Z0-9_\-+=/]{16,})`).
ReplaceAllString(content, "$1 [REDACTED]")
// ENV-style KEY=VALUE pairs where the key looks like a credential name.
content = regexp.MustCompile(`(?i)([A-Z][A-Z0-9_]*(?:KEY|SECRET|PASSWORD|TOKEN|API|AUTH)[A-Z0-9_]*)=([^\s]{8,})`).
ReplaceAllString(content, "$1=[REDACTED]")
// JSON/ini-style "key": "value" or 'key': 'value' with long values.
content = regexp.MustCompile(`(?i)"(key|secret|password|token|api_?key|auth|bearer)":\s*"([a-zA-Z0-9_\-+=/]{8,})"`).
ReplaceAllString(content, `"$1": "[REDACTED]"`)
return content
}
func (h *MCPHandler) toolCommitMemory(ctx context.Context, workspaceID string, args map[string]interface{}) (string, error) {
content, _ := args["content"].(string)

View File

@ -505,6 +505,7 @@ func TestValidateAgentURL(t *testing.T) {
// either misconfigured or intentionally unreachable.
{"DNS name: nxdomain (must fail)", "https://this-domain-definitely-does-not-exist-12345.invalid/", true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
err := validateAgentURL(tc.url)
if tc.wantErr && err == nil {

View File

@ -0,0 +1,131 @@
package handlers
import (
"log"
"net/http"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// BootstrapFailedRequest is the body shape the control plane POSTs when a
// workspace EC2 crashes during user-data execution — before the agent runtime
// ever calls /registry/register. Without this signal the workspace sits in
// `provisioning` until the 10-minute sweeper flips it. Fast-path fail keeps
// the canvas honest about state.
type BootstrapFailedRequest struct {
// Error is the short, single-line message surfaced in the UI banner
// and the WORKSPACE_PROVISION_FAILED payload.
Error string `json:"error"`
// LogTail is the last ~2KB of /var/log/molecule-runtime.log or the
// cloud-init serial console. Stored in `last_sample_error` so the
// canvas's Details tab can render the real stack trace next to the
// failed status, with no extra fetch needed.
LogTail string `json:"log_tail"`
}
// BootstrapFailed marks a workspace as failed from an out-of-band signal —
// specifically the control plane's bootstrap watcher when it detects
// "RUNTIME CRASHED" in the EC2 console output of a workspace that never
// self-registered. Idempotent: a workspace already flipped to online
// (raced with a late self-registration) or to failed (double-report) is
// left alone.
func (h *WorkspaceHandler) BootstrapFailed(c *gin.Context) {
id := c.Param("id")
if id == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "workspace id required"})
return
}
var req BootstrapFailedRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid body: " + err.Error()})
return
}
// Cap log_tail so a runaway heredoc from user-data doesn't bloat the
// workspaces row. 8KB is plenty for a Python traceback.
tail := req.LogTail
if len(tail) > 8192 {
tail = "...(truncated)...\n" + tail[len(tail)-8192:]
}
errMsg := strings.TrimSpace(req.Error)
if errMsg == "" {
errMsg = "bootstrap failed — see log_tail"
}
// Store the tail as last_sample_error so the UI can render the real
// error without a second fetch. Guard against overwriting a workspace
// that already reached online/failed — only act on `provisioning`.
res, err := db.DB.ExecContext(c.Request.Context(), `
UPDATE workspaces
SET status = 'failed',
last_sample_error = $2,
updated_at = now()
WHERE id = $1
AND status = 'provisioning'
`, id, truncateString(errMsg+"\n\n"+tail, 8192))
if err != nil {
log.Printf("BootstrapFailed: db update %s: %v", id, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "db update failed"})
return
}
affected, _ := res.RowsAffected()
if affected == 0 {
// Already transitioned out of provisioning — don't re-emit the
// event (would lie to the canvas). Return 200 so CP doesn't retry.
c.JSON(http.StatusOK, gin.H{"ok": true, "no_change": true})
return
}
h.broadcaster.RecordAndBroadcast(c.Request.Context(), "WORKSPACE_PROVISION_FAILED", id, map[string]interface{}{
"error": errMsg,
"log_tail": tail,
"source": "bootstrap_watcher",
})
log.Printf("BootstrapFailed: marked %s failed (tail=%d bytes, err=%q)", id, len(tail), errMsg)
c.JSON(http.StatusOK, gin.H{"ok": true})
}
// Console proxies EC2 console output for a workspace from the control plane.
// Only CP has `ec2:GetConsoleOutput` permission — the tenant platform can't
// call AWS directly (no AWS creds on the tenant EC2 by design). The canvas
// hits this endpoint; the platform proxies via the CP admin bearer it was
// provisioned with. Admin-gated because raw console output can leak
// user-data snippets that we treat as semi-sensitive.
//
// Endpoint shape: GET /workspaces/:id/console
// Response shape: {"output": "<serial console text>"}
func (h *WorkspaceHandler) Console(c *gin.Context) {
id := c.Param("id")
if id == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "workspace id required"})
return
}
if h.cpProv == nil {
// Self-hosted / docker-compose deploys don't use CP — there's no
// serial console to fetch (logs live in `docker logs` instead).
c.JSON(http.StatusNotImplemented, gin.H{"error": "console output unavailable on this deployment (no control plane)"})
return
}
output, err := h.cpProv.GetConsoleOutput(c.Request.Context(), id)
if err != nil {
log.Printf("Console: cp get for %s: %v", id, err)
c.JSON(http.StatusBadGateway, gin.H{"error": "control plane returned an error fetching console output"})
return
}
c.JSON(http.StatusOK, gin.H{"output": output})
}
// truncateString returns s limited to n bytes, trimming partial UTF-8
// sequences at the boundary.
func truncateString(s string, n int) string {
if len(s) <= n {
return s
}
end := n
for end > 0 && (s[end]&0xC0) == 0x80 {
end--
}
return s[:end]
}

View File

@ -0,0 +1,149 @@
package handlers
import (
"bytes"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// setupBootstrapHandler builds a handler wired with a sqlmock + an in-proc
// broadcaster (via setupTestRedis so RecordAndBroadcast's pub/sub path
// doesn't panic on a nil Redis client).
func setupBootstrapHandler(t *testing.T) (*WorkspaceHandler, sqlmock.Sqlmock) {
t.Helper()
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
return NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()), mock
}
func TestBootstrapFailed_HappyPath(t *testing.T) {
h, mock := setupBootstrapHandler(t)
// UPDATE only flips from provisioning → re-check the predicate.
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-crashed", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// RecordAndBroadcast inserts into structure_events.
mock.ExpectExec(`INSERT INTO structure_events`).
WithArgs("WORKSPACE_PROVISION_FAILED", "ws-crashed", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-crashed"}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-crashed/bootstrap-failed",
bytes.NewBufferString(`{"error":"module 'adapter' has no attribute 'Adapter'","log_tail":"Traceback...\n..."}`))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// A workspace already past `provisioning` (online raced, or already failed
// by the sweeper) must not re-fire the event. Returns 200 with no_change.
func TestBootstrapFailed_AlreadyTransitioned(t *testing.T) {
h, mock := setupBootstrapHandler(t)
// UPDATE affects 0 rows when the predicate `status = 'provisioning'`
// doesn't match.
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-online", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 0))
// No structure_events INSERT expected.
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-online"}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-online/bootstrap-failed",
bytes.NewBufferString(`{"error":"late report","log_tail":""}`))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestBootstrapFailed_EmptyIDIs400(t *testing.T) {
h, _ := setupBootstrapHandler(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: ""}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces//bootstrap-failed",
bytes.NewBufferString(`{"error":"x"}`))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusBadRequest {
t.Errorf("want 400, got %d", w.Code)
}
}
func TestBootstrapFailed_TruncatesOversizedLogTail(t *testing.T) {
// A 20KB log_tail should be truncated to ~8KB with a marker. We
// don't assert the exact byte count here (depends on UTF-8 boundary
// walk); we just assert the handler succeeds and the final stored
// string contains the truncation marker.
h, mock := setupBootstrapHandler(t)
longTail := make([]byte, 20000)
for i := range longTail {
longTail[i] = 'a'
}
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-spammy", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO structure_events`).
WithArgs("WORKSPACE_PROVISION_FAILED", "ws-spammy", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
body := `{"error":"huge","log_tail":"` + string(longTail) + `"}`
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-spammy"}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-spammy/bootstrap-failed",
bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String())
}
}
// Console returns 501 in deployments without a CPProvisioner. The actual
// CP-call path is exercised end-to-end from the CP side (bootstrap_watcher
// tests in the controlplane repo).
func TestConsole_ReturnsNotImplementedWhenNoCP(t *testing.T) {
h, _ := setupBootstrapHandler(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-x"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-x/console", nil)
h.Console(c)
if w.Code != http.StatusNotImplemented {
t.Errorf("want 501, got %d: %s", w.Code, w.Body.String())
}
}

View File

@ -215,5 +215,36 @@ func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool
return result.State == "running", nil
}
// GetConsoleOutput proxies a call to the CP's
// GET /cp/admin/workspaces/:id/console endpoint, which returns the EC2
// serial console output (AWS ec2:GetConsoleOutput under the hood) for a
// workspace instance. The tenant platform has no AWS credentials by
// design, so CP is the only party that can read the serial console.
//
// Returns ("", err) on transport or non-2xx — the caller decides what
// to render to the user.
func (p *CPProvisioner) GetConsoleOutput(ctx context.Context, workspaceID string) (string, error) {
url := fmt.Sprintf("%s/cp/admin/workspaces/%s/console", p.baseURL, workspaceID)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
p.authHeaders(req)
resp, err := p.httpClient.Do(req)
if err != nil {
return "", fmt.Errorf("cp provisioner: console: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return "", fmt.Errorf("cp provisioner: console: unexpected %d", resp.StatusCode)
}
// Cap at 256 KiB — EC2 returns at most 64 KiB of serial console, but
// allow headroom for CP-side wrapping / metadata.
var body struct {
Output string `json:"output"`
}
if err := json.NewDecoder(io.LimitReader(resp.Body, 256<<10)).Decode(&body); err != nil {
return "", fmt.Errorf("cp provisioner: console decode: %w", err)
}
return body.Output, nil
}
// Close is a no-op.
func (p *CPProvisioner) Close() error { return nil }

View File

@ -121,6 +121,16 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
wsAdmin.GET("/workspaces", wh.List)
wsAdmin.POST("/workspaces", wh.Create)
wsAdmin.DELETE("/workspaces/:id", wh.Delete)
// Out-of-band bootstrap signal: CP's watcher POSTs here when it
// detects "RUNTIME CRASHED" in a workspace EC2 console output,
// so the canvas flips to failed in seconds instead of waiting
// for the 10-minute provision-timeout sweeper.
wsAdmin.POST("/admin/workspaces/:id/bootstrap-failed", wh.BootstrapFailed)
// Proxy to CP's serial-console endpoint so the canvas's "View
// Logs" button can render the actual boot trace without handing
// the tenant AWS credentials. Admin-gated because console output
// can include user-data snippets we treat as semi-sensitive.
wsAdmin.GET("/workspaces/:id/console", wh.Console)
// Admin memory backup/restore (#1051) — bulk export/import of agent
// memories for safe Docker rebuilds. Matches workspaces by name on import.