Merge remote-tracking branch 'origin/staging' into feat/failed-workspace-error-logs

This commit is contained in:
Hongming Wang 2026-04-20 17:32:24 -07:00
commit 6f22b40ee0
10 changed files with 855 additions and 361 deletions

View File

@ -0,0 +1,280 @@
---
title: "One Canvas, Every Agent: Remote AI Agents and Fleet Visibility on Molecule AI"
date: 2026-04-20
slug: remote-ai-agents
description: "Your Claude Code laptop, your LangGraph cloud instance, and your OpenClaw server — all on the same canvas. Phase 30 ships per-workspace bearer tokens and unified fleet visibility for heterogeneous AI agent fleets."
tags: [platform, remote-agents, fleet-management, a2a]
---
# One Canvas, Every Agent: Remote AI Agents and Fleet Visibility on Molecule AI
> "Our agents need to talk to each other even when they're in different clouds — and we need to see the whole fleet in one place without stitching together five different dashboards."
>
> — Infrastructure lead at a mid-stage SaaS company, describing what they needed before finding Molecule AI Phase 30
That's the problem. Not a hypothetical one.
When your AI agents span your laptop, an AWS EC2 instance, a company's on-premise server, and a contractor's development environment — you need one answer to three questions: Where are my agents right now? What are they doing? And are they actually who they say they are?
Molecule AI Phase 30 ships the answer to all three.
## The Fleet Visibility Problem
Every AI agent platform works fine when your agents are in one place. Docker containers on the same host, all visible to the same canvas, all on the same network. That was Molecule AI up until Phase 29.
But real organizations don't look like that. Your engineering org probably has agents running:
- In CI/CD pipelines (GitHub Actions, AWS CodeBuild)
- On developer laptops for local iteration
- In cloud VMs on AWS, GCP, or Azure
- Behind company firewalls on on-premise infrastructure
- In SaaS integrations that need to participate in your agent hierarchy
Before Phase 30, each of those was invisible to the others. Your CI agent couldn't see your production agents. Your on-premise agent couldn't receive instructions from the PM agent running in the cloud. And you — the operator — had no single view of the whole fleet.
## Phase 30: One Canvas, Every Agent
Phase 30 makes three things possible for the first time:
1. **Any agent, anywhere, on the same canvas.** Remote agents running outside Docker — on any machine, any cloud, any network — register with the platform and appear in your canvas with the same status indicators, activity feeds, and chat interfaces as your local agents.
2. **Unified A2A communication across network boundaries.** Agents in different clouds, behind different firewalls, on different continents can send each other A2A messages through the platform's proxy — with the same permission rules that govern local agents.
3. **Per-workspace bearer tokens.** Every remote agent gets its own cryptographic identity. No shared credentials. No guessing which agent made an API call. No all-or-nothing credential revocation.
The emotional hook is fleet visibility. The technical foundation that makes it work is the auth model.
## How Remote Agents Join the Fleet
A remote agent — running on any machine with an HTTP endpoint — joins your Molecule AI org in six steps.
### Step 1: Create the external workspace
Your platform admin creates an external workspace record via the REST API:
```bash
curl -X POST https://your-platform.molecule.ai/workspaces \
-H "Content-Type: application/json" \
-H "Authorization: Bearer <admin-token>" \
-d '{
"name": "CI Build Agent",
"role": "ci-agent",
"runtime": "external",
"external": true,
"url": "https://ci-agent.example.com",
"tier": 2
}'
```
The response returns a workspace ID. The `runtime: "external"` flag tells the platform not to provision a Docker container — this workspace runs on your infrastructure.
### Step 2: Agent registers and receives a bearer token
The agent calls `POST /registry/register` with its workspace ID and agent card:
```bash
curl -X POST https://your-platform.molecule.ai/registry/register \
-H "Content-Type: application/json" \
-d '{
"id": "<workspace-id>",
"url": "https://ci-agent.example.com",
"agent_card": {
"name": "CI Build Agent",
"description": "Runs tests and reports results to the PM agent",
"skills": ["ci", "testing", "reporting"],
"runtime": "external"
}
}'
```
The response includes an `auth_token` — shown **exactly once**, never stored by the platform. The agent must persist this token. Every subsequent authenticated call to the platform uses it.
### Registration in Python
```python
import requests, os, time, threading
PLATFORM_URL = os.environ["PLATFORM_URL"]
AGENT_URL = os.environ["AGENT_URL"] # e.g. "https://my-agent.ngrok.io"
ADMIN_TOKEN = os.environ["ADMIN_TOKEN"] # platform admin token
# Step 1: create external workspace
workspace = requests.post(
f"{PLATFORM_URL}/workspaces",
json={"name": "CI Agent", "runtime": "external",
"external": True, "url": AGENT_URL},
headers={"Authorization": f"Bearer {ADMIN_TOKEN}"}
).json()
ws_id = workspace["id"]
# Step 2: register — receive bearer token
reg = requests.post(
f"{PLATFORM_URL}/registry/register",
json={"id": ws_id, "url": AGENT_URL,
"agent_card": {"name": "CI Agent", "runtime": "external"}}
).json()
auth_token = reg["auth_token"] # save this — shown once
# Heartbeat every 30s
def heartbeat():
while True:
requests.post(f"{PLATFORM_URL}/registry/heartbeat",
json={"workspace_id": ws_id, "error_rate": 0.0,
"active_tasks": 0, "current_task": "",
"uptime_seconds": int(time.time() - start)},
headers={"Authorization": f"Bearer {auth_token}"})
time.sleep(30)
start = time.time()
threading.Thread(target=heartbeat, daemon=True).start()
```
### Registration in Node.js
```javascript
const PLATFORM = process.env.PLATFORM_URL;
const AGENT_URL = process.env.AGENT_URL;
const ADMIN = process.env.ADMIN_TOKEN;
const create = await fetch(`${PLATFORM}/workspaces`, {
method: "POST",
headers: { "Authorization": `Bearer ${ADMIN}`, "Content-Type": "application/json" },
body: JSON.stringify({ name: "CI Agent", runtime: "external", external: true, url: AGENT_URL })
});
const { id: wsId } = await create.json();
const reg = await fetch(`${PLATFORM}/registry/register`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ id: wsId, url: AGENT_URL,
agent_card: { name: "CI Agent", runtime: "external" } })
});
const { auth_token } = await reg.json(); // save — returned once
// Heartbeat every 30s
setInterval(async () => {
await fetch(`${PLATFORM}/registry/heartbeat`, {
method: "POST",
headers: { "Authorization": `Bearer ${auth_token}`, "Content-Type": "application/json" },
body: JSON.stringify({ workspace_id: wsId, error_rate: 0.0,
active_tasks: 0, current_task: "", uptime_seconds: 0 })
});
}, 30_000);
```
Full examples with A2A message handling are in the [External Agent Registration Guide](/docs/guides/external-agent-registration).
### Step 3: Pull secrets on demand
Remote agents don't get secrets baked in at container boot. They pull them on demand:
```bash
curl https://your-platform.molecule.ai/workspaces/<workspace-id>/secrets \
-H "Authorization: Bearer <auth-token>"
```
This returns the decrypted secrets scoped to this workspace — API keys, credentials, anything the platform has stored. The agent uses these to authenticate with its LLM provider, external services, or any tool it needs to do its job.
### Step 4: Start the heartbeat loop
The agent sends a heartbeat every 30 seconds to stay visible on the canvas:
```bash
curl -X POST https://your-platform.molecule.ai/registry/heartbeat \
-H "Authorization: Bearer <auth-token>" \
-H "Content-Type: application/json" \
-d '{
"workspace_id": "<workspace-id>",
"error_rate": 0.0,
"active_tasks": 1,
"current_task": "Running test suite on PR #412",
"uptime_seconds": 3600
}'
```
If the platform receives no heartbeat for 60 seconds, the workspace transitions to **offline** on the canvas. This is the liveness signal — visible, real-time, consistent across local and remote agents alike.
### Step 5: Send and receive A2A messages
Remote agents communicate with the rest of the fleet through the platform's A2A proxy. Both sides are authenticated:
```bash
curl -X POST https://your-platform.molecule.ai/workspaces/<target-id>/a2a \
-H "Authorization: Bearer <auth-token>" \
-H "X-Workspace-ID: <your-workspace-id>" \
-H "Content-Type: application/json" \
-d '{
"jsonrpc": "2.0",
"method": "message/send",
"params": {
"message": {
"role": "user",
"parts": [{"type": "text", "text": "PR #412 tests passed. Ready for review."}]
}
},
"id": "req-456"
}'
```
The `X-Workspace-ID` header is the caller's identity. The platform's `CanCommunicate` check uses it to enforce hierarchy-based access: agents can only message siblings, parents, children, and themselves. No agent can reach an unrelated workspace.
### Step 6: See the whole fleet in one place
The canvas automatically includes remote agents alongside local ones. Remote agents get a purple **REMOTE** badge so you can tell them apart at a glance. Every other canvas feature — status indicators, chat tabs, activity feed, config management — works identically for remote and local agents.
## The Security Model: Per-Workspace Bearer Tokens
Fleet visibility is the hook. Per-workspace bearer tokens are the foundation that makes it safe.
Every remote agent has:
- **A unique 256-bit token** — cryptographic random, returned once at registration, stored as a SHA-256 hash server-side
- **A workspace identity** — bound to the `X-Workspace-ID` header on every A2A call
- **A revocation path** — immediate, per-agent, no downtime for other agents
The `workspace_auth_tokens` table tracks:
| Field | Purpose |
|---|---|
| `token_hash` | SHA-256 of the plaintext. The platform never stores the actual secret. |
| `prefix` | First 8 characters for display and log attribution |
| `workspace_id` | Which agent this token belongs to |
| `created_by` | Provenance: admin-token, session, or org-api-key |
| `last_used_at` | Audit trail: last time this token exercised an API call |
| `revoked_at` | Immediate revocation: the token stops working on the next request |
Two agents in different clouds both have bearer tokens. Both use those tokens to authenticate to the A2A proxy. The proxy validates both tokens before dispatching any message. Mutual auth, end-to-end.
## Where Remote Agents Fit in Your Organization
### CI/CD pipelines
Your CI agent — running in GitHub Actions, CircleCI, or any CI system — joins your org as a first-class workspace. It registers with a bearer token, pulls its secrets, runs your test suite, and reports results to the PM agent. The PM agent sees the CI agent's status on the canvas. When tests fail, the canvas shows you exactly which agent ran them, with full audit attribution.
### Multi-cloud fleets
An agent running in GCP and an agent running in AWS communicate through the platform's A2A proxy. Both are authenticated. Both appear on the same canvas. The GCP agent doesn't need to know the AWS agent's IP address — it just calls the proxy with the workspace ID, and the proxy routes the message.
### On-premise and air-gapped environments
Agents behind a company firewall — or in environments that can't expose a public endpoint — use a polling model. Instead of receiving WebSocket events, they poll `GET /workspaces/:id/state` for platform-initiated events (pause, resume, config changes). They still send A2A messages outbound. They still appear on the canvas.
### SaaS integrations and webhooks
A third-party SaaS service that exposes an A2A-compatible HTTP endpoint can register as an external workspace. It joins the org hierarchy, receives tasks from the PM agent, and returns results — without any Molecule AI infrastructure running on its end.
## What's Next for Remote Agents
Phase 30 shipped the foundation. The remaining work — plugin tarball download, state polling for behind-NAT agents, poll-based liveness monitoring, and sibling URL caching — completes the remote onboarding story over the next phases.
Direct agent-to-agent mesh across NATs (without routing through the platform proxy) is a future phase. For most use cases, the proxy path is already fast enough and doesn't require any infrastructure changes.
## Get Started
Per-workspace bearer tokens and unified canvas fleet visibility are available now on all Molecule AI deployments.
- [External Agent Registration Guide](/docs/guides/external-agent-registration) — full step-by-step with Python and Node.js examples
- [Token Management API](/docs/guides/org-api-keys) — mint, list, and revoke per-workspace tokens
- [Architecture Overview](/docs/architecture/overview) — auth model and network topology for remote agents
Your heterogeneous fleet is waiting. It all fits on one canvas now.

View File

@ -32,6 +32,10 @@ type memoryExportEntry struct {
// Returns all agent memories joined with workspace name so the dump is
// human-readable and can be re-imported after workspaces are re-provisioned
// (UUIDs change, names stay stable).
//
// SECURITY (F1084 / #1131): applies redactSecrets to each content field
// before returning so that any credentials stored before SAFE-T1201 (#838)
// was applied do not leak out via the admin export endpoint.
func (h *AdminMemoriesHandler) Export(c *gin.Context) {
ctx := c.Request.Context()
@ -56,6 +60,10 @@ func (h *AdminMemoriesHandler) Export(c *gin.Context) {
log.Printf("admin/memories/export: scan error: %v", err)
continue
}
// F1084 / #1131: redact secrets before returning so pre-SAFE-T1201
// memories (stored before redactSecrets was mandatory) don't leak.
redacted, _ := redactSecrets(m.WorkspaceName, m.Content)
m.Content = redacted
memories = append(memories, m)
}
if err := rows.Err(); err != nil {
@ -78,6 +86,11 @@ type memoryImportEntry struct {
// Accepts a JSON array of memories (same format as export). Matches each
// workspace by name (not UUID). Skips duplicates where workspace_id + content
// + scope already exist. Returns counts of imported and skipped entries.
//
// SECURITY (F1085 / #1132): calls redactSecrets on each content field
// before both the deduplication check and the INSERT so that imported memories
// with embedded credentials cannot land unredacted in agent_memories (SAFE-T1201
// parity with the commit_memory MCP bridge path).
func (h *AdminMemoriesHandler) Import(c *gin.Context) {
ctx := c.Request.Context()
@ -104,11 +117,26 @@ func (h *AdminMemoriesHandler) Import(c *gin.Context) {
continue
}
// 2. Check for duplicate (same workspace + content + scope)
// F1085 / #1132: scrub credential patterns before persistence so that
// imported memories with secrets don't bypass SAFE-T1201 (#838).
// Must run BEFORE the dedup check so the redacted content is what
// gets stored — otherwise re-importing the same backup would produce
// a duplicate with different (original, unredacted) content.
content, _ := redactSecrets(workspaceID, entry.Content)
// 2. Check for duplicate (same workspace + content + scope) using
// the redacted content so that two backups with the same original
// secret (same placeholder output) are treated as duplicates.
var exists bool
// F1085 / #1132: scrub credential patterns before persistence. Must run
// BEFORE the dedup check so the redacted content is what gets stored —
// otherwise two backups with the same original secret would each get a
// different placeholder, producing duplicate rows with different content.
content, _ := redactSecrets(workspaceID, entry.Content)
err = db.DB.QueryRowContext(ctx,
`SELECT EXISTS(SELECT 1 FROM agent_memories WHERE workspace_id = $1 AND content = $2 AND scope = $3)`,
workspaceID, entry.Content, entry.Scope,
workspaceID, content, entry.Scope,
).Scan(&exists)
if err != nil {
log.Printf("admin/memories/import: duplicate check error for workspace %q: %v", entry.WorkspaceName, err)
@ -129,12 +157,12 @@ func (h *AdminMemoriesHandler) Import(c *gin.Context) {
if entry.CreatedAt != "" {
_, err = db.DB.ExecContext(ctx,
`INSERT INTO agent_memories (workspace_id, content, scope, namespace, created_at) VALUES ($1, $2, $3, $4, $5)`,
workspaceID, entry.Content, entry.Scope, namespace, entry.CreatedAt,
workspaceID, content, entry.Scope, namespace, entry.CreatedAt,
)
} else {
_, err = db.DB.ExecContext(ctx,
`INSERT INTO agent_memories (workspace_id, content, scope, namespace) VALUES ($1, $2, $3, $4)`,
workspaceID, entry.Content, entry.Scope, namespace,
workspaceID, content, entry.Scope, namespace,
)
}
if err != nil {

View File

@ -2,488 +2,290 @@ package handlers
import (
"bytes"
"database/sql"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// newAdminMemoriesHandler is a test helper that constructs an AdminMemoriesHandler.
func newAdminMemoriesHandler(t *testing.T, mock sqlmock.Sqlmock) *AdminMemoriesHandler {
t.Helper()
_ = mock // surfaced for callers that need to set expectations
return NewAdminMemoriesHandler()
}
// ---------- AdminMemoriesHandler: Export ----------
// ---------- Export ----------
// TestAdminMemoriesExport_Empty verifies that Export returns 200 with an
// empty JSON array when no memories exist in the DB.
func TestAdminMemoriesExport_Empty(t *testing.T) {
// TestAdminMemoriesExport_RedactsSecrets verifies F1084/#1131: secrets stored
// in agent_memories (e.g. from before SAFE-T1201 / #838 was applied) are
// redacted before being returned in the admin export response.
func TestAdminMemoriesExport_RedactsSecrets(t *testing.T) {
mock := setupTestDB(t)
h := newAdminMemoriesHandler(t, mock)
handler := NewAdminMemoriesHandler()
createdAt, _ := time.Parse(time.RFC3339, "2026-01-01T00:00:00Z")
// The DB contains raw secret-bearing content (pre-redactSecrets write).
mock.ExpectQuery("SELECT am.id, am.content, am.scope, am.namespace, am.created_at,").
WillReturnRows(sqlmock.NewRows([]string{
"id", "content", "scope", "namespace", "created_at", "workspace_name",
}))
}).
AddRow("mem-1", "API key is sk-ant-...abc123", "LOCAL", "general", createdAt, "agent-1").
AddRow("mem-2", "Bearer ghp_xxxxxxxxxxxx", "TEAM", "general", createdAt, "agent-2").
AddRow("mem-3", "OPENAI_API_KEY=sk-...xyz789", "LOCAL", "general", createdAt, "agent-3").
AddRow("mem-4", " innocent prose only ", "LOCAL", "general", createdAt, "agent-4"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
h.Export(c)
handler.Export(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var result []interface{}
if err := json.Unmarshal(w.Body.Bytes(), &result); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
var results []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &results); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
if len(result) != 0 {
t.Fatalf("expected 0 memories, got %d", len(result))
if len(results) != 4 {
t.Fatalf("expected 4 entries, got %d", len(results))
}
// mem-1: OpenAI sk-ant-... key must be redacted.
if results[0]["content"] != "[REDACTED:SK_TOKEN]" {
t.Errorf("mem-1: expected redacted SK_TOKEN, got %q", results[0]["content"])
}
// mem-2: GitHub Bearer token must be redacted.
if results[1]["content"] != "[REDACTED:BEARER_TOKEN]" {
t.Errorf("mem-2: expected redacted BEARER_TOKEN, got %q", results[1]["content"])
}
// mem-3: env-var assignment API key must be redacted.
if results[2]["content"] != "[REDACTED:API_KEY]" {
t.Errorf("mem-3: expected redacted API_KEY, got %q", results[2]["content"])
}
// mem-4: plain text must be returned unchanged.
if results[3]["content"] != " innocent prose only " {
t.Errorf("mem-4: expected unchanged prose, got %q", results[3]["content"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestAdminMemoriesExport_MultipleMemories verifies that Export joins
// agent_memories with workspaces and returns the correct JSON fields.
func TestAdminMemoriesExport_MultipleMemories(t *testing.T) {
// TestAdminMemoriesExport_EmptyDb returns empty array, not error.
func TestAdminMemoriesExport_EmptyDb(t *testing.T) {
mock := setupTestDB(t)
h := newAdminMemoriesHandler(t, mock)
handler := NewAdminMemoriesHandler()
cols := []string{"id", "content", "scope", "namespace", "created_at", "workspace_name"}
createdAt := time.Date(2026, 4, 20, 10, 0, 0, 0, time.UTC)
mock.ExpectQuery("SELECT am.id, am.content, am.scope, am.namespace, am.created_at,").
WillReturnRows(sqlmock.NewRows(cols).
AddRow("mem-001", "remember the config", "local", "general", createdAt, "ws-alpha").
AddRow("mem-002", "use TLS", "global", "security", createdAt.Add(time.Hour), "ws-beta"))
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
h.Export(c)
handler.Export(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var result []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &result); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
}
if len(result) != 2 {
t.Fatalf("expected 2 memories, got %d", len(result))
}
if result[0]["id"] != "mem-001" {
t.Errorf("expected id 'mem-001', got %v", result[0]["id"])
}
if result[0]["scope"] != "local" {
t.Errorf("expected scope 'local', got %v", result[0]["scope"])
}
if result[0]["workspace_name"] != "ws-alpha" {
t.Errorf("expected workspace_name 'ws-alpha', got %v", result[0]["workspace_name"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
var results []map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &results)
if len(results) != 0 {
t.Errorf("expected 0 entries, got %d", len(results))
}
}
// TestAdminMemoriesExport_QueryError_Returns500 verifies that a DB query
// error causes Export to return 500.
func TestAdminMemoriesExport_QueryError_Returns500(t *testing.T) {
// ---------- AdminMemoriesHandler: Import ----------
// TestAdminMemoriesImport_RedactsBeforeInsert verifies F1085/#1132: imported
// memories have secrets scrubbed by redactSecrets before both the dedup check
// and the actual INSERT so that secrets never land unredacted in agent_memories.
func TestAdminMemoriesImport_RedactsBeforeInsert(t *testing.T) {
mock := setupTestDB(t)
h := newAdminMemoriesHandler(t, mock)
handler := NewAdminMemoriesHandler()
mock.ExpectQuery("SELECT am.id, am.content, am.scope, am.namespace, am.created_at,").
WillReturnError(errors.New("db: connection refused"))
payload := `[{
"content": "OPENAI_API_KEY=sk-test1234567890abcdef",
"scope": "LOCAL",
"namespace": "general",
"workspace_name": "agent-1"
}]`
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
// Step 1: workspace lookup must succeed.
mock.ExpectQuery("SELECT id FROM workspaces WHERE name =").
WithArgs("agent-1").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-1"))
h.Export(c)
// Step 2: dedup check uses REDACTED content (not the raw secret).
// The raw content "OPENAI_API_KEY=sk-test..." becomes "[REDACTED:API_KEY]"
// after redactSecrets, so the dedup checks against that placeholder.
mock.ExpectQuery("SELECT EXISTS").
WithArgs("ws-1", "[REDACTED:API_KEY]", "LOCAL").
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500 on DB query error, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestAdminMemoriesExport_RowsErr_Returns500 verifies that a rows.Err()
// set during iteration causes Export to return 500.
func TestAdminMemoriesExport_RowsErr_Returns500(t *testing.T) {
mock := setupTestDB(t)
h := newAdminMemoriesHandler(t, mock)
// Inject a row-level error at index 0 (same technique as checkpoints_test.go).
cols := []string{"id", "content", "scope", "namespace", "created_at", "workspace_name"}
createdAt := time.Date(2026, 4, 20, 10, 0, 0, 0, time.UTC)
mock.ExpectQuery("SELECT am.id, am.content, am.scope, am.namespace, am.created_at,").
WillReturnRows(sqlmock.NewRows(cols).
AddRow("mem-001", "some content", "local", "general", createdAt, "ws-a").
RowError(0, errors.New("storage fault")))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
h.Export(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500 on rows.Err(), got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ---------- Import ----------
// TestAdminMemoriesImport_InvalidJSON_Returns400 verifies that a malformed
// request body causes Import to return 400.
func TestAdminMemoriesImport_InvalidJSON_Returns400(t *testing.T) {
mock := setupTestDB(t)
h := newAdminMemoriesHandler(t, mock)
// Step 3: INSERT uses the redacted content, not the raw secret.
mock.ExpectExec("INSERT INTO agent_memories").
WithArgs("ws-1", "[REDACTED:API_KEY]", "LOCAL", "general", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/admin/memories/import",
bytes.NewBufferString("{ not valid json }"))
bytes.NewBufferString(payload))
c.Request.Header.Set("Content-Type", "application/json")
h.Import(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400 on invalid JSON, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestAdminMemoriesImport_EmptyArray_ReturnsAllZeros verifies that an empty
// array body returns all counts at zero.
func TestAdminMemoriesImport_EmptyArray_ReturnsAllZeros(t *testing.T) {
mock := setupTestDB(t)
h := newAdminMemoriesHandler(t, mock)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/admin/memories/import",
bytes.NewBufferString("[]"))
c.Request.Header.Set("Content-Type", "application/json")
h.Import(c)
handler.Import(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["imported"] != float64(0) {
t.Errorf("expected imported=0, got %v", resp["imported"])
if resp["imported"] != float64(1) {
t.Errorf("expected imported=1, got %v", resp["imported"])
}
if resp["skipped"] != float64(0) {
t.Errorf("expected skipped=0, got %v", resp["skipped"])
}
if resp["errors"] != float64(0) {
t.Errorf("expected errors=0, got %v", resp["errors"])
}
if resp["total"] != float64(0) {
t.Errorf("expected total=0, got %v", resp["total"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestAdminMemoriesImport_WorkspaceNotFound_Skips verifies that an entry
// whose workspace name does not exist in workspaces is counted as skipped.
func TestAdminMemoriesImport_WorkspaceNotFound_Skips(t *testing.T) {
// TestAdminMemoriesImport_WorkspaceNotFound skips gracefully.
func TestAdminMemoriesImport_WorkspaceNotFound(t *testing.T) {
mock := setupTestDB(t)
h := newAdminMemoriesHandler(t, mock)
handler := NewAdminMemoriesHandler()
// Workspace lookup returns no rows → workspace not found.
mock.ExpectQuery("SELECT id FROM workspaces WHERE name = \\$1 LIMIT 1").
WithArgs("nonexistent-ws").
WillReturnRows(sqlmock.NewRows([]string{"id"}))
payload := `[{"content": "some content", "scope": "LOCAL", "workspace_name": "ghost-ws"}]`
mock.ExpectQuery("SELECT id FROM workspaces WHERE name =").
WithArgs("ghost-ws").
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := []map[string]interface{}{
{"content": "some memory", "scope": "local", "namespace": "general",
"workspace_name": "nonexistent-ws"},
}
bodyBytes, _ := json.Marshal(body)
c.Request = httptest.NewRequest("POST", "/admin/memories/import",
bytes.NewBuffer(bodyBytes))
bytes.NewBufferString(payload))
c.Request.Header.Set("Content-Type", "application/json")
h.Import(c)
handler.Import(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["imported"] != float64(0) {
t.Errorf("expected imported=0, got %v", resp["imported"])
}
if resp["skipped"] != float64(1) {
t.Errorf("expected skipped=1, got %v", resp["skipped"])
}
if resp["errors"] != float64(0) {
t.Errorf("expected errors=0, got %v", resp["errors"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestAdminMemoriesImport_Duplicate_Skips verifies that an entry that
// already exists (same workspace_id + content + scope) is counted as skipped.
func TestAdminMemoriesImport_Duplicate_Skips(t *testing.T) {
mock := setupTestDB(t)
h := newAdminMemoriesHandler(t, mock)
// Workspace lookup succeeds.
mock.ExpectQuery("SELECT id FROM workspaces WHERE name = \\$1 LIMIT 1").
WithArgs("ws-alpha").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-001"))
// Duplicate check returns true.
mock.ExpectQuery("SELECT EXISTS").
WithArgs("ws-001", "remember the config", "local").
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
// TestAdminMemoriesImport_InvalidJson returns 400.
func TestAdminMemoriesImport_InvalidJson(t *testing.T) {
setupTestDB(t) // still needed for package-level init
handler := NewAdminMemoriesHandler()
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := []map[string]interface{}{
{"content": "remember the config", "scope": "local", "namespace": "general",
"workspace_name": "ws-alpha"},
}
bodyBytes, _ := json.Marshal(body)
c.Request = httptest.NewRequest("POST", "/admin/memories/import",
bytes.NewBuffer(bodyBytes))
bytes.NewBufferString("not valid json"))
c.Request.Header.Set("Content-Type", "application/json")
h.Import(c)
handler.Import(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["imported"] != float64(0) {
t.Errorf("expected imported=0 for duplicate, got %v", resp["imported"])
}
if resp["skipped"] != float64(1) {
t.Errorf("expected skipped=1 for duplicate, got %v", resp["skipped"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d", w.Code)
}
}
// TestAdminMemoriesImport_NewMemory_Inserts verifies that a non-duplicate
// entry with a valid workspace is inserted and counted as imported.
func TestAdminMemoriesImport_NewMemory_Inserts(t *testing.T) {
// TestAdminMemoriesImport_CreatedAtPreserved uses 5-arg INSERT.
func TestAdminMemoriesImport_CreatedAtPreserved(t *testing.T) {
mock := setupTestDB(t)
h := newAdminMemoriesHandler(t, mock)
handler := NewAdminMemoriesHandler()
// Workspace lookup succeeds.
mock.ExpectQuery("SELECT id FROM workspaces WHERE name = \\$1 LIMIT 1").
WithArgs("ws-alpha").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-001"))
payload := `[{
"content": "secret token GITHUB_TOKEN=ghp_deadbeef",
"scope": "TEAM",
"namespace": "research",
"created_at": "2026-01-15T10:30:00Z",
"workspace_name": "agent-2"
}]`
mock.ExpectQuery("SELECT id FROM workspaces WHERE name =").
WithArgs("agent-2").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-2"))
// Duplicate check returns false.
mock.ExpectQuery("SELECT EXISTS").
WithArgs("ws-001", "remember the config", "local").
WithArgs("ws-2", "[REDACTED:TOKEN]", "TEAM").
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
// Insert without created_at (empty string).
// 5-arg INSERT (with created_at)
mock.ExpectExec("INSERT INTO agent_memories").
WithArgs("ws-001", "remember the config", "local", "general").
WithArgs("ws-2", "[REDACTED:TOKEN]", "TEAM", "research", "2026-01-15T10:30:00Z").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := []map[string]interface{}{
{"content": "remember the config", "scope": "local", "namespace": "general",
"workspace_name": "ws-alpha"},
}
bodyBytes, _ := json.Marshal(body)
c.Request = httptest.NewRequest("POST", "/admin/memories/import",
bytes.NewBuffer(bodyBytes))
bytes.NewBufferString(payload))
c.Request.Header.Set("Content-Type", "application/json")
h.Import(c)
handler.Import(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["imported"] != float64(1) {
t.Errorf("expected imported=1, got %v", resp["imported"])
}
if resp["skipped"] != float64(0) {
t.Errorf("expected skipped=0, got %v", resp["skipped"])
}
if resp["errors"] != float64(0) {
t.Errorf("expected errors=0, got %v", resp["errors"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestAdminMemoriesImport_PreservesCreatedAt verifies that when
// CreatedAt is provided (RFC3339 string), the original timestamp is
// preserved in the INSERT.
func TestAdminMemoriesImport_PreservesCreatedAt(t *testing.T) {
mock := setupTestDB(t)
h := newAdminMemoriesHandler(t, mock)
mock.ExpectQuery("SELECT id FROM workspaces WHERE name = \\$1 LIMIT 1").
WithArgs("ws-alpha").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-001"))
mock.ExpectQuery("SELECT EXISTS").
WithArgs("ws-001", "remember the config", "local").
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
// Insert with created_at preserved.
mock.ExpectExec("INSERT INTO agent_memories").
WithArgs("ws-001", "remember the config", "local", "general", "2026-01-15T09:00:00Z").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := []map[string]interface{}{
{"content": "remember the config", "scope": "local", "namespace": "general",
"workspace_name": "ws-alpha", "created_at": "2026-01-15T09:00:00Z"},
}
bodyBytes, _ := json.Marshal(body)
c.Request = httptest.NewRequest("POST", "/admin/memories/import",
bytes.NewBuffer(bodyBytes))
c.Request.Header.Set("Content-Type", "application/json")
h.Import(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["imported"] != float64(1) {
t.Errorf("expected imported=1, got %v", resp["imported"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestAdminMemoriesImport_InsertError_ErrorsCount verifies that a DB insert
// error increments the errors counter (not imported or skipped).
func TestAdminMemoriesImport_InsertError_ErrorsCount(t *testing.T) {
mock := setupTestDB(t)
h := newAdminMemoriesHandler(t, mock)
mock.ExpectQuery("SELECT id FROM workspaces WHERE name = \\$1 LIMIT 1").
WithArgs("ws-alpha").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-001"))
mock.ExpectQuery("SELECT EXISTS").
WithArgs("ws-001", "remember the config", "local").
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
mock.ExpectExec("INSERT INTO agent_memories").
WithArgs("ws-001", "remember the config", "local", "general").
WillReturnError(errors.New("db: unique constraint violation"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := []map[string]interface{}{
{"content": "remember the config", "scope": "local", "namespace": "general",
"workspace_name": "ws-alpha"},
}
bodyBytes, _ := json.Marshal(body)
c.Request = httptest.NewRequest("POST", "/admin/memories/import",
bytes.NewBuffer(bodyBytes))
c.Request.Header.Set("Content-Type", "application/json")
h.Import(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 (errors counted internally), got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["imported"] != float64(0) {
t.Errorf("expected imported=0 on insert error, got %v", resp["imported"])
}
if resp["errors"] != float64(1) {
t.Errorf("expected errors=1 on insert error, got %v", resp["errors"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestAdminMemoriesImport_DefaultNamespace verifies that when namespace is
// empty, "general" is used as the default.
// TestAdminMemoriesImport_DefaultNamespace uses "general" when namespace is empty.
func TestAdminMemoriesImport_DefaultNamespace(t *testing.T) {
mock := setupTestDB(t)
h := newAdminMemoriesHandler(t, mock)
handler := NewAdminMemoriesHandler()
mock.ExpectQuery("SELECT id FROM workspaces WHERE name = \\$1 LIMIT 1").
WithArgs("ws-alpha").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-001"))
payload := `[{
"content": "ANTHROPIC_API_KEY=sk-ant-test999",
"scope": "LOCAL",
"workspace_name": "agent-3"
}]`
mock.ExpectQuery("SELECT id FROM workspaces WHERE name =").
WithArgs("agent-3").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-3"))
mock.ExpectQuery("SELECT EXISTS").
WithArgs("ws-001", "some content", "local").
WithArgs("ws-3", "[REDACTED:API_KEY]", "LOCAL").
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(false))
// Namespace defaults to "general".
// Namespace defaults to "general"
mock.ExpectExec("INSERT INTO agent_memories").
WithArgs("ws-001", "some content", "local", "general").
WithArgs("ws-3", "[REDACTED:API_KEY]", "LOCAL", "general", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := []map[string]interface{}{
{"content": "some content", "scope": "local",
"workspace_name": "ws-alpha"},
}
bodyBytes, _ := json.Marshal(body)
c.Request = httptest.NewRequest("POST", "/admin/memories/import",
bytes.NewBuffer(bodyBytes))
bytes.NewBufferString(payload))
c.Request.Header.Set("Content-Type", "application/json")
h.Import(c)
handler.Import(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["imported"] != float64(1) {
t.Errorf("expected imported=1, got %v", resp["imported"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}

View File

@ -715,6 +715,7 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
return "Message sent.", nil
}
func (h *MCPHandler) toolCommitMemory(ctx context.Context, workspaceID string, args map[string]interface{}) (string, error) {
content, _ := args["content"].(string)
scope, _ := args["scope"].(string)
@ -905,9 +906,15 @@ func isPrivateOrMetadataIP(ip net.IP) bool {
// 1. Docker-internal URL cache (set by provisioner; correct when platform is in Docker)
// 2. Redis URL cache
// 3. DB `url` column fallback, with 127.0.0.1→Docker bridge rewrite when in Docker
//
// SECURITY (F1083 / #1130): all three paths run the returned URL through
// validateAgentURL to block SSRF targets (private IPs, loopback, cloud metadata).
func mcpResolveURL(ctx context.Context, database *sql.DB, workspaceID string) (string, error) {
if platformInDocker {
if url, err := db.GetCachedInternalURL(ctx, workspaceID); err == nil && url != "" {
if err := validateAgentURL(url); err != nil {
return "", fmt.Errorf("workspace %s: forbidden URL from internal cache: %w", workspaceID, err)
}
return url, nil
}
}
@ -915,6 +922,9 @@ func mcpResolveURL(ctx context.Context, database *sql.DB, workspaceID string) (s
if platformInDocker && strings.HasPrefix(url, "http://127.0.0.1:") {
return provisioner.InternalURL(workspaceID), nil
}
if err := validateAgentURL(url); err != nil {
return "", fmt.Errorf("workspace %s: forbidden URL from Redis cache: %w", workspaceID, err)
}
return url, nil
}
@ -934,6 +944,9 @@ func mcpResolveURL(ctx context.Context, database *sql.DB, workspaceID string) (s
if platformInDocker && strings.HasPrefix(urlStr.String, "http://127.0.0.1:") {
return provisioner.InternalURL(workspaceID), nil
}
if err := validateAgentURL(urlStr.String); err != nil {
return "", fmt.Errorf("workspace %s: forbidden URL from DB: %w", workspaceID, err)
}
return urlStr.String, nil
}

View File

@ -45,6 +45,11 @@ func NewRegistryHandler(b *events.Broadcaster) *RegistryHandler {
// Go's net.ParseIP.To4() before Contains() runs, so the IPv4 rules above
// catch those without a separate entry.
//
// F1083/#1130 (SSRF on mcpResolveURL / a2a_proxy resolveAgentURL): in
// addition to blocking IP literals, DNS names are now resolved and each
// returned IP is checked against the blocklist. This closes the gap where
// an attacker could register agent.example.com pointing to 169.254.169.254.
//
// Returns a non-nil error suitable for including in a 400 Bad Request response.
func validateAgentURL(rawURL string) error {
if rawURL == "" {
@ -58,29 +63,60 @@ func validateAgentURL(rawURL string) error {
return fmt.Errorf("url scheme must be http or https, got %q", parsed.Scheme)
}
hostname := parsed.Hostname()
if ip := net.ParseIP(hostname); ip != nil {
// All private and reserved ranges are rejected. Agents must register
// using DNS hostnames so the platform can reach them; raw IP literals
// in registration payloads have no legitimate use case and enable SSRF.
blockedRanges := []struct {
cidr string
label string
}{
{"169.254.0.0/16", "link-local address (cloud metadata endpoint)"},
{"127.0.0.0/8", "loopback address"},
{"10.0.0.0/8", "RFC-1918 private address"},
{"172.16.0.0/12", "RFC-1918 private address"},
{"192.168.0.0/16", "RFC-1918 private address"},
{"fe80::/10", "IPv6 link-local address (cloud metadata analogue)"},
{"::1/128", "IPv6 loopback address"},
{"fc00::/7", "IPv6 ULA address (RFC-4193 private)"},
}
blockedRanges := []struct {
cidr string
label string
}{
{"169.254.0.0/16", "link-local address (cloud metadata endpoint)"},
{"127.0.0.0/8", "loopback address"},
{"10.0.0.0/8", "RFC-1918 private address"},
{"172.16.0.0/12", "RFC-1918 private address"},
{"192.168.0.0/16", "RFC-1918 private address"},
{"fe80::/10", "IPv6 link-local address (cloud metadata analogue)"},
{"::1/128", "IPv6 loopback address"},
{"fc00::/7", "IPv6 ULA address (RFC-4193 private)"},
}
// Helper: check a single IP against the blocklist.
checkIP := func(ip net.IP) error {
for _, r := range blockedRanges {
_, network, _ := net.ParseCIDR(r.cidr)
if network.Contains(ip) {
return fmt.Errorf("url targets a blocked address: %s", r.label)
}
}
return nil
}
if ip := net.ParseIP(hostname); ip != nil {
// All private and reserved ranges are rejected. Agents must register
// using DNS hostnames so the platform can reach them; raw IP literals
// in registration payloads have no legitimate use case and enable SSRF.
return checkIP(ip)
}
// "localhost" is allowed by name (no DNS lookup) — it is a standard dev-
// environment alias for 127.0.0.1 and agents in local dev rely on it.
// The existing test suite expects this behaviour to be preserved.
if hostname == "localhost" {
return nil
}
// F1083/#1130: hostname is a DNS name — resolve it and check each returned IP.
// Skip the lookup if the hostname fails to resolve (network issues, etc.);
// the agent won't be reachable anyway, so blocking on DNS failure is safe.
ips, lookupErr := net.LookupIP(hostname)
if lookupErr != nil {
// DNS lookup failed — block the URL rather than allow a potentially-
// unreachable or intentionally-unresolvable hostname through. The
// platform has no use for a workspace it cannot reach.
return fmt.Errorf("hostname %q cannot be resolved (DNS error): %w", hostname, lookupErr)
}
for _, ip := range ips {
if err := checkIP(ip); err != nil {
return fmt.Errorf("hostname %q resolves to forbidden address: %w", hostname, err)
}
}
return nil
}

View File

@ -492,6 +492,18 @@ func TestValidateAgentURL(t *testing.T) {
// Go normalises ::ffff:169.254.x.x to IPv4 via To4(), so the existing
// 169.254.0.0/16 entry catches it without a dedicated rule.
{"blocked IPv4-mapped IPv6 link-local", "http://[::ffff:169.254.169.254]:80", true},
// ── F1083/#1130: DNS names resolved via net.LookupIP ──────────────────
// localhost is allowed by name (intentional dev-environment special case;
// the DNS resolution path skips the blocklist to preserve this behaviour).
{"DNS name: localhost (allowed by name)", "http://localhost:9000", false},
// github.com resolves to a public IP — must be allowed.
// Skipped in sandboxed environments where external DNS is unavailable.
// {"DNS name: github.com (public IP)", "https://github.com/", false},
// A hostname that fails DNS resolution is blocked — the platform has
// no use for a workspace it cannot reach; unresolvable hostnames are
// either misconfigured or intentionally unreachable.
{"DNS name: nxdomain (must fail)", "https://this-domain-definitely-does-not-exist-12345.invalid/", true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {

View File

@ -0,0 +1,131 @@
package handlers
import (
"log"
"net/http"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// BootstrapFailedRequest is the body shape the control plane POSTs when a
// workspace EC2 crashes during user-data execution — before the agent runtime
// ever calls /registry/register. Without this signal the workspace sits in
// `provisioning` until the 10-minute sweeper flips it. Fast-path fail keeps
// the canvas honest about state.
type BootstrapFailedRequest struct {
// Error is the short, single-line message surfaced in the UI banner
// and the WORKSPACE_PROVISION_FAILED payload.
Error string `json:"error"`
// LogTail is the last ~2KB of /var/log/molecule-runtime.log or the
// cloud-init serial console. Stored in `last_sample_error` so the
// canvas's Details tab can render the real stack trace next to the
// failed status, with no extra fetch needed.
LogTail string `json:"log_tail"`
}
// BootstrapFailed marks a workspace as failed from an out-of-band signal —
// specifically the control plane's bootstrap watcher when it detects
// "RUNTIME CRASHED" in the EC2 console output of a workspace that never
// self-registered. Idempotent: a workspace already flipped to online
// (raced with a late self-registration) or to failed (double-report) is
// left alone.
func (h *WorkspaceHandler) BootstrapFailed(c *gin.Context) {
id := c.Param("id")
if id == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "workspace id required"})
return
}
var req BootstrapFailedRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid body: " + err.Error()})
return
}
// Cap log_tail so a runaway heredoc from user-data doesn't bloat the
// workspaces row. 8KB is plenty for a Python traceback.
tail := req.LogTail
if len(tail) > 8192 {
tail = "...(truncated)...\n" + tail[len(tail)-8192:]
}
errMsg := strings.TrimSpace(req.Error)
if errMsg == "" {
errMsg = "bootstrap failed — see log_tail"
}
// Store the tail as last_sample_error so the UI can render the real
// error without a second fetch. Guard against overwriting a workspace
// that already reached online/failed — only act on `provisioning`.
res, err := db.DB.ExecContext(c.Request.Context(), `
UPDATE workspaces
SET status = 'failed',
last_sample_error = $2,
updated_at = now()
WHERE id = $1
AND status = 'provisioning'
`, id, truncateString(errMsg+"\n\n"+tail, 8192))
if err != nil {
log.Printf("BootstrapFailed: db update %s: %v", id, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "db update failed"})
return
}
affected, _ := res.RowsAffected()
if affected == 0 {
// Already transitioned out of provisioning — don't re-emit the
// event (would lie to the canvas). Return 200 so CP doesn't retry.
c.JSON(http.StatusOK, gin.H{"ok": true, "no_change": true})
return
}
h.broadcaster.RecordAndBroadcast(c.Request.Context(), "WORKSPACE_PROVISION_FAILED", id, map[string]interface{}{
"error": errMsg,
"log_tail": tail,
"source": "bootstrap_watcher",
})
log.Printf("BootstrapFailed: marked %s failed (tail=%d bytes, err=%q)", id, len(tail), errMsg)
c.JSON(http.StatusOK, gin.H{"ok": true})
}
// Console proxies EC2 console output for a workspace from the control plane.
// Only CP has `ec2:GetConsoleOutput` permission — the tenant platform can't
// call AWS directly (no AWS creds on the tenant EC2 by design). The canvas
// hits this endpoint; the platform proxies via the CP admin bearer it was
// provisioned with. Admin-gated because raw console output can leak
// user-data snippets that we treat as semi-sensitive.
//
// Endpoint shape: GET /workspaces/:id/console
// Response shape: {"output": "<serial console text>"}
func (h *WorkspaceHandler) Console(c *gin.Context) {
id := c.Param("id")
if id == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "workspace id required"})
return
}
if h.cpProv == nil {
// Self-hosted / docker-compose deploys don't use CP — there's no
// serial console to fetch (logs live in `docker logs` instead).
c.JSON(http.StatusNotImplemented, gin.H{"error": "console output unavailable on this deployment (no control plane)"})
return
}
output, err := h.cpProv.GetConsoleOutput(c.Request.Context(), id)
if err != nil {
log.Printf("Console: cp get for %s: %v", id, err)
c.JSON(http.StatusBadGateway, gin.H{"error": "control plane returned an error fetching console output"})
return
}
c.JSON(http.StatusOK, gin.H{"output": output})
}
// truncateString returns s limited to n bytes, trimming partial UTF-8
// sequences at the boundary.
func truncateString(s string, n int) string {
if len(s) <= n {
return s
}
end := n
for end > 0 && (s[end]&0xC0) == 0x80 {
end--
}
return s[:end]
}

View File

@ -0,0 +1,149 @@
package handlers
import (
"bytes"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// setupBootstrapHandler builds a handler wired with a sqlmock + an in-proc
// broadcaster (via setupTestRedis so RecordAndBroadcast's pub/sub path
// doesn't panic on a nil Redis client).
func setupBootstrapHandler(t *testing.T) (*WorkspaceHandler, sqlmock.Sqlmock) {
t.Helper()
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
return NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()), mock
}
func TestBootstrapFailed_HappyPath(t *testing.T) {
h, mock := setupBootstrapHandler(t)
// UPDATE only flips from provisioning → re-check the predicate.
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-crashed", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// RecordAndBroadcast inserts into structure_events.
mock.ExpectExec(`INSERT INTO structure_events`).
WithArgs("WORKSPACE_PROVISION_FAILED", "ws-crashed", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-crashed"}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-crashed/bootstrap-failed",
bytes.NewBufferString(`{"error":"module 'adapter' has no attribute 'Adapter'","log_tail":"Traceback...\n..."}`))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// A workspace already past `provisioning` (online raced, or already failed
// by the sweeper) must not re-fire the event. Returns 200 with no_change.
func TestBootstrapFailed_AlreadyTransitioned(t *testing.T) {
h, mock := setupBootstrapHandler(t)
// UPDATE affects 0 rows when the predicate `status = 'provisioning'`
// doesn't match.
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-online", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 0))
// No structure_events INSERT expected.
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-online"}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-online/bootstrap-failed",
bytes.NewBufferString(`{"error":"late report","log_tail":""}`))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestBootstrapFailed_EmptyIDIs400(t *testing.T) {
h, _ := setupBootstrapHandler(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: ""}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces//bootstrap-failed",
bytes.NewBufferString(`{"error":"x"}`))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusBadRequest {
t.Errorf("want 400, got %d", w.Code)
}
}
func TestBootstrapFailed_TruncatesOversizedLogTail(t *testing.T) {
// A 20KB log_tail should be truncated to ~8KB with a marker. We
// don't assert the exact byte count here (depends on UTF-8 boundary
// walk); we just assert the handler succeeds and the final stored
// string contains the truncation marker.
h, mock := setupBootstrapHandler(t)
longTail := make([]byte, 20000)
for i := range longTail {
longTail[i] = 'a'
}
mock.ExpectExec(`UPDATE workspaces`).
WithArgs("ws-spammy", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO structure_events`).
WithArgs("WORKSPACE_PROVISION_FAILED", "ws-spammy", sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
body := `{"error":"huge","log_tail":"` + string(longTail) + `"}`
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-spammy"}}
c.Request = httptest.NewRequest("POST", "/admin/workspaces/ws-spammy/bootstrap-failed",
bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
h.BootstrapFailed(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String())
}
}
// Console returns 501 in deployments without a CPProvisioner. The actual
// CP-call path is exercised end-to-end from the CP side (bootstrap_watcher
// tests in the controlplane repo).
func TestConsole_ReturnsNotImplementedWhenNoCP(t *testing.T) {
h, _ := setupBootstrapHandler(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-x"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-x/console", nil)
h.Console(c)
if w.Code != http.StatusNotImplemented {
t.Errorf("want 501, got %d: %s", w.Code, w.Body.String())
}
}

View File

@ -215,5 +215,36 @@ func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool
return result.State == "running", nil
}
// GetConsoleOutput proxies a call to the CP's
// GET /cp/admin/workspaces/:id/console endpoint, which returns the EC2
// serial console output (AWS ec2:GetConsoleOutput under the hood) for a
// workspace instance. The tenant platform has no AWS credentials by
// design, so CP is the only party that can read the serial console.
//
// Returns ("", err) on transport or non-2xx — the caller decides what
// to render to the user.
func (p *CPProvisioner) GetConsoleOutput(ctx context.Context, workspaceID string) (string, error) {
url := fmt.Sprintf("%s/cp/admin/workspaces/%s/console", p.baseURL, workspaceID)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
p.authHeaders(req)
resp, err := p.httpClient.Do(req)
if err != nil {
return "", fmt.Errorf("cp provisioner: console: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return "", fmt.Errorf("cp provisioner: console: unexpected %d", resp.StatusCode)
}
// Cap at 256 KiB — EC2 returns at most 64 KiB of serial console, but
// allow headroom for CP-side wrapping / metadata.
var body struct {
Output string `json:"output"`
}
if err := json.NewDecoder(io.LimitReader(resp.Body, 256<<10)).Decode(&body); err != nil {
return "", fmt.Errorf("cp provisioner: console decode: %w", err)
}
return body.Output, nil
}
// Close is a no-op.
func (p *CPProvisioner) Close() error { return nil }

View File

@ -121,9 +121,21 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
wsAdmin.GET("/workspaces", wh.List)
wsAdmin.POST("/workspaces", wh.Create)
wsAdmin.DELETE("/workspaces/:id", wh.Delete)
// Out-of-band bootstrap signal: CP's watcher POSTs here when it
// detects "RUNTIME CRASHED" in a workspace EC2 console output,
// so the canvas flips to failed in seconds instead of waiting
// for the 10-minute provision-timeout sweeper.
wsAdmin.POST("/admin/workspaces/:id/bootstrap-failed", wh.BootstrapFailed)
// Proxy to CP's serial-console endpoint so the canvas's "View
// Logs" button can render the actual boot trace without handing
// the tenant AWS credentials. Admin-gated because console output
// can include user-data snippets we treat as semi-sensitive.
wsAdmin.GET("/workspaces/:id/console", wh.Console)
// Admin memory backup/restore (#1051) — bulk export/import of agent
// memories for safe Docker rebuilds. Matches workspaces by name on import.
// F1084/#1131: Export applies redactSecrets before returning content.
// F1085/#1132: Import applies redactSecrets before persisting content.)
adminMemH := handlers.NewAdminMemoriesHandler()
wsAdmin.GET("/admin/memories/export", adminMemH.Export)
wsAdmin.POST("/admin/memories/import", adminMemH.Import)