feat(display): proxy native desktop streams after takeover
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
CI / Canvas Deploy Reminder (pull_request) Blocked by required conditions
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Successful in 8s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 8s
CI / Detect changes (pull_request) Successful in 16s
CI / Python Lint & Test (pull_request) Successful in 7s
E2E API Smoke Test / detect-changes (pull_request) Successful in 19s
E2E Chat / detect-changes (pull_request) Successful in 11s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 14s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 42s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been skipped
Handlers Postgres Integration / detect-changes (pull_request) Successful in 6s
Harness Replays / detect-changes (pull_request) Successful in 3s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 4s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 3s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 5s
gate-check-v3 / gate-check (pull_request) Successful in 3s
qa-review / approved (pull_request) Successful in 4s
security-review / approved (pull_request) Successful in 3s
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request) Successful in 4s
sop-checklist / review-refire (pull_request) Has been skipped
sop-tier-check / tier-check (pull_request) Successful in 4s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m4s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 8s
E2E Chat / E2E Chat (pull_request) Successful in 9s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 13s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 2m15s
Harness Replays / Harness Replays (pull_request) Successful in 10s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 1m49s
CI / Platform (Go) (pull_request) Successful in 6m56s
CI / Canvas (Next.js) (pull_request) Successful in 7m23s
CI / all-required (pull_request) Successful in 25m7s
audit-force-merge / audit (pull_request) Successful in 8s

This commit is contained in:
claude-ceo-assistant
2026-05-23 16:28:14 -07:00
parent ef42e17224
commit 3c82b39f3d
13 changed files with 601 additions and 65 deletions
+7
View File
@@ -8,6 +8,7 @@
"name": "molecule-monorepo-canvas",
"version": "0.1.0",
"dependencies": {
"@novnc/novnc": "^1.7.0",
"@radix-ui/react-alert-dialog": "^1.1.15",
"@radix-ui/react-dialog": "^1.1.15",
"@radix-ui/react-tabs": "^1.1.12",
@@ -1110,6 +1111,12 @@
"node": ">= 10"
}
},
"node_modules/@novnc/novnc": {
"version": "1.7.0",
"resolved": "https://registry.npmjs.org/@novnc/novnc/-/novnc-1.7.0.tgz",
"integrity": "sha512-ucEJOx4T2avIRCleodk7YobZj5O2Ga2AeLfQ69A/yjG9HHba2+PDgwSkN3FttrmG+70ZGx21sElNFouK13RzyA==",
"license": "MPL-2.0"
},
"node_modules/@oxc-project/types": {
"version": "0.127.0",
"resolved": "https://registry.npmjs.org/@oxc-project/types/-/types-0.127.0.tgz",
+1
View File
@@ -11,6 +11,7 @@
"test:coverage": "vitest run --coverage"
},
"dependencies": {
"@novnc/novnc": "^1.7.0",
"@radix-ui/react-alert-dialog": "^1.1.15",
"@radix-ui/react-dialog": "^1.1.15",
"@radix-ui/react-tabs": "^1.1.12",
+72 -12
View File
@@ -2,6 +2,7 @@
import { useEffect, useRef, useState } from "react";
import { api } from "@/lib/api";
import type RFB from "@novnc/novnc";
interface DisplayStatus {
available: boolean;
@@ -11,13 +12,13 @@ interface DisplayStatus {
protocol?: string;
width?: number;
height?: number;
viewer_url?: string;
}
interface DisplayControlStatus {
controller: "none" | "user" | "agent";
controlled_by?: string;
expires_at?: string;
session_url?: string;
}
interface Props {
@@ -30,6 +31,7 @@ export function DisplayTab({ workspaceId }: Props) {
const [error, setError] = useState<string | null>(null);
const [controlError, setControlError] = useState<string | null>(null);
const [controlBusy, setControlBusy] = useState(false);
const [sessionUrl, setSessionUrl] = useState<string | null>(null);
const requestGeneration = useRef(0);
useEffect(() => {
@@ -38,6 +40,7 @@ export function DisplayTab({ workspaceId }: Props) {
let cancelled = false;
setStatus(null);
setControl(null);
setSessionUrl(null);
setError(null);
setControlError(null);
setControlBusy(false);
@@ -78,6 +81,7 @@ export function DisplayTab({ workspaceId }: Props) {
});
if (requestGeneration.current !== generation) return;
setControl(next);
setSessionUrl(next.session_url || null);
} catch (err) {
if (requestGeneration.current !== generation) return;
setControlError("Failed to take control");
@@ -103,6 +107,7 @@ export function DisplayTab({ workspaceId }: Props) {
const next = await api.post<DisplayControlStatus>(`${controlPath}/release`, {});
if (requestGeneration.current !== generation) return;
setControl(next);
setSessionUrl(null);
} catch (err) {
if (requestGeneration.current !== generation) return;
setControlError("Failed to release control");
@@ -224,24 +229,19 @@ export function DisplayTab({ workspaceId }: Props) {
control={control}
controlBusy={controlBusy}
controlError={controlError}
hasSession={!!sessionUrl}
onAcquire={acquireControl}
onRelease={releaseControl}
/>
</div>
{status.viewer_url ? (
<iframe
title="Workspace desktop"
src={status.viewer_url}
className="min-h-0 flex-1 border-0 bg-black"
allow="clipboard-read; clipboard-write; fullscreen; pointer-lock"
referrerPolicy="no-referrer"
/>
{sessionUrl ? (
<DesktopStream sessionUrl={sessionUrl} />
) : (
<div className="flex flex-1 items-center justify-center p-8 text-center">
<div>
<h3 className="mb-1.5 text-sm font-medium text-ink">Display session is not ready.</h3>
<h3 className="mb-1.5 text-sm font-medium text-ink">Take control to open the desktop.</h3>
<p className="max-w-xs text-[11px] leading-relaxed text-ink-mid">
This workspace has display configuration, but the desktop session URL is not available yet.
The display service is ready. Control access opens a short-lived desktop stream.
</p>
</div>
</div>
@@ -254,12 +254,14 @@ function DisplayControlBar({
control,
controlBusy,
controlError,
hasSession,
onAcquire,
onRelease,
}: {
control: DisplayControlStatus | null;
controlBusy: boolean;
controlError: string | null;
hasSession: boolean;
onAcquire: () => void;
onRelease: () => void;
}) {
@@ -280,7 +282,8 @@ function DisplayControlBar({
{controlError && <p className="mt-0.5 text-[10px] text-red-200">{controlError}</p>}
</div>
)}
{control?.controller === "none" && (
{(control?.controller === "none" ||
(control?.controller === "user" && control.controlled_by === "admin-token" && !hasSession)) && (
<button
type="button"
onClick={onAcquire}
@@ -304,6 +307,63 @@ function DisplayControlBar({
);
}
function DesktopStream({ sessionUrl }: { sessionUrl: string }) {
const containerRef = useRef<HTMLDivElement | null>(null);
const [streamError, setStreamError] = useState<string | null>(null);
useEffect(() => {
let cancelled = false;
let rfb: RFB | null = null;
async function connect() {
setStreamError(null);
try {
const mod = await import("@novnc/novnc");
if (cancelled || !containerRef.current) return;
const stream = displayWebSocketConnection(sessionUrl);
rfb = new mod.default(containerRef.current, stream.url, {
wsProtocols: ["binary", `molecule-display-token.${stream.token}`],
});
rfb.scaleViewport = true;
rfb.resizeSession = true;
rfb.focusOnClick = true;
rfb.addEventListener("disconnect", (event: Event) => {
const detail = (event as CustomEvent<{ clean?: boolean }>).detail;
if (!cancelled && !detail?.clean) setStreamError("Desktop stream disconnected.");
});
} catch {
if (!cancelled) setStreamError("Desktop stream could not be opened.");
}
}
connect();
return () => {
cancelled = true;
rfb?.disconnect();
};
}, [sessionUrl]);
return (
<div className="relative min-h-0 flex-1 bg-black">
<div ref={containerRef} title="Workspace desktop" className="h-full w-full overflow-hidden bg-black" />
{streamError && (
<div className="absolute inset-x-4 top-4 rounded border border-red-500/30 bg-red-950/80 px-3 py-2 text-[11px] text-red-100">
{streamError}
</div>
)}
</div>
);
}
function displayWebSocketConnection(sessionUrl: string): { url: string; token: string } {
const url = new URL(sessionUrl, window.location.href);
const token = new URLSearchParams(url.hash.replace(/^#/, "")).get("token") ?? "";
if (!token) throw new Error("display session token missing");
url.hash = "";
url.protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
return { url: url.toString(), token };
}
function displayControlActorLabel(control: DisplayControlStatus): string {
if (control.controller === "agent") return "Agent";
if (control.controlled_by === "admin-token") return "Admin";
@@ -2,7 +2,11 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { cleanup, fireEvent, render, screen, waitFor } from "@testing-library/react";
const { mockGet, mockPost } = vi.hoisted(() => ({ mockGet: vi.fn(), mockPost: vi.fn() }));
const { mockGet, mockPost, mockRFBConstructor } = vi.hoisted(() => ({
mockGet: vi.fn(),
mockPost: vi.fn(),
mockRFBConstructor: vi.fn(),
}));
vi.mock("@/lib/api", () => ({
api: {
@@ -11,6 +15,25 @@ vi.mock("@/lib/api", () => ({
},
}));
vi.mock("@novnc/novnc", () => ({
default: class MockRFB extends EventTarget {
scaleViewport = false;
resizeSession = false;
focusOnClick = false;
target: HTMLElement;
url: string;
options?: { wsProtocols?: string[] };
constructor(target: HTMLElement, url: string, options?: { wsProtocols?: string[] }) {
super();
this.target = target;
this.url = url;
this.options = options;
mockRFBConstructor(target, url, options);
}
disconnect() {}
},
}));
import { DisplayTab } from "../DisplayTab";
describe("DisplayTab", () => {
@@ -18,6 +41,7 @@ describe("DisplayTab", () => {
cleanup();
mockGet.mockReset();
mockPost.mockReset();
mockRFBConstructor.mockReset();
});
it("renders unavailable state for non-display workspaces", async () => {
@@ -71,15 +95,14 @@ describe("DisplayTab", () => {
});
});
it("renders the desktop stream when a display session is available", async () => {
it("waits for takeover before opening a ready display stream", async () => {
mockGet
.mockResolvedValueOnce({
available: true,
mode: "desktop-control",
protocol: "dcv",
protocol: "novnc",
width: 1920,
height: 1080,
viewer_url: "https://display.example.test/session/ws-display",
})
.mockResolvedValueOnce({
controller: "none",
@@ -87,12 +110,51 @@ describe("DisplayTab", () => {
render(<DisplayTab workspaceId="ws-display" />);
await waitFor(() => {
expect(screen.getByText("Take control to open the desktop.")).toBeTruthy();
});
expect(screen.getByRole("button", { name: "Take control" })).toBeTruthy();
});
it("opens the trusted noVNC client after takeover returns a stream URL", async () => {
mockGet
.mockResolvedValueOnce({
available: true,
mode: "desktop-control",
protocol: "novnc",
width: 1920,
height: 1080,
})
.mockResolvedValueOnce({
controller: "none",
});
mockPost.mockResolvedValueOnce({
controller: "user",
controlled_by: "admin-token",
expires_at: "2026-05-23T08:48:27Z",
session_url: "/workspaces/ws-display/display/session/websockify#token=signed",
});
render(<DisplayTab workspaceId="ws-display" />);
await waitFor(() => {
expect(screen.getByRole("button", { name: "Take control" })).toBeTruthy();
});
fireEvent.click(screen.getByRole("button", { name: "Take control" }));
await waitFor(() => {
expect(screen.getByTitle("Workspace desktop")).toBeTruthy();
});
const frame = screen.getByTitle("Workspace desktop") as HTMLIFrameElement;
expect(frame.src).toBe("https://display.example.test/session/ws-display");
expect(screen.getByRole("button", { name: "Take control" })).toBeTruthy();
expect(mockPost).toHaveBeenCalledWith("/workspaces/ws-display/display/control/acquire", {
controller: "user",
ttl_seconds: 300,
});
expect(mockRFBConstructor).toHaveBeenCalledWith(
expect.any(HTMLElement),
expect.stringContaining("/workspaces/ws-display/display/session/websockify"),
{ wsProtocols: ["binary", "molecule-display-token.signed"] },
);
expect(mockRFBConstructor.mock.calls[0][1]).not.toContain("token=");
});
it("releases user display control", async () => {
@@ -100,8 +162,7 @@ describe("DisplayTab", () => {
.mockResolvedValueOnce({
available: true,
mode: "desktop-control",
protocol: "dcv",
viewer_url: "https://display.example.test/session/ws-display",
protocol: "novnc",
})
.mockResolvedValueOnce({
controller: "user",
+9
View File
@@ -0,0 +1,9 @@
declare module "@novnc/novnc" {
export default class RFB extends EventTarget {
scaleViewport: boolean;
resizeSession: boolean;
focusOnClick: boolean;
constructor(target: HTMLElement, url: string, options?: { wsProtocols?: string[]; [key: string]: unknown });
disconnect(): void;
}
}
@@ -30,7 +30,7 @@ func TestRefreshEnvFromCP_AppliesCPResponse(t *testing.T) {
t.Errorf("org id header: got %q", got)
}
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"MOLECULE_CP_SHARED_SECRET":"new-secret","MOLECULE_CP_URL":"https://api.moleculesai.app"}`)
fmt.Fprint(w, `{"MOLECULE_CP_SHARED_SECRET":"new-secret","MOLECULE_CP_URL":"https://api.moleculesai.app","DISPLAY_SESSION_SIGNING_SECRET":"display-secret"}`)
}))
defer srv.Close()
@@ -45,6 +45,9 @@ func TestRefreshEnvFromCP_AppliesCPResponse(t *testing.T) {
if got := os.Getenv("MOLECULE_CP_SHARED_SECRET"); got != "new-secret" {
t.Errorf("SHARED_SECRET: want new-secret, got %q", got)
}
if got := os.Getenv("DISPLAY_SESSION_SIGNING_SECRET"); got != "display-secret" {
t.Errorf("DISPLAY_SESSION_SIGNING_SECRET: want display-secret, got %q", got)
}
}
// TestRefreshEnvFromCP_CPUnreachableDoesNotFailBoot: network errors must
@@ -6,9 +6,6 @@ import (
"encoding/json"
"fmt"
"log"
"net/url"
"os"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
@@ -32,7 +29,6 @@ type workspaceDisplayResponse struct {
Width int `json:"width,omitempty"`
Height int `json:"height,omitempty"`
Status string `json:"status,omitempty"`
ViewerURL string `json:"viewer_url,omitempty"`
}
var workspaceComputeInstanceAllowlist = map[string]struct{}{
@@ -177,17 +173,13 @@ func withStoredCompute(ctx context.Context, workspaceID string, payload models.C
}
// Display handles GET /workspaces/:id/display.
//
// Phase 1 only exposes the product contract and the non-display unavailable
// state. Future desktop-control work will replace the display-enabled branch
// with short-lived proxied DCV session details.
func (h *WorkspaceHandler) Display(c *gin.Context) {
workspaceID := c.Param("id")
var raw string
var raw, instanceID string
err := db.DB.QueryRowContext(c.Request.Context(),
`SELECT COALESCE(compute, '{}'::jsonb) FROM workspaces WHERE id = $1`,
`SELECT COALESCE(compute, '{}'::jsonb), COALESCE(instance_id, '') FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&raw)
).Scan(&raw, &instanceID)
if err != nil {
if err == sql.ErrNoRows {
c.JSON(404, gin.H{"error": "workspace not found"})
@@ -217,7 +209,7 @@ func (h *WorkspaceHandler) Display(c *gin.Context) {
})
return
}
if viewerURL := workspaceDisplayViewerURL(workspaceID); viewerURL != "" {
if instanceID != "" {
c.JSON(200, workspaceDisplayResponse{
Available: true,
Mode: compute.Display.Mode,
@@ -225,7 +217,6 @@ func (h *WorkspaceHandler) Display(c *gin.Context) {
Width: compute.Display.Width,
Height: compute.Display.Height,
Status: "ready",
ViewerURL: viewerURL,
})
return
}
@@ -239,15 +230,3 @@ func (h *WorkspaceHandler) Display(c *gin.Context) {
Status: "not_configured",
})
}
func workspaceDisplayViewerURL(workspaceID string) string {
base := strings.TrimRight(os.Getenv("DISPLAY_VIEWER_BASE_URL"), "/")
if base == "" {
return ""
}
parsed, err := url.Parse(base)
if err != nil || parsed.Scheme != "https" || parsed.Host == "" {
return ""
}
return base + "/" + url.PathEscape(workspaceID)
}
@@ -6,8 +6,10 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
@@ -202,9 +204,9 @@ func TestWorkspaceDisplay_NonDisplayWorkspaceReturnsUnavailable(t *testing.T) {
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\) FROM workspaces WHERE id = \$1`).
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\), COALESCE\(instance_id, ''\) FROM workspaces WHERE id = \$1`).
WithArgs("ws-no-display").
WillReturnRows(sqlmock.NewRows([]string{"compute"}).AddRow(`{}`))
WillReturnRows(sqlmock.NewRows([]string{"compute", "instance_id"}).AddRow(`{}`, ""))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -236,9 +238,9 @@ func TestWorkspaceDisplay_DisplayConfiguredReturnsSessionUnavailableContract(t *
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\) FROM workspaces WHERE id = \$1`).
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\), COALESCE\(instance_id, ''\) FROM workspaces WHERE id = \$1`).
WithArgs("ws-display").
WillReturnRows(sqlmock.NewRows([]string{"compute"}).AddRow(`{"display":{"mode":"desktop-control","protocol":"novnc","width":1920,"height":1080}}`))
WillReturnRows(sqlmock.NewRows([]string{"compute", "instance_id"}).AddRow(`{"display":{"mode":"desktop-control","protocol":"novnc","width":1920,"height":1080}}`, ""))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -277,15 +279,14 @@ func TestWorkspaceDisplay_DisplayConfiguredReturnsSessionUnavailableContract(t *
}
}
func TestWorkspaceDisplay_DisplayConfiguredWithViewerBaseReturnsAvailableSession(t *testing.T) {
func TestWorkspaceDisplay_DisplayConfiguredWithInstanceReturnsAvailableSession(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
t.Setenv("DISPLAY_VIEWER_BASE_URL", "https://display.example.test/sessions")
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\) FROM workspaces WHERE id = \$1`).
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\), COALESCE\(instance_id, ''\) FROM workspaces WHERE id = \$1`).
WithArgs("ws-display").
WillReturnRows(sqlmock.NewRows([]string{"compute"}).AddRow(`{"display":{"mode":"desktop-control","protocol":"novnc","width":1920,"height":1080}}`))
WillReturnRows(sqlmock.NewRows([]string{"compute", "instance_id"}).AddRow(`{"display":{"mode":"desktop-control","protocol":"novnc","width":1920,"height":1080}}`, "i-display123"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -304,8 +305,8 @@ func TestWorkspaceDisplay_DisplayConfiguredWithViewerBaseReturnsAvailableSession
if resp["available"] != true {
t.Fatalf("available = %v, want true", resp["available"])
}
if resp["viewer_url"] != "https://display.example.test/sessions/ws-display" {
t.Fatalf("viewer_url = %v, want workspace viewer URL", resp["viewer_url"])
if resp["viewer_url"] != nil {
t.Fatalf("viewer_url = %v, want omitted; stream URL is minted by Take control", resp["viewer_url"])
}
if resp["reason"] != nil {
t.Fatalf("reason = %v, want omitted", resp["reason"])
@@ -315,16 +316,15 @@ func TestWorkspaceDisplay_DisplayConfiguredWithViewerBaseReturnsAvailableSession
}
}
func TestWorkspaceDisplay_DisplayConfiguredWithInvalidViewerBaseReturnsUnavailable(t *testing.T) {
func TestWorkspaceDisplay_DisplayConfiguredWithoutInstanceReturnsUnavailable(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
t.Setenv("DISPLAY_VIEWER_BASE_URL", "http://display.example.test/sessions")
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
workspaceID := "ws-display"
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\) FROM workspaces WHERE id = \$1`).
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\), COALESCE\(instance_id, ''\) FROM workspaces WHERE id = \$1`).
WithArgs(workspaceID).
WillReturnRows(sqlmock.NewRows([]string{"compute"}).AddRow(`{"display":{"mode":"desktop-control","protocol":"novnc","width":1920,"height":1080}}`))
WillReturnRows(sqlmock.NewRows([]string{"compute", "instance_id"}).AddRow(`{"display":{"mode":"desktop-control","protocol":"novnc","width":1920,"height":1080}}`, ""))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -359,9 +359,9 @@ func TestWorkspaceDisplay_IgnoresUnrelatedStoredComputeSizingDrift(t *testing.T)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\) FROM workspaces WHERE id = \$1`).
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\), COALESCE\(instance_id, ''\) FROM workspaces WHERE id = \$1`).
WithArgs("ws-display-sizing-drift").
WillReturnRows(sqlmock.NewRows([]string{"compute"}).AddRow(`{"instance_type":"old.large","display":{"mode":"desktop-control","protocol":"novnc","width":1920,"height":1080}}`))
WillReturnRows(sqlmock.NewRows([]string{"compute", "instance_id"}).AddRow(`{"instance_type":"old.large","display":{"mode":"desktop-control","protocol":"novnc","width":1920,"height":1080}}`, ""))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -390,9 +390,9 @@ func TestWorkspaceDisplay_InvalidStoredDisplayConfigReturnsServerError(t *testin
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\) FROM workspaces WHERE id = \$1`).
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\), COALESCE\(instance_id, ''\) FROM workspaces WHERE id = \$1`).
WithArgs("ws-invalid-display").
WillReturnRows(sqlmock.NewRows([]string{"compute"}).AddRow(`{"display":{"mode":"desktop-control","protocol":"vnc"}}`))
WillReturnRows(sqlmock.NewRows([]string{"compute", "instance_id"}).AddRow(`{"display":{"mode":"desktop-control","protocol":"vnc"}}`, ""))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -415,3 +415,113 @@ func TestWorkspaceDisplay_InvalidStoredDisplayConfigReturnsServerError(t *testin
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceDisplaySession_ProxiesThroughDisplayForward(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
t.Setenv("DISPLAY_SESSION_SIGNING_SECRET", "display-session-test-secret")
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
var upstreamAuth, upstreamCookie, upstreamProtocol, gotInstanceID string
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/websockify" {
t.Errorf("upstream path = %q, want /websockify", r.URL.Path)
}
if r.URL.RawQuery != "" {
t.Errorf("upstream raw query = %q, want stripped", r.URL.RawQuery)
}
upstreamAuth = r.Header.Get("Authorization")
upstreamCookie = r.Header.Get("Cookie")
upstreamProtocol = r.Header.Get("Sec-WebSocket-Protocol")
_, _ = w.Write([]byte("websockify"))
}))
defer upstream.Close()
upstreamURL, err := url.Parse(upstream.URL)
if err != nil {
t.Fatalf("parse upstream URL: %v", err)
}
prevForward := displayForward
displayForward = func(_ context.Context, instanceID string, fn func(target *url.URL) error) error {
gotInstanceID = instanceID
return fn(upstreamURL)
}
t.Cleanup(func() { displayForward = prevForward })
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\), COALESCE\(instance_id, ''\) FROM workspaces WHERE id = \$1`).
WithArgs("ws-display").
WillReturnRows(sqlmock.NewRows([]string{"compute", "instance_id"}).AddRow(
`{"display":{"mode":"desktop-control","protocol":"novnc","width":1920,"height":1080}}`,
"i-display123",
))
expiresAt := time.Now().Add(5 * time.Minute).UTC()
mock.ExpectQuery(`SELECT controller, controlled_by, expires_at FROM workspace_display_control_locks WHERE workspace_id = \$1 AND expires_at > now\(\)`).
WithArgs("ws-display").
WillReturnRows(sqlmock.NewRows([]string{"controller", "controlled_by", "expires_at"}).AddRow("user", "admin-token", expiresAt))
token := signDisplaySessionToken("ws-display", "admin-token", expiresAt)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{
{Key: "id", Value: "ws-display"},
{Key: "proxyPath", Value: "/websockify"},
}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-display/display/session/websockify", nil)
c.Request.Header.Set("Authorization", "Bearer should-not-reach-upstream")
c.Request.Header.Set("Cookie", "session=should-not-reach-upstream")
c.Request.Header.Set("Sec-WebSocket-Protocol", "binary, molecule-display-token."+token)
handler.DisplaySession(c)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
if gotInstanceID != "i-display123" {
t.Fatalf("displayForward instanceID = %q, want i-display123", gotInstanceID)
}
if w.Body.String() != "websockify" {
t.Fatalf("body = %q, want websockify", w.Body.String())
}
if upstreamAuth != "" || upstreamCookie != "" {
t.Fatalf("proxied credentials leaked upstream: auth=%q cookie=%q", upstreamAuth, upstreamCookie)
}
if upstreamProtocol != "binary" {
t.Fatalf("upstream websocket protocol = %q, want binary without display token", upstreamProtocol)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceDisplaySession_NonDisplayWorkspaceDoesNotProxy(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
prevForward := displayForward
displayForward = func(_ context.Context, _ string, _ func(target *url.URL) error) error {
t.Fatal("displayForward must not run for non-display workspaces")
return nil
}
t.Cleanup(func() { displayForward = prevForward })
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\), COALESCE\(instance_id, ''\) FROM workspaces WHERE id = \$1`).
WithArgs("ws-no-display").
WillReturnRows(sqlmock.NewRows([]string{"compute", "instance_id"}).AddRow(`{}`, "i-display123"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{
{Key: "id", Value: "ws-no-display"},
{Key: "proxyPath", Value: "/websockify"},
}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-no-display/display/session/websockify", nil)
handler.DisplaySession(c)
if w.Code != http.StatusNotFound {
t.Fatalf("expected status 404, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
@@ -2,13 +2,19 @@ package handlers
import (
"context"
"crypto/hmac"
"crypto/sha256"
"crypto/subtle"
"database/sql"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
@@ -27,6 +33,7 @@ type workspaceDisplayControlResponse struct {
Controller string `json:"controller"`
ControlledBy string `json:"controlled_by,omitempty"`
ExpiresAt time.Time `json:"expires_at"`
SessionURL string `json:"session_url,omitempty"`
}
type workspaceDisplayControlNoneResponse struct {
@@ -89,6 +96,10 @@ func (h *WorkspaceHandler) AcquireDisplayControl(c *gin.Context) {
c.JSON(http.StatusForbidden, gin.H{"error": "display control requires admin-token or org-token auth"})
return
}
if displaySessionSigningSecret() == "" {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "display session signing secret is not configured"})
return
}
workspaceID := c.Param("id")
startedAt := time.Now()
emitDisplayControlEvent(c.Request.Context(), "display.control.acquire.started", workspaceID, map[string]any{
@@ -113,6 +124,7 @@ RETURNING controller, controlled_by, expires_at`,
workspaceID, req.Controller, controlledBy, req.TTLSeconds,
).Scan(&lock.Controller, &lock.ControlledBy, &lock.ExpiresAt)
if err == nil {
lock.SessionURL = signedDisplaySessionURL(workspaceID, lock.ControlledBy, lock.ExpiresAt)
emitDisplayControlEvent(c.Request.Context(), "display.control.acquire.completed", workspaceID, map[string]any{
"controller": lock.Controller,
"controlled_by": lock.ControlledBy,
@@ -358,3 +370,50 @@ func emitDisplayControlEvent(ctx context.Context, eventType string, workspaceID
log.Printf("emitDisplayControlEvent: insert %s failed: %v", eventType, err)
}
}
func signedDisplaySessionURL(workspaceID, controlledBy string, expiresAt time.Time) string {
token := signDisplaySessionToken(workspaceID, controlledBy, expiresAt)
if token == "" {
return ""
}
return fmt.Sprintf("/workspaces/%s/display/session/websockify#token=%s", url.PathEscape(workspaceID), token)
}
func signDisplaySessionToken(workspaceID, controlledBy string, expiresAt time.Time) string {
secret := displaySessionSigningSecret()
if secret == "" || workspaceID == "" || controlledBy == "" || expiresAt.IsZero() {
return ""
}
payload := strings.Join([]string{workspaceID, controlledBy, strconv.FormatInt(expiresAt.Unix(), 10)}, "|")
mac := hmac.New(sha256.New, []byte(secret))
_, _ = mac.Write([]byte(payload))
return base64.RawURLEncoding.EncodeToString([]byte(payload)) + "." + base64.RawURLEncoding.EncodeToString(mac.Sum(nil))
}
func validateDisplaySessionToken(token, workspaceID, controlledBy string, expiresAt time.Time) bool {
secret := displaySessionSigningSecret()
parts := strings.Split(token, ".")
if secret == "" || len(parts) != 2 || workspaceID == "" || controlledBy == "" || expiresAt.IsZero() || time.Now().After(expiresAt) {
return false
}
payloadBytes, err := base64.RawURLEncoding.DecodeString(parts[0])
if err != nil {
return false
}
payload := string(payloadBytes)
wantPayload := strings.Join([]string{workspaceID, controlledBy, strconv.FormatInt(expiresAt.Unix(), 10)}, "|")
if subtle.ConstantTimeCompare([]byte(payload), []byte(wantPayload)) != 1 {
return false
}
sig, err := base64.RawURLEncoding.DecodeString(parts[1])
if err != nil {
return false
}
mac := hmac.New(sha256.New, []byte(secret))
_, _ = mac.Write([]byte(payload))
return hmac.Equal(sig, mac.Sum(nil))
}
func displaySessionSigningSecret() string {
return os.Getenv("DISPLAY_SESSION_SIGNING_SECRET")
}
@@ -2,10 +2,15 @@ package handlers
import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"database/sql"
"encoding/base64"
"encoding/json"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
@@ -54,6 +59,7 @@ func TestWorkspaceDisplayControl_NoActiveLockReturnsNone(t *testing.T) {
func TestWorkspaceDisplayControlAcquire_ClaimsUnlockedDisplay(t *testing.T) {
mock := setupTestDB(t)
t.Setenv("DISPLAY_SESSION_SIGNING_SECRET", "display-session-test-secret")
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
expiresAt := time.Date(2026, 5, 23, 18, 30, 0, 0, time.UTC)
@@ -87,13 +93,39 @@ func TestWorkspaceDisplayControlAcquire_ClaimsUnlockedDisplay(t *testing.T) {
if resp["expires_at"] == "" {
t.Fatalf("expires_at missing in response: %#v", resp)
}
sessionURL, ok := resp["session_url"].(string)
if !ok || !strings.HasPrefix(sessionURL, "/workspaces/ws-display/display/session/websockify#token=") {
t.Fatalf("session_url = %#v, want signed websockify URL fragment", resp["session_url"])
}
if strings.Contains(sessionURL, "?token=") {
t.Fatalf("session_url must not put display token in logged query string: %q", sessionURL)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestDisplaySessionToken_RequiresDedicatedSigningSecret(t *testing.T) {
t.Setenv("ADMIN_TOKEN", "client-exposed-admin-token")
t.Setenv("DISPLAY_SESSION_SIGNING_SECRET", "")
expiresAt := time.Now().Add(5 * time.Minute)
if token := signDisplaySessionToken("ws-display", "admin-token", expiresAt); token != "" {
t.Fatalf("signDisplaySessionToken minted token with no dedicated signing secret: %q", token)
}
payload := "ws-display|admin-token|" + strconv.FormatInt(expiresAt.Unix(), 10)
mac := hmac.New(sha256.New, []byte(""))
_, _ = mac.Write([]byte(payload))
forged := base64.RawURLEncoding.EncodeToString([]byte(payload)) + "." + base64.RawURLEncoding.EncodeToString(mac.Sum(nil))
if validateDisplaySessionToken(forged, "ws-display", "admin-token", expiresAt) {
t.Fatal("validateDisplaySessionToken accepted empty-secret forged token")
}
}
func TestWorkspaceDisplayControlAcquire_ActiveLockReturnsConflict(t *testing.T) {
mock := setupTestDB(t)
t.Setenv("DISPLAY_SESSION_SIGNING_SECRET", "display-session-test-secret")
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
expiresAt := time.Date(2026, 5, 23, 18, 30, 0, 0, time.UTC)
@@ -136,6 +168,32 @@ func TestWorkspaceDisplayControlAcquire_ActiveLockReturnsConflict(t *testing.T)
}
}
func TestWorkspaceDisplayControlAcquire_RejectsMissingSessionSigningSecret(t *testing.T) {
mock := setupTestDB(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
mock.ExpectQuery(`SELECT COALESCE\(compute, '\{\}'::jsonb\) FROM workspaces WHERE id = \$1`).
WithArgs("ws-display").
WillReturnRows(sqlmock.NewRows([]string{"compute"}).AddRow(`{"display":{"mode":"desktop-control","protocol":"dcv","width":1920,"height":1080}}`))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-display"}}
c.Request = httptest.NewRequest("POST", "/workspaces/ws-display/display/control/acquire", bytes.NewBufferString(`{"controller":"user","ttl_seconds":300}`))
c.Request.Header.Set("Content-Type", "application/json")
attachDisplayControlAdminToken(t, c)
t.Setenv("DISPLAY_SESSION_SIGNING_SECRET", "")
handler.AcquireDisplayControl(c)
if w.Code != http.StatusServiceUnavailable {
t.Fatalf("expected status 503, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceDisplayControlAcquire_RejectsDisplayDisabledWorkspace(t *testing.T) {
mock := setupTestDB(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
@@ -0,0 +1,168 @@
package handlers
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"net/http"
"net/http/httputil"
"net/url"
"os"
"os/exec"
"strings"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/gin-gonic/gin"
)
const workspaceDisplaySessionTimeout = 12 * time.Hour
const displaySessionTokenProtocolPrefix = "molecule-display-token."
var displayForward = realDisplayForward
// DisplaySession proxies noVNC/websockify requests for a display-enabled EC2
// workspace through the existing EIC SSH path. The EC2 :6080 listener stays
// private to the VPC; the browser only sees this same-origin route.
func (h *WorkspaceHandler) DisplaySession(c *gin.Context) {
workspaceID := c.Param("id")
display, instanceID, err := loadWorkspaceDisplaySessionTarget(c.Request.Context(), workspaceID)
if err != nil {
if err == sql.ErrNoRows {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
}
log.Printf("DisplaySession: load target for %s failed: %v", workspaceID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load display session"})
return
}
if display.Mode == "" || display.Mode == "none" {
c.JSON(http.StatusNotFound, gin.H{"error": "display not enabled"})
return
}
if instanceID == "" {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "display session unavailable"})
return
}
proxyPath := c.Param("proxyPath")
if proxyPath != "/websockify" {
c.JSON(http.StatusNotFound, gin.H{"error": "display session path not found"})
return
}
lock, found, err := h.loadActiveDisplayControl(c, workspaceID)
if err != nil {
log.Printf("DisplaySession: load active lock for %s failed: %v", workspaceID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to load display control"})
return
}
if !found || !validateDisplaySessionToken(displaySessionTokenFromRequest(c.Request), workspaceID, lock.ControlledBy, lock.ExpiresAt) {
c.JSON(http.StatusForbidden, gin.H{"error": "display control required"})
return
}
ctx, cancel := context.WithTimeout(c.Request.Context(), workspaceDisplaySessionTimeout)
defer cancel()
err = displayForward(ctx, instanceID, func(target *url.URL) error {
proxy := newDisplaySessionReverseProxy(target)
proxy.ServeHTTP(c.Writer, c.Request.WithContext(ctx))
return nil
})
if err != nil {
log.Printf("DisplaySession: proxy for %s instance=%s failed: %v", workspaceID, instanceID, err)
if !c.Writer.Written() {
c.JSON(http.StatusBadGateway, gin.H{"error": "display session proxy failed"})
}
}
}
func loadWorkspaceDisplaySessionTarget(ctx context.Context, workspaceID string) (models.WorkspaceComputeDisplay, string, error) {
var raw, instanceID string
err := db.DB.QueryRowContext(ctx,
`SELECT COALESCE(compute, '{}'::jsonb), COALESCE(instance_id, '') FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&raw, &instanceID)
if err != nil {
return models.WorkspaceComputeDisplay{}, "", err
}
var compute models.WorkspaceCompute
if raw != "" && raw != "{}" {
if err := json.Unmarshal([]byte(raw), &compute); err != nil {
return models.WorkspaceComputeDisplay{}, "", fmt.Errorf("invalid compute JSON: %w", err)
}
if err := validateWorkspaceDisplayConfig(compute.Display); err != nil {
return models.WorkspaceComputeDisplay{}, "", err
}
}
return compute.Display, instanceID, nil
}
func newDisplaySessionReverseProxy(target *url.URL) *httputil.ReverseProxy {
return &httputil.ReverseProxy{
Director: func(req *http.Request) {
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path = "/websockify"
req.URL.RawPath = ""
req.URL.RawQuery = ""
req.Host = target.Host
req.Header.Del("Authorization")
req.Header.Del("Cookie")
req.Header.Set("Sec-WebSocket-Protocol", "binary")
},
ErrorHandler: func(w http.ResponseWriter, _ *http.Request, err error) {
log.Printf("DisplaySession: upstream proxy error: %v", err)
http.Error(w, "display session proxy failed", http.StatusBadGateway)
},
}
}
func displaySessionTokenFromRequest(r *http.Request) string {
for _, part := range strings.Split(r.Header.Get("Sec-WebSocket-Protocol"), ",") {
protocol := strings.TrimSpace(part)
if strings.HasPrefix(protocol, displaySessionTokenProtocolPrefix) {
return strings.TrimPrefix(protocol, displaySessionTokenProtocolPrefix)
}
}
return ""
}
func realDisplayForward(ctx context.Context, instanceID string, fn func(target *url.URL) error) error {
if instanceID == "" {
return fmt.Errorf("workspace has no instance_id")
}
return withEICTunnel(ctx, instanceID, func(s eicSSHSession) error {
localPort, err := pickFreePort()
if err != nil {
return fmt.Errorf("pick display forward port: %w", err)
}
cmd := exec.CommandContext(ctx, "ssh",
"-i", s.keyPath,
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "LogLevel=ERROR",
"-o", "ExitOnForwardFailure=yes",
"-N",
"-L", fmt.Sprintf("%d:127.0.0.1:6080", localPort),
"-p", fmt.Sprintf("%d", s.localPort),
fmt.Sprintf("%s@127.0.0.1", s.osUser),
)
cmd.Env = os.Environ()
if err := cmd.Start(); err != nil {
return fmt.Errorf("display forward start: %w", err)
}
defer func() {
if cmd.Process != nil {
_ = cmd.Process.Kill()
}
_ = cmd.Wait()
}()
if err := waitForPort(ctx, "127.0.0.1", localPort, 10*time.Second); err != nil {
return fmt.Errorf("display forward never listened: %w", err)
}
return fn(&url.URL{Scheme: "http", Host: fmt.Sprintf("127.0.0.1:%d", localPort)})
})
}
@@ -182,6 +182,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// URLs, so keep the endpoint admin-gated from the first unavailable
// state rather than widening it later.
wsAdmin.GET("/workspaces/:id/display", wh.Display)
wsAdmin.GET("/workspaces/:id/display/session/*proxyPath", wh.DisplaySession)
wsAdmin.GET("/workspaces/:id/display/control", wh.DisplayControl)
wsAdmin.POST("/workspaces/:id/display/control/acquire", wh.AcquireDisplayControl)
wsAdmin.POST("/workspaces/:id/display/control/release", wh.ReleaseDisplayControl)
@@ -18,6 +18,7 @@ func buildWorkspaceDisplayEngine(t *testing.T) *gin.Engine {
r := gin.New()
wh := handlers.NewWorkspaceHandler(nil, nil, "http://localhost:8080", t.TempDir())
r.GET("/workspaces/:id/display", middleware.AdminAuth(db.DB), wh.Display)
r.GET("/workspaces/:id/display/session/*proxyPath", middleware.AdminAuth(db.DB), wh.DisplaySession)
r.POST("/workspaces/:id/display/control/acquire", middleware.AdminAuth(db.DB), wh.AcquireDisplayControl)
return r
}
@@ -59,3 +60,22 @@ func TestWorkspaceDisplayControlRoute_RequiresAdminAuth(t *testing.T) {
t.Errorf("sqlmock unmet: %v", err)
}
}
func TestWorkspaceDisplaySessionRoute_RequiresAdminAuth(t *testing.T) {
t.Setenv("ADMIN_TOKEN", "test-admin-secret-not-presented-by-caller")
mock := setupRouterTestDB(t)
mock.ExpectQuery("SELECT COUNT.*FROM workspace_auth_tokens").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
r := buildWorkspaceDisplayEngine(t)
w := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "/workspaces/ws-display/display/session/vnc.html", nil)
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("expected 401 for unauthenticated request, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock unmet: %v", err)
}
}