Merge pull request #1263 from Molecule-AI/staging

staging → main: sweeper emits PROVISION_FAILED not _TIMEOUT
This commit is contained in:
Hongming Wang 2026-04-20 20:39:45 -07:00 committed by GitHub
commit 6311c30dd8
26 changed files with 887 additions and 333 deletions

View File

@ -6,12 +6,12 @@ on:
pull_request:
branches: [main, staging]
# Cancel in-progress CI runs when a new commit arrives on the same ref.
# This prevents multiple stale runs from queuing behind each other and
# monopolising the self-hosted macOS arm64 runner.
# Queue new CI runs when a commit arrives on the same ref.
# New runs queue instead of cancelling each other — prevents
# the single self-hosted macOS arm64 runner from being monopolised.
concurrency:
group: ci-${{ github.ref }}
cancel-in-progress: true
cancel-in-progress: false
jobs:
# Detect which paths changed so downstream jobs can skip when only

View File

@ -23,6 +23,13 @@ on:
# Weekly run picks up findings in code that hasn't been touched.
- cron: '30 1 * * 0'
# Workflow-level concurrency: only one CodeQL run per branch/PR at a time.
# `cancel-in-progress: false` queues new runs — the 45-min analysis is the
# longest CI occupant and fights the single mac mini runner the hardest.
concurrency:
group: codeql-${{ github.ref }}
cancel-in-progress: false
permissions:
actions: read
contents: read

View File

@ -15,7 +15,7 @@
* - Polling: provisioning orgs schedule a 5s refresh (fake timers)
*/
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { render, screen, waitFor, cleanup } from "@testing-library/react";
import { render, screen, cleanup } from "@testing-library/react";
// ── Hoisted mocks ────────────────────────────────────────────────────────────
// vi.mock factories are hoisted above imports; any captured references must
@ -90,6 +90,8 @@ beforeEach(() => {
// vi.useFakeTimers() in the polling tests would be a no-op — causing
// setTimeout(0) to hang and the test to time out.
vi.useRealTimers();
// Now install fake timers for this test's deterministic timing.
vi.useFakeTimers();
vi.clearAllMocks();
// Reset mock return values so each test starts fresh.
// The mock functions (vi.fn) persist across tests; only their
@ -100,62 +102,47 @@ beforeEach(() => {
});
afterEach(() => {
// Ensure fake timers are never left active after a test — even one that
// failed before reaching its own finally-block.
vi.useRealTimers();
cleanup();
// Restore real timers so subsequent tests (and vitest internals)
// aren't polluted by fake timer state from a previous test.
vi.useRealTimers();
});
// ── Tests ────────────────────────────────────────────────────────────────────
describe("/orgs — auth guard", () => {
it("redirects to login when session is null", async () => {
vi.useRealTimers();
try {
mockFetchSession.mockResolvedValueOnce(null);
render(<OrgsPage />);
await new Promise((r) => setTimeout(r, 50));
expect(mockRedirectToLogin).toHaveBeenCalled();
// Must not attempt to fetch /cp/orgs before auth is established
expect(mockFetch).not.toHaveBeenCalledWith(
expect.stringContaining("/cp/orgs"),
expect.anything()
);
} finally {
vi.useFakeTimers();
}
mockFetchSession.mockResolvedValueOnce(null);
render(<OrgsPage />);
await vi.advanceTimersByTimeAsync(50);
expect(mockRedirectToLogin).toHaveBeenCalled();
// Must not attempt to fetch /cp/orgs before auth is established
expect(mockFetch).not.toHaveBeenCalledWith(
expect.stringContaining("/cp/orgs"),
expect.anything()
);
});
});
describe("/orgs — error state", () => {
it("shows error + Retry button when /cp/orgs fails", async () => {
vi.useRealTimers();
try {
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(notOk(500, "db down"));
render(<OrgsPage />);
await new Promise((r) => setTimeout(r, 50));
expect(screen.getByText(/Error:/)).toBeTruthy();
expect(screen.getByRole("button", { name: /retry/i })).toBeTruthy();
} finally {
vi.useFakeTimers();
}
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(notOk(500, "db down"));
render(<OrgsPage />);
await vi.advanceTimersByTimeAsync(50);
expect(screen.getByText(/Error:/)).toBeTruthy();
expect(screen.getByRole("button", { name: /retry/i })).toBeTruthy();
});
});
describe("/orgs — empty list", () => {
it("renders EmptyState with CreateOrgForm when user has zero orgs", async () => {
vi.useRealTimers();
try {
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(okJson({ orgs: [] }));
render(<OrgsPage />);
await new Promise((r) => setTimeout(r, 50));
expect(screen.getByText(/don't have any organizations/i)).toBeTruthy();
expect(screen.getByRole("button", { name: /create organization/i })).toBeTruthy();
} finally {
vi.useFakeTimers();
}
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(okJson({ orgs: [] }));
render(<OrgsPage />);
await vi.advanceTimersByTimeAsync(50);
expect(screen.getByText(/don't have any organizations/i)).toBeTruthy();
expect(screen.getByRole("button", { name: /create organization/i })).toBeTruthy();
});
});
@ -163,160 +150,130 @@ describe("/orgs — CTAs by status", () => {
const session = { userId: "u-1" };
it("running → Open link targets {slug}.moleculesai.app", async () => {
vi.useRealTimers();
try {
mockFetchSession.mockResolvedValue(session);
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-1",
slug: "acme",
name: "Acme",
plan: "pro",
status: "running",
created_at: "",
updated_at: "",
},
],
})
);
render(<OrgsPage />);
await new Promise((r) => setTimeout(r, 50));
const link = screen.getByRole("link", { name: /open/i }) as HTMLAnchorElement;
expect(link.href).toBe("https://acme.moleculesai.app/");
} finally {
vi.useFakeTimers();
}
mockFetchSession.mockResolvedValue(session);
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-1",
slug: "acme",
name: "Acme",
plan: "pro",
status: "running",
created_at: "",
updated_at: "",
},
],
})
);
render(<OrgsPage />);
await vi.advanceTimersByTimeAsync(50);
const link = screen.getByRole("link", { name: /open/i }) as HTMLAnchorElement;
expect(link.href).toBe("https://acme.moleculesai.app/");
});
it("awaiting_payment → Complete payment link to /pricing?org=<slug>", async () => {
vi.useRealTimers();
try {
mockFetchSession.mockResolvedValue(session);
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-2",
slug: "beta-co",
name: "Beta",
plan: "",
status: "awaiting_payment",
created_at: "",
updated_at: "",
},
],
})
);
render(<OrgsPage />);
await new Promise((r) => setTimeout(r, 50));
const link = screen.getByRole("link", {
name: /complete payment/i,
}) as HTMLAnchorElement;
expect(link.getAttribute("href")).toBe("/pricing?org=beta-co");
} finally {
vi.useFakeTimers();
}
mockFetchSession.mockResolvedValue(session);
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-2",
slug: "beta-co",
name: "Beta",
plan: "",
status: "awaiting_payment",
created_at: "",
updated_at: "",
},
],
})
);
render(<OrgsPage />);
await vi.advanceTimersByTimeAsync(50);
const link = screen.getByRole("link", {
name: /complete payment/i,
}) as HTMLAnchorElement;
expect(link.getAttribute("href")).toBe("/pricing?org=beta-co");
});
it("failed → mailto support link", async () => {
vi.useRealTimers();
try {
mockFetchSession.mockResolvedValue(session);
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-3",
slug: "boom",
name: "Boom",
plan: "",
status: "failed",
created_at: "",
updated_at: "",
},
],
})
);
render(<OrgsPage />);
await new Promise((r) => setTimeout(r, 50));
const link = screen.getByRole("link", {
name: /contact support/i,
}) as HTMLAnchorElement;
expect(link.getAttribute("href")).toBe("mailto:support@moleculesai.app");
} finally {
vi.useFakeTimers();
}
mockFetchSession.mockResolvedValue(session);
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-3",
slug: "boom",
name: "Boom",
plan: "",
status: "failed",
created_at: "",
updated_at: "",
},
],
})
);
render(<OrgsPage />);
await vi.advanceTimersByTimeAsync(50);
const link = screen.getByRole("link", {
name: /contact support/i,
}) as HTMLAnchorElement;
expect(link.getAttribute("href")).toBe("mailto:support@moleculesai.app");
});
});
describe("/orgs — post-checkout banner", () => {
it("renders CheckoutBanner when ?checkout=success and scrubs the URL", async () => {
vi.useRealTimers();
try {
setLocation("https://moleculesai.app/orgs?checkout=success");
const replaceState = vi.spyOn(window.history, "replaceState");
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-1",
slug: "acme",
name: "Acme",
plan: "pro",
status: "running",
created_at: "",
updated_at: "",
},
],
})
);
render(<OrgsPage />);
await new Promise((r) => setTimeout(r, 50));
expect(screen.getByText(/Payment confirmed/i)).toBeTruthy();
// URL must be rewritten to drop the ?checkout flag so reload doesn't re-show the banner
expect(replaceState).toHaveBeenCalled();
const callArgs = replaceState.mock.calls[0];
expect(callArgs[2]).toBe("/orgs");
} finally {
vi.useFakeTimers();
}
setLocation("https://moleculesai.app/orgs?checkout=success");
const replaceState = vi.spyOn(window.history, "replaceState");
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-1",
slug: "acme",
name: "Acme",
plan: "pro",
status: "running",
created_at: "",
updated_at: "",
},
],
})
);
render(<OrgsPage />);
await vi.advanceTimersByTimeAsync(50);
expect(screen.getByText(/Payment confirmed/i)).toBeTruthy();
// URL must be rewritten to drop the ?checkout flag so reload doesn't re-show the banner
expect(replaceState).toHaveBeenCalled();
const callArgs = replaceState.mock.calls[0];
expect(callArgs[2]).toBe("/orgs");
});
it("does NOT render CheckoutBanner without ?checkout=success", async () => {
vi.useRealTimers();
try {
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(okJson({ orgs: [] }));
render(<OrgsPage />);
await new Promise((r) => setTimeout(r, 50));
expect(screen.getByText(/don't have any organizations/i)).toBeTruthy();
expect(screen.queryByText(/Payment confirmed/i)).toBeNull();
} finally {
vi.useFakeTimers();
}
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(okJson({ orgs: [] }));
render(<OrgsPage />);
await vi.advanceTimersByTimeAsync(50);
expect(screen.getByText(/don't have any organizations/i)).toBeTruthy();
expect(screen.queryByText(/Payment confirmed/i)).toBeNull();
});
});
describe("/orgs — fetch includes credentials + timeout signal", () => {
it("/cp/orgs fetch is called with credentials:include and an AbortSignal", async () => {
vi.useRealTimers();
try {
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(okJson({ orgs: [] }));
render(<OrgsPage />);
await new Promise((r) => setTimeout(r, 50));
const callArgs = mockFetch.mock.calls.find((c) =>
String(c[0]).includes("/cp/orgs")
);
expect(callArgs).toBeDefined();
expect(callArgs![1]).toMatchObject({ credentials: "include" });
expect(callArgs![1].signal).toBeInstanceOf(AbortSignal);
} finally {
vi.useFakeTimers();
}
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(okJson({ orgs: [] }));
render(<OrgsPage />);
await vi.advanceTimersByTimeAsync(50);
const callArgs = mockFetch.mock.calls.find((c) =>
String(c[0]).includes("/cp/orgs")
);
expect(callArgs).toBeDefined();
expect(callArgs![1]).toMatchObject({ credentials: "include" });
expect(callArgs![1].signal).toBeInstanceOf(AbortSignal);
});
});
@ -328,113 +285,98 @@ describe("/orgs — fetch includes credentials + timeout signal", () => {
describe("/orgs — polling of in-flight orgs", () => {
it("schedules a 5s refetch when at least one org is provisioning", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
try {
mockFetchSession.mockResolvedValue({ userId: "u-1" });
// First /cp/orgs returns provisioning orgs so a poll is scheduled.
// Second returns running orgs to observe the state flip stop re-scheduling.
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-1",
slug: "acme",
name: "Acme",
plan: "pro",
status: "provisioning",
created_at: "",
updated_at: "",
},
],
})
);
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-1",
slug: "acme",
name: "Acme",
plan: "pro",
status: "running",
created_at: "",
updated_at: "",
},
],
})
);
// beforeEach already set up fake timers; advance time to fire the 5s poll.
mockFetchSession.mockResolvedValue({ userId: "u-1" });
// First /cp/orgs returns provisioning orgs so a poll is scheduled.
// Second returns running orgs to observe the state flip stop re-scheduling.
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-1",
slug: "acme",
name: "Acme",
plan: "pro",
status: "provisioning",
created_at: "",
updated_at: "",
},
],
})
);
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-1",
slug: "acme",
name: "Acme",
plan: "pro",
status: "running",
created_at: "",
updated_at: "",
},
],
})
);
render(<OrgsPage />);
// Auto-advancing time fires the 5s poll while we await
await vi.advanceTimersByTimeAsync(5_100);
// First /cp/orgs + second poll /cp/orgs
expect(mockFetch).toHaveBeenCalledTimes(2);
} finally {
vi.useRealTimers();
}
render(<OrgsPage />);
await vi.advanceTimersByTimeAsync(5_100);
// First /cp/orgs + second poll /cp/orgs
expect(mockFetch).toHaveBeenCalledTimes(2);
});
it("does NOT schedule a refetch when all orgs are running", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
try {
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-1",
slug: "acme",
name: "Acme",
plan: "pro",
status: "running",
created_at: "",
updated_at: "",
},
],
})
);
render(<OrgsPage />);
// Auto-advancing time — no poll fires (stillMoving = false)
await vi.advanceTimersByTimeAsync(10_000);
// Only the initial /cp/orgs
expect(mockFetch).toHaveBeenCalledTimes(1);
} finally {
vi.useRealTimers();
}
// beforeEach already set up fake timers.
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-1",
slug: "acme",
name: "Acme",
plan: "pro",
status: "running",
created_at: "",
updated_at: "",
},
],
})
);
render(<OrgsPage />);
await vi.advanceTimersByTimeAsync(10_000);
// Only the initial /cp/orgs — no poll fires (stillMoving = false)
expect(mockFetch).toHaveBeenCalledTimes(1);
});
it("clears the poll timer on unmount — no fetch after unmount", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
try {
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-1",
slug: "acme",
name: "Acme",
plan: "pro",
status: "awaiting_payment",
created_at: "",
updated_at: "",
},
],
})
);
const { unmount } = render(<OrgsPage />);
// With shouldAdvanceTime, effects are deferred — flush microtasks first
// so the effect runs and schedules the 5s poll before we unmount.
await vi.advanceTimersByTimeAsync(0);
// Now the effect has run (scheduling the poll) but not the poll itself
expect(mockFetch).toHaveBeenCalledTimes(1);
// Tear down — cleanup must clear the 5s timer
unmount();
// Advance timers — the cleanup cleared the 5s timer, so no poll fires
await vi.advanceTimersByTimeAsync(10_000);
expect(mockFetch).toHaveBeenCalledTimes(1);
} finally {
vi.useRealTimers();
}
// beforeEach already set up fake timers.
mockFetchSession.mockResolvedValue({ userId: "u-1" });
mockFetch.mockResolvedValueOnce(
okJson({
orgs: [
{
id: "o-1",
slug: "acme",
name: "Acme",
plan: "pro",
status: "awaiting_payment",
created_at: "",
updated_at: "",
},
],
})
);
const { unmount } = render(<OrgsPage />);
// Flush microtasks so the effect runs and schedules the 5s poll before we unmount.
await vi.advanceTimersByTimeAsync(0);
// Now the effect has run (scheduling the poll) but not the poll itself
expect(mockFetch).toHaveBeenCalledTimes(1);
// Tear down — cleanup must clear the 5s timer
unmount();
// Advance timers — the cleanup cleared the 5s timer, so no poll fires
await vi.advanceTimersByTimeAsync(10_000);
expect(mockFetch).toHaveBeenCalledTimes(1);
});
});

View File

@ -117,6 +117,12 @@ function CanvasInner() {
}
}, [pendingDelete, setPendingDelete, removeNode]);
// Cascade guard: include child count in the warning message when the workspace
// has children, so the user understands the blast radius before clicking Delete All.
const cascadeMessage = pendingDelete?.hasChildren
? `⚠️ Deleting "${pendingDelete.name}" will permanently delete all child workspaces and their data. This cannot be undone.`
: null;
const onNodeDragStop: OnNodeDrag<Node<WorkspaceNodeData>> = useCallback(
(_event, node) => {
const { dragOverNodeId, nodes: allNodes } = useCanvasStore.getState();
@ -381,9 +387,11 @@ function CanvasInner() {
{/* Confirmation dialog for workspace delete — driven by store */}
<ConfirmDialog
open={!!pendingDelete}
title="Delete Workspace"
message={`Permanently delete "${pendingDelete?.name}"? This will stop the container and remove all configuration. This action cannot be undone.`}
confirmLabel="Delete"
title={pendingDelete?.hasChildren ? "Delete Workspace and Children" : "Delete Workspace"}
message={pendingDelete?.hasChildren
? `⚠️ Deleting "${pendingDelete?.name}" will permanently delete all of its child workspaces and their data. This cannot be undone.`
: `Permanently delete "${pendingDelete?.name}"? This will stop the container and remove all configuration. This action cannot be undone.`}
confirmLabel={pendingDelete?.hasChildren ? "Delete All" : "Delete"}
confirmVariant="danger"
onConfirm={confirmDelete}
onCancel={() => setPendingDelete(null)}

View File

@ -164,7 +164,7 @@ export function ContextMenu() {
// it survives ContextMenu unmount. Closing the menu here avoids the
// prior race where the portal dialog's Confirm click was treated as
// "outside" by the menu's outside-click handler.
setPendingDelete({ id: contextMenu.nodeId, name: contextMenu.nodeData.name });
setPendingDelete({ id: contextMenu.nodeId, name: contextMenu.nodeData.name, hasChildren });
closeContextMenu();
}, [contextMenu, setPendingDelete, closeContextMenu]);

View File

@ -98,7 +98,8 @@ export function WorkspaceUsage({ workspaceId }: WorkspaceUsageProps) {
);
}
function formatPeriod(start: string, end: string): string {
function formatPeriod(start: string | undefined, end: string | undefined): string {
if (!start || !end) return "—";
const fmt = (s: string) =>
new Date(s).toLocaleDateString(undefined, {
month: "short",

View File

@ -189,7 +189,7 @@ export function DetailsTab({ workspaceId, data }: Props) {
<Row label="Parent" value={data.parentId || "root"} mono />
<Row label="Active Tasks" value={String(data.activeTasks)} />
{data.status === "degraded" && (
<Row label="Error Rate" value={`${(data.lastErrorRate * 100).toFixed(0)}%`} />
<Row label="Error Rate" value={`${((data.lastErrorRate ?? 0) * 100).toFixed(0)}%`} />
)}
{isRestartable && (
<div className="pt-2">

View File

@ -72,8 +72,8 @@ interface CanvasState {
// handler: clicking Confirm registered as "outside", closed the menu, and
// unmounted the dialog before its onClick fired. Hoisting the state fixes
// that — see fix/context-menu-delete-race.
pendingDelete: { id: string; name: string } | null;
setPendingDelete: (v: { id: string; name: string } | null) => void;
pendingDelete: { id: string; name: string; hasChildren: boolean } | null;
setPendingDelete: (v: { id: string; name: string; hasChildren: boolean } | null) => void;
searchOpen: boolean;
setSearchOpen: (open: boolean) => void;
viewport: { x: number; y: number; zoom: number };

View File

@ -131,6 +131,133 @@ That design lets the platform improve the backend memory boundary without forcin
This matters because Molecule AI wants hierarchy to remain operationally real, not cosmetic.
## Remote Agent Registration (External Workspaces)
External workspaces run outside the platform's Docker infrastructure — on your laptop, a cloud VM, an on-prem server, or a CI/CD agent. They register via the platform API and send heartbeats to stay live on the canvas.
### How it differs from Docker workspaces
| | Docker workspace | External workspace |
|---|---|---|
| Provisioning | Platform spins up a container | You provide the machine; platform just tracks it |
| Liveness | Docker health sweep | Heartbeat TTL (90s offline threshold) |
| Registration | Automatic at container start | Manual: `POST /workspaces` + `POST /registry/register` |
| Token | Inherited from container env | Minted at registration, shown once |
| Secrets | Baked in image or env var | Pulled from platform at boot via `GET /workspaces/:id/secrets/values` |
### Registration flow
**1. Create the workspace:**
```bash
curl -X POST http://localhost:8080/workspaces \
-H "Authorization: Bearer <admin-token>" \
-H "Content-Type: application/json" \
-d '{
"name": "my-remote-agent",
"runtime": "external",
"external": true,
"url": "https://my-agent.example.com/a2a",
"parent_id": "ws-pm-123"
}'
```
Returns `{ "id": "ws-xyz", "platform_url": "http://localhost:8080" }`.
**2. Register the agent with the platform:**
```bash
curl -X POST http://localhost:8080/registry/register \
-H "Content-Type: application/json" \
-H "Authorization: Bearer <admin-token>" \
-d '{
"workspace_id": "ws-xyz",
"name": "my-remote-agent",
"description": "Runs on a cloud VM in us-east-1",
"skills": ["research", "summarization"],
"url": "https://my-agent.example.com/a2a"
}'
```
The platform returns a 256-bit bearer token — save it, it is shown only once.
**3. Pull secrets at boot:**
```bash
curl http://localhost:8080/workspaces/ws-xyz/secrets/values \
-H "Authorization: Bearer <your-token>"
```
Returns `{ "ANTHROPIC_API_KEY": "...", "OPENAI_API_KEY": "..." }`. No credentials baked into images or env files.
**4. Send heartbeats every 30 seconds:**
```bash
curl -X POST http://localhost:8080/registry/heartbeat \
-H "Authorization: Bearer <your-token>" \
-H "Content-Type: application/json" \
-d '{
"workspace_id": "ws-xyz",
"status": "online",
"task": "analyzing Q1 sales data",
"error_rate": 0.0
}'
```
If the platform misses two consecutive heartbeats, the workspace shows offline on the canvas.
**5. A2A with `X-Workspace-ID` header:**
When sending A2A messages to sibling or parent workspaces, include the header so the platform can verify mutual auth:
```bash
curl -X POST http://localhost:8080/workspaces/ws-pm-123/a2a \
-H "Authorization: Bearer <your-token>" \
-H "X-Workspace-ID: ws-xyz" \
-H "Content-Type: application/json" \
-d '{"type": "status_report", "payload": {...}}'
```
### Behind NAT — Cloudflare Tunnel / ngrok
If the agent machine has no public IP, use an outbound tunnel:
```bash
# ngrok
ngrok http 8000 --url https://my-agent.ngrok.io
# Cloudflare Tunnel
cloudflared tunnel run --token <token>
# Register the tunnel URL (not localhost)
curl -X POST http://localhost:8080/registry/update-card \
-H "Authorization: Bearer <your-token>" \
-d '{"workspace_id": "ws-xyz", "url": "https://my-agent.ngrok.io/a2a"}'
```
The agent initiates the outbound WebSocket to the platform — no inbound ports need to be opened on the firewall.
### Revocation and re-registration
To revoke and re-register:
```bash
# Delete the workspace
curl -X DELETE http://localhost:8080/workspaces/ws-xyz \
-H "Authorization: Bearer <admin-token>"
# Create fresh (new workspace_id, new token)
```
Re-registration with the same `workspace_id` does not issue a new token — use the token saved from first registration.
### Related docs
- Full step-by-step: [External Agent Registration Guide](../guides/external-agent-registration.md)
- Tutorial with CI/CD examples: [Register a Remote Agent](../tutorials/register-remote-agent.md)
- API reference: [Registry and Heartbeat](../api-protocol/registry-and-heartbeat.md)
## A2A And Registration
Each workspace exposes an A2A server, builds an Agent Card, and registers with the platform. The platform is used for:

View File

@ -90,6 +90,125 @@ What can you help me with in this workspace?
Responses are delivered through the platform A2A proxy and pushed back to the canvas through WebSocket events, with polling kept only as recovery fallback.
---
## Path 2: Remote Agent (run anywhere)
A remote agent runs on your own machine or a cloud VM — no Docker on the platform side. The agent registers with the platform via API, pulls its secrets at boot, and sends heartbeats to stay live on the canvas.
**Use this path if you:**
- want to run an agent on your laptop for local development
- need an agent on a machine with specific hardware (GPU, on-prem)
- have a data-residency requirement that keeps agent compute off the platform's infra
### Step 0: Prerequisites
- Python 3.10+ and `pip install molecule-agent-sdk`
- Outbound HTTPS access from the agent machine to `https://<your-org>.moleculesai.app`
- A platform admin token (from the canvas, under `Config → Secrets & API Keys → Global`)
### Step 1: Create the workspace
```bash
PLATFORM="https://acme.moleculesai.app"
ADMIN_TOKEN="your-admin-token"
curl -X POST "$PLATFORM/workspaces" \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "my-remote-agent",
"runtime": "external",
"external": true,
"url": "https://my-agent.example.com/a2a",
"parent_id": null
}'
```
Save the returned `workspace_id`.
### Step 2: Register the agent
```bash
WORKSPACE_ID="ws-xyz"
curl -X POST "$PLATFORM/registry/register" \
-H "Authorization: Bearer $ADMIN_TOKEN" \
-H "Content-Type: application/json" \
-d "{
\"workspace_id\": \"$WORKSPACE_ID\",
\"name\": \"my-remote-agent\",
\"description\": \"Runs on a cloud VM in us-east-1\",
\"skills\": [\"research\"],
\"url\": \"https://my-agent.example.com/a2a\"
}"
```
The response includes your bearer token — save it now. It is shown only once.
### Step 3: Pull secrets at boot
```bash
AGENT_TOKEN="the-token-from-step-2"
curl "$PLATFORM/workspaces/$WORKSPACE_ID/secrets/values" \
-H "Authorization: Bearer $AGENT_TOKEN"
```
Store the returned secrets in your environment before starting the agent.
### Step 4: Run the agent
```bash
molecule-agent run \
--workspace-id "$WORKSPACE_ID" \
--platform-url "$PLATFORM" \
--agent-token "$AGENT_TOKEN"
```
The agent connects to the platform, appears on the canvas within ~10 seconds, and starts processing tasks.
### Step 5: Configure the agent
Edit `config.yaml` in the agent's working directory:
```yaml
name: my-remote-agent
role: researcher
runtime: python
platform_url: https://acme.moleculesai.app
a2a:
port: 8000
```
### Step 6: Inspect and iterate
The agent appears on the canvas as a workspace card with a **REMOTE** badge. Open the chat tab, send a task, and watch it work. To iterate, stop and restart the agent — it re-registers with the same `workspace_id` and token.
### Behind NAT (no public IP)
If the agent machine has no public IP, use a tunnel:
```bash
# Terminal 1: start a tunnel
ngrok http 8000 --url https://my-agent.ngrok.io
# Update the registered URL
curl -X POST "$PLATFORM/registry/update-card" \
-H "Authorization: Bearer $AGENT_TOKEN" \
-H "Content-Type: application/json" \
-d '{"workspace_id": "'"$WORKSPACE_ID"'", "url": "https://my-agent.ngrok.io/a2a"}'
```
No inbound firewall rules needed — the agent initiates the outbound WebSocket connection.
### Next steps
- [Register a Remote Agent](../tutorials/register-remote-agent.md) — full tutorial with CI/CD examples
- [External Agent Registration Guide](../guides/external-agent-registration.md) — detailed reference
- [Remote Workspaces FAQ](../guides/remote-workspaces-faq.md) — common questions
## What To Try Next
- **Expand to a team:** right-click a workspace and choose `Expand to Team`.

View File

@ -64,7 +64,7 @@ func refreshEnvFromCP() error {
if err != nil {
return fmt.Errorf("do request: %w", err)
}
defer func() { _ = $1 }()
defer func() { _ = resp.Body.Close() }()
// 64 KiB cap — the CP only returns small JSON blobs here. An
// unbounded read would be weaponizable if a compromised upstream

View File

@ -288,7 +288,7 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
if err != nil {
return h.handleA2ADispatchError(ctx, workspaceID, callerID, body, a2aMethod, err, durationMs, logActivity)
}
defer func() { _ = $1 }()
defer func() { _ = resp.Body.Close() }()
// Read agent response (capped at 10MB).
// #689: Do() succeeded, which means the target received the request and sent

View File

@ -163,7 +163,7 @@ func generateAppInstallationToken() (string, time.Time, error) {
if err != nil {
return "", time.Time{}, err
}
defer func() { _ = $1 }()
defer func() { _ = resp.Body.Close() }()
var result struct {
Token string `json:"token"`
ExpiresAt time.Time `json:"expires_at"`

View File

@ -295,6 +295,13 @@ func (h *TemplatesHandler) ReadFile(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"error": "file not found (container offline, no template)"})
return
}
// validateRelPath is already called above (line 260) for the container path,
// but the fallback below uses filePath directly in filepath.Join without
// any sanitization. Re-validate before the host-side read to close the gap.
if err := validateRelPath(filePath); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid path"})
return
}
fullPath := filepath.Join(templateDir, filePath)
data, err := os.ReadFile(fullPath)
if err != nil {

View File

@ -49,7 +49,7 @@ func (h *TracesHandler) List(c *gin.Context) {
c.JSON(http.StatusOK, []interface{}{})
return
}
defer func() { _ = $1 }()
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
c.Data(resp.StatusCode, "application/json", body)

View File

@ -111,7 +111,7 @@ func (h *TranscriptHandler) Get(c *gin.Context) {
c.JSON(http.StatusBadGateway, gin.H{"error": "workspace unreachable"})
return
}
defer func() { _ = $1 }()
defer func() { _ = resp.Body.Close() }()
// Cap at 1 MB so a giant transcript doesn't melt the canvas.
body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))

View File

@ -582,11 +582,7 @@ func (h *WorkspaceHandler) provisionWorkspaceCP(workspaceID, templatePath string
if err := h.envMutators.Run(ctx, workspaceID, envVars); err != nil {
log.Printf("CPProvisioner: env mutator failed for %s: %v", workspaceID, err)
db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'failed', last_sample_error = $2, updated_at = now() WHERE id = $1`,
<<<<<<< HEAD
workspaceID, "plugin env mutator chain failed")
=======
workspaceID, "provisioning failed")
>>>>>>> f9fff93 (fix(security): replace err.Error() leaks with prod-safe messages (#1206))
workspaceID, "plugin env mutator chain failed")
return
}

View File

@ -125,10 +125,15 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) {
template = findTemplateByName(h.configsDir, wsName)
}
if template != "" {
candidatePath := filepath.Join(h.configsDir, template)
if _, err := os.Stat(candidatePath); err == nil {
candidatePath, resolveErr := resolveInsideRoot(h.configsDir, template)
if resolveErr != nil {
log.Printf("Restart: invalid template %q: %v — proceeding without it", template, resolveErr)
template = "" // clear so findTemplateByName fallback fires
} else if _, err := os.Stat(candidatePath); err == nil {
templatePath = candidatePath
configLabel = template
} else {
log.Printf("Restart: template %q dir not found — proceeding without it", template)
}
}

View File

@ -207,7 +207,7 @@ func verifiedCPSession(cookieHeader string) (valid, presented bool) {
// for the negative-TTL window. Next request retries.
return false, true
}
defer func() { _ = $1 }()
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
sessionCachePut(key, false)

View File

@ -60,9 +60,10 @@ func WorkspaceAuth(database *sql.DB) gin.HandlerFunc {
// power surface as ADMIN_TOKEN but named, revocable, audited.
// Check before per-workspace token so an org-key presenter
// doesn't hit the narrower ValidateToken failure path.
if id, prefix, err := orgtoken.Validate(ctx, database, tok); err == nil {
if id, prefix, orgID, err := orgtoken.Validate(ctx, database, tok); err == nil {
c.Set("org_token_id", id)
c.Set("org_token_prefix", prefix)
c.Set("org_id", orgID)
c.Next()
return
} else if !errors.Is(err, orgtoken.ErrInvalidToken) {
@ -181,20 +182,10 @@ func AdminAuth(database *sql.DB) gin.HandlerFunc {
// index with revoked_at IS NULL) + an async last_used_at
// bump. Cost per request: one SELECT + one UPDATE, both
// hitting the same narrow partial index.
if id, prefix, err := orgtoken.Validate(ctx, database, tok); err == nil {
if id, prefix, orgID, err := orgtoken.Validate(ctx, database, tok); err == nil {
c.Set("org_token_id", id)
c.Set("org_token_prefix", prefix)
// F1097: also set org_id from the token's org_id column so that
// requireCallerOwnsOrg can look it up via c.Get("org_id").
// Tokens created before PR #1210 have org_id=NULL — for those,
// the SELECT returns nil and no org_id is set, which is correct:
// requireCallerOwnsOrg already denies access for nil org_id.
var orgID *string
if err := database.QueryRowContext(ctx,
`SELECT org_id::text FROM org_api_tokens WHERE id = $1`, id,
).Scan(&orgID); err == nil && orgID != nil && *orgID != "" {
c.Set("org_id", *orgID)
}
c.Set("org_id", orgID)
c.Next()
return
} else if !errors.Is(err, orgtoken.ErrInvalidToken) {

View File

@ -0,0 +1,326 @@
package middleware
import (
"crypto/sha256"
"database/sql"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// orgTokenValidateQuery is matched for orgtoken.Validate in both
// WorkspaceAuth and AdminAuth middleware paths. The query selects
// id and prefix from org_api_tokens where token_hash matches and
// revoked_at IS NULL.
const orgTokenValidateQuery = "SELECT id, prefix FROM org_api_tokens WHERE token_hash"
func TestWorkspaceAuth_ValidOrgToken_SetsOrgIDContext(t *testing.T) {
// F1097 (#1218): org tokens validated via WorkspaceAuth must have
// org_id populated on the Gin context so downstream handlers can
// enforce org isolation without a per-request DB round-trip.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer mockDB.Close()
orgToken := "tok_test_org_token_abc123"
tokenHash := sha256.Sum256([]byte(orgToken))
// orgtoken.Validate — returns id + prefix (no org_id column yet).
mock.ExpectQuery(orgTokenValidateQuery).
WithArgs(tokenHash[:]).
WillReturnRows(sqlmock.NewRows([]string{"id", "prefix"}).
AddRow("tok-org-abc", "tok_test"))
// F1097: secondary SELECT for org_id from org_api_tokens.
mock.ExpectQuery("SELECT org_id::text FROM org_api_tokens WHERE id").
WithArgs("tok-org-abc").
WillReturnRows(sqlmock.NewRows([]string{"org_id"}).
AddRow("00000000-0000-0000-0000-000000000001"))
r := gin.New()
r.GET("/workspaces/:id/secrets", WorkspaceAuth(mockDB), func(c *gin.Context) {
v, exists := c.Get("org_id")
if !exists {
t.Errorf("org_id not set on context for valid org token")
c.JSON(http.StatusOK, gin.H{"ok": true})
return
}
c.JSON(http.StatusOK, gin.H{"org_id": v})
})
w := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, "/workspaces/ws-1/secrets", nil)
req.Header.Set("Authorization", "Bearer "+orgToken)
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
// org_id must appear in the JSON response body.
body := w.Body.String()
if body == "" || body == "{}" {
t.Errorf("org_id missing from response body: %s", body)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceAuth_ValidOrgToken_OrgIDNULL_DoesNotSetContext(t *testing.T) {
// F1097: pre-migration tokens (org_id=NULL) must NOT set org_id on context —
// requireCallerOwnsOrg already handles nil by denying by default, so a
// nil context key is the correct signal.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer mockDB.Close()
orgToken := "tok_old_token_no_org"
tokenHash := sha256.Sum256([]byte(orgToken))
// orgtoken.Validate.
mock.ExpectQuery(orgTokenValidateQuery).
WithArgs(tokenHash[:]).
WillReturnRows(sqlmock.NewRows([]string{"id", "prefix"}).
AddRow("tok-old-xyz", "tok_old_"))
// F1097: org_id SELECT returns NULL — context key must NOT be set.
mock.ExpectQuery("SELECT org_id::text FROM org_api_tokens WHERE id").
WithArgs("tok-old-xyz").
WillReturnRows(sqlmock.NewRows([]string{"org_id"}).AddRow(nil))
r := gin.New()
r.GET("/workspaces/:id/secrets", WorkspaceAuth(mockDB), func(c *gin.Context) {
_, exists := c.Get("org_id")
if exists {
t.Errorf("org_id should not be set on context for NULL org_id token")
}
c.JSON(http.StatusOK, gin.H{"ok": true})
})
w := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, "/workspaces/ws-1/secrets", nil)
req.Header.Set("Authorization", "Bearer "+orgToken)
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestAdminAuth_ValidOrgToken_SetsOrgIDContext(t *testing.T) {
// F1097 (#1218): AdminAuth path (used for /org/* routes) must also
// populate org_id so org-token callers can access their own org's
// routes without a separate OrgIDByTokenID call per request.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer mockDB.Close()
orgToken := "tok_admin_path_org_token"
tokenHash := sha256.Sum256([]byte(orgToken))
// HasAnyLiveTokenGlobal: at least one workspace auth token exists globally
// (bootstrap gate — if no tokens exist, AdminAuth grants access to all).
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
// orgtoken.Validate via AdminAuth — returns id + prefix.
mock.ExpectQuery(orgTokenValidateQuery).
WithArgs(tokenHash[:]).
WillReturnRows(sqlmock.NewRows([]string{"id", "prefix"}).
AddRow("tok-admin-org", "tok_adm_"))
// F1097: secondary SELECT for org_id.
mock.ExpectQuery("SELECT org_id::text FROM org_api_tokens WHERE id").
WithArgs("tok-admin-org").
WillReturnRows(sqlmock.NewRows([]string{"org_id"}).
AddRow("00000000-0000-0000-0000-000000000042"))
r := gin.New()
r.GET("/admin/org-settings", AdminAuth(mockDB), func(c *gin.Context) {
v, exists := c.Get("org_id")
if !exists {
t.Errorf("org_id not set on context for valid org token via AdminAuth")
c.JSON(http.StatusOK, gin.H{"ok": true})
return
}
c.JSON(http.StatusOK, gin.H{"org_id": v})
})
w := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, "/admin/org-settings", nil)
req.Header.Set("Authorization", "Bearer "+orgToken)
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestAdminAuth_ValidOrgToken_OrgIDNULL_DoesNotSetContext(t *testing.T) {
// F1097: AdminAuth path for pre-migration org token (org_id=NULL) must
// NOT set org_id on context. Tokens minted before F1097 fix have
// org_id=NULL — requireCallerOwnsOrg already denies these by default.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer mockDB.Close()
orgToken := "tok_old_admin_token"
tokenHash := sha256.Sum256([]byte(orgToken))
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
mock.ExpectQuery(orgTokenValidateQuery).
WithArgs(tokenHash[:]).
WillReturnRows(sqlmock.NewRows([]string{"id", "prefix"}).
AddRow("tok-old-admin", "tok_old_"))
// F1097: org_id is NULL — no context key set.
mock.ExpectQuery("SELECT org_id::text FROM org_api_tokens WHERE id").
WithArgs("tok-old-admin").
WillReturnRows(sqlmock.NewRows([]string{"org_id"}).AddRow(nil))
r := gin.New()
r.GET("/admin/org-settings", AdminAuth(mockDB), func(c *gin.Context) {
_, exists := c.Get("org_id")
if exists {
t.Errorf("org_id should not be set for NULL org_id token via AdminAuth")
}
c.JSON(http.StatusOK, gin.H{"ok": true})
})
w := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, "/admin/org-settings", nil)
req.Header.Set("Authorization", "Bearer "+orgToken)
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceAuth_OrgToken_DBRowScanError_DoesNotPanic(t *testing.T) {
// F1097: if the org_id SELECT returns an unexpected column count or type,
// the deferred suppress-pattern must not crash — the token is still valid,
// org_id is simply not set (token is denied by requireCallerOwnsOrg at use-time).
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer mockDB.Close()
orgToken := "tok_token_ok"
tokenHash := sha256.Sum256([]byte(orgToken))
mock.ExpectQuery(orgTokenValidateQuery).
WithArgs(tokenHash[:]).
WillReturnRows(sqlmock.NewRows([]string{"id", "prefix"}).
AddRow("tok-ok", "tok_tok_"))
// org_id SELECT fails — sqlmock returns ErrRowNotFound when columns don't match.
// We set up an impossible regex to force a mismatch.
mock.ExpectQuery("SELECT org_id::text FROM org_api_tokens WHERE id").
WithArgs("tok-ok").
WillReturnError(sql.ErrNoRows)
r := gin.New()
r.GET("/workspaces/:id/secrets", WorkspaceAuth(mockDB), func(c *gin.Context) {
// org_id key may or may not be set — either is acceptable here.
// The important thing is we don't panic.
c.JSON(http.StatusOK, gin.H{"ok": true})
})
w := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, "/workspaces/ws-1/secrets", nil)
req.Header.Set("Authorization", "Bearer "+orgToken)
r.ServeHTTP(w, req)
// Token is still accepted — only the org_id enrichment fails.
if w.Code != http.StatusOK {
t.Errorf("expected 200 despite org_id SELECT error, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestWorkspaceAuth_OrgToken_SetsAllContextKeys verifies the complete set of
// context keys set by WorkspaceAuth for a valid org token (F1097 coverage).
func TestWorkspaceAuth_OrgToken_SetsAllContextKeys(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer mockDB.Close()
orgToken := "tok_full_context_token"
tokenHash := sha256.Sum256([]byte(orgToken))
expectedOrgID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
mock.ExpectQuery(orgTokenValidateQuery).
WithArgs(tokenHash[:]).
WillReturnRows(sqlmock.NewRows([]string{"id", "prefix"}).
AddRow("tok-full", "tok_fu_"))
mock.ExpectQuery("SELECT org_id::text FROM org_api_tokens WHERE id").
WithArgs("tok-full").
WillReturnRows(sqlmock.NewRows([]string{"org_id"}).AddRow(expectedOrgID))
r := gin.New()
r.GET("/workspaces/:id/secrets", WorkspaceAuth(mockDB), func(c *gin.Context) {
id, ok := c.Get("org_token_id")
if !ok {
t.Errorf("org_token_id not set")
} else if id != "tok-full" {
t.Errorf("org_token_id: got %v, want tok-full", id)
}
prefix, ok := c.Get("org_token_prefix")
if !ok {
t.Errorf("org_token_prefix not set")
} else if prefix != "tok_fu_" {
t.Errorf("org_token_prefix: got %v, want tok_fu_", prefix)
}
orgID, ok := c.Get("org_id")
if !ok {
t.Errorf("org_id not set")
} else if orgID != expectedOrgID {
t.Errorf("org_id: got %v, want %s", orgID, expectedOrgID)
}
c.JSON(http.StatusOK, gin.H{"ok": true})
})
w := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, "/workspaces/ws-1/secrets", nil)
req.Header.Set("Authorization", "Bearer "+orgToken)
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}

View File

@ -94,34 +94,45 @@ func Issue(ctx context.Context, db *sql.DB, name, createdBy, orgID string) (plai
// Validate looks up a presented bearer, returns ErrInvalidToken on
// any mismatch (bad bytes, revoked, deleted). On success, updates
// last_used_at best-effort (the hot path — failure to update doesn't
// fail the request) and returns the token id + display prefix for
// audit logging.
// fail the request) and returns the token id + display prefix + org_id
// for audit logging and org isolation.
//
// Returning the prefix alongside the id lets callers produce audit
// strings that match what users see in the UI (the plaintext prefix,
// not the UUID). Keeps the "who did what" trail visually
// correlatable to the revoke button in the token list.
func Validate(ctx context.Context, db *sql.DB, plaintext string) (id, prefix string, err error) {
//
// The org_id is the workspace UUID of the org that owns this token.
// May be empty for pre-migration tokens minted before #1212. Callers
// that need org isolation should use requireCallerOwnsOrg (which does
// a second lookup) rather than trusting an empty org_id here — this
// avoids a breaking change to the Validate interface while still
// populating the Gin context for callers that don't need it.
func Validate(ctx context.Context, db *sql.DB, plaintext string) (id, prefix, orgID string, err error) {
if plaintext == "" {
return "", "", ErrInvalidToken
return "", "", "", ErrInvalidToken
}
hash := sha256.Sum256([]byte(plaintext))
var orgIDNull sql.NullString
queryErr := db.QueryRowContext(ctx, `
SELECT id, prefix FROM org_api_tokens
SELECT id, prefix, org_id FROM org_api_tokens
WHERE token_hash = $1 AND revoked_at IS NULL
`, hash[:]).Scan(&id, &prefix)
`, hash[:]).Scan(&id, &prefix, &orgIDNull)
if queryErr != nil {
// Collapse all failure shapes into ErrInvalidToken so the
// caller can't accidentally leak "row exists but revoked" vs
// "row never existed" via response shape.
return "", "", ErrInvalidToken
return "", "", "", ErrInvalidToken
}
if orgIDNull.Valid {
orgID = orgIDNull.String
}
// Best-effort last_used_at bump. Failure here is acceptable — the
// request is already authenticated; we don't want a transient DB
// blip to flip a 200 into a 500.
_, _ = db.ExecContext(ctx,
`UPDATE org_api_tokens SET last_used_at = now() WHERE id = $1`, id)
return id, prefix, nil
return id, prefix, orgID, nil
}
// List returns live (non-revoked) tokens newest-first. Safe to

View File

@ -129,7 +129,7 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
if err != nil {
return "", fmt.Errorf("cp provisioner: send: %w", err)
}
defer func() { _ = $1 }()
defer func() { _ = resp.Body.Close() }()
// Cap body read at 64 KiB — the CP only ever returns small JSON
// responses; an unbounded read could be weaponized into log-flood
@ -199,7 +199,7 @@ func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool
if err != nil {
return true, fmt.Errorf("cp provisioner: status: %w", err)
}
defer func() { _ = $1 }()
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
// Don't leak the body — upstream errors may echo headers.
return true, fmt.Errorf("cp provisioner: status: unexpected %d", resp.StatusCode)
@ -231,7 +231,7 @@ func (p *CPProvisioner) GetConsoleOutput(ctx context.Context, workspaceID string
if err != nil {
return "", fmt.Errorf("cp provisioner: console: %w", err)
}
defer func() { _ = $1 }()
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return "", fmt.Errorf("cp provisioner: console: unexpected %d", resp.StatusCode)
}

View File

@ -127,9 +127,15 @@ func sweepStuckProvisioning(ctx context.Context, emitter ProvisionTimeoutEmitter
continue
}
log.Printf("Provision-timeout sweep: %s stuck in provisioning > %s — marked failed", id, timeout)
if emitErr := emitter.RecordAndBroadcast(ctx, "WORKSPACE_PROVISION_TIMEOUT", id, map[string]interface{}{
"error": msg,
"timeout_secs": timeoutSec,
// Emit as WORKSPACE_PROVISION_FAILED, not _TIMEOUT, because the
// canvas event handler only flips node state on the _FAILED case.
// A separate event type was considered but the UI reaction is
// identical either way — operators who need to distinguish can
// tell from the `source` payload field.
if emitErr := emitter.RecordAndBroadcast(ctx, "WORKSPACE_PROVISION_FAILED", id, map[string]interface{}{
"error": msg,
"timeout_secs": timeoutSec,
"source": "provision_timeout_sweep",
}); emitErr != nil {
log.Printf("Provision-timeout sweep: broadcast failed for %s: %v", id, emitErr)
}

View File

@ -58,8 +58,8 @@ func TestSweepStuckProvisioning_FlipsOverdue(t *testing.T) {
if emit.count() != 1 {
t.Fatalf("expected 1 event, got %d", emit.count())
}
if emit.events[0].Type != "WORKSPACE_PROVISION_TIMEOUT" {
t.Errorf("event type = %q, want WORKSPACE_PROVISION_TIMEOUT", emit.events[0].Type)
if emit.events[0].Type != "WORKSPACE_PROVISION_FAILED" {
t.Errorf("event type = %q, want WORKSPACE_PROVISION_FAILED", emit.events[0].Type)
}
if emit.events[0].WorkspaceID != "ws-stuck" {
t.Errorf("workspace id = %q, want ws-stuck", emit.events[0].WorkspaceID)
@ -72,7 +72,7 @@ func TestSweepStuckProvisioning_FlipsOverdue(t *testing.T) {
// TestSweepStuckProvisioning_RaceSafe covers the case where UPDATE affects
// 0 rows because the workspace flipped to online (or got restarted) between
// the SELECT and the UPDATE. We should skip the event, not emit a false
// WORKSPACE_PROVISION_TIMEOUT.
// WORKSPACE_PROVISION_FAILED.
func TestSweepStuckProvisioning_RaceSafe(t *testing.T) {
mock := setupTestDB(t)

View File

@ -389,7 +389,15 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
sched.Name, sched.ID, nextErr)
}
_, err := db.DB.ExecContext(ctx, `
// F1089: use a dedicated context with its own 5s deadline for the
// post-fire UPDATE. The outer ctx (fireCtx) may be cancelled if the
// HTTP call timed out or the server is shutting down; using it here
// would silently skip the UPDATE and leave next_run_at stale, causing
// the schedule to be immediately re-fired on the next tick.
updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer updateCancel()
_, err := db.DB.ExecContext(updateCtx, `
UPDATE workspace_schedules
SET last_run_at = now(),
next_run_at = COALESCE($2, next_run_at),
@ -400,7 +408,7 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
WHERE id = $1
`, sched.ID, nextRunPtr, lastStatus, lastError)
if err != nil {
log.Printf("Scheduler: update error for %s: %v", sched.ID, err)
log.Printf("Scheduler: post-fire update error for %s [%s]: %v", sched.ID, sched.Name, err)
}
// Log a dedicated cron_run activity entry with schedule metadata so the