Compare commits

..

1 Commits

Author SHA1 Message Date
claude-ceo-assistant 324bee36be feat(workspace-server): tenant-scoped one-shot workspace exec API (internal#742 Part 1)
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 5s
CI / Python Lint & Test (pull_request) Successful in 6s
Check migration collisions / Migration version collision check (pull_request) Successful in 11s
CI / Detect changes (pull_request) Successful in 11s
E2E API Smoke Test / detect-changes (pull_request) Successful in 12s
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Successful in 13s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (pull_request) Successful in 8s
E2E Chat / detect-changes (pull_request) Successful in 14s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 13s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (local) (pull_request) Successful in 1m5s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 58s
Harness Replays / detect-changes (pull_request) Successful in 4s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 7s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 3s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 3s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Failing after 1m20s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 3s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m4s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m16s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Successful in 3s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Failing after 4m9s
review-check-tests / review-check.sh regression tests (pull_request) Successful in 10s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 6s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m10s
qa-review / approved (pull_request) Failing after 6s
security-review / approved (pull_request) Failing after 4s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m16s
CI / Canvas (Next.js) (pull_request) Successful in 2s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 2s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m42s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m19s
E2E Chat / E2E Chat (pull_request) Successful in 7s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 5s
Harness Replays / Harness Replays (pull_request) Successful in 7s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 2m34s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 2m49s
CI / Platform (Go) (pull_request) Successful in 6m43s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / all-required (pull_request) Successful in 12m34s
sop-checklist / na-declarations (pull_request) N/A: (none)
gate-check-v3 / gate-check (pull_request) Successful in 19s
sop-checklist / all-items-acked (pull_request) Successful in 18s
sop-checklist / review-refire (pull_request) Has been skipped
sop-tier-check / tier-check (pull_request) Successful in 6s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Successful in 1m9s
sop-tier-check / tier-check (pull_request_review) Successful in 3s
Add POST /workspaces/:id/exec — a first-class, audited, tenant-token-gated
one-shot exec endpoint that runs an argv non-interactively on the workspace
EC2 over the EXISTING EIC tunnel broker, streams framed stdout/stderr, and
returns the exit code.

Reuse, not reinvention:
- execViaEIC is modelled on readFileViaEIC: acquire a pooled EIC SSH session
  via withEICTunnel (the refcounted pool keyed by instanceID), run the argv
  non-interactively, capture stdout/stderr/exit, tear down. No new EIC wiring.

Safety / limits:
- argv-only: a bare-string cmd is rejected 400; the program never gets an
  implicit shell unless argv[0] is itself a shell (caller's explicit choice).
- timeout_s clamped to [default 30, max 120].
- per-stream output cap 1 MiB with an explicit truncation marker.

Authz / capability:
- Registered on the same WorkspaceAuth-gated group as /files/*, inheriting
  per-workspace bearer / org-token authz (and per-org TenantGuard on SaaS).
- Host exec additionally gated on the workspace's host-control tier (T3/T4);
  a lower/read-only tier (T1/T2) gets 403, not host exec.

Audit:
- Exactly one activity_logs row per exec via LogActivity — actor (token
  subject), argv, exit code, duration. NEVER stdout/stderr content (streamed
  to the caller, never persisted), mirroring the secret-redaction contract.

Tests cover: happy path (exit 0 + output), non-zero exit, timeout (504),
output-cap truncation, argv-validation rejection of a string cmd, empty argv,
capability-denied 403 for a non-host-control tier, own-org scoping (unknown
:id → 404, no dispatch), and audit-content exclusion (streamed secret never
reaches the persisted row). Plus pure-function coverage for argv quoting and
the capped buffer.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 01:41:40 -07:00
47 changed files with 1066 additions and 1660 deletions
-152
View File
@@ -605,151 +605,6 @@ def file_or_update_red(
sys.stderr.write(f"::warning::label '{RED_LABEL}' not found on repo\n")
def close_stale_red_issues(
current_sha: str,
current_status: dict,
*,
dry_run: bool = False,
) -> int:
"""Close open [main-red] issues whose specific failing contexts have
all recovered on `current_sha`, even though `main` is still red for
other reasons (mc#1789).
When main stays red across consecutive SHAs for *different* causes,
`close_open_red_issues_for_other_shas` never fires (it only runs when
main is green). This function prevents stale issues from accumulating
indefinitely by comparing per-context recovery across SHAs.
An issue is considered stale when every context that was in a failed
state on the issue's SHA is now either `success` on the current HEAD
or absent (workflow removed / renamed). Issues whose original SHA had
a combined-red-with-no-detail (empty statuses list) are skipped — we
cannot verify recovery without per-context data.
Returns the number of issues closed.
"""
open_red = list_open_red_issues()
if not open_red:
return 0
current_statuses = current_status.get("statuses") or []
closed = 0
for issue in open_red:
title = issue.get("title", "")
prefix = f"{TITLE_PREFIX} {REPO}: "
if not title.startswith(prefix):
continue
short_sha = title[len(prefix):]
if short_sha == current_sha[:10]:
continue
# Query status for the old SHA. Short SHA should resolve; if it
# doesn't (GC'd, force-pushed, ambiguous), skip conservatively.
try:
old_status = get_combined_status(short_sha)
except ApiError:
continue
old_red, old_failed = is_red(old_status)
if not old_red:
# Open issue for a now-green SHA — close it via the normal path.
num = issue.get("number")
if isinstance(num, int):
comment = (
f"Commit `{short_sha}` is no longer red. Closing as the "
f"failure context has recovered or expired."
)
if dry_run:
print(
f"::notice::[dry-run] would close issue #{num} "
f"({title}) — old SHA is now green"
)
closed += 1
continue
api(
"POST",
f"/repos/{OWNER}/{NAME}/issues/{num}/comments",
body={"body": comment},
)
api(
"PATCH",
f"/repos/{OWNER}/{NAME}/issues/{num}",
body={"state": "closed"},
)
print(
f"::notice::Closed stale main-red issue #{num} "
f"(old SHA {short_sha} is now green)"
)
closed += 1
continue
if not old_failed:
# Combined red with no per-context detail — can't verify recovery.
continue
# Verify every failed context from the old SHA has recovered.
all_recovered = True
recovered_ctxs: list[str] = []
still_failing_ctxs: list[str] = []
for s in old_failed:
ctx = s.get("context", "")
if not ctx:
continue
current_match = None
for cs in current_statuses:
if isinstance(cs, dict) and cs.get("context") == ctx:
current_match = cs
break
if current_match is None:
recovered_ctxs.append(ctx)
elif _entry_state(current_match) == "success":
recovered_ctxs.append(ctx)
else:
all_recovered = False
still_failing_ctxs.append(ctx)
if not all_recovered:
continue
num = issue.get("number")
if not isinstance(num, int):
continue
comment = (
f"The failing contexts from this SHA (`{short_sha}`) have "
f"recovered on current HEAD `{current_sha[:10]}`: "
f"{', '.join(recovered_ctxs)}. "
f"Main is still red for other reasons; see the current "
f"`[main-red]` issue for `{current_sha[:10]}`."
)
if dry_run:
print(
f"::notice::[dry-run] would close stale issue #{num} "
f"({title}) — contexts recovered"
)
closed += 1
continue
api(
"POST",
f"/repos/{OWNER}/{NAME}/issues/{num}/comments",
body={"body": comment},
)
api(
"PATCH",
f"/repos/{OWNER}/{NAME}/issues/{num}",
body={"state": "closed"},
)
print(
f"::notice::Closed stale main-red issue #{num} "
f"(contexts recovered at {current_sha[:10]})"
)
closed += 1
return closed
def close_open_red_issues_for_other_shas(
current_sha: str,
*,
@@ -920,13 +775,6 @@ def run_once(*, dry_run: bool = False) -> int:
print(f"::warning::main is RED at {sha[:10]} on {WATCH_BRANCH}: "
f"{len(failed)} failed context(s)")
file_or_update_red(sha, failed, debug, dry_run=dry_run)
stale_closed = close_stale_red_issues(sha, recheck_status, dry_run=dry_run)
if stale_closed:
emit_loki_event("main_red_stale_closed", sha, [])
print(
f"::notice::Closed {stale_closed} stale main-red issue(s) "
f"whose contexts recovered at {sha[:10]}"
)
else:
# Green or pending-with-no-real-failures. Close stale issues
# from earlier SHAs when required CI has recovered.
+2 -2
View File
@@ -642,7 +642,7 @@ def load_config(path: str) -> dict[str, Any]:
# requiring the dep, so the ignore is safe: if yaml loads, we use it;
# otherwise we fall back silently.
import yaml # type: ignore[import-not-found]
with open(path, encoding="utf-8") as f:
with open(path) as f:
return yaml.safe_load(f)
except ImportError:
return _load_config_minimal(path)
@@ -656,7 +656,7 @@ def _load_config_minimal(path: str) -> dict[str, Any]:
item map: scalars + lists of scalars. Does NOT support nested lists,
YAML anchors, multi-doc, or flow style.
"""
with open(path, encoding="utf-8") as f:
with open(path) as f:
lines = f.readlines()
return _parse_minimal_yaml(lines)
+1 -1
View File
@@ -33,7 +33,7 @@ def scenario() -> str:
p = os.path.join(STATE_DIR, "scenario")
if not os.path.isfile(p):
return "T1_success"
with open(p, encoding="utf-8") as f:
with open(p) as f:
return f.read().strip()
@@ -40,7 +40,7 @@ def scenario() -> str:
p = os.path.join(STATE_DIR, "scenario")
if not os.path.isfile(p):
return "T1_pr_open"
with open(p, encoding="utf-8") as f:
with open(p) as f:
return f.read().strip()
@@ -258,7 +258,6 @@ def test_run_once_failure_does_not_close(monkeypatch):
monkeypatch.setattr(wd, "file_or_update_red", capture_file)
monkeypatch.setattr(wd, "close_open_red_issues_for_other_shas", lambda *a, **k: 0)
monkeypatch.setattr(wd, "close_stale_red_issues", lambda *a, **k: 0)
assert wd.run_once(dry_run=True) == 0
assert filed == ["abc123"]
+6 -14
View File
@@ -164,20 +164,12 @@ jobs:
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
- if: ${{ needs.changes.outputs.platform == 'true' }}
name: Run tests with coverage (blocking gate)
# Removed -race from the blocking gate per #1184: cold runners
# take 13-25 min to compile with race instrumentation, exceeding
# the 10m step timeout and causing false failures. Race detection
# now runs as a non-blocking advisory step below.
run: go test -timeout 10m -coverprofile=coverage.out ./...
- if: ${{ needs.changes.outputs.platform == 'true' }}
name: Race detection (advisory, non-blocking)
# mc#1184: runs race detector as an advisory check so cold-runner
# compile-time spikes don't block merges. Failures here surface in
# the run log but do not fail the build.
run: go test -race -timeout 10m ./...
continue-on-error: true
name: Run tests with race detection and coverage
# Explicit timeout: cold runner cache causes OOM kills at ~4m39s on the
# full ./... suite with race detection + coverage. A 10m per-step timeout
# lets the suite complete on cold cache (~5-7m) while failing cleanly
# instead of OOM-killing. The job-level timeout (15m) is a backstop.
run: go test -race -timeout 10m -coverprofile=coverage.out ./...
- if: ${{ needs.changes.outputs.platform == 'true' }}
name: Per-file coverage report
+8 -78
View File
@@ -288,40 +288,6 @@ export function deriveProvidersFromModels(models: ModelSpec[]): string[] {
return out;
}
// billingModeForProvider — maps a selected PROVIDER (vendor key) to the
// LLM billing_mode it implies (internal#703 Gap 2).
//
// Today, picking a non-Platform provider in the Config tab writes the
// credential env (CLAUDE_CODE_OAUTH_TOKEN / vendor key) but leaves
// llm_billing_mode at its resolved default (`platform_managed`). The CP
// tenant_config endpoint then keeps injecting the platform proxy base
// URLs, so the OAuth token / vendor key is never actually used — BYOK
// silently no-ops (the live SEO-Agent symptom in #703). The workspace-
// server even hard-blocks vendor-key writes on platform_managed
// workspaces (secrets.go:87), pointing the user at this exact billing-
// mode switch. Wiring the provider change to also set billing_mode is
// the UI half that makes BYOK take (the CP/workspace-server backend half
// is being fixed in parallel — internal#703 Gap 1).
//
// Mapping:
// - "platform" (the Platform-managed proxy) OR "" (no explicit
// provider override → inherit, defaults to platform) → "platform_managed".
// - any other vendor key ("anthropic-oauth" = Claude Code subscription
// OAuth, "anthropic" = Anthropic API key, "minimax", "openrouter",
// etc.) → "byok".
//
// Returns the billing_mode string the PUT body should carry. The valid
// set is fixed by workspace-server's recognizer (platform_managed | byok
// | disabled); "disabled" is never auto-selected by a provider choice —
// it's an explicit operator action via the LLM Billing section.
export type LLMBillingMode = "platform_managed" | "byok";
export function billingModeForProvider(provider: string): LLMBillingMode {
const v = provider.trim().toLowerCase();
if (v === "" || v === "platform") return "platform_managed";
return "byok";
}
// Fallback used when /templates can't be fetched (offline, older backend).
// Keep in sync with manifest.json workspace_templates as a defensive default.
// Model + env suggestions only flow when the backend is reachable.
@@ -736,36 +702,6 @@ export function ConfigTab({ workspaceId }: Props) {
}
}
// Provider → billing_mode linkage (internal#703 Gap 2). When the
// provider actually changed AND its implied billing_mode differs
// from the previously-selected provider's, push the new mode to
// the per-tenant llm-billing-mode endpoint (same path the LLM
// Billing section uses). Without this, selecting a non-Platform
// provider leaves billing_mode=platform_managed → CP keeps
// injecting the platform proxy → BYOK never takes.
//
// Gated on (a) the provider PUT having succeeded — no point setting
// byok if the credential write failed — and (b) the mode actually
// changing, so an unrelated provider tweak between two BYOK vendors
// (e.g. minimax → openrouter) doesn't re-issue a redundant
// platform_managed→byok PUT and trigger a needless restart.
let billingModeSaveError: string | null = null;
if (providerChanged && !providerSaveError) {
const nextMode = billingModeForProvider(provider);
const prevMode = billingModeForProvider(originalProvider);
if (nextMode !== prevMode) {
try {
await api.put(
`/admin/workspaces/${workspaceId}/llm-billing-mode`,
{ mode: nextMode },
);
} catch (e) {
billingModeSaveError =
e instanceof Error ? e.message : "Billing mode update was rejected";
}
}
}
setOriginalYaml(content);
if (rawMode) {
const parsed = parseYaml(content);
@@ -785,22 +721,16 @@ export function ConfigTab({ workspaceId }: Props) {
} else if (!restart) {
useCanvasStore.getState().updateNodeData(workspaceId, { needsRestart: !providerWillAutoRestart });
}
// Aggregate partial-save errors. modelSaveError, providerSaveError,
// and billingModeSaveError describe rejected updates from
// independent endpoints — show whichever fired so the user knows
// which field reverts on next reload (otherwise they'd see "Saved"
// and be confused why Provider snapped back). The billing-mode case
// is the most important to surface: the provider credential saved
// but BYOK won't actually take until billing_mode flips, so a
// silent failure here is exactly the #703 "selecting a provider has
// no effect" symptom.
// Aggregate partial-save errors. Both modelSaveError and
// providerSaveError describe rejected updates from independent
// endpoints — show whichever fired so the user knows which
// field reverts on next reload (otherwise they'd see "Saved" and
// be confused why Provider snapped back).
const partialError = providerSaveError
? `Other fields saved, but provider update failed: ${providerSaveError}`
: billingModeSaveError
? `Provider saved, but switching billing mode failed — your own provider key/OAuth may not take effect until billing mode is set: ${billingModeSaveError}`
: modelSaveError
? `Other fields saved, but model update failed: ${modelSaveError}`
: null;
: modelSaveError
? `Other fields saved, but model update failed: ${modelSaveError}`
: null;
if (partialError) {
setError(partialError);
} else {
@@ -1,255 +0,0 @@
// @vitest-environment jsdom
//
// Tests for the provider → llm_billing_mode linkage (internal#703 Gap 2).
//
// What this pins: when the operator changes the PROVIDER in the Config
// tab, the workspace's llm_billing_mode must follow — a non-Platform
// provider sets billing_mode=byok; Platform sets platform_managed. Before
// this wiring, selecting "Claude Code subscription (OAuth)" or any vendor
// key wrote the credential env but left billing_mode=platform_managed, so
// CP kept injecting the platform proxy base URL and the OAuth token /
// vendor key was never used — BYOK silently no-op'd (the live jrs-auto
// SEO-Agent symptom in #703).
//
// The billing-mode PUT targets the same per-tenant endpoint the LLM
// Billing section uses: PUT /admin/workspaces/:id/llm-billing-mode with
// body {mode: "byok" | "platform_managed"}.
import { describe, it, expect, vi, afterEach, beforeEach } from "vitest";
import { render, screen, cleanup, waitFor, fireEvent } from "@testing-library/react";
import React from "react";
afterEach(cleanup);
const apiGet = vi.fn();
const apiPatch = vi.fn();
const apiPut = vi.fn();
vi.mock("@/lib/api", () => ({
api: {
get: (path: string) => apiGet(path),
patch: (path: string, body: unknown) => apiPatch(path, body),
put: (path: string, body: unknown) => apiPut(path, body),
post: vi.fn(),
del: vi.fn(),
},
}));
const storeUpdateNodeData = vi.fn();
const storeRestartWorkspace = vi.fn();
vi.mock("@/store/canvas", () => ({
useCanvasStore: Object.assign(
(selector: (s: unknown) => unknown) =>
selector({ restartWorkspace: storeRestartWorkspace, updateNodeData: storeUpdateNodeData }),
{
getState: () => ({
restartWorkspace: storeRestartWorkspace,
updateNodeData: storeUpdateNodeData,
}),
},
),
}));
vi.mock("../AgentCardSection", () => ({
AgentCardSection: () => <div data-testid="agent-card-stub" />,
}));
import { ConfigTab, billingModeForProvider } from "../ConfigTab";
function wireApi(opts: { providerValue?: string | "missing" }) {
apiGet.mockImplementation((path: string) => {
if (path === `/workspaces/ws-test`) {
return Promise.resolve({ runtime: "hermes" });
}
if (path === `/workspaces/ws-test/model`) {
return Promise.resolve({ model: "nousresearch/hermes-4-70b" });
}
if (path === `/workspaces/ws-test/provider`) {
if (opts.providerValue === "missing") return Promise.reject(new Error("404"));
return Promise.resolve({
provider: opts.providerValue ?? "",
source: opts.providerValue ? "workspace_secrets" : "default",
});
}
if (path === `/workspaces/ws-test/files/config.yaml`) {
return Promise.resolve({ content: "name: ws\nruntime: hermes\n" });
}
if (path === "/templates") return Promise.resolve([]);
return Promise.reject(new Error(`unmocked api.get: ${path}`));
});
}
function billingModeCalls() {
return apiPut.mock.calls.filter(
([path]) => path === "/admin/workspaces/ws-test/llm-billing-mode",
);
}
beforeEach(() => {
apiGet.mockReset();
apiPatch.mockReset();
apiPut.mockReset();
storeUpdateNodeData.mockReset();
storeRestartWorkspace.mockReset();
});
describe("billingModeForProvider — pure mapping (internal#703 Gap 2)", () => {
// Platform / empty → platform_managed. Empty means "no explicit
// override → inherit", which resolves to platform on the backend, so
// it must NOT flip the workspace into byok.
it("maps Platform and empty to platform_managed", () => {
expect(billingModeForProvider("platform")).toBe("platform_managed");
expect(billingModeForProvider("")).toBe("platform_managed");
expect(billingModeForProvider(" ")).toBe("platform_managed");
expect(billingModeForProvider("PLATFORM")).toBe("platform_managed");
});
// Every non-Platform provider → byok. If this regresses to returning
// platform_managed for a vendor, BYOK silently no-ops again (#703).
it("maps non-Platform providers to byok", () => {
expect(billingModeForProvider("anthropic-oauth")).toBe("byok"); // Claude Code subscription
expect(billingModeForProvider("anthropic")).toBe("byok"); // Anthropic API key
expect(billingModeForProvider("minimax")).toBe("byok");
expect(billingModeForProvider("openrouter")).toBe("byok");
expect(billingModeForProvider("openai")).toBe("byok");
});
});
describe("ConfigTab — provider change drives billing_mode (internal#703 Gap 2)", () => {
// The core fix: picking a non-Platform provider (here "anthropic-oauth"
// = Claude Code subscription OAuth) from a fresh/empty provider must
// PUT mode=byok to the per-tenant llm-billing-mode endpoint. This is
// the exact path that was missing — the credential env saved but the
// billing mode never followed, so the proxy stayed engaged.
it("PUTs mode=byok when switching to a non-Platform provider", async () => {
wireApi({ providerValue: "" });
apiPut.mockResolvedValue({ status: "saved" });
render(<ConfigTab workspaceId="ws-test" />);
const input = await screen.findByTestId("provider-input");
fireEvent.change(input, { target: { value: "anthropic-oauth" } });
fireEvent.click(screen.getByRole("button", { name: /^save$/i }));
await waitFor(() => {
const calls = billingModeCalls();
expect(calls.length).toBe(1);
expect(calls[0][1]).toEqual({ mode: "byok" });
});
// Provider credential PUT still happens too (independent endpoint).
expect(
apiPut.mock.calls.some(([path]) => path === "/workspaces/ws-test/provider"),
).toBe(true);
});
// Switching FROM a byok provider back TO Platform must PUT
// mode=platform_managed so the workspace re-engages the proxy and stops
// expecting a (now-absent) vendor key.
it("PUTs mode=platform_managed when switching back to Platform", async () => {
wireApi({ providerValue: "anthropic-oauth" });
apiPut.mockResolvedValue({ status: "saved" });
render(<ConfigTab workspaceId="ws-test" />);
const input = await screen.findByTestId("provider-input");
await waitFor(() => expect((input as HTMLInputElement).value).toBe("anthropic-oauth"));
fireEvent.change(input, { target: { value: "platform" } });
fireEvent.click(screen.getByRole("button", { name: /^save$/i }));
await waitFor(() => {
const calls = billingModeCalls();
expect(calls.length).toBe(1);
expect(calls[0][1]).toEqual({ mode: "platform_managed" });
});
});
// Changing between two BYOK vendors (minimax → openrouter) keeps
// billing_mode=byok — the implied mode is unchanged, so re-PUTing it
// would be a wasteful no-op that risks an extra restart. Must NOT fire.
it("does NOT PUT billing-mode when the implied mode is unchanged", async () => {
wireApi({ providerValue: "minimax" });
apiPut.mockResolvedValue({ status: "saved" });
render(<ConfigTab workspaceId="ws-test" />);
const input = await screen.findByTestId("provider-input");
await waitFor(() => expect((input as HTMLInputElement).value).toBe("minimax"));
fireEvent.change(input, { target: { value: "openrouter" } });
fireEvent.click(screen.getByRole("button", { name: /^save$/i }));
await waitFor(() => {
// Provider PUT fires (vendor changed)...
expect(
apiPut.mock.calls.some(([path]) => path === "/workspaces/ws-test/provider"),
).toBe(true);
});
// ...but billing-mode does NOT (byok → byok is a no-op).
expect(billingModeCalls().length).toBe(0);
});
// A Save that doesn't touch the provider must not PUT billing-mode —
// editing tier/name shouldn't disturb the workspace's billing mode.
it("does NOT PUT billing-mode on a Save that leaves provider unchanged", async () => {
wireApi({ providerValue: "anthropic-oauth" });
apiPut.mockResolvedValue({ status: "saved" });
render(<ConfigTab workspaceId="ws-test" />);
await screen.findByTestId("provider-input");
// Dirty an unrelated field so Save is enabled.
const tierSelect = screen.getByLabelText(/tier/i) as HTMLSelectElement;
fireEvent.change(tierSelect, { target: { value: "3" } });
fireEvent.click(screen.getByRole("button", { name: /^save$/i }));
await waitFor(() => {
// Some PUT may fire (e.g. /model); just assert billing-mode did not.
expect(billingModeCalls().length).toBe(0);
});
});
// If the provider credential PUT itself fails, we must NOT set byok —
// flipping billing_mode while the credential write failed would leave
// the workspace expecting a key it doesn't have (worse than no-op).
it("does NOT PUT billing-mode when the provider PUT fails", async () => {
wireApi({ providerValue: "" });
apiPut.mockImplementation((path: string) => {
if (path === "/workspaces/ws-test/provider") return Promise.reject(new Error("boom"));
return Promise.resolve({ status: "saved" });
});
render(<ConfigTab workspaceId="ws-test" />);
const input = await screen.findByTestId("provider-input");
fireEvent.change(input, { target: { value: "anthropic-oauth" } });
fireEvent.click(screen.getByRole("button", { name: /^save$/i }));
await waitFor(() => {
// The provider-failure error is surfaced (getByText throws if absent).
expect(screen.getByText(/provider update failed/i)).toBeTruthy();
});
expect(billingModeCalls().length).toBe(0);
});
// If the credential saved but the billing-mode PUT is rejected, the
// user must be warned that BYOK may not take — a silent failure here
// is precisely the #703 symptom we're fixing.
it("surfaces an error when billing-mode PUT fails after a successful provider save", async () => {
wireApi({ providerValue: "" });
apiPut.mockImplementation((path: string) => {
if (path === "/admin/workspaces/ws-test/llm-billing-mode") {
return Promise.reject(new Error("403 forbidden"));
}
return Promise.resolve({ status: "saved" });
});
render(<ConfigTab workspaceId="ws-test" />);
const input = await screen.findByTestId("provider-input");
fireEvent.change(input, { target: { value: "anthropic-oauth" } });
fireEvent.click(screen.getByRole("button", { name: /^save$/i }));
await waitFor(() => {
expect(screen.getByText(/switching billing mode failed/i)).toBeTruthy();
});
});
});
@@ -335,7 +335,6 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
})
if marshalErr != nil {
log.Printf("Channels %s: json.Marshal a2aBody failed: %v", ch.ChannelType, marshalErr)
return fmt.Errorf("marshal a2a body: %w", marshalErr)
}
callerID := "channel:" + ch.ChannelType
@@ -677,7 +676,6 @@ func (m *Manager) appendHistory(ctx context.Context, key string, username, userM
})
if marshalErr != nil {
log.Printf("appendHistory %s: json.Marshal entry failed: %v", key, marshalErr)
return
}
db.RDB.LPush(ctx, key, string(entry))
db.RDB.LTrim(ctx, key, 0, int64(maxHistoryEntries-1))
@@ -163,7 +163,6 @@ func (s *SlackAdapter) sendBotMessage(ctx context.Context, config map[string]int
body, marshalErr := json.Marshal(payload)
if marshalErr != nil {
log.Printf("slack SendMessage: json.Marshal payload failed: %v", marshalErr)
return fmt.Errorf("slack: marshal payload: %w", marshalErr)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://slack.com/api/chat.postMessage", bytes.NewReader(body))
if err != nil {
@@ -482,14 +482,12 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
if apiErr.Code == 429 {
retryAfter := time.Duration(apiErr.RetryAfter) * time.Second
log.Printf("Channels: Telegram poll rate-limited, sleeping %s", retryAfter)
timer := time.NewTimer(retryAfter)
select {
case <-ctx.Done():
timer.Stop()
return nil
case <-timer.C:
case <-time.After(retryAfter):
continue
}
continue
}
if apiErr.Code == 401 {
invalidateBot(token)
@@ -497,14 +495,12 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
}
}
log.Printf("Channels: Telegram poll error: %v", err)
timer := time.NewTimer(telegramPollInterval)
select {
case <-ctx.Done():
timer.Stop()
return nil
case <-timer.C:
case <-time.After(telegramPollInterval):
continue
}
continue
}
for _, update := range updates {
@@ -426,34 +426,16 @@ func nilIfEmpty(s string) *string {
// (their next /registry/register will mint their first token, after
// which this branch never fires again for them).
//
// Post-RFC#637 addition: a request may instead be carrying a HUMAN's
// canvas-user identity (e.g. the 344a2623-… identity workspace from the
// RFC#637 rollout). That human sits OUTSIDE the workspace org hierarchy, so
// the returned isCanvasUser flag lets the A2A proxy bypass CanCommunicate for
// it. Canvas-user classification is decided by isGenuineCanvasUser using
// NON-FORGEABLE credentials only (see that function) — never by the caller's
// X-Workspace-ID alone, and never by a bare same-origin Host/Referer in a
// SaaS image (those are forgeable; see middleware.IsSameOriginCanvas).
//
// #1673: this canvas-user check is now evaluated BEFORE the HasAnyLiveToken
// peer-token contract. Previously it lived only in the !hasLive branch, so a
// canvas-user identity workspace that had acquired live tokens fell into the
// hasLive=true branch, which demands a bearer the canvas frontend never sends
// → silent 401 → the message was dropped before logA2AReceiveQueued wrote the
// activity_logs row, breaking canvas chat for poll-mode workspaces. A genuine
// canvas user is identified by the human's session/admin/org credential, which
// is independent of whether the identity workspace happens to hold peer tokens.
// Post-RFC#637 addition: when the tokenless workspace is accompanied by
// canvas or admin auth (same-origin request, admin bearer, or org-level
// token), the caller is identified as a canvas-user identity rather than
// a legacy peer agent. The returned isCanvasUser flag lets the A2A proxy
// bypass CanCommunicate for human users, who sit outside the workspace
// hierarchy.
//
// On auth failure this writes the 401 via c and returns an error so the
// handler aborts without running the proxy.
func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) (isCanvasUser bool, err error) {
// Genuine canvas-user identity? Decided independently of the caller
// workspace's token state (the #1673 fix) and using only non-forgeable
// signals (the #1944 escalation guard).
if isGenuineCanvasUser(ctx, c) {
return true, nil
}
hasLive, dbErr := wsauth.HasAnyLiveToken(ctx, db.DB, callerID)
if dbErr != nil {
// Fail-open here matches the heartbeat path — A2A caller auth is
@@ -464,10 +446,22 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) (
return false, nil
}
if !hasLive {
// Tokenless, non-canvas-user workspace — legacy / pre-upgrade peer.
// Grandfather it through (its next /registry/register mints its
// first token, after which it lands in the hasLive=true branch).
return false, nil
// Tokenless workspace — could be legacy/pre-upgrade caller or
// canvas-user identity. Distinguish by request auth signals.
if middleware.IsSameOriginCanvas(c) {
return true, nil
}
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
if tok != "" {
adminSecret := os.Getenv("ADMIN_TOKEN")
if adminSecret != "" && subtle.ConstantTimeCompare([]byte(tok), []byte(adminSecret)) == 1 {
return true, nil
}
if _, _, _, err := orgtoken.Validate(ctx, db.DB, tok); err == nil {
return true, nil
}
}
return false, nil // legacy / pre-upgrade caller
}
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
if tok == "" {
@@ -481,61 +475,6 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) (
return false, nil
}
// isGenuineCanvasUser reports whether the request is a real human acting
// through the canvas UI (RFC#637 canvas-user identity), as opposed to a peer
// workspace agent. A true result lets the A2A proxy bypass CanCommunicate, so
// it MUST only accept signals an attacker on the platform network cannot forge:
//
// - A control-plane-verified canvas session: the WorkOS session cookie is
// confirmed upstream to belong to a MEMBER of THIS tenant's org
// (middleware.IsVerifiedCanvasSession → /cp/auth/tenant-member). This is
// the production SaaS canvas path.
// - An Authorization: Bearer matching ADMIN_TOKEN (break-glass / molecli).
// - An Authorization: Bearer matching a live org_api_tokens row (user-minted
// org-scoped API token).
//
// Deliberately NOT accepted as a canvas-user signal in a SaaS image:
//
// - A bare same-origin Host/Referer/Origin (middleware.IsSameOriginCanvas).
// Those headers are trivially forgeable by any container on the Docker
// network, and the combined-tenant image (CANVAS_PROXY_URL set) is exactly
// where a forged Referer + an arbitrary X-Workspace-ID could otherwise
// bypass CanCommunicate and reach cross-workspace A2A — the PR #1944
// privilege escalation. Same-origin is only honored as a fallback when CP
// session verification is NOT configured (self-hosted / dev), a
// single-tenant topology with no cross-tenant boundary to escalate across;
// even there the org hierarchy still owns intra-org routing.
//
// Note this classification is about the human's credential, not the caller
// workspace's X-Workspace-ID — so it never trusts an attacker-supplied caller
// ID, and it is independent of whether that workspace holds peer tokens.
func isGenuineCanvasUser(ctx context.Context, c *gin.Context) bool {
// Production SaaS: control-plane-verified org-member session cookie.
if middleware.IsVerifiedCanvasSession(c) {
return true
}
if tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization")); tok != "" {
adminSecret := os.Getenv("ADMIN_TOKEN")
if adminSecret != "" && subtle.ConstantTimeCompare([]byte(tok), []byte(adminSecret)) == 1 {
return true
}
if _, _, _, err := orgtoken.Validate(ctx, db.DB, tok); err == nil {
return true
}
}
// Self-hosted / dev fallback ONLY: when upstream session verification is
// not configured there is no verified-cookie signal to use, and the
// deployment is single-tenant, so the forgeable same-origin check is an
// acceptable canvas signal. In SaaS (CP session configured) this branch is
// skipped, closing the forged-same-origin escalation.
if !middleware.CPSessionConfigured() && middleware.IsSameOriginCanvas(c) {
return true
}
return false
}
// errInvalidCallerToken is a sentinel for validateCallerToken's "missing
// token" branch so the handler-level guard can detect it without string
// matching (the wsauth errors are typed for the invalid case).
@@ -11,7 +11,6 @@ import (
"net/http"
"net/http/httptest"
"os"
"os/exec"
"strings"
"testing"
"time"
@@ -1245,12 +1244,13 @@ func TestValidateCallerToken_WrongWorkspaceBindingRejected(t *testing.T) {
}
func TestValidateCallerToken_CanvasUser_AdminToken(t *testing.T) {
setupTestDB(t)
mock := setupTestDB(t)
setupTestRedis(t)
// #1673/#1944: the genuine-canvas-user check (admin bearer here) now runs
// BEFORE HasAnyLiveToken, so no SELECT COUNT(*) is issued — the human's
// credential, not the caller workspace's token state, decides canvas-user.
// Tokenless workspace
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
WithArgs("ws-canvas-admin").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
t.Setenv("ADMIN_TOKEN", "admin-secret-42")
@@ -1276,9 +1276,10 @@ func TestValidateCallerToken_CanvasUser_OrgToken(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
// #1673/#1944: the genuine-canvas-user check (org token here) now runs
// BEFORE HasAnyLiveToken, so the first DB query is orgtoken.Validate's
// lookup — there is no SELECT COUNT(*) expectation anymore.
// Tokenless workspace
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
WithArgs("ws-canvas-org").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// orgtoken.Validate lookup
mock.ExpectQuery(`SELECT id, prefix, org_id FROM org_api_tokens WHERE token_hash = .* AND revoked_at IS NULL`).
@@ -2340,197 +2341,6 @@ func TestProxyA2A_PollMode_ShortCircuits_NoSSRF_NoDispatch(t *testing.T) {
}
}
// stubVerifiedCPSession points VerifiedCPSession at a stub control-plane that
// confirms the given cookie belongs to a tenant-member, so tests can exercise
// the genuine (non-forgeable) canvas-session path end-to-end without a live CP.
// It sets CP_UPSTREAM_URL + MOLECULE_ORG_SLUG for the test's lifetime; the
// real middleware.VerifiedCPSession HTTP+cache code path runs unchanged.
func stubVerifiedCPSession(t *testing.T, member bool) {
t.Helper()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if member {
fmt.Fprint(w, `{"member":true,"user_id":"user-canvas-1"}`)
} else {
w.WriteHeader(http.StatusForbidden)
fmt.Fprint(w, `{"member":false}`)
}
}))
t.Cleanup(srv.Close)
t.Setenv("CP_UPSTREAM_URL", srv.URL)
t.Setenv("MOLECULE_ORG_SLUG", "test-tenant")
}
// TestProxyA2A_PollMode_CanvasUserWithVerifiedSession is the #1673 regression
// guard. A poll-mode canvas-user identity workspace that HAS acquired live
// tokens (the exact condition that made #1673 fire) sends a canvas message
// carrying a control-plane-verified session cookie but no bearer token. The
// fix must classify it as a canvas user BEFORE the HasAnyLiveToken peer-token
// contract, so the request is queued (200) and logA2AReceiveQueued writes the
// activity_logs row — instead of the pre-fix silent 401 that dropped the
// message before any row landed (breaking canvas chat + chat-history).
//
// Runs in a subprocess with CANVAS_PROXY_URL set so middleware.canvasProxyActive
// is true at package-init time (matching the combined-tenant image), proving the
// fix does not depend on disabling same-origin detection.
func TestProxyA2A_PollMode_CanvasUserWithVerifiedSession(t *testing.T) {
if os.Getenv("CANVAS_PROXY_URL") == "" {
cmd := exec.Command(os.Args[0], "-test.run=^TestProxyA2A_PollMode_CanvasUserWithVerifiedSession$", "-test.v")
cmd.Env = append(os.Environ(), "CANVAS_PROXY_URL=http://localhost")
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("subprocess test failed: %v\n%s", err, out)
}
return
}
stubVerifiedCPSession(t, true)
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
const wsTarget = "ws-poll-canvas-target"
const wsCanvasUser = "ws-canvas-user-344a"
// CRUCIAL: no SELECT COUNT(*) FROM workspace_auth_tokens expectation. The
// genuine-canvas-user check (verified session) must short-circuit BEFORE
// HasAnyLiveToken — that is the #1673 regression path. An identity
// workspace that already holds live tokens must NOT fall into the
// hasLive=true bearer-required branch.
// isCanvasUser=true → CanCommunicate is skipped (no parent_id lookups).
expectBudgetCheck(mock, wsTarget)
mock.ExpectQuery("SELECT delivery_mode FROM workspaces WHERE id").
WithArgs(wsTarget).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll"))
// logA2AReceiveQueued must fire synchronously and write the row.
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsTarget}}
body := `{"jsonrpc":"2.0","id":"canvas-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hello from canvas"}]}}}`
req := httptest.NewRequest("POST", "/workspaces/"+wsTarget+"/a2a", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Workspace-ID", wsCanvasUser)
// Verified canvas session cookie (the genuine, non-forgeable signal).
req.Header.Set("Cookie", "wos-session=valid-canvas-session-cookie")
// Same-origin headers, present as a real canvas request would send them —
// but they are NOT what authorizes the bypass here (the verified session is).
req.Host = "localhost"
req.Header.Set("Referer", "https://localhost/")
c.Request = req
handler.ProxyA2A(c)
time.Sleep(50 * time.Millisecond)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 (queued) for canvas-user with verified session, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response is not valid JSON: %v", err)
}
if resp["status"] != "queued" {
t.Errorf("response.status = %v, want %q", resp["status"], "queued")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations (activity_logs row must be written): %v", err)
}
}
// TestProxyA2A_ForgedSameOrigin_CannotBypassCanCommunicate is the security
// crux of the #1673 fix and the reason PR #1944 was held. In the combined-
// tenant SaaS image (CANVAS_PROXY_URL set, CP session verification configured),
// an attacker forges a same-origin request — correct Host + a matching
// `Referer: https://<host>/` — and supplies an arbitrary X-Workspace-ID naming
// a workspace it does not control, targeting a workspace it is NOT authorized
// to reach. It presents NO verified session cookie, NO admin token, NO org
// token.
//
// PR #1944's same-origin bypass would have classified this as a canvas user and
// skipped CanCommunicate, granting cross-workspace A2A — a privilege
// escalation. The safe fix must instead fall through to the standard
// peer-token contract and CanCommunicate, which rejects the cross-hierarchy
// call with 403. This test proves the escalation is closed.
func TestProxyA2A_ForgedSameOrigin_CannotBypassCanCommunicate(t *testing.T) {
if os.Getenv("CANVAS_PROXY_URL") == "" {
cmd := exec.Command(os.Args[0], "-test.run=^TestProxyA2A_ForgedSameOrigin_CannotBypassCanCommunicate$", "-test.v")
cmd.Env = append(os.Environ(), "CANVAS_PROXY_URL=http://localhost")
out, err := cmd.CombinedOutput()
if err != nil {
t.Fatalf("subprocess test failed: %v\n%s", err, out)
}
return
}
// SaaS image with CP session verification configured. The stub CP rejects
// any cookie as a non-member; the attacker sends none anyway. This asserts
// that with verification configured, same-origin alone is NOT a canvas
// signal (CPSessionConfigured()==true disables the dev fallback).
stubVerifiedCPSession(t, false)
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
const wsTarget = "ws-victim-target"
const wsForgedCaller = "ws-attacker-caller"
// validateCallerToken: not a genuine canvas user (no verified session, no
// admin/org token, and the dev same-origin fallback is disabled in SaaS).
// So it consults the peer-token contract: HasAnyLiveToken for the forged
// caller. Return 0 → tokenless legacy peer → grandfathered through token
// validation (isCanvasUser stays false). The request must then still be
// gated by CanCommunicate.
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
WithArgs(wsForgedCaller).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// CanCommunicate MUST run (the escalation guard) and DENY: caller and
// target sit under different parents.
mockCanCommunicate(mock, wsForgedCaller, wsTarget, false)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsTarget}}
body := `{"jsonrpc":"2.0","id":"exploit-1","method":"message/send","params":{"message":{"role":"user","parts":[{"text":"cross-workspace exploit"}]}}}`
req := httptest.NewRequest("POST", "/workspaces/"+wsTarget+"/a2a", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
// Arbitrary caller workspace the attacker does not own.
req.Header.Set("X-Workspace-ID", wsForgedCaller)
// Forged same-origin signals (the #1944 bypass vector).
req.Host = "localhost"
req.Header.Set("Referer", "https://localhost/")
req.Header.Set("Origin", "https://localhost")
// No Cookie / Authorization — no genuine canvas credential.
c.Request = req
handler.ProxyA2A(c)
if w.Code != http.StatusForbidden {
t.Fatalf("ESCALATION NOT CLOSED: forged same-origin + arbitrary X-Workspace-ID "+
"reached an unauthorized target with status %d (want 403): %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("body not JSON: %v", err)
}
if !strings.Contains(fmt.Sprint(resp["error"]), "access denied") {
t.Errorf("expected an access-denied error from CanCommunicate, got %v", resp["error"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations — CanCommunicate must have been consulted: %v", err)
}
}
// TestProxyA2A_PushMode_NoShortCircuit verifies the symmetric contract:
// a push-mode workspace (default) is NOT affected by the new short-circuit.
// It still proceeds to resolveAgentURL + dispatch. Without this guard, a
@@ -425,7 +425,6 @@ func (h *WorkspaceHandler) stitchDrainResponseToDelegation(ctx context.Context,
})
if marshalErr != nil {
log.Printf("a2aQueue stitch %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
return
}
res, err := db.DB.ExecContext(ctx, `
UPDATE activity_logs
@@ -153,15 +153,7 @@ func queueRowAuthFields(ctx context.Context, queueID string) (callerID, workspac
if err != nil {
return "", "", err
}
callerID = ""
if callerNS.Valid {
callerID = callerNS.String
}
workspaceID = ""
if workspaceNS.Valid {
workspaceID = workspaceNS.String
}
return callerID, workspaceID, nil
return callerNS.String, workspaceNS.String, nil
}
// GetA2AQueueStatus handles GET /workspaces/:id/a2a/queue/:queue_id.
@@ -1,62 +1,9 @@
package handlers
import (
"context"
"testing"
"github.com/DATA-DOG/go-sqlmock"
)
// TestQueueRowAuthFields_NilSafeScan proves queueRowAuthFields returns empty
// strings (not a panic / garbage) when the a2a_queue row has NULL caller_id
// or workspace_id. Before the fix it dereferenced NullString.String directly,
// which is only the zero value when Valid is false but masked the NULL-vs-""
// distinction; the guard makes the intent explicit and safe.
func TestQueueRowAuthFields_NilSafeScan(t *testing.T) {
mock := setupTestDB(t)
queueID := "queue-123"
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs(queueID).
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).AddRow(nil, nil))
caller, workspace, err := queueRowAuthFields(context.Background(), queueID)
if err != nil {
t.Fatalf("queueRowAuthFields returned error: %v", err)
}
if caller != "" {
t.Errorf("callerID = %q, want empty string for NULL caller_id", caller)
}
if workspace != "" {
t.Errorf("workspaceID = %q, want empty string for NULL workspace_id", workspace)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestQueueRowAuthFields_PopulatedRow confirms the non-NULL path still returns
// the scanned values unchanged.
func TestQueueRowAuthFields_PopulatedRow(t *testing.T) {
mock := setupTestDB(t)
queueID := "queue-456"
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id = \$1`).
WithArgs(queueID).
WillReturnRows(sqlmock.NewRows([]string{"caller_id", "workspace_id"}).AddRow("caller-x", "ws-y"))
caller, workspace, err := queueRowAuthFields(context.Background(), queueID)
if err != nil {
t.Fatalf("queueRowAuthFields returned error: %v", err)
}
if caller != "caller-x" || workspace != "ws-y" {
t.Fatalf("got caller=%q workspace=%q, want caller-x / ws-y", caller, workspace)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestExtractExpiresInSeconds covers the JSON parser used at enqueue time
// to honor a caller-specified TTL. Zero return = "no TTL" — caller leaves
// expires_at NULL on the queue row.
@@ -167,7 +167,6 @@ func (w *AgentMessageWriter) Send(
respJSON, marshalErr := json.Marshal(respPayload)
if marshalErr != nil {
log.Printf("AgentMessageWriter %s: json.Marshal respPayload failed: %v", workspaceID, marshalErr)
return nil
}
preview := textutil.TruncateRunes(message, 80)
if _, err := w.db.ExecContext(ctx, `
@@ -347,7 +347,6 @@ func computeAuditHMAC(key []byte, ev *auditEventRow) string {
payload, marshalErr := json.Marshal(canonical) // compact, sorted keys
if marshalErr != nil {
log.Printf("auditChainHash: json.Marshal canonical failed: %v", marshalErr)
return ""
}
mac := hmac.New(sha256.New, key)
mac.Write(payload)
@@ -172,14 +172,10 @@ func (h *ChannelHandler) Create(c *gin.Context) {
configJSON, marshalErr := json.Marshal(body.Config)
if marshalErr != nil {
log.Printf("Channels create %s: json.Marshal config failed: %v", workspaceID, marshalErr)
c.JSON(http.StatusInternalServerError, gin.H{"error": "marshal config failed"})
return
}
allowedJSON, marshalErr := json.Marshal(body.AllowedUsers)
if marshalErr != nil {
log.Printf("Channels create %s: json.Marshal allowed_users failed: %v", workspaceID, marshalErr)
c.JSON(http.StatusInternalServerError, gin.H{"error": "marshal allowed_users failed"})
return
}
enabled := true
if body.Enabled != nil {
@@ -238,8 +234,6 @@ func (h *ChannelHandler) Update(c *gin.Context) {
j, marshalErr := json.Marshal(body.Config)
if marshalErr != nil {
log.Printf("Channels update %s: json.Marshal config failed: %v", workspaceID, marshalErr)
c.JSON(http.StatusInternalServerError, gin.H{"error": "marshal config failed"})
return
}
configArg = string(j)
}
@@ -247,8 +241,6 @@ func (h *ChannelHandler) Update(c *gin.Context) {
j, marshalErr := json.Marshal(body.AllowedUsers)
if marshalErr != nil {
log.Printf("Channels update %s: json.Marshal allowed_users failed: %v", workspaceID, marshalErr)
c.JSON(http.StatusInternalServerError, gin.H{"error": "marshal allowed_users failed"})
return
}
allowedArg = string(j)
}
@@ -60,14 +60,12 @@ func pushDelegationResultToInbox(ctx context.Context, sourceID, delegationID, st
respJSON, marshalErr := json.Marshal(respPayload)
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal respPayload failed: %v", delegationID, marshalErr)
return
}
reqJSON, marshalErr := json.Marshal(map[string]interface{}{
"delegation_id": delegationID,
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal reqPayload failed: %v", delegationID, marshalErr)
return
}
logStatus := "ok"
if status == "failed" {
@@ -321,7 +319,6 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal taskJSON failed: %v", delegationID, marshalErr)
return insertTrackingUnavailable
}
// Store delegation_id in response_body so agent check_delegation_status
// (which reads response_body->>delegation_id) can locate this row even
@@ -331,7 +328,6 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
return insertTrackingUnavailable
}
var idemArg interface{}
if body.IdempotencyKey != "" {
@@ -435,12 +431,10 @@ func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, tar
if proxyErr != nil && isTransientProxyError(proxyErr) && len(respBody) == 0 {
log.Printf("Delegation %s: first attempt failed (%s) — retrying in %s after reactive URL refresh",
delegationID, proxyErr.Error(), delegationRetryDelay)
timer := time.NewTimer(delegationRetryDelay)
select {
case <-ctx.Done():
timer.Stop()
// outer timeout hit before retry window elapsed
case <-timer.C:
case <-time.After(delegationRetryDelay):
status, respBody, proxyErr = h.workspace.proxyA2ARequest(ctx, targetID, a2aBody, sourceID, true, false)
}
}
@@ -511,13 +505,12 @@ handleSuccess:
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal queuedJSON failed: %v", delegationID, marshalErr)
} else {
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'queued')
`, sourceID, sourceID, targetID, "Delegation queued — target at capacity", string(queuedJSON)); err != nil {
log.Printf("Delegation %s: failed to insert queued log: %v", delegationID, err)
}
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'queued')
`, sourceID, sourceID, targetID, "Delegation queued — target at capacity", string(queuedJSON)); err != nil {
log.Printf("Delegation %s: failed to insert queued log: %v", delegationID, err)
}
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationStatus), sourceID, map[string]interface{}{
"delegation_id": delegationID, "target_id": targetID, "status": "queued",
@@ -538,13 +531,12 @@ handleSuccess:
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
} else {
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'completed')
`, sourceID, sourceID, targetID, "Delegation completed ("+textutil.TruncateBytes(responseText, 80)+")", string(respJSON)); err != nil {
log.Printf("Delegation %s: failed to insert success log: %v", delegationID, err)
}
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, response_body, status)
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4, $5::jsonb, 'completed')
`, sourceID, sourceID, targetID, "Delegation completed ("+textutil.TruncateBytes(responseText, 80)+")", string(respJSON)); err != nil {
log.Printf("Delegation %s: failed to insert success log: %v", delegationID, err)
}
log.Printf("Delegation %s: step=recording_ledger_completed", delegationID)
@@ -627,8 +619,6 @@ func (h *DelegationHandler) Record(c *gin.Context) {
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal taskJSON failed: %v", body.DelegationID, marshalErr)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to marshal task"})
return
}
// Store delegation_id in response_body so agent check_delegation_status
// can locate this row. Fixes mc#984.
@@ -637,8 +627,6 @@ func (h *DelegationHandler) Record(c *gin.Context) {
})
if marshalErr != nil {
log.Printf("Delegation %s: json.Marshal respJSON failed: %v", body.DelegationID, marshalErr)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to marshal response"})
return
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status)
@@ -709,13 +697,12 @@ func (h *DelegationHandler) UpdateStatus(c *gin.Context) {
})
if marshalErr != nil {
log.Printf("Delegation UpdateStatus %s: json.Marshal respJSON failed: %v", delegationID, marshalErr)
} else {
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, response_body, status)
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4::jsonb, 'completed')
`, sourceID, sourceID, "Delegation completed ("+textutil.TruncateBytes(body.ResponsePreview, 80)+")", string(respJSON)); err != nil {
log.Printf("Delegation UpdateStatus: result insert failed for %s: %v", delegationID, err)
}
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, response_body, status)
VALUES ($1, 'delegation', 'delegate_result', $2, $3, $4::jsonb, 'completed')
`, sourceID, sourceID, "Delegation completed ("+textutil.TruncateBytes(body.ResponsePreview, 80)+")", string(respJSON)); err != nil {
log.Printf("Delegation UpdateStatus: result insert failed for %s: %v", delegationID, err)
}
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventDelegationComplete), sourceID, map[string]interface{}{
"delegation_id": delegationID,
@@ -0,0 +1,174 @@
package handlers
// exec_eic.go — one-shot, non-interactive command execution on a SaaS
// workspace EC2 over the EIC tunnel. Part 1 of RFC internal#742
// (tenant-scoped workspace exec API).
//
// This is the NON-interactive sibling of terminal.go's PTY path. It is
// modelled on readFileViaEIC (template_files_eic.go): acquire a pooled
// EIC SSH session via withEICTunnel, run a single remote command, capture
// stdout/stderr/exit, tear the session down. The only differences from
// readFileViaEIC are (a) the remote command is the caller's argv rather
// than a fixed `cat`, and (b) we capture the exit code and an output cap
// instead of mapping not-found to os.ErrNotExist.
//
// Safety boundary lives in the caller (exec_handler.go): argv-only,
// timeout clamp, tier gate, audit. This file only owns "run argv N over
// the pooled session and hand back stdout/stderr/exit".
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"os/exec"
"strings"
)
// execOutputCap bounds the stdout (and, separately, stderr) bytes we
// buffer and return for a single exec. 1 MiB each. Past the cap we stop
// appending and set truncated=true; the handler emits an explicit
// truncation marker so the caller knows output was cut rather than empty.
//
// The cap protects the workspace-server process: a `cat /dev/zero` or a
// runaway build log could otherwise OOM the box. We bound each stream
// independently so a chatty-stderr command can't crowd out stdout.
const execOutputCap = 1 << 20 // 1 MiB
// execResult is the outcome of one non-interactive remote command.
type execResult struct {
Stdout []byte
Stderr []byte
ExitCode int
StdoutTruncated bool
StderrTruncated bool
}
// runEICExec runs argv over an already-open EIC session and returns the
// captured streams + exit code. Wrapped in a package-level var (matching
// the withEICTunnel / poolSetupTunnel pattern) so the handler test can
// stub the actual remote execution without a real sshd — the EIC wiring
// (withEICTunnel) and this command runner are the two seams a test needs.
//
// Exit-code extraction: ssh propagates the remote command's exit status
// as its own exit status, so an *exec.ExitError carries the real remote
// code. A non-ExitError (ssh failed to connect, context killed) is a
// transport error and is returned as err with exitCode=-1.
var runEICExec = realRunEICExec
func realRunEICExec(ctx context.Context, s eicSSHSession, argv []string) (execResult, error) {
remote := buildExecRemoteCommand(argv)
sshCmd := exec.CommandContext(ctx, "ssh", s.sshArgs(remote)...)
sshCmd.Env = os.Environ()
stdout := &cappedBuffer{cap: execOutputCap}
stderr := &cappedBuffer{cap: execOutputCap}
sshCmd.Stdout = stdout
sshCmd.Stderr = stderr
err := sshCmd.Run()
res := execResult{
Stdout: stdout.Bytes(),
Stderr: stderr.Bytes(),
StdoutTruncated: stdout.truncated,
StderrTruncated: stderr.truncated,
}
if err == nil {
res.ExitCode = 0
return res, nil
}
// A non-zero remote exit comes back as *exec.ExitError — that is a
// SUCCESSFUL exec with a non-zero code, not a transport failure.
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
res.ExitCode = exitErr.ExitCode()
return res, nil
}
// Anything else (ssh couldn't connect, context deadline killed the
// subprocess) is a real transport error.
res.ExitCode = -1
return res, err
}
// buildExecRemoteCommand turns the caller's argv into a single remote
// command string for ssh. ssh always concatenates its trailing args into
// one string and hands it to the login shell, so to run argv literally
// (no implicit shell word-splitting on the caller's behalf) we shell-quote
// every element and join with spaces. The login shell then re-splits on
// those quotes back into the exact argv.
//
// This is what enforces "argv-only, no implicit shell": if argv[0] is
// e.g. ["ls","-l","/a b"] the remote runs `ls` with two args and the path
// "/a b" stays one token. If the caller WANTS a shell they pass
// ["bash","-lc","..."] explicitly — argv[0] is then a shell by their
// choice, which the handler permits.
func buildExecRemoteCommand(argv []string) string {
quoted := make([]string, len(argv))
for i, a := range argv {
quoted[i] = shellQuote(a)
}
return strings.Join(quoted, " ")
}
// cappedBuffer is an io.Writer that buffers up to cap bytes and silently
// drops the rest, recording that truncation happened. Used to bound the
// per-stream memory a single exec can consume.
type cappedBuffer struct {
buf bytes.Buffer
cap int
truncated bool
}
func (b *cappedBuffer) Write(p []byte) (int, error) {
// Always report the full length written so the upstream copier
// (ssh subprocess plumbing) doesn't error with a short-write; we
// just don't retain bytes past the cap.
if b.buf.Len() >= b.cap {
b.truncated = true
return len(p), nil
}
room := b.cap - b.buf.Len()
if len(p) <= room {
return b.buf.Write(p)
}
b.buf.Write(p[:room])
b.truncated = true
return len(p), nil
}
func (b *cappedBuffer) Bytes() []byte { return b.buf.Bytes() }
// execViaEIC runs argv non-interactively on the workspace EC2 identified
// by instanceID, over a pooled EIC SSH session, and returns the captured
// result. Mirrors readFileViaEIC's structure: derive a per-op deadline,
// open the tunnel via withEICTunnel, run the command inside the closure.
//
// The caller (exec_handler.go) is responsible for argv validation, the
// timeout clamp, the tier gate, and the audit event. This function only
// owns the EIC plumbing + the run.
func execViaEIC(ctx context.Context, instanceID string, argv []string) (execResult, error) {
if len(argv) == 0 {
return execResult{}, fmt.Errorf("exec: empty argv")
}
var res execResult
runErr := withEICTunnel(ctx, instanceID, func(s eicSSHSession) error {
r, err := runEICExec(ctx, s, argv)
res = r
if err != nil {
// Distinguish a deadline-killed exec from a generic transport
// error so the handler can surface a timeout cleanly (the same
// shape writeFileViaEIC uses for internal#423).
if cerr := ctx.Err(); cerr != nil {
return fmt.Errorf("exec: command did not finish: %w", cerr)
}
return fmt.Errorf("exec: ssh transport: %w", err)
}
return nil
})
if runErr != nil {
return res, runErr
}
return res, nil
}
@@ -0,0 +1,53 @@
package handlers
// exec_eic_test.go — pure-function coverage for the EIC exec layer:
// argv → remote-command quoting, and the per-stream output cap.
import (
"strings"
"testing"
)
func TestBuildExecRemoteCommand_QuotesEachArg(t *testing.T) {
// A space in an argument must stay one token after the remote shell
// re-splits — proving we don't leak the caller's argv into implicit
// word-splitting.
got := buildExecRemoteCommand([]string{"ls", "-l", "/a b"})
want := `'ls' '-l' '/a b'`
if got != want {
t.Fatalf("quoting mismatch:\n got %q\nwant %q", got, want)
}
}
func TestBuildExecRemoteCommand_EscapesSingleQuotes(t *testing.T) {
// An embedded single quote must be escaped so it can't break out of
// the quoting and inject extra shell tokens.
got := buildExecRemoteCommand([]string{"echo", "a'b"})
if !strings.Contains(got, `'a'\''b'`) {
t.Fatalf("embedded single-quote not escaped: %q", got)
}
}
func TestCappedBuffer_TruncatesAtCap(t *testing.T) {
b := &cappedBuffer{cap: 8}
n, _ := b.Write([]byte("12345"))
if n != 5 || b.truncated {
t.Fatalf("first write under cap should not truncate; n=%d truncated=%v", n, b.truncated)
}
// This write crosses the cap (5 + 6 = 11 > 8): keep 3 more, drop rest.
n, _ = b.Write([]byte("abcdef"))
if n != 6 {
t.Fatalf("Write must report full length to avoid short-write errors; n=%d", n)
}
if !b.truncated {
t.Fatalf("expected truncated=true after crossing cap")
}
if got := string(b.Bytes()); got != "12345abc" {
t.Fatalf("capped content mismatch: got %q want %q", got, "12345abc")
}
// A further write past a full buffer stays dropped + truncated.
b.Write([]byte("zzz"))
if got := string(b.Bytes()); got != "12345abc" {
t.Fatalf("post-cap write must be dropped: got %q", got)
}
}
@@ -0,0 +1,332 @@
package handlers
// exec_handler.go — POST /workspaces/:id/exec. Part 1 of RFC internal#742
// (tenant-scoped workspace exec API).
//
// A first-class, audited, tenant-token-gated one-shot exec endpoint that
// runs an argv non-interactively on the workspace EC2 via the EXISTING EIC
// tunnel broker (execViaEIC → withEICTunnel → pooled session). It is the
// non-interactive sibling of /terminal (interactive PTY) and reuses the
// same EIC plumbing readFileViaEIC uses.
//
// Authz layering (no new auth surface invented):
// 1. TenantGuard (router middleware) — process is one-per-org, so a
// request for a sibling org's machine is already bounced 404 before
// it reaches any handler.
// 2. WorkspaceAuth (wsAuth group) — the bearer/org-token/session must be
// valid for THIS :id. A per-workspace token is bound to its own
// workspace; an org token is bound to its org. This is what stops an
// org token from naming a sibling org's instance id: the row is
// looked up by :id and that :id had to pass WorkspaceAuth for the
// presented credential.
// 3. Tier gate (this handler) — host exec requires a host-control tier
// (T3/T4). A standard/sandboxed (T1/T2) workspace gets 403, not host
// exec. See execMinHostControlTier for the reviewer caveat.
//
// Audit: exactly one activity_logs row per exec via LogActivity — actor,
// argv, exit code, duration. NEVER stdout/stderr content (those are
// streamed to the caller but never persisted), mirroring the
// secret-redaction contract used by the memory export/import paths.
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
"github.com/gin-gonic/gin"
)
// execMaxTimeoutSeconds is the hard ceiling on a single exec. A caller
// asking for more is clamped down to this. 120s matches the RFC bound; it
// is also comfortably above the poolTTL/SendSSHPublicKey 60s grant — see
// the reviewer note in the PR: a >60s command may outlive its pooled key
// and need a fresh tunnel mid-run, which the pool handles by rebuilding,
// but the single open ssh subprocess for THIS exec keeps running on the
// already-authenticated channel, so the 120s ceiling is honoured.
const execMaxTimeoutSeconds = 120
// execDefaultTimeoutSeconds is used when the caller omits timeout_s.
const execDefaultTimeoutSeconds = 30
// execMaxArgvLen caps the number of argv elements. Defence against a
// pathological body; a legitimate command is well under this.
const execMaxArgvLen = 256
// execMinHostControlTier is the minimum workspace tier permitted to run
// host exec. Tiers 3 (privileged) and 4 (full host access) are the
// host-control tiers in this codebase (see provisioner.ApplyTierConfig);
// tiers 1 (sandboxed/readonly) and 2 (standard) are NOT, so they get 403.
//
// REVIEWER NOTE (flagged in the PR): the RFC asked to "mirror the existing
// capability check used for memory.write / update_agent_card". There is no
// such named capability constant in workspace-server today — memory.Set
// and registry.UpdateCard are gated only by WorkspaceAuth, not by a
// tier/capability check. The closest REAL, persisted host-control signal
// is the workspaces.tier column, which is exactly what decides whether the
// container/instance got privileged/host access at provision time. Gating
// host exec on tier >= 3 keeps "a lower-tier/read-only workspace must get
// 403, not host exec" honest against the actual provisioning model. If the
// team prefers a dedicated capability column, that is a follow-up; this
// handler reads tier through one indirection (execTierGate) so swapping the
// gate is a one-function change.
const execMinHostControlTier = 3
// ExecHandler serves POST /workspaces/:id/exec.
type ExecHandler struct {
broadcaster events.EventEmitter
}
// NewExecHandler builds the handler. broadcaster may be nil (tests /
// non-broadcast deploys) — LogActivity tolerates a nil emitter.
func NewExecHandler(broadcaster events.EventEmitter) *ExecHandler {
return &ExecHandler{broadcaster: broadcaster}
}
// execTierGate is a package-level var so tests can drive the tier-denied
// path without standing up a full sqlmock row. Production reads
// instance_id + tier from the workspaces row in one query.
//
// Returns (instanceID, tier, found). found=false means no such workspace.
var execTierGate = realExecTierGate
func realExecTierGate(c *gin.Context, workspaceID string) (instanceID string, tier int, found bool) {
if db.DB == nil {
return "", 0, false
}
err := db.DB.QueryRowContext(c.Request.Context(),
`SELECT COALESCE(instance_id, ''), COALESCE(tier, 0) FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&instanceID, &tier)
if err != nil {
return "", 0, false
}
return instanceID, tier, true
}
// Exec handles POST /workspaces/:id/exec.
func (h *ExecHandler) Exec(c *gin.Context) {
workspaceID := c.Param("id")
// 1. Body + argv validation. We unmarshal into RawMessage first so we
// can reject a bare-string `cmd` ("cmd":"rm -rf /") explicitly with a
// 400 rather than letting it silently fail to bind — the RFC requires
// argv-only.
var raw struct {
Cmd json.RawMessage `json:"cmd"`
TimeoutS int `json:"timeout_s"`
}
if err := c.ShouldBindJSON(&raw); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
var argv []string
if err := json.Unmarshal(raw.Cmd, &argv); err != nil {
c.JSON(http.StatusBadRequest, gin.H{
"error": "cmd must be a JSON array of strings (argv), e.g. [\"bash\",\"-lc\",\"...\"]; a bare string is not accepted",
})
return
}
if len(argv) == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "cmd must be a non-empty argv array"})
return
}
if len(argv) > execMaxArgvLen {
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("cmd has too many arguments (max %d)", execMaxArgvLen)})
return
}
if strings.TrimSpace(argv[0]) == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "cmd[0] (the program) must not be empty"})
return
}
// 2. Timeout clamp.
timeoutS := raw.TimeoutS
if timeoutS <= 0 {
timeoutS = execDefaultTimeoutSeconds
}
if timeoutS > execMaxTimeoutSeconds {
timeoutS = execMaxTimeoutSeconds
}
// 3. Resolve instance + tier, enforce host-control gate.
instanceID, tier, found := execTierGate(c, workspaceID)
if !found {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
}
if instanceID == "" {
// Local-Docker workspace (no EC2). Host exec via EIC is a
// SaaS-only path; refuse rather than silently no-op.
c.JSON(http.StatusBadRequest, gin.H{"error": "workspace has no instance_id — host exec is only available for SaaS (EC2-per-workspace) workspaces"})
return
}
if tier < execMinHostControlTier {
c.JSON(http.StatusForbidden, gin.H{
"error": "this workspace's tier does not grant host control; exec is not permitted",
"code": "exec_tier_forbidden",
})
return
}
// 4. Run over the EIC tunnel with the clamped per-op deadline.
ctx, cancel := context.WithTimeout(c.Request.Context(), time.Duration(timeoutS)*time.Second)
defer cancel()
start := time.Now()
res, runErr := execViaEIC(ctx, instanceID, argv)
durationMs := int(time.Since(start).Milliseconds())
// 5. Audit — exactly one activity row. Actor + argv + exit + duration.
// NEVER the streamed stdout/stderr content.
h.audit(c, workspaceID, argv, res.ExitCode, durationMs, runErr)
// 6. Respond. On a transport/timeout error we never produced a real
// exit code; surface it as a 5xx with no captured-output leak.
if runErr != nil {
// A deadline-killed exec is the operator-actionable timeout case.
if ctx.Err() != nil {
c.JSON(http.StatusGatewayTimeout, gin.H{
"error": fmt.Sprintf("exec timed out after %ds", timeoutS),
"code": "exec_timeout",
"timeout_s": timeoutS,
})
return
}
c.JSON(http.StatusBadGateway, gin.H{"error": fmt.Sprintf("exec failed: %v", runErr)})
return
}
// Framed streaming: newline-delimited JSON frames so a caller can read
// stdout/stderr chunks and a terminal exit frame off a single
// response body without a second round-trip. We hold the full capped
// buffers (≤1 MiB each) and emit them as one frame per stream rather
// than true incremental streaming — Part 1 keeps the wire shape simple
// and bounded; incremental chunking is a Part-2 affordance.
writeExecFrames(c, res)
}
// writeExecFrames emits newline-delimited JSON frames:
//
// {"stream":"stdout","data":"<...>","truncated":<bool>}
// {"stream":"stderr","data":"<...>","truncated":<bool>}
// {"exit_code":<int>}
//
// stdout/stderr frames are omitted when that stream is empty. The final
// exit_code frame is ALWAYS emitted so the caller has an unambiguous
// terminator. Truncation is signalled both via the per-frame `truncated`
// flag and an explicit marker appended to the data, so a caller that only
// reads `data` still sees that output was cut.
func writeExecFrames(c *gin.Context, res execResult) {
c.Header("Content-Type", "application/x-ndjson")
c.Status(http.StatusOK)
enc := json.NewEncoder(c.Writer)
emit := func(stream string, data []byte, truncated bool) {
if len(data) == 0 && !truncated {
return
}
payload := string(data)
if truncated {
payload += "\n[output truncated at " + fmt.Sprintf("%d", execOutputCap) + " bytes]"
}
_ = enc.Encode(map[string]any{
"stream": stream,
"data": payload,
"truncated": truncated,
})
}
emit("stdout", res.Stdout, res.StdoutTruncated)
emit("stderr", res.Stderr, res.StderrTruncated)
_ = enc.Encode(map[string]any{"exit_code": res.ExitCode})
}
// audit writes one activity_logs row for the exec. Records the actor
// (token subject), the argv (the command — NOT its output), the exit code,
// and the duration. The request_body carries the argv + actor; the
// response_body carries ONLY exit_code/duration/status — never the
// streamed stdout/stderr, mirroring the redact-secrets contract so the
// audit ledger can't become a side channel for command output or any
// secret a command happened to echo.
func (h *ExecHandler) audit(c *gin.Context, workspaceID string, argv []string, exitCode, durationMs int, runErr error) {
actor := execActor(c)
status := "ok"
var errDetail *string
if runErr != nil {
status = "error"
if c.Request.Context().Err() != nil {
status = "timeout"
}
// The transport error text is safe (it's our own wrapper +
// ssh-layer messages), but it never contains command output.
d := runErr.Error()
errDetail = &d
} else if exitCode != 0 {
status = "error"
}
method := "POST"
summary := "exec: " + argvSummary(argv)
src := workspaceID
dm := durationMs
LogActivity(c.Request.Context(), h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "workspace_exec",
SourceID: &src,
Method: &method,
Summary: &summary,
RequestBody: map[string]any{
"actor": actor,
"argv": argv,
},
ResponseBody: map[string]any{
"exit_code": exitCode,
"duration_ms": durationMs,
},
DurationMs: &dm,
Status: status,
ErrorDetail: errDetail,
})
}
// execActor returns the audit subject for the caller, reusing the same
// precedence the display-control + org-token audit paths use: org-token
// prefix > CP session > ADMIN_TOKEN > the per-workspace bearer (attributed
// to the workspace itself). Never the raw token value.
func execActor(c *gin.Context) string {
if v, ok := c.Get("org_token_prefix"); ok {
if s, ok := v.(string); ok && s != "" {
return actorOrgTokenPrefix + s
}
}
if v, ok := c.Get("cp_session_actor"); ok {
if s, ok := v.(string); ok && s != "" {
return s
}
}
// ADMIN_TOKEN bearer (break-glass; sets no context key) — detect it so
// an admin-issued exec isn't mis-attributed to the workspace itself.
// Reuses the same detection the display-control audit path uses.
if displayControlIsAdminToken(c) {
return actorAdminToken
}
return "workspace:" + c.Param("id")
}
// argvSummary renders argv into a single-line, bounded summary string for
// the activity feed. The full argv is in request_body.argv; this is the
// human-glance form. Capped so a long command can't bloat the summary
// column.
func argvSummary(argv []string) string {
s := strings.Join(argv, " ")
const cap = 200
if len(s) > cap {
return s[:cap] + "…"
}
return s
}
@@ -0,0 +1,367 @@
package handlers
// exec_handler_test.go — unit tests for POST /workspaces/:id/exec
// (RFC internal#742 Part 1).
//
// The EIC plumbing is faked at two seams that mirror the existing EIC test
// pattern (see template_files_eic_write_timeout_test.go for withEICTunnel,
// eic_tunnel_pool_test.go for poolSetupTunnel):
//
// - withEICTunnel — swapped for a closure that hands the inner fn a bare
// session and does NOT touch AWS/ssh.
// - runEICExec — swapped for a fake that returns canned
// stdout/stderr/exit (or honours an output cap / timeout) without a
// real sshd.
// - execTierGate — swapped to control (instanceID, tier, found) without
// a sqlmock workspaces row, so the tier/own-org cases are deterministic.
//
// The audit insert goes through the real LogActivity → db.DB path, so each
// test that reaches the run stage sets up a sqlmock and expects exactly one
// activity_logs INSERT — proving the "one activity event per exec" contract
// AND that the streamed stdout/stderr never appears in the persisted row.
import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// execTestCtx builds a gin context + recorder for a POST /exec with the
// given JSON body and :id param.
func execTestCtx(t *testing.T, workspaceID, body string) (*gin.Context, *httptest.ResponseRecorder) {
t.Helper()
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
req := httptest.NewRequest(http.MethodPost, "/workspaces/"+workspaceID+"/exec", strings.NewReader(body))
req.Header.Set("Content-Type", "application/json")
c.Request = req
c.Params = gin.Params{{Key: "id", Value: workspaceID}}
return c, w
}
// stubExecTierGate swaps execTierGate for the duration of a test.
func stubExecTierGate(t *testing.T, instanceID string, tier int, found bool) {
t.Helper()
prev := execTierGate
execTierGate = func(_ *gin.Context, _ string) (string, int, bool) {
return instanceID, tier, found
}
t.Cleanup(func() { execTierGate = prev })
}
// stubEICExec swaps withEICTunnel + runEICExec so the handler runs against
// a fake session with canned output. capture, if non-nil, records the argv
// the handler actually dispatched.
func stubEICExec(t *testing.T, res execResult, runErr error, capture *[]string) {
t.Helper()
prevTunnel := withEICTunnel
prevRun := runEICExec
withEICTunnel = func(ctx context.Context, instanceID string, fn func(s eicSSHSession) error) error {
return fn(eicSSHSession{instanceID: instanceID, osUser: "ubuntu"})
}
runEICExec = func(_ context.Context, _ eicSSHSession, argv []string) (execResult, error) {
if capture != nil {
*capture = argv
}
return res, runErr
}
t.Cleanup(func() {
withEICTunnel = prevTunnel
runEICExec = prevRun
})
}
// expectOneAuditInsert sets up a sqlmock that expects exactly one
// activity_logs INSERT and asserts the persisted request/response bodies
// do NOT contain the given forbidden substrings (stdout/stderr content).
func expectOneAuditInsert(t *testing.T, forbidden ...string) sqlmock.Sqlmock {
t.Helper()
mock := setupTestDB(t)
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(),
sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(),
sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(1, 1))
return mock
}
func decodeFrames(t *testing.T, body string) []map[string]any {
t.Helper()
var frames []map[string]any
for _, line := range strings.Split(strings.TrimSpace(body), "\n") {
if strings.TrimSpace(line) == "" {
continue
}
var f map[string]any
if err := json.Unmarshal([]byte(line), &f); err != nil {
t.Fatalf("frame %q is not valid JSON: %v", line, err)
}
frames = append(frames, f)
}
return frames
}
// --- happy path: exit 0 + output ---
func TestExec_HappyPath(t *testing.T) {
expectOneAuditInsert(t)
stubExecTierGate(t, "i-abc", 4, true)
var gotArgv []string
stubEICExec(t, execResult{Stdout: []byte("hello\n"), ExitCode: 0}, nil, &gotArgv)
c, w := execTestCtx(t, "ws-1", `{"cmd":["bash","-lc","echo hello"],"timeout_s":10}`)
h := NewExecHandler(nil)
h.Exec(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String())
}
if want := []string{"bash", "-lc", "echo hello"}; strings.Join(gotArgv, "\x00") != strings.Join(want, "\x00") {
t.Fatalf("argv mismatch: got %v want %v", gotArgv, want)
}
frames := decodeFrames(t, w.Body.String())
last := frames[len(frames)-1]
if ec, ok := last["exit_code"].(float64); !ok || ec != 0 {
t.Fatalf("final frame must be exit_code 0, got %v", last)
}
if !strings.Contains(w.Body.String(), "hello") {
t.Fatalf("stdout 'hello' not streamed: %s", w.Body.String())
}
}
// --- non-zero exit ---
func TestExec_NonZeroExit(t *testing.T) {
expectOneAuditInsert(t)
stubExecTierGate(t, "i-abc", 3, true)
stubEICExec(t, execResult{Stderr: []byte("boom\n"), ExitCode: 7}, nil, nil)
c, w := execTestCtx(t, "ws-1", `{"cmd":["false"]}`)
NewExecHandler(nil).Exec(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200 (the exec ran; non-zero is in-band), got %d", w.Code)
}
frames := decodeFrames(t, w.Body.String())
last := frames[len(frames)-1]
if ec, _ := last["exit_code"].(float64); ec != 7 {
t.Fatalf("want exit_code 7, got %v", last)
}
}
// --- timeout ---
func TestExec_Timeout(t *testing.T) {
expectOneAuditInsert(t)
stubExecTierGate(t, "i-abc", 4, true)
// Honour the handler's per-op ctx: block until ITS deadline fires, then
// return the transport error the way realRunEICExec does when ssh is
// SIGKILLed by the expired deadline. The handler then sees ctx.Err() !=
// nil and must surface a 504, not a generic 502. timeout_s:1 keeps the
// test bounded to ~1s while still exercising the real deadline path.
prevTunnel := withEICTunnel
prevRun := runEICExec
withEICTunnel = func(ctx context.Context, instanceID string, fn func(s eicSSHSession) error) error {
return fn(eicSSHSession{instanceID: instanceID})
}
runEICExec = func(ctx context.Context, _ eicSSHSession, _ []string) (execResult, error) {
<-ctx.Done() // wait for the handler's deadline to fire
return execResult{ExitCode: -1}, ctx.Err()
}
t.Cleanup(func() { withEICTunnel = prevTunnel; runEICExec = prevRun })
c, w := execTestCtx(t, "ws-1", `{"cmd":["sleep","999"],"timeout_s":1}`)
NewExecHandler(nil).Exec(c)
if w.Code != http.StatusGatewayTimeout {
t.Fatalf("want 504 on timeout, got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "exec_timeout") {
t.Fatalf("want exec_timeout code, got %s", w.Body.String())
}
}
// --- output-cap truncation ---
func TestExec_OutputTruncation(t *testing.T) {
expectOneAuditInsert(t)
stubExecTierGate(t, "i-abc", 4, true)
// Drive the real cappedBuffer truncation by writing > execOutputCap.
big := strings.Repeat("a", execOutputCap+1024)
stubEICExec(t, execResult{Stdout: []byte(big[:execOutputCap]), StdoutTruncated: true, ExitCode: 0}, nil, nil)
c, w := execTestCtx(t, "ws-1", `{"cmd":["cat","/dev/zero"]}`)
NewExecHandler(nil).Exec(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d", w.Code)
}
frames := decodeFrames(t, w.Body.String())
var sawTruncated bool
for _, f := range frames {
if f["stream"] == "stdout" {
if tr, _ := f["truncated"].(bool); tr {
sawTruncated = true
}
if data, _ := f["data"].(string); !strings.Contains(data, "truncated") {
t.Fatalf("truncated stdout frame must carry an explicit marker, got %q", data[:min(80, len(data))])
}
}
}
if !sawTruncated {
t.Fatalf("expected a truncated=true stdout frame; frames=%v", frames)
}
}
// --- argv validation: bare-string cmd rejected ---
func TestExec_RejectsStringCmd(t *testing.T) {
// No tier gate / EIC stubs needed — validation fails before dispatch.
c, w := execTestCtx(t, "ws-1", `{"cmd":"rm -rf /"}`)
NewExecHandler(nil).Exec(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("want 400 for a bare-string cmd, got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "argv") && !strings.Contains(w.Body.String(), "array") {
t.Fatalf("400 should explain argv-only requirement, got %s", w.Body.String())
}
}
func TestExec_RejectsEmptyArgv(t *testing.T) {
c, w := execTestCtx(t, "ws-1", `{"cmd":[]}`)
NewExecHandler(nil).Exec(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("want 400 for empty argv, got %d", w.Code)
}
}
// --- capability/tier denied: a non-host-control tier gets 403 ---
func TestExec_TierForbidden(t *testing.T) {
// Tier 2 (standard) is NOT a host-control tier → 403, no exec, no audit.
stubExecTierGate(t, "i-abc", 2, true)
// Fail loudly if the handler dispatches despite the gate.
prevRun := runEICExec
runEICExec = func(_ context.Context, _ eicSSHSession, _ []string) (execResult, error) {
t.Fatalf("runEICExec must NOT be called for a forbidden tier")
return execResult{}, nil
}
t.Cleanup(func() { runEICExec = prevRun })
c, w := execTestCtx(t, "ws-1", `{"cmd":["ls"]}`)
NewExecHandler(nil).Exec(c)
if w.Code != http.StatusForbidden {
t.Fatalf("want 403 for tier 2, got %d: %s", w.Code, w.Body.String())
}
if !strings.Contains(w.Body.String(), "exec_tier_forbidden") {
t.Fatalf("want exec_tier_forbidden code, got %s", w.Body.String())
}
}
// --- own-org scoping: an unknown :id (not visible to this tenant/org) is
// 404, and the handler never dispatches exec for it. In production a
// sibling-org id never resolves to a row in THIS one-org process, so
// execTierGate returns found=false → 404. ---
func TestExec_SiblingOrgIDNotFound(t *testing.T) {
stubExecTierGate(t, "", 0, false)
prevRun := runEICExec
runEICExec = func(_ context.Context, _ eicSSHSession, _ []string) (execResult, error) {
t.Fatalf("runEICExec must NOT be called for an unknown workspace id")
return execResult{}, nil
}
t.Cleanup(func() { runEICExec = prevRun })
c, w := execTestCtx(t, "ws-sibling-org", `{"cmd":["ls"]}`)
NewExecHandler(nil).Exec(c)
if w.Code != http.StatusNotFound {
t.Fatalf("want 404 for a workspace id not in this org, got %d: %s", w.Code, w.Body.String())
}
}
// --- audit content contract: the persisted activity row carries argv +
// exit but NEVER the stdout/stderr content. ---
func TestExec_AuditExcludesOutputContent(t *testing.T) {
const secretOut = "SUPER_SECRET_TOKEN_VALUE"
// Expect exactly one insert AND assert no arg contains the secret output.
mock := setupTestDB(t)
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
sqlmock.AnyArg(), // workspace_id
sqlmock.AnyArg(), // activity_type
sqlmock.AnyArg(), // source_id
sqlmock.AnyArg(), // target_id
sqlmock.AnyArg(), // method
noSecret(t, secretOut), // summary
noSecret(t, secretOut), // request_body
noSecret(t, secretOut), // response_body
sqlmock.AnyArg(), // tool_trace
sqlmock.AnyArg(), // duration_ms
sqlmock.AnyArg(), // status
sqlmock.AnyArg(), // error_detail
).
WillReturnResult(sqlmock.NewResult(1, 1))
stubExecTierGate(t, "i-abc", 4, true)
stubEICExec(t, execResult{Stdout: []byte(secretOut + "\n"), ExitCode: 0}, nil, nil)
c, w := execTestCtx(t, "ws-1", `{"cmd":["printenv","SECRET"]}`)
NewExecHandler(nil).Exec(c)
if w.Code != http.StatusOK {
t.Fatalf("want 200, got %d", w.Code)
}
// The secret IS streamed to the caller (that's the point of exec)...
if !strings.Contains(w.Body.String(), secretOut) {
t.Fatalf("stdout should be streamed to the caller")
}
// ...but must NOT be in the audit row (asserted by noSecret matchers).
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("audit insert expectations: %v", err)
}
}
// noSecret is a sqlmock argument matcher that asserts the stringified
// argument does not contain the forbidden substring.
type secretFreeArg struct {
t *testing.T
forbidden string
}
func (m secretFreeArg) Match(v driver.Value) bool {
var s string
switch t := v.(type) {
case nil:
s = ""
case string:
s = t
case []byte:
s = string(t)
case *string:
if t != nil {
s = *t
}
default:
s = fmt.Sprintf("%v", v)
}
if strings.Contains(s, m.forbidden) {
m.t.Errorf("audit row leaked forbidden content %q in: %q", m.forbidden, s)
}
return true
}
func noSecret(t *testing.T, forbidden string) secretFreeArg {
return secretFreeArg{t: t, forbidden: forbidden}
}
@@ -167,9 +167,6 @@ func generateAppInstallationToken() (string, time.Time, error) {
return "", time.Time{}, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusCreated {
return "", time.Time{}, fmt.Errorf("github token endpoint returned status %d", resp.StatusCode)
}
var result struct {
Token string `json:"token"`
ExpiresAt time.Time `json:"expires_at"`
@@ -280,92 +280,6 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T
}
}
// TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy proves the
// extracted #1933 fix: when the A2A body fails to marshal, the detached
// goroutine returns early and never calls proxyA2ARequest with a nil/empty
// body. Before the fix the goroutine logged the error and fell through,
// dispatching a malformed A2A request.
func TestMCPHandler_DelegateTaskAsync_MarshalFailureDoesNotCallProxy(t *testing.T) {
h, mock := newMCPHandler(t)
callerID := "11111111-1111-1111-1111-111111111111"
targetID := "22222222-2222-2222-2222-222222222222"
parentID := "33333333-3333-3333-3333-333333333333"
expectCanCommunicateSiblings(mock, callerID, targetID, parentID)
mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`).
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// Force the (otherwise near-impossible) marshal failure for the A2A body.
origMarshal := marshalA2ABody
marshalA2ABody = func(any) ([]byte, error) {
return nil, errors.New("forced marshal failure")
}
t.Cleanup(func() { marshalA2ABody = origMarshal })
proxyCalled := make(chan struct{}, 1)
h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, proxyCallerID string, logActivity bool) (int, []byte, error) {
proxyCalled <- struct{}{}
return 200, []byte(`{}`), nil
}
out, err := h.toolDelegateTaskAsync(context.Background(), callerID, map[string]interface{}{
"workspace_id": targetID,
"task": "async work",
})
if err != nil {
t.Fatalf("delegate_task_async returned error: %v", err)
}
if !strings.Contains(out, `"status":"dispatched"`) {
t.Fatalf("delegate_task_async response = %s", out)
}
// Wait for the detached goroutine to finish, then assert the proxy was
// never reached because of the early return on marshal failure.
waitGlobalAsyncForTest()
select {
case <-proxyCalled:
t.Fatal("proxyA2ARequest was called after marshal failure; expected early return")
default:
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestMCPHandler_CheckTaskStatus_NullStatusDefaultsToUnknown proves the
// extracted #1933 hardening: when the activity_logs row has a NULL status,
// check_task_status reports "unknown" instead of an empty string (the old
// status.String zero value).
func TestMCPHandler_CheckTaskStatus_NullStatusDefaultsToUnknown(t *testing.T) {
h, mock := newMCPHandler(t)
callerID := "11111111-1111-1111-1111-111111111111"
targetID := "22222222-2222-2222-2222-222222222222"
taskID := "task-abc"
mock.ExpectQuery(`(?s)SELECT status, error_detail, response_body.*FROM activity_logs`).
WithArgs(callerID, targetID, taskID).
WillReturnRows(sqlmock.NewRows([]string{"status", "error_detail", "response_body"}).
AddRow(nil, nil, nil))
out, err := h.toolCheckTaskStatus(context.Background(), callerID, map[string]interface{}{
"workspace_id": targetID,
"task_id": taskID,
})
if err != nil {
t.Fatalf("check_task_status returned error: %v", err)
}
if !strings.Contains(out, `"status": "unknown"`) {
t.Fatalf("expected status \"unknown\" for NULL status row, got: %s", out)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// ─────────────────────────────────────────────────────────────────────────────
// notifications/initialized
// ─────────────────────────────────────────────────────────────────────────────
@@ -20,11 +20,6 @@ import (
"github.com/google/uuid"
)
// marshalA2ABody marshals the JSON-RPC body for an async A2A dispatch.
// Indirected through a package var so tests can force the (otherwise
// near-impossible) marshal-failure path and assert the early return.
var marshalA2ABody = json.Marshal
// insertMCPDelegationRow writes a delegation activity row so the canvas
// Agent Comms tab can show the task text for MCP-initiated delegations.
// Mirrors insertDelegationRow (delegation.go) for the MCP tool path.
@@ -149,7 +144,6 @@ func (h *MCPHandler) toolListPeers(ctx context.Context, workspaceID string) (str
b, marshalErr := json.MarshalIndent(peers, "", " ")
if marshalErr != nil {
log.Printf("toolListPeers: json.MarshalIndent peers failed: %v", marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(b), nil
}
@@ -183,7 +177,6 @@ func (h *MCPHandler) toolGetWorkspaceInfo(ctx context.Context, workspaceID strin
b, marshalErr := json.MarshalIndent(info, "", " ")
if marshalErr != nil {
log.Printf("toolGetWorkspaceInfo %s: json.MarshalIndent info failed: %v", workspaceID, marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(b), nil
}
@@ -276,7 +269,7 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
bgCtx, cancel := context.WithTimeout(context.Background(), mcpAsyncCallTimeout)
defer cancel()
a2aBody, marshalErr := marshalA2ABody(map[string]interface{}{
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0",
"id": delegationID,
"method": "message/send",
@@ -290,9 +283,6 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
})
if marshalErr != nil {
log.Printf("toolDelegateTask %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
// Bail out: proceeding would call proxyA2ARequest with a
// nil/empty body, dispatching a malformed A2A request.
return
}
status, _, err := h.proxyA2ARequest(bgCtx, targetID, a2aBody, callerID, true)
@@ -340,13 +330,9 @@ func (h *MCPHandler) toolCheckTaskStatus(ctx context.Context, callerID string, a
result := map[string]interface{}{
"task_id": taskID,
"status": status.String,
"target_id": targetID,
}
if status.Valid {
result["status"] = status.String
} else {
result["status"] = "unknown"
}
if errorDetail.Valid && errorDetail.String != "" {
result["error"] = errorDetail.String
}
@@ -356,7 +342,6 @@ func (h *MCPHandler) toolCheckTaskStatus(ctx context.Context, callerID string, a
b, marshalErr := json.MarshalIndent(result, "", " ")
if marshalErr != nil {
log.Printf("toolCheckTaskStatus: json.MarshalIndent result failed: %v", marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(b), nil
}
@@ -194,7 +194,6 @@ func (h *MCPHandler) recallMemoryLegacyShim(ctx context.Context, workspaceID str
b, marshalErr := json.MarshalIndent(out, "", " ")
if marshalErr != nil {
log.Printf("toolRecallMemory: json.MarshalIndent out failed: %v", marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(b), nil
}
@@ -166,7 +166,6 @@ func (h *MCPHandler) toolCommitMemoryV2(ctx context.Context, workspaceID string,
out, marshalErr := json.Marshal(resp)
if marshalErr != nil {
log.Printf("toolCommitMemoryV2 %s: json.Marshal resp failed: %v", workspaceID, marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(out), nil
}
@@ -224,7 +223,6 @@ func (h *MCPHandler) toolSearchMemory(ctx context.Context, workspaceID string, a
out, marshalErr := json.Marshal(resp)
if marshalErr != nil {
log.Printf("toolSearchMemory %s: json.Marshal resp failed: %v", workspaceID, marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(out), nil
}
@@ -283,7 +281,6 @@ func (h *MCPHandler) toolCommitSummary(ctx context.Context, workspaceID string,
out, marshalErr := json.Marshal(resp)
if marshalErr != nil {
log.Printf("toolCommitSummary %s: json.Marshal resp failed: %v", workspaceID, marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(out), nil
}
@@ -303,7 +300,6 @@ func (h *MCPHandler) toolListWritableNamespaces(ctx context.Context, workspaceID
b, marshalErr := json.MarshalIndent(ns, "", " ")
if marshalErr != nil {
log.Printf("toolListWritableNamespaces %s: json.MarshalIndent ns failed: %v", workspaceID, marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(b), nil
}
@@ -319,7 +315,6 @@ func (h *MCPHandler) toolListReadableNamespaces(ctx context.Context, workspaceID
b, marshalErr := json.MarshalIndent(ns, "", " ")
if marshalErr != nil {
log.Printf("toolListReadableNamespaces %s: json.MarshalIndent ns failed: %v", workspaceID, marshalErr)
return "", fmt.Errorf("marshal response: %w", marshalErr)
}
return string(b), nil
}
@@ -247,14 +247,13 @@ func (h *MemoriesHandler) Commit(c *gin.Context) {
})
if marshalErr != nil {
log.Printf("Commit %s: json.Marshal auditBody failed: %v", workspaceID, marshalErr)
} else {
summary := "GLOBAL memory written: id=" + memoryID + " namespace=" + nsName
if _, auditErr := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, summary, request_body, status)
VALUES ($1, $2, $3, $4, $5::jsonb, $6)
`, workspaceID, "memory_write_global", workspaceID, summary, string(auditBody), "ok"); auditErr != nil {
log.Printf("Commit: GLOBAL memory audit log failed for %s/%s: %v", workspaceID, memoryID, auditErr)
}
}
summary := "GLOBAL memory written: id=" + memoryID + " namespace=" + nsName
if _, auditErr := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, summary, request_body, status)
VALUES ($1, $2, $3, $4, $5::jsonb, $6)
`, workspaceID, "memory_write_global", workspaceID, summary, string(auditBody), "ok"); auditErr != nil {
log.Printf("Commit: GLOBAL memory audit log failed for %s/%s: %v", workspaceID, memoryID, auditErr)
}
}
@@ -345,16 +345,8 @@ func (h *RegistryHandler) Register(c *gin.Context) {
if qErr := db.DB.QueryRowContext(ctx,
`SELECT name, role FROM workspaces WHERE id = $1`, payload.ID,
).Scan(&dbName, &dbRole); qErr == nil {
name := ""
if dbName.Valid {
name = dbName.String
}
role := ""
if dbRole.Valid {
role = dbRole.String
}
if rc, did := reconcileAgentCardIdentity(
payload.AgentCard, payload.ID, name, role,
payload.AgentCard, payload.ID, dbName.String, dbRole.String,
); did {
reconciledCard = rc
log.Printf("Registry register: reconciled agent_card identity for %s from workspaces row", payload.ID)
@@ -177,12 +177,10 @@ func waitForWorkspaceOnline(ctx context.Context, workspaceID string, timeout tim
).Scan(&status); err == nil && status == "online" {
return true
}
timer := time.NewTimer(restartContextOnlinePollInterval)
select {
case <-ctx.Done():
timer.Stop()
return false
case <-timer.C:
case <-time.After(restartContextOnlinePollInterval):
}
}
return false
@@ -215,12 +213,10 @@ func waitForFreshHeartbeat(ctx context.Context, workspaceID string, restartStart
lastHB.Valid && lastHB.Time.After(restartStartTs) {
return true
}
timer := time.NewTimer(restartContextOnlinePollInterval)
select {
case <-ctx.Done():
timer.Stop()
return false
case <-timer.C:
case <-time.After(restartContextOnlinePollInterval):
}
}
return false
@@ -160,14 +160,13 @@ func (h *ScheduleHandler) Create(c *gin.Context) {
}
// Validate timezone
loc, err := time.LoadLocation(body.Timezone)
if err != nil {
if _, err := time.LoadLocation(body.Timezone); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid timezone: " + body.Timezone})
return
}
// Validate and compute next run
nextRun, err := scheduler.ComputeNextRun(body.CronExpr, body.Timezone, time.Now().In(loc))
nextRun, err := scheduler.ComputeNextRun(body.CronExpr, body.Timezone, time.Now())
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
@@ -261,12 +260,11 @@ func (h *ScheduleHandler) Update(c *gin.Context) {
if body.Timezone != nil {
tz = *body.Timezone
}
loc, err := time.LoadLocation(tz)
if err != nil {
if _, err := time.LoadLocation(tz); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid timezone: " + tz})
return
}
nextRun, err := scheduler.ComputeNextRun(cronExpr, tz, time.Now().In(loc))
nextRun, err := scheduler.ComputeNextRun(cronExpr, tz, time.Now())
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
@@ -574,12 +574,7 @@ func (h *WorkspaceHandler) CascadeDelete(ctx context.Context, id string) ([]stri
var stopErrs []error
stopAndRemove := func(wsID string) {
// Delete-path stop uses bounded retry (matches the restart path) and
// records a durable structure_events row on exhaustion so a leaked /
// pending EC2 is queryable and handed off to the CP-orphan-sweeper —
// rather than the bare one-shot StopWorkspaceAuto that produced the
// silent-leak class (task #15 / workspace-ec2-leak).
if err := h.stopWorkspaceForDelete(cleanupCtx, wsID); err != nil {
if err := h.StopWorkspaceAuto(cleanupCtx, wsID); err != nil {
log.Printf("CascadeDelete %s stop failed: %v — leaving cleanup for orphan sweeper", wsID, err)
stopErrs = append(stopErrs, fmt.Errorf("stop %s: %w", wsID, err))
return
@@ -1,102 +0,0 @@
package handlers
// workspace_delete_stop_retry_test.go — pins the contract of the
// delete-path EC2 stop retry (task #15 / workspace-ec2-leak).
//
// Background (Phase 1 evidence): the DELETE path's StopWorkspaceAuto →
// cpProv.Stop had NO retry, while the restart path used cpStopWithRetry
// (bounded exponential backoff). A transient CP/AWS hiccup on delete left
// the workspace row at status='removed' with instance_id still populated,
// returned a 500, and relied entirely on the 60s CP-orphan-sweeper to
// re-drive the terminate. For a cascade *descendant* whose own row is
// already 'removed', the inline retry-via-client-replay is defeated by
// CascadeDelete's `status != 'removed'` CTE filter — so the only inline
// recovery is this bounded retry.
//
// Contract of stopWorkspaceForDelete:
// - CP path: bounded retry (cpStopRetryAttempts, exp backoff) on
// cpProv.Stop; returns nil on eventual success.
// - On retry exhaustion: returns the terminal error AND emits a
// `workspace.delete.terminate_retry_exhausted` structure_events row so
// the leak decision is queryable (structured-logging gate), not just a
// log.Printf. The row is the durable pending-terminate signal: the row
// stays status='removed' with instance_id populated, which is exactly
// what the CP-orphan-sweeper (registry/cp_orphan_sweeper.go) re-drives.
// - Docker path: single Stop, no retry (local daemon failure won't heal
// on retry — matches RestartWorkspaceAuto's Docker rationale).
// - No backend wired: nil (nothing to stop).
import (
"context"
"errors"
"strings"
"testing"
"github.com/DATA-DOG/go-sqlmock"
)
func TestStopWorkspaceForDelete_CPRetriesTransientThenSucceeds(t *testing.T) {
shrinkRetryBackoff(t)
buf := captureLog(t)
// 2 transient failures then success — within the 3-attempt budget.
stub := &scriptedCPStop{errs: []error{
errors.New("cp 503 attempt 1"),
errors.New("cp 503 attempt 2"),
}}
h := &WorkspaceHandler{cpProv: stub}
err := h.stopWorkspaceForDelete(context.Background(), "ws-del-1")
if err != nil {
t.Fatalf("expected nil error on eventual success, got %v", err)
}
if stub.calls != 3 {
t.Errorf("expected 3 Stop calls (2 fails + 1 success), got %d", stub.calls)
}
if strings.Contains(buf.String(), "terminate_retry_exhausted") {
t.Errorf("eventual success must NOT log retry-exhausted; got %q", buf.String())
}
}
func TestStopWorkspaceForDelete_CPExhaustsEmitsDurableEventAndReturnsError(t *testing.T) {
shrinkRetryBackoff(t)
mock := setupTestDB(t)
buf := captureLog(t)
stub := &scriptedCPStop{errs: []error{
errors.New("cp 502 attempt 1"),
errors.New("cp 502 attempt 2"),
errors.New("cp 502 final"),
}}
h := &WorkspaceHandler{cpProv: stub}
// On exhaustion the helper persists a durable pending-terminate row so
// the leak decision is queryable. structure_events is the audit-of-record.
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
err := h.stopWorkspaceForDelete(context.Background(), "ws-doomed")
if err == nil {
t.Fatal("expected terminal error on retry exhaustion, got nil")
}
if stub.calls != cpStopRetryAttempts {
t.Errorf("expected %d Stop calls when all fail, got %d", cpStopRetryAttempts, stub.calls)
}
if !strings.Contains(err.Error(), "cp 502 final") {
t.Errorf("returned error should wrap the LAST attempt's error, got %v", err)
}
if e := mock.ExpectationsWereMet(); e != nil {
t.Fatalf("expected structure_events INSERT on exhaustion: %v", e)
}
// The LEAK-SUSPECT line stays the operator-facing prose bridge to the
// orphan reconciler; assert it carries the delete source so triage can
// distinguish delete-leaks from restart-leaks.
if !strings.Contains(buf.String(), "LEAK-SUSPECT") {
t.Errorf("expected LEAK-SUSPECT log on exhaustion, got %q", buf.String())
}
}
func TestStopWorkspaceForDelete_NoBackendIsNoOp(t *testing.T) {
h := &WorkspaceHandler{} // cpProv nil, provisioner nil
if err := h.stopWorkspaceForDelete(context.Background(), "ws-x"); err != nil {
t.Errorf("expected nil no-op with no backend, got %v", err)
}
}
@@ -31,11 +31,9 @@ package handlers
import (
"context"
"encoding/json"
"log"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/provlog"
)
@@ -209,86 +207,6 @@ func (h *WorkspaceHandler) StopWorkspaceAuto(ctx context.Context, workspaceID st
return nil
}
// stopWorkspaceForDelete is the DELETE-path stop dispatcher. It differs
// from StopWorkspaceAuto in exactly one way: the CP (EC2) path gets the
// same bounded retry the restart path uses (cpStopWithRetryErr), and on
// retry exhaustion it persists a durable `workspace.delete.terminate_retry_exhausted`
// event to structure_events (the structured-logging gate) so the leak
// decision is queryable, not just stdout prose.
//
// Why retry here (task #15 / workspace-ec2-leak): the bare cpProv.Stop on
// delete left a transient CP/AWS hiccup as an immediate 500 with no inline
// recovery. For a cascade *descendant* the "client retries → replays
// terminate" recovery is defeated by CascadeDelete's `status != 'removed'`
// CTE filter (the descendant's row is already 'removed', so a retry walks
// zero descendant rows). Bounded retry absorbs the transient class inline;
// the durable event + the row staying status='removed'+instance_id is the
// hand-off to the 60s CP-orphan-sweeper (registry/cp_orphan_sweeper.go) for
// the (rarer) sustained-outage case.
//
// We deliberately do NOT clear status='removed' on exhaustion — the
// CP-orphan-sweeper's recovery query keys on exactly that state, so
// reverting it would break the existing backstop. The error is still
// returned so the HTTP Delete handler surfaces the retryable 500.
//
// Docker path: single Stop, no retry — a local daemon that fails to stop a
// container won't heal on retry (matches RestartWorkspaceAuto's Docker
// rationale); the orphan-container sweeper (registry/orphan_sweeper.go) is
// the Docker-side backstop.
func (h *WorkspaceHandler) stopWorkspaceForDelete(ctx context.Context, workspaceID string) error {
if h.cpProv != nil {
if err := h.cpStopWithRetryErr(ctx, workspaceID, "Delete"); err != nil {
h.emitDeleteTerminateRetryExhausted(ctx, workspaceID, err)
return err
}
return nil
}
if h.provisioner != nil {
return h.provisioner.Stop(ctx, workspaceID)
}
return nil
}
// emitDeleteTerminateRetryExhausted persists a durable record that the
// delete-path EC2 terminate could not be completed inline after the full
// retry budget. Per the §Persistent structured logging gate: a
// state-mutating decision (we are leaving a known-leaked-or-pending EC2 for
// the orphan sweeper) must land in structure_events, not just log.Printf.
//
// Event-type taxonomy (append-only; never rename):
//
// workspace.delete.terminate_retry_exhausted — delete-path cpProv.Stop
// exhausted its retry budget; row stays status='removed' with
// instance_id populated for the CP-orphan-sweeper to re-drive.
//
// Telemetry never blocks the request path: marshal / INSERT failures are
// logged and swallowed.
func (h *WorkspaceHandler) emitDeleteTerminateRetryExhausted(ctx context.Context, workspaceID string, cause error) {
payload := map[string]any{
"workspace_id": workspaceID,
"attempts": cpStopRetryAttempts,
"last_error": cause.Error(),
// recovery_path documents WHO is expected to finish the terminate,
// so a reader of the audit row doesn't have to grep the code to
// know the EC2 isn't simply abandoned.
"recovery_path": "cp_orphan_sweeper",
}
payloadJSON, err := json.Marshal(payload)
if err != nil {
log.Printf("emitDeleteTerminateRetryExhausted: marshal payload failed for %s: %v", workspaceID, err)
return
}
if db.DB == nil {
return
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO structure_events (event_type, workspace_id, payload, created_at)
VALUES ($1, $2, $3, now())
`, "workspace.delete.terminate_retry_exhausted", workspaceID, payloadJSON); err != nil {
log.Printf("emitDeleteTerminateRetryExhausted: insert failed for %s: %v", workspaceID, err)
}
}
// RestartWorkspaceAuto stops the running workload (with retry semantics
// tuned for the restart hot path) then starts provisioning again, in a
// detached goroutine. Returns true when a backend was kicked off, false
@@ -953,24 +953,14 @@ func applyPlatformManagedLLMEnv(ctx context.Context, envVars map[string]string,
log.Printf("workspace_provision: resolve billing mode workspace=%s err=%v (defaulting to platform_managed)", workspaceID, resolveErr)
}
log.Printf("workspace_provision: billing mode workspace=%s resolved=%s source=%s org_default=%s", workspaceID, res.ResolvedMode, res.Source, res.OrgDefault)
// internal#703: MOLECULE_LLM_BILLING_MODE in the container must reflect the
// RESOLVED per-workspace mode, not a hardcoded literal. Pre-fix this var was
// only emitted (hardcoded "platform_managed") on the strip path below, so a
// byok/disabled container never carried a truthful billing-mode value — only
// MOLECULE_LLM_BILLING_MODE_RESOLVED. Emit both here, resolver-driven, for
// every mode so the value is correct on the byok/disabled early-return path
// too (and downstream consumers / debug shells see byok, not platform_managed).
envVars["MOLECULE_LLM_BILLING_MODE"] = res.ResolvedMode
// Observability: surface the resolved mode in the container env so the
// agent / debug shell can answer "why is my key being stripped" without
// pulling logs or hitting the admin route.
envVars["MOLECULE_LLM_BILLING_MODE_RESOLVED"] = res.ResolvedMode
if res.ResolvedMode != LLMBillingModePlatformManaged {
// byok or disabled — DO NOT strip vendor keys, DO NOT force-route to CP,
// DO NOT override the workspace own ANTHROPIC_BASE_URL / OAuth token.
// byok or disabled — DO NOT strip vendor keys, DO NOT force-route to CP.
// Leave envVars alone so CLAUDE_CODE_OAUTH_TOKEN / vendor API keys
// pulled from workspace_secrets survive into the container, and the
// workspace talks to its own provider directly (internal#703).
// pulled from workspace_secrets survive into the container.
return
}
baseURL := firstNonEmptyEnv("MOLECULE_LLM_BASE_URL", "OPENAI_BASE_URL")
@@ -981,8 +971,7 @@ func applyPlatformManagedLLMEnv(ctx context.Context, envVars map[string]string,
}
stripPlatformManagedLLMBypassEnv(envVars)
// MOLECULE_LLM_BILLING_MODE is already set to res.ResolvedMode (==
// platform_managed on this path) above (internal#703); no hardcode here.
envVars["MOLECULE_LLM_BILLING_MODE"] = "platform_managed"
envVars["MOLECULE_LLM_BASE_URL"] = baseURL
envVars["MOLECULE_LLM_USAGE_TOKEN"] = token
if anthropicBaseURL != "" {
@@ -1015,7 +1004,7 @@ func stripPlatformManagedLLMBypassEnv(envVars map[string]string) {
}
func runtimeUsesAnthropicNativeProxy(runtime string) bool {
return strings.EqualFold(strings.TrimSpace(runtime), "claude-code")
return strings.TrimSpace(strings.ToLower(runtime)) == "claude-code"
}
func firstNonEmptyEnv(names ...string) string {
@@ -1106,112 +1106,6 @@ func TestApplyPlatformManagedLLMEnv_NoopsOutsidePlatformManaged(t *testing.T) {
}
}
// TestApplyPlatformManagedLLMEnv_ClaudeCodeByokKeepsOwnProviderEnv is the
// internal#703 regression guard: a per-workspace byok override (org-level
// MOLECULE_LLM_BILLING_MODE left at the platform_managed bootstrap floor)
// must resolve to byok and leave the workspace own provider env intact —
// the CP-injected proxy ANTHROPIC_BASE_URL / usage token must NOT be forced,
// the OAuth token must NOT be stripped, and MOLECULE_LLM_BILLING_MODE in the
// container must read the RESOLVED mode (byok), not the hardcoded literal.
//
// This is the discriminating test for the byok end-to-end fix: pre-fix the
// strip path was the only emitter of MOLECULE_LLM_BILLING_MODE (hardcoded
// "platform_managed"), so a byok container carried no truthful billing mode.
func TestApplyPlatformManagedLLMEnv_ClaudeCodeByokKeepsOwnProviderEnv(t *testing.T) {
const wsID = "77777777-7777-7777-7777-777777777777"
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT llm_billing_mode FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"llm_billing_mode"}).AddRow(LLMBillingModeBYOK))
// Org-level env left at the bootstrap floor — the per-workspace override
// is what must flip this workspace to byok (the realistic prod shape).
t.Setenv("MOLECULE_LLM_BILLING_MODE", LLMBillingModePlatformManaged)
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "https://api.example.test/api/v1/internal/llm/anthropic")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
// The workspace brought its own Claude Code OAuth token (BYOK via the
// subscription provider). It must survive untouched.
envVars := map[string]string{
"CLAUDE_CODE_OAUTH_TOKEN": "user-oauth-token",
"MODEL": "sonnet",
}
applyPlatformManagedLLMEnv(context.Background(), envVars, wsID, "claude-code", "")
// 1. OAuth token intact — not stripped.
if got := envVars["CLAUDE_CODE_OAUTH_TOKEN"]; got != "user-oauth-token" {
t.Fatalf("CLAUDE_CODE_OAUTH_TOKEN = %q, want it left intact for byok", got)
}
// 2. No CP proxy base URL / usage token forced onto the workspace.
if got, ok := envVars["ANTHROPIC_BASE_URL"]; ok {
t.Fatalf("ANTHROPIC_BASE_URL must NOT be injected for byok, got %q", got)
}
if got, ok := envVars["ANTHROPIC_API_KEY"]; ok {
t.Fatalf("ANTHROPIC_API_KEY must NOT be injected for byok, got %q", got)
}
if got, ok := envVars["MOLECULE_LLM_ANTHROPIC_BASE_URL"]; ok {
t.Fatalf("MOLECULE_LLM_ANTHROPIC_BASE_URL must NOT be injected for byok, got %q", got)
}
if got, ok := envVars["MOLECULE_LLM_USAGE_TOKEN"]; ok {
t.Fatalf("MOLECULE_LLM_USAGE_TOKEN must NOT be injected for byok, got %q", got)
}
// 3. Billing mode in the container reflects the RESOLVED mode (byok).
if got := envVars["MOLECULE_LLM_BILLING_MODE"]; got != LLMBillingModeBYOK {
t.Fatalf("MOLECULE_LLM_BILLING_MODE = %q, want %q (resolver-driven, not hardcoded)", got, LLMBillingModeBYOK)
}
if got := envVars["MOLECULE_LLM_BILLING_MODE_RESOLVED"]; got != LLMBillingModeBYOK {
t.Fatalf("MOLECULE_LLM_BILLING_MODE_RESOLVED = %q, want %q", got, LLMBillingModeBYOK)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestApplyPlatformManagedLLMEnv_PlatformManagedStillEmitsResolvedMode is the
// no-regression companion: a workspace that resolves to platform_managed must
// still strip + force the proxy AND emit MOLECULE_LLM_BILLING_MODE=
// platform_managed (now resolver-driven, internal#703). Proves the byok fix
// did not alter the platform_managed contract.
func TestApplyPlatformManagedLLMEnv_PlatformManagedStillEmitsResolvedMode(t *testing.T) {
const wsID = "88888888-8888-8888-8888-888888888888"
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT llm_billing_mode FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"llm_billing_mode"}).AddRow(LLMBillingModePlatformManaged))
t.Setenv("MOLECULE_LLM_BILLING_MODE", LLMBillingModePlatformManaged)
t.Setenv("MOLECULE_LLM_BASE_URL", "https://api.example.test/api/v1/internal/llm/openai/v1")
t.Setenv("MOLECULE_LLM_ANTHROPIC_BASE_URL", "https://api.example.test/api/v1/internal/llm/anthropic")
t.Setenv("MOLECULE_LLM_USAGE_TOKEN", "tenant-admin-token")
envVars := map[string]string{
"CLAUDE_CODE_OAUTH_TOKEN": "user-oauth-token",
"MODEL": "sonnet",
}
applyPlatformManagedLLMEnv(context.Background(), envVars, wsID, "claude-code", "")
// OAuth stripped, proxy forced — unchanged platform_managed contract.
if _, ok := envVars["CLAUDE_CODE_OAUTH_TOKEN"]; ok {
t.Fatalf("CLAUDE_CODE_OAUTH_TOKEN should be stripped for platform_managed")
}
if got := envVars["ANTHROPIC_BASE_URL"]; got != "https://api.example.test/api/v1/internal/llm/anthropic" {
t.Fatalf("ANTHROPIC_BASE_URL = %q, want proxy forced for platform_managed", got)
}
if got := envVars["ANTHROPIC_API_KEY"]; got != "tenant-admin-token" {
t.Fatalf("ANTHROPIC_API_KEY = %q, want usage token for platform_managed", got)
}
if got := envVars["MOLECULE_LLM_BILLING_MODE"]; got != LLMBillingModePlatformManaged {
t.Fatalf("MOLECULE_LLM_BILLING_MODE = %q, want %q", got, LLMBillingModePlatformManaged)
}
if got := envVars["MOLECULE_LLM_BILLING_MODE_RESOLVED"]; got != LLMBillingModePlatformManaged {
t.Fatalf("MOLECULE_LLM_BILLING_MODE_RESOLVED = %q, want %q", got, LLMBillingModePlatformManaged)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestApplyRuntimeModelEnv_PersonaEnvMODELSecretPreserved locks in the
// 2026-05-08 fix that prevents the MODEL_PROVIDER-as-slug fallback from
// silently overwriting a per-persona MODEL workspace_secret on restart,
@@ -1616,28 +1616,3 @@ func (*mockResolver) Scheme() string { return "" }
func (m *mockResolver) Fetch(_ context.Context, _, _ string) (string, error) {
return m.fetchName, m.fetchErr
}
// TestRuntimeUsesAnthropicNativeProxy_CaseAndWhitespace proves the
// strings.EqualFold hardening: the runtime check now matches "claude-code"
// case-insensitively (and after trimming whitespace) instead of relying on
// a lowercased exact compare.
func TestRuntimeUsesAnthropicNativeProxy_CaseAndWhitespace(t *testing.T) {
cases := []struct {
runtime string
want bool
}{
{"claude-code", true},
{"Claude-Code", true},
{"CLAUDE-CODE", true},
{" claude-code ", true},
{"\tClaude-Code\n", true},
{"claude-code-x", false},
{"codex", false},
{"", false},
}
for _, c := range cases {
if got := runtimeUsesAnthropicNativeProxy(c.runtime); got != c.want {
t.Errorf("runtimeUsesAnthropicNativeProxy(%q) = %v, want %v", c.runtime, got, c.want)
}
}
}
@@ -721,31 +721,8 @@ var cpStopRetryBaseDelay = 1 * time.Second
//
// Returns nothing — caller's contract is unchanged.
func (h *WorkspaceHandler) cpStopWithRetry(ctx context.Context, workspaceID, source string) {
// Restart's contract is "make the workspace alive again": it proceeds
// with reprovision regardless of the Stop outcome, so it discards the
// terminal error. The delete path needs the error (it must keep the
// row recoverable for the orphan-sweeper + emit a durable event), so
// the actual retry loop lives in cpStopWithRetryErr below.
_ = h.cpStopWithRetryErr(ctx, workspaceID, source)
}
// cpStopWithRetryErr is the shared bounded-retry core for cpProv.Stop.
// It returns the terminal error so callers that need to react to a leak
// (the DELETE path's stopWorkspaceForDelete) can do so, while
// cpStopWithRetry keeps its void contract for the restart paths.
//
// Behaviour (unchanged from the original cpStopWithRetry loop):
// - cpProv nil → nil (no-op; nothing to stop).
// - success on attempt N → nil; logs a retry-success line when N > 1.
// - ctx cancelled mid-retry → returns ctx.Err(); logs an "abandoned"
// line and deliberately does NOT emit LEAK-SUSPECT (operator-initiated
// drain is a different signal than "we tried hard and failed").
// - all attempts fail → returns the LAST attempt's error and emits the
// stable `LEAK-SUSPECT cpProv.Stop ...` log line so the CP-side orphan
// reconciler can correlate by workspace_id.
func (h *WorkspaceHandler) cpStopWithRetryErr(ctx context.Context, workspaceID, source string) error {
if h.cpProv == nil {
return nil
return
}
var lastErr error
delay := cpStopRetryBaseDelay
@@ -755,7 +732,7 @@ func (h *WorkspaceHandler) cpStopWithRetryErr(ctx context.Context, workspaceID,
if attempt > 1 {
log.Printf("%s: cpProv.Stop(%s) succeeded on attempt %d", source, workspaceID, attempt)
}
return nil
return
}
lastErr = err
if attempt == cpStopRetryAttempts {
@@ -763,14 +740,12 @@ func (h *WorkspaceHandler) cpStopWithRetryErr(ctx context.Context, workspaceID,
}
// Sleep with ctx awareness so a cancelled ctx exits early instead
// of stalling the goroutine through the remaining backoff.
timer := time.NewTimer(delay)
select {
case <-ctx.Done():
timer.Stop()
log.Printf("%s: cpProv.Stop(%s) abandoned mid-retry: ctx cancelled (last_err=%v)",
source, workspaceID, lastErr)
return ctx.Err()
case <-timer.C:
return
case <-time.After(delay):
}
delay *= 2
}
@@ -778,7 +753,6 @@ func (h *WorkspaceHandler) cpStopWithRetryErr(ctx context.Context, workspaceID,
// so logs are greppable / parseable for the CP-side orphan reconciler.
log.Printf("LEAK-SUSPECT cpProv.Stop workspace_id=%s source=%s attempts=%d last_err=%q",
workspaceID, source, cpStopRetryAttempts, lastErr.Error())
return lastErr
}
// runRestartCycle does the actual stop+provision work for one restart
@@ -248,13 +248,8 @@ func TestRestart_CPStopOnlyInsideRetryHelper(t *testing.T) {
if !ok || fn.Body == nil || fn.Recv == nil {
continue
}
// cpStopWithRetryErr is the ONE allowed home for h.cpProv.Stop
// the bounded-retry loop. cpStopWithRetry is the void-returning
// wrapper (restart path) that delegates to it; the delete path uses
// cpStopWithRetryErr directly via stopWorkspaceForDelete to capture
// the terminal error (task #15). Both wrappers are exempt from this
// gate; any OTHER direct cpProv.Stop is the silent-leak regression.
if fn.Name.Name == "cpStopWithRetry" || fn.Name.Name == "cpStopWithRetryErr" {
// cpStopWithRetry is the ONE allowed home for h.cpProv.Stop.
if fn.Name.Name == "cpStopWithRetry" {
continue
}
ast.Inspect(fn.Body, func(n ast.Node) bool {
@@ -412,40 +412,3 @@ func isSameOriginCanvas(c *gin.Context) bool {
origin := c.GetHeader("Origin")
return origin == "https://"+host || origin == "http://"+host
}
// cpSessionConfigured reports whether this platform is wired for upstream
// session-cookie verification — i.e. it runs as a SaaS tenant image with
// both CP_UPSTREAM_URL and MOLECULE_ORG_SLUG set. When false (self-hosted /
// dev), VerifiedCPSession can never succeed, so callers that want a
// non-forgeable canvas signal in SaaS while still working in dev can use
// this to decide whether the forgeable same-origin fallback is acceptable.
func cpSessionConfigured() bool {
return os.Getenv("CP_UPSTREAM_URL") != "" && tenantSlug() != ""
}
// CPSessionConfigured is the exported form of cpSessionConfigured for callers
// outside this package (e.g. the A2A proxy's canvas-user classification).
func CPSessionConfigured() bool {
return cpSessionConfigured()
}
// IsVerifiedCanvasSession returns true ONLY when the request carries a WorkOS
// session cookie that the control plane confirms belongs to a member of THIS
// tenant's org (via /cp/auth/tenant-member). Unlike IsSameOriginCanvas — whose
// Host/Referer/Origin inputs are trivially forgeable by any container on the
// Docker network and which is therefore documented as cosmetic-only (see
// AdminAuth / CanvasOrBearer comments above, #623/#194) — this is a real,
// upstream-verified authentication boundary. It is the correct gate for
// non-cosmetic actions such as A2A dispatch on behalf of a canvas user.
//
// Returns false (no network call) in self-hosted / dev deployments where
// CP_UPSTREAM_URL / MOLECULE_ORG_SLUG are unset; callers should treat that as
// "no verified canvas session available" and fall back accordingly.
func IsVerifiedCanvasSession(c *gin.Context) bool {
cookie := c.GetHeader("Cookie")
if cookie == "" {
return false
}
valid, _ := VerifiedCPSession(cookie)
return valid
}
@@ -202,9 +202,7 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
// - Rejects symlinks at the template root (prevents bypass via symlink traversal)
// - Skips symlinks during WalkDir (prevents /etc/passwd etc. inclusion)
// - Validates all paths are relative and non-escaping
// - Caps total size at cpConfigFilesMaxBytes (a transport-DoS guard,
// not the retired 12 KiB user-data ceiling — config now ships off
// user-data via the CP's Secrets-Manager seeding path)
// - Caps total size at 12 KiB to prevent payload bloat
configFiles, err := collectCPConfigFiles(cfg)
if err != nil {
return "", fmt.Errorf("cp provisioner: collect config files: %w", err)
@@ -279,27 +277,7 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
return result.InstanceID, nil
}
// cpConfigFilesMaxBytes bounds the aggregate config bundle this tenant
// ships to the control plane. It is a transport-DoS guard, NOT the old
// EC2-user-data ceiling.
//
// History: this was 12 KiB (12<<10) because the CP embedded the bundle in
// EC2 user-data, which AWS caps at 16 KiB (the cap left ~4 KiB for bootstrap
// overhead). That ceiling failed real customers — the jrs-auto SEO Agent's
// config (long SEO system prompt + SERVICES_REPO_WEBSITE + a 12-schedule
// block baked into config.yaml) exceeds 12 KiB, so Start() rejected it
// client-side with "config files exceed 12288 bytes" and the workspace
// could never provision.
//
// Config delivery now goes OFF user-data: the CP stages the bundle to AWS
// Secrets Manager (molecule/workspace/<id>/config) at provision time and the
// workspace fetches it into /configs at boot (mirrors the proven tenant
// bootstrap-secrets pattern). The bundle travels here only inside the JSON
// HTTP request body to the CP, which has no 16 KiB limit. The remaining
// bound exists purely so a buggy/hostile tenant can't stream an unbounded
// body and OOM the CP provision path — set generous (256 KiB) so legitimate
// growth (more schedules, longer prompts, more skills) never re-hits a wall.
const cpConfigFilesMaxBytes = 256 << 10
const cpConfigFilesMaxBytes = 12 << 10
// isCPTemplateConfigFile restricts which files from a template directory are
// eligible for transport to the control plane. Only config.yaml (the runtime
@@ -1,151 +0,0 @@
package provisioner
import (
"context"
"encoding/base64"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
)
// TestStart_OversizedConfigBundleProvisions is the Prove-It reproduction for
// the jrs-auto SEO Agent provisioning failure:
//
// CPProvisioner: workspace start failed: cp provisioner: collect config
// files: config files exceed 12288 bytes
//
// Root cause: collectCPConfigFiles hard-capped the *eligible* config bundle
// (config.yaml + prompts/*) at 12 KiB because the controlplane embedded it in
// EC2 user-data (16 KiB AWS ceiling bootstrap overhead). The SEO agent's
// config (long SEO system prompt + SERVICES_REPO_WEBSITE + the 12-schedule
// block baked into config.yaml) exceeds 12 KiB, so Start() failed before it
// ever reached the wire — blocking a paying customer from provisioning.
//
// After moving config delivery OFF user-data and onto the persistent
// secondary volume (CP stages the bundle to Secrets Manager; the workspace
// fetches it at boot into /configs), the 12 KiB ceiling is obsolete: the
// bundle travels in the JSON HTTP body to CP, which has no 16 KiB limit. This
// test pins that a realistically-oversized (>12288 B) config bundle now
// reaches the CP request body intact instead of being rejected client-side.
func TestStart_OversizedConfigBundleProvisions(t *testing.T) {
// SEO-sized config.yaml: a 12-schedule block + SERVICES_REPO_WEBSITE +
// a long system prompt, comfortably over the retired 12 KiB cap.
var sb strings.Builder
sb.WriteString("name: jrs-auto-seo\nruntime: claude-code\n")
sb.WriteString("env:\n SERVICES_REPO_WEBSITE: https://example.com/jrs-auto/website-repo\n")
sb.WriteString("schedules:\n")
for i := 0; i < 12; i++ {
sb.WriteString(" - id: seo-task-")
sb.WriteString(strings.Repeat("x", 8))
sb.WriteString("\n cron: \"0 */2 * * *\"\n prompt: |\n")
sb.WriteString(" Run the SEO audit pass, refresh keyword rankings, regenerate the\n")
sb.WriteString(" sitemap, and publish the digest to the marketing channel.\n")
}
configYAML := sb.String()
seoPrompt := strings.Repeat(
"You are an expert SEO agent. Audit pages, find ranking gaps, and act. ", 200)
cfg := map[string][]byte{
"config.yaml": []byte(configYAML),
"prompts/system.md": []byte(seoPrompt),
}
total := len(configYAML) + len(seoPrompt)
if total <= 12<<10 {
t.Fatalf("fixture not representative: bundle is %d bytes, must exceed 12288 to reproduce the failure", total)
}
t.Logf("oversized config bundle: %d bytes (> old 12288 cap)", total)
var body cpProvisionRequest
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Errorf("decode request: %v", err)
}
w.WriteHeader(http.StatusCreated)
_, _ = io.WriteString(w, `{"instance_id":"i-seo","state":"pending"}`)
}))
defer srv.Close()
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-seo", httpClient: srv.Client()}
_, err := p.Start(context.Background(), WorkspaceConfig{
WorkspaceID: "ws-seo",
Runtime: "claude-code",
Tier: 4,
PlatformURL: "http://tenant",
ConfigFiles: cfg,
})
if err != nil {
t.Fatalf("Start with oversized config bundle failed: %v — the 12288-byte cap must be gone now config delivery is off user-data", err)
}
// The full bundle must have reached the CP request body intact.
wantCfg := base64.StdEncoding.EncodeToString([]byte(configYAML))
if got := body.ConfigFiles["config.yaml"]; got != wantCfg {
t.Errorf("config.yaml not delivered intact to CP (len got=%d want=%d)", len(got), len(wantCfg))
}
wantPrompt := base64.StdEncoding.EncodeToString([]byte(seoPrompt))
if got := body.ConfigFiles["prompts/system.md"]; got != wantPrompt {
t.Errorf("prompts/system.md not delivered intact to CP (len got=%d want=%d)", len(got), len(wantPrompt))
}
}
// TestCollectCPConfigFiles_DoSGuardStillBounds pins that retiring the 12 KiB
// cap did NOT remove the bound entirely — an absurdly large bundle (a buggy
// or hostile tenant) is still rejected so a compromised workspace-server
// can't OOM the CP request path. The guard just moved from a 12 KiB
// user-data ceiling to a generous transport-DoS ceiling.
func TestCollectCPConfigFiles_DoSGuardStillBounds(t *testing.T) {
huge := make([]byte, cpConfigFilesMaxBytes+1)
for i := range huge {
huge[i] = 'a'
}
_, err := collectCPConfigFiles(WorkspaceConfig{
ConfigFiles: map[string][]byte{"config.yaml": huge},
})
if err == nil {
t.Fatalf("expected the DoS guard to reject a %d-byte bundle, got nil", len(huge))
}
if !strings.Contains(err.Error(), "config files exceed") {
t.Errorf("unexpected error %q, want the size-guard message", err.Error())
}
}
// TestCollectCPConfigFiles_AcceptsSEOSizedBundle is the unit-level companion:
// collectCPConfigFiles itself (not just Start) must accept the SEO-sized
// bundle. Guards the exact constant that caused the outage.
func TestCollectCPConfigFiles_AcceptsSEOSizedBundle(t *testing.T) {
// 30 KiB of eligible config — far over the retired 12288 cap, far under
// the new DoS guard.
cfgBlob := make([]byte, 18<<10)
for i := range cfgBlob {
cfgBlob[i] = 'c'
}
promptBlob := make([]byte, 12<<10)
for i := range promptBlob {
promptBlob[i] = 'p'
}
files, err := collectCPConfigFiles(WorkspaceConfig{
ConfigFiles: map[string][]byte{
"config.yaml": cfgBlob,
"prompts/system.md": promptBlob,
},
})
if err != nil {
t.Fatalf("collectCPConfigFiles rejected a %d-byte SEO-sized bundle: %v", len(cfgBlob)+len(promptBlob), err)
}
if len(files) != 2 {
t.Fatalf("expected 2 files collected, got %d", len(files))
}
// Also confirm a template-dir path stays size-bounded the same way.
tmpl := t.TempDir()
if err := os.WriteFile(filepath.Join(tmpl, "config.yaml"), cfgBlob, 0o600); err != nil {
t.Fatal(err)
}
if _, err := collectCPConfigFiles(WorkspaceConfig{TemplatePath: tmpl}); err != nil {
t.Fatalf("collectCPConfigFiles rejected an SEO-sized template config.yaml: %v", err)
}
}
@@ -694,6 +694,17 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
wsAuth.PUT("/files/*path", tmplh.WriteFile)
wsAuth.DELETE("/files/*path", tmplh.DeleteFile)
// One-shot host exec (RFC internal#742 Part 1) — registered on the same
// WorkspaceAuth-gated group as /files/* so it inherits the per-workspace
// token authz (and, on SaaS, the per-org TenantGuard). The handler adds a
// tier gate on top: only host-control tiers (T3/T4) may run host exec;
// lower/read-only tiers get 403. Runs the caller's argv non-interactively
// over the EXISTING EIC tunnel broker (execViaEIC → withEICTunnel), the
// same pooled-SSH path the Files API uses. Emits one audited activity row
// per exec (actor + argv + exit + duration; never stdout/stderr content).
exech := handlers.NewExecHandler(broadcaster)
wsAuth.POST("/exec", exech.Exec)
// Chat attachments — file upload (user → agent) and binary-safe
// streaming download (agent → user). Namespaced under /chat/ so
// the security model is obviously distinct from /files/* (which
@@ -418,7 +418,6 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
})
if marshalErr != nil {
log.Printf("Scheduler '%s': json.Marshal a2aBody failed: %v", sched.Name, marshalErr)
return
}
log.Printf("Scheduler: firing '%s' → workspace %s", sched.Name, short(sched.WorkspaceID, 12))
@@ -604,24 +603,23 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
})
if marshalErr != nil {
log.Printf("Scheduler '%s': json.Marshal cronMeta failed: %v", sched.Name, marshalErr)
} else {
// #152: persist lastError into error_detail on the activity_logs row
// so GET /workspaces/:id/schedules/:id/history can surface why a run
// failed (previously dropped — history returned status without any
// error context, making root-cause debugging impossible).
// #2026: bounded Background() context — this INSERT was observed wedging
// indefinitely on invalid-UTF-8 jsonb payloads, blocking wg.Wait() in
// tick() and stalling the whole scheduler. Now: 10s deadline, survives
// outer ctx cancellation, and every string is UTF-8 sanitized.
insertCtx, insertCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
if _, insErr := db.DB.ExecContext(insertCtx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, $5, now())
`, sched.WorkspaceID, sanitizeUTF8("Cron: "+sched.Name), string(cronMeta), lastStatus, sanitizeUTF8(lastError)); insErr != nil {
log.Printf("Scheduler: activity_logs insert failed for '%s' (%s): %v", sched.Name, sched.ID, insErr)
}
insertCancel()
}
// #152: persist lastError into error_detail on the activity_logs row
// so GET /workspaces/:id/schedules/:id/history can surface why a run
// failed (previously dropped — history returned status without any
// error context, making root-cause debugging impossible).
// #2026: bounded Background() context — this INSERT was observed wedging
// indefinitely on invalid-UTF-8 jsonb payloads, blocking wg.Wait() in
// tick() and stalling the whole scheduler. Now: 10s deadline, survives
// outer ctx cancellation, and every string is UTF-8 sanitized.
insertCtx, insertCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
if _, insErr := db.DB.ExecContext(insertCtx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, $5, now())
`, sched.WorkspaceID, sanitizeUTF8("Cron: "+sched.Name), string(cronMeta), lastStatus, sanitizeUTF8(lastError)); insErr != nil {
log.Printf("Scheduler: activity_logs insert failed for '%s' (%s): %v", sched.Name, sched.ID, insErr)
}
insertCancel()
if s.broadcaster != nil {
s.broadcaster.RecordAndBroadcast(ctx, string(events.EventCronExecuted), sched.WorkspaceID, map[string]interface{}{
@@ -695,18 +693,17 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
})
if marshalErr != nil {
log.Printf("Scheduler '%s': json.Marshal cronMeta failed: %v", sched.Name, marshalErr)
} else {
// #2026: bounded Background() context on the skipped activity log INSERT
// for the same reason as the fireSchedule activity_logs INSERT above.
skipInsCtx, skipInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
if _, err := db.DB.ExecContext(skipInsCtx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now())
`, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil {
log.Printf("Scheduler: '%s' skip activity log failed: %v", sched.Name, err)
}
skipInsCancel()
}
// #2026: bounded Background() context on the skipped activity log INSERT
// for the same reason as the fireSchedule activity_logs INSERT above.
skipInsCtx, skipInsCancel := context.WithTimeout(context.Background(), dbQueryTimeout)
if _, err := db.DB.ExecContext(skipInsCtx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now())
`, sched.WorkspaceID, sanitizeUTF8("Cron skipped: "+sched.Name), string(cronMeta), sanitizeUTF8(reason)); err != nil {
log.Printf("Scheduler: '%s' skip activity log failed: %v", sched.Name, err)
}
skipInsCancel()
if s.broadcaster != nil {
_ = s.broadcaster.RecordAndBroadcast(ctx, string(events.EventCronSkipped), sched.WorkspaceID, map[string]interface{}{
@@ -60,12 +60,10 @@ func RunWithRecover(ctx context.Context, name string, fn func(context.Context))
}
// Panic → back off and restart.
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
case <-time.After(backoff):
}
if backoff < maxBackoff {
backoff *= 2