Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 324bee36be |
@@ -0,0 +1,174 @@
|
||||
package handlers
|
||||
|
||||
// exec_eic.go — one-shot, non-interactive command execution on a SaaS
|
||||
// workspace EC2 over the EIC tunnel. Part 1 of RFC internal#742
|
||||
// (tenant-scoped workspace exec API).
|
||||
//
|
||||
// This is the NON-interactive sibling of terminal.go's PTY path. It is
|
||||
// modelled on readFileViaEIC (template_files_eic.go): acquire a pooled
|
||||
// EIC SSH session via withEICTunnel, run a single remote command, capture
|
||||
// stdout/stderr/exit, tear the session down. The only differences from
|
||||
// readFileViaEIC are (a) the remote command is the caller's argv rather
|
||||
// than a fixed `cat`, and (b) we capture the exit code and an output cap
|
||||
// instead of mapping not-found to os.ErrNotExist.
|
||||
//
|
||||
// Safety boundary lives in the caller (exec_handler.go): argv-only,
|
||||
// timeout clamp, tier gate, audit. This file only owns "run argv N over
|
||||
// the pooled session and hand back stdout/stderr/exit".
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// execOutputCap bounds the stdout (and, separately, stderr) bytes we
|
||||
// buffer and return for a single exec. 1 MiB each. Past the cap we stop
|
||||
// appending and set truncated=true; the handler emits an explicit
|
||||
// truncation marker so the caller knows output was cut rather than empty.
|
||||
//
|
||||
// The cap protects the workspace-server process: a `cat /dev/zero` or a
|
||||
// runaway build log could otherwise OOM the box. We bound each stream
|
||||
// independently so a chatty-stderr command can't crowd out stdout.
|
||||
const execOutputCap = 1 << 20 // 1 MiB
|
||||
|
||||
// execResult is the outcome of one non-interactive remote command.
|
||||
type execResult struct {
|
||||
Stdout []byte
|
||||
Stderr []byte
|
||||
ExitCode int
|
||||
StdoutTruncated bool
|
||||
StderrTruncated bool
|
||||
}
|
||||
|
||||
// runEICExec runs argv over an already-open EIC session and returns the
|
||||
// captured streams + exit code. Wrapped in a package-level var (matching
|
||||
// the withEICTunnel / poolSetupTunnel pattern) so the handler test can
|
||||
// stub the actual remote execution without a real sshd — the EIC wiring
|
||||
// (withEICTunnel) and this command runner are the two seams a test needs.
|
||||
//
|
||||
// Exit-code extraction: ssh propagates the remote command's exit status
|
||||
// as its own exit status, so an *exec.ExitError carries the real remote
|
||||
// code. A non-ExitError (ssh failed to connect, context killed) is a
|
||||
// transport error and is returned as err with exitCode=-1.
|
||||
var runEICExec = realRunEICExec
|
||||
|
||||
func realRunEICExec(ctx context.Context, s eicSSHSession, argv []string) (execResult, error) {
|
||||
remote := buildExecRemoteCommand(argv)
|
||||
sshCmd := exec.CommandContext(ctx, "ssh", s.sshArgs(remote)...)
|
||||
sshCmd.Env = os.Environ()
|
||||
|
||||
stdout := &cappedBuffer{cap: execOutputCap}
|
||||
stderr := &cappedBuffer{cap: execOutputCap}
|
||||
sshCmd.Stdout = stdout
|
||||
sshCmd.Stderr = stderr
|
||||
|
||||
err := sshCmd.Run()
|
||||
res := execResult{
|
||||
Stdout: stdout.Bytes(),
|
||||
Stderr: stderr.Bytes(),
|
||||
StdoutTruncated: stdout.truncated,
|
||||
StderrTruncated: stderr.truncated,
|
||||
}
|
||||
if err == nil {
|
||||
res.ExitCode = 0
|
||||
return res, nil
|
||||
}
|
||||
// A non-zero remote exit comes back as *exec.ExitError — that is a
|
||||
// SUCCESSFUL exec with a non-zero code, not a transport failure.
|
||||
var exitErr *exec.ExitError
|
||||
if errors.As(err, &exitErr) {
|
||||
res.ExitCode = exitErr.ExitCode()
|
||||
return res, nil
|
||||
}
|
||||
// Anything else (ssh couldn't connect, context deadline killed the
|
||||
// subprocess) is a real transport error.
|
||||
res.ExitCode = -1
|
||||
return res, err
|
||||
}
|
||||
|
||||
// buildExecRemoteCommand turns the caller's argv into a single remote
|
||||
// command string for ssh. ssh always concatenates its trailing args into
|
||||
// one string and hands it to the login shell, so to run argv literally
|
||||
// (no implicit shell word-splitting on the caller's behalf) we shell-quote
|
||||
// every element and join with spaces. The login shell then re-splits on
|
||||
// those quotes back into the exact argv.
|
||||
//
|
||||
// This is what enforces "argv-only, no implicit shell": if argv[0] is
|
||||
// e.g. ["ls","-l","/a b"] the remote runs `ls` with two args and the path
|
||||
// "/a b" stays one token. If the caller WANTS a shell they pass
|
||||
// ["bash","-lc","..."] explicitly — argv[0] is then a shell by their
|
||||
// choice, which the handler permits.
|
||||
func buildExecRemoteCommand(argv []string) string {
|
||||
quoted := make([]string, len(argv))
|
||||
for i, a := range argv {
|
||||
quoted[i] = shellQuote(a)
|
||||
}
|
||||
return strings.Join(quoted, " ")
|
||||
}
|
||||
|
||||
// cappedBuffer is an io.Writer that buffers up to cap bytes and silently
|
||||
// drops the rest, recording that truncation happened. Used to bound the
|
||||
// per-stream memory a single exec can consume.
|
||||
type cappedBuffer struct {
|
||||
buf bytes.Buffer
|
||||
cap int
|
||||
truncated bool
|
||||
}
|
||||
|
||||
func (b *cappedBuffer) Write(p []byte) (int, error) {
|
||||
// Always report the full length written so the upstream copier
|
||||
// (ssh subprocess plumbing) doesn't error with a short-write; we
|
||||
// just don't retain bytes past the cap.
|
||||
if b.buf.Len() >= b.cap {
|
||||
b.truncated = true
|
||||
return len(p), nil
|
||||
}
|
||||
room := b.cap - b.buf.Len()
|
||||
if len(p) <= room {
|
||||
return b.buf.Write(p)
|
||||
}
|
||||
b.buf.Write(p[:room])
|
||||
b.truncated = true
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (b *cappedBuffer) Bytes() []byte { return b.buf.Bytes() }
|
||||
|
||||
// execViaEIC runs argv non-interactively on the workspace EC2 identified
|
||||
// by instanceID, over a pooled EIC SSH session, and returns the captured
|
||||
// result. Mirrors readFileViaEIC's structure: derive a per-op deadline,
|
||||
// open the tunnel via withEICTunnel, run the command inside the closure.
|
||||
//
|
||||
// The caller (exec_handler.go) is responsible for argv validation, the
|
||||
// timeout clamp, the tier gate, and the audit event. This function only
|
||||
// owns the EIC plumbing + the run.
|
||||
func execViaEIC(ctx context.Context, instanceID string, argv []string) (execResult, error) {
|
||||
if len(argv) == 0 {
|
||||
return execResult{}, fmt.Errorf("exec: empty argv")
|
||||
}
|
||||
|
||||
var res execResult
|
||||
runErr := withEICTunnel(ctx, instanceID, func(s eicSSHSession) error {
|
||||
r, err := runEICExec(ctx, s, argv)
|
||||
res = r
|
||||
if err != nil {
|
||||
// Distinguish a deadline-killed exec from a generic transport
|
||||
// error so the handler can surface a timeout cleanly (the same
|
||||
// shape writeFileViaEIC uses for internal#423).
|
||||
if cerr := ctx.Err(); cerr != nil {
|
||||
return fmt.Errorf("exec: command did not finish: %w", cerr)
|
||||
}
|
||||
return fmt.Errorf("exec: ssh transport: %w", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if runErr != nil {
|
||||
return res, runErr
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package handlers
|
||||
|
||||
// exec_eic_test.go — pure-function coverage for the EIC exec layer:
|
||||
// argv → remote-command quoting, and the per-stream output cap.
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBuildExecRemoteCommand_QuotesEachArg(t *testing.T) {
|
||||
// A space in an argument must stay one token after the remote shell
|
||||
// re-splits — proving we don't leak the caller's argv into implicit
|
||||
// word-splitting.
|
||||
got := buildExecRemoteCommand([]string{"ls", "-l", "/a b"})
|
||||
want := `'ls' '-l' '/a b'`
|
||||
if got != want {
|
||||
t.Fatalf("quoting mismatch:\n got %q\nwant %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildExecRemoteCommand_EscapesSingleQuotes(t *testing.T) {
|
||||
// An embedded single quote must be escaped so it can't break out of
|
||||
// the quoting and inject extra shell tokens.
|
||||
got := buildExecRemoteCommand([]string{"echo", "a'b"})
|
||||
if !strings.Contains(got, `'a'\''b'`) {
|
||||
t.Fatalf("embedded single-quote not escaped: %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCappedBuffer_TruncatesAtCap(t *testing.T) {
|
||||
b := &cappedBuffer{cap: 8}
|
||||
n, _ := b.Write([]byte("12345"))
|
||||
if n != 5 || b.truncated {
|
||||
t.Fatalf("first write under cap should not truncate; n=%d truncated=%v", n, b.truncated)
|
||||
}
|
||||
// This write crosses the cap (5 + 6 = 11 > 8): keep 3 more, drop rest.
|
||||
n, _ = b.Write([]byte("abcdef"))
|
||||
if n != 6 {
|
||||
t.Fatalf("Write must report full length to avoid short-write errors; n=%d", n)
|
||||
}
|
||||
if !b.truncated {
|
||||
t.Fatalf("expected truncated=true after crossing cap")
|
||||
}
|
||||
if got := string(b.Bytes()); got != "12345abc" {
|
||||
t.Fatalf("capped content mismatch: got %q want %q", got, "12345abc")
|
||||
}
|
||||
// A further write past a full buffer stays dropped + truncated.
|
||||
b.Write([]byte("zzz"))
|
||||
if got := string(b.Bytes()); got != "12345abc" {
|
||||
t.Fatalf("post-cap write must be dropped: got %q", got)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,332 @@
|
||||
package handlers
|
||||
|
||||
// exec_handler.go — POST /workspaces/:id/exec. Part 1 of RFC internal#742
|
||||
// (tenant-scoped workspace exec API).
|
||||
//
|
||||
// A first-class, audited, tenant-token-gated one-shot exec endpoint that
|
||||
// runs an argv non-interactively on the workspace EC2 via the EXISTING EIC
|
||||
// tunnel broker (execViaEIC → withEICTunnel → pooled session). It is the
|
||||
// non-interactive sibling of /terminal (interactive PTY) and reuses the
|
||||
// same EIC plumbing readFileViaEIC uses.
|
||||
//
|
||||
// Authz layering (no new auth surface invented):
|
||||
// 1. TenantGuard (router middleware) — process is one-per-org, so a
|
||||
// request for a sibling org's machine is already bounced 404 before
|
||||
// it reaches any handler.
|
||||
// 2. WorkspaceAuth (wsAuth group) — the bearer/org-token/session must be
|
||||
// valid for THIS :id. A per-workspace token is bound to its own
|
||||
// workspace; an org token is bound to its org. This is what stops an
|
||||
// org token from naming a sibling org's instance id: the row is
|
||||
// looked up by :id and that :id had to pass WorkspaceAuth for the
|
||||
// presented credential.
|
||||
// 3. Tier gate (this handler) — host exec requires a host-control tier
|
||||
// (T3/T4). A standard/sandboxed (T1/T2) workspace gets 403, not host
|
||||
// exec. See execMinHostControlTier for the reviewer caveat.
|
||||
//
|
||||
// Audit: exactly one activity_logs row per exec via LogActivity — actor,
|
||||
// argv, exit code, duration. NEVER stdout/stderr content (those are
|
||||
// streamed to the caller but never persisted), mirroring the
|
||||
// secret-redaction contract used by the memory export/import paths.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// execMaxTimeoutSeconds is the hard ceiling on a single exec. A caller
|
||||
// asking for more is clamped down to this. 120s matches the RFC bound; it
|
||||
// is also comfortably above the poolTTL/SendSSHPublicKey 60s grant — see
|
||||
// the reviewer note in the PR: a >60s command may outlive its pooled key
|
||||
// and need a fresh tunnel mid-run, which the pool handles by rebuilding,
|
||||
// but the single open ssh subprocess for THIS exec keeps running on the
|
||||
// already-authenticated channel, so the 120s ceiling is honoured.
|
||||
const execMaxTimeoutSeconds = 120
|
||||
|
||||
// execDefaultTimeoutSeconds is used when the caller omits timeout_s.
|
||||
const execDefaultTimeoutSeconds = 30
|
||||
|
||||
// execMaxArgvLen caps the number of argv elements. Defence against a
|
||||
// pathological body; a legitimate command is well under this.
|
||||
const execMaxArgvLen = 256
|
||||
|
||||
// execMinHostControlTier is the minimum workspace tier permitted to run
|
||||
// host exec. Tiers 3 (privileged) and 4 (full host access) are the
|
||||
// host-control tiers in this codebase (see provisioner.ApplyTierConfig);
|
||||
// tiers 1 (sandboxed/readonly) and 2 (standard) are NOT, so they get 403.
|
||||
//
|
||||
// REVIEWER NOTE (flagged in the PR): the RFC asked to "mirror the existing
|
||||
// capability check used for memory.write / update_agent_card". There is no
|
||||
// such named capability constant in workspace-server today — memory.Set
|
||||
// and registry.UpdateCard are gated only by WorkspaceAuth, not by a
|
||||
// tier/capability check. The closest REAL, persisted host-control signal
|
||||
// is the workspaces.tier column, which is exactly what decides whether the
|
||||
// container/instance got privileged/host access at provision time. Gating
|
||||
// host exec on tier >= 3 keeps "a lower-tier/read-only workspace must get
|
||||
// 403, not host exec" honest against the actual provisioning model. If the
|
||||
// team prefers a dedicated capability column, that is a follow-up; this
|
||||
// handler reads tier through one indirection (execTierGate) so swapping the
|
||||
// gate is a one-function change.
|
||||
const execMinHostControlTier = 3
|
||||
|
||||
// ExecHandler serves POST /workspaces/:id/exec.
|
||||
type ExecHandler struct {
|
||||
broadcaster events.EventEmitter
|
||||
}
|
||||
|
||||
// NewExecHandler builds the handler. broadcaster may be nil (tests /
|
||||
// non-broadcast deploys) — LogActivity tolerates a nil emitter.
|
||||
func NewExecHandler(broadcaster events.EventEmitter) *ExecHandler {
|
||||
return &ExecHandler{broadcaster: broadcaster}
|
||||
}
|
||||
|
||||
// execTierGate is a package-level var so tests can drive the tier-denied
|
||||
// path without standing up a full sqlmock row. Production reads
|
||||
// instance_id + tier from the workspaces row in one query.
|
||||
//
|
||||
// Returns (instanceID, tier, found). found=false means no such workspace.
|
||||
var execTierGate = realExecTierGate
|
||||
|
||||
func realExecTierGate(c *gin.Context, workspaceID string) (instanceID string, tier int, found bool) {
|
||||
if db.DB == nil {
|
||||
return "", 0, false
|
||||
}
|
||||
err := db.DB.QueryRowContext(c.Request.Context(),
|
||||
`SELECT COALESCE(instance_id, ''), COALESCE(tier, 0) FROM workspaces WHERE id = $1`,
|
||||
workspaceID,
|
||||
).Scan(&instanceID, &tier)
|
||||
if err != nil {
|
||||
return "", 0, false
|
||||
}
|
||||
return instanceID, tier, true
|
||||
}
|
||||
|
||||
// Exec handles POST /workspaces/:id/exec.
|
||||
func (h *ExecHandler) Exec(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
|
||||
// 1. Body + argv validation. We unmarshal into RawMessage first so we
|
||||
// can reject a bare-string `cmd` ("cmd":"rm -rf /") explicitly with a
|
||||
// 400 rather than letting it silently fail to bind — the RFC requires
|
||||
// argv-only.
|
||||
var raw struct {
|
||||
Cmd json.RawMessage `json:"cmd"`
|
||||
TimeoutS int `json:"timeout_s"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&raw); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
||||
return
|
||||
}
|
||||
var argv []string
|
||||
if err := json.Unmarshal(raw.Cmd, &argv); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{
|
||||
"error": "cmd must be a JSON array of strings (argv), e.g. [\"bash\",\"-lc\",\"...\"]; a bare string is not accepted",
|
||||
})
|
||||
return
|
||||
}
|
||||
if len(argv) == 0 {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "cmd must be a non-empty argv array"})
|
||||
return
|
||||
}
|
||||
if len(argv) > execMaxArgvLen {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("cmd has too many arguments (max %d)", execMaxArgvLen)})
|
||||
return
|
||||
}
|
||||
if strings.TrimSpace(argv[0]) == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "cmd[0] (the program) must not be empty"})
|
||||
return
|
||||
}
|
||||
|
||||
// 2. Timeout clamp.
|
||||
timeoutS := raw.TimeoutS
|
||||
if timeoutS <= 0 {
|
||||
timeoutS = execDefaultTimeoutSeconds
|
||||
}
|
||||
if timeoutS > execMaxTimeoutSeconds {
|
||||
timeoutS = execMaxTimeoutSeconds
|
||||
}
|
||||
|
||||
// 3. Resolve instance + tier, enforce host-control gate.
|
||||
instanceID, tier, found := execTierGate(c, workspaceID)
|
||||
if !found {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
return
|
||||
}
|
||||
if instanceID == "" {
|
||||
// Local-Docker workspace (no EC2). Host exec via EIC is a
|
||||
// SaaS-only path; refuse rather than silently no-op.
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "workspace has no instance_id — host exec is only available for SaaS (EC2-per-workspace) workspaces"})
|
||||
return
|
||||
}
|
||||
if tier < execMinHostControlTier {
|
||||
c.JSON(http.StatusForbidden, gin.H{
|
||||
"error": "this workspace's tier does not grant host control; exec is not permitted",
|
||||
"code": "exec_tier_forbidden",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// 4. Run over the EIC tunnel with the clamped per-op deadline.
|
||||
ctx, cancel := context.WithTimeout(c.Request.Context(), time.Duration(timeoutS)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
res, runErr := execViaEIC(ctx, instanceID, argv)
|
||||
durationMs := int(time.Since(start).Milliseconds())
|
||||
|
||||
// 5. Audit — exactly one activity row. Actor + argv + exit + duration.
|
||||
// NEVER the streamed stdout/stderr content.
|
||||
h.audit(c, workspaceID, argv, res.ExitCode, durationMs, runErr)
|
||||
|
||||
// 6. Respond. On a transport/timeout error we never produced a real
|
||||
// exit code; surface it as a 5xx with no captured-output leak.
|
||||
if runErr != nil {
|
||||
// A deadline-killed exec is the operator-actionable timeout case.
|
||||
if ctx.Err() != nil {
|
||||
c.JSON(http.StatusGatewayTimeout, gin.H{
|
||||
"error": fmt.Sprintf("exec timed out after %ds", timeoutS),
|
||||
"code": "exec_timeout",
|
||||
"timeout_s": timeoutS,
|
||||
})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusBadGateway, gin.H{"error": fmt.Sprintf("exec failed: %v", runErr)})
|
||||
return
|
||||
}
|
||||
|
||||
// Framed streaming: newline-delimited JSON frames so a caller can read
|
||||
// stdout/stderr chunks and a terminal exit frame off a single
|
||||
// response body without a second round-trip. We hold the full capped
|
||||
// buffers (≤1 MiB each) and emit them as one frame per stream rather
|
||||
// than true incremental streaming — Part 1 keeps the wire shape simple
|
||||
// and bounded; incremental chunking is a Part-2 affordance.
|
||||
writeExecFrames(c, res)
|
||||
}
|
||||
|
||||
// writeExecFrames emits newline-delimited JSON frames:
|
||||
//
|
||||
// {"stream":"stdout","data":"<...>","truncated":<bool>}
|
||||
// {"stream":"stderr","data":"<...>","truncated":<bool>}
|
||||
// {"exit_code":<int>}
|
||||
//
|
||||
// stdout/stderr frames are omitted when that stream is empty. The final
|
||||
// exit_code frame is ALWAYS emitted so the caller has an unambiguous
|
||||
// terminator. Truncation is signalled both via the per-frame `truncated`
|
||||
// flag and an explicit marker appended to the data, so a caller that only
|
||||
// reads `data` still sees that output was cut.
|
||||
func writeExecFrames(c *gin.Context, res execResult) {
|
||||
c.Header("Content-Type", "application/x-ndjson")
|
||||
c.Status(http.StatusOK)
|
||||
|
||||
enc := json.NewEncoder(c.Writer)
|
||||
emit := func(stream string, data []byte, truncated bool) {
|
||||
if len(data) == 0 && !truncated {
|
||||
return
|
||||
}
|
||||
payload := string(data)
|
||||
if truncated {
|
||||
payload += "\n[output truncated at " + fmt.Sprintf("%d", execOutputCap) + " bytes]"
|
||||
}
|
||||
_ = enc.Encode(map[string]any{
|
||||
"stream": stream,
|
||||
"data": payload,
|
||||
"truncated": truncated,
|
||||
})
|
||||
}
|
||||
emit("stdout", res.Stdout, res.StdoutTruncated)
|
||||
emit("stderr", res.Stderr, res.StderrTruncated)
|
||||
_ = enc.Encode(map[string]any{"exit_code": res.ExitCode})
|
||||
}
|
||||
|
||||
// audit writes one activity_logs row for the exec. Records the actor
|
||||
// (token subject), the argv (the command — NOT its output), the exit code,
|
||||
// and the duration. The request_body carries the argv + actor; the
|
||||
// response_body carries ONLY exit_code/duration/status — never the
|
||||
// streamed stdout/stderr, mirroring the redact-secrets contract so the
|
||||
// audit ledger can't become a side channel for command output or any
|
||||
// secret a command happened to echo.
|
||||
func (h *ExecHandler) audit(c *gin.Context, workspaceID string, argv []string, exitCode, durationMs int, runErr error) {
|
||||
actor := execActor(c)
|
||||
status := "ok"
|
||||
var errDetail *string
|
||||
if runErr != nil {
|
||||
status = "error"
|
||||
if c.Request.Context().Err() != nil {
|
||||
status = "timeout"
|
||||
}
|
||||
// The transport error text is safe (it's our own wrapper +
|
||||
// ssh-layer messages), but it never contains command output.
|
||||
d := runErr.Error()
|
||||
errDetail = &d
|
||||
} else if exitCode != 0 {
|
||||
status = "error"
|
||||
}
|
||||
|
||||
method := "POST"
|
||||
summary := "exec: " + argvSummary(argv)
|
||||
src := workspaceID
|
||||
dm := durationMs
|
||||
|
||||
LogActivity(c.Request.Context(), h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "workspace_exec",
|
||||
SourceID: &src,
|
||||
Method: &method,
|
||||
Summary: &summary,
|
||||
RequestBody: map[string]any{
|
||||
"actor": actor,
|
||||
"argv": argv,
|
||||
},
|
||||
ResponseBody: map[string]any{
|
||||
"exit_code": exitCode,
|
||||
"duration_ms": durationMs,
|
||||
},
|
||||
DurationMs: &dm,
|
||||
Status: status,
|
||||
ErrorDetail: errDetail,
|
||||
})
|
||||
}
|
||||
|
||||
// execActor returns the audit subject for the caller, reusing the same
|
||||
// precedence the display-control + org-token audit paths use: org-token
|
||||
// prefix > CP session > ADMIN_TOKEN > the per-workspace bearer (attributed
|
||||
// to the workspace itself). Never the raw token value.
|
||||
func execActor(c *gin.Context) string {
|
||||
if v, ok := c.Get("org_token_prefix"); ok {
|
||||
if s, ok := v.(string); ok && s != "" {
|
||||
return actorOrgTokenPrefix + s
|
||||
}
|
||||
}
|
||||
if v, ok := c.Get("cp_session_actor"); ok {
|
||||
if s, ok := v.(string); ok && s != "" {
|
||||
return s
|
||||
}
|
||||
}
|
||||
// ADMIN_TOKEN bearer (break-glass; sets no context key) — detect it so
|
||||
// an admin-issued exec isn't mis-attributed to the workspace itself.
|
||||
// Reuses the same detection the display-control audit path uses.
|
||||
if displayControlIsAdminToken(c) {
|
||||
return actorAdminToken
|
||||
}
|
||||
return "workspace:" + c.Param("id")
|
||||
}
|
||||
|
||||
// argvSummary renders argv into a single-line, bounded summary string for
|
||||
// the activity feed. The full argv is in request_body.argv; this is the
|
||||
// human-glance form. Capped so a long command can't bloat the summary
|
||||
// column.
|
||||
func argvSummary(argv []string) string {
|
||||
s := strings.Join(argv, " ")
|
||||
const cap = 200
|
||||
if len(s) > cap {
|
||||
return s[:cap] + "…"
|
||||
}
|
||||
return s
|
||||
}
|
||||
@@ -0,0 +1,367 @@
|
||||
package handlers
|
||||
|
||||
// exec_handler_test.go — unit tests for POST /workspaces/:id/exec
|
||||
// (RFC internal#742 Part 1).
|
||||
//
|
||||
// The EIC plumbing is faked at two seams that mirror the existing EIC test
|
||||
// pattern (see template_files_eic_write_timeout_test.go for withEICTunnel,
|
||||
// eic_tunnel_pool_test.go for poolSetupTunnel):
|
||||
//
|
||||
// - withEICTunnel — swapped for a closure that hands the inner fn a bare
|
||||
// session and does NOT touch AWS/ssh.
|
||||
// - runEICExec — swapped for a fake that returns canned
|
||||
// stdout/stderr/exit (or honours an output cap / timeout) without a
|
||||
// real sshd.
|
||||
// - execTierGate — swapped to control (instanceID, tier, found) without
|
||||
// a sqlmock workspaces row, so the tier/own-org cases are deterministic.
|
||||
//
|
||||
// The audit insert goes through the real LogActivity → db.DB path, so each
|
||||
// test that reaches the run stage sets up a sqlmock and expects exactly one
|
||||
// activity_logs INSERT — proving the "one activity event per exec" contract
|
||||
// AND that the streamed stdout/stderr never appears in the persisted row.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// execTestCtx builds a gin context + recorder for a POST /exec with the
|
||||
// given JSON body and :id param.
|
||||
func execTestCtx(t *testing.T, workspaceID, body string) (*gin.Context, *httptest.ResponseRecorder) {
|
||||
t.Helper()
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
req := httptest.NewRequest(http.MethodPost, "/workspaces/"+workspaceID+"/exec", strings.NewReader(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
c.Request = req
|
||||
c.Params = gin.Params{{Key: "id", Value: workspaceID}}
|
||||
return c, w
|
||||
}
|
||||
|
||||
// stubExecTierGate swaps execTierGate for the duration of a test.
|
||||
func stubExecTierGate(t *testing.T, instanceID string, tier int, found bool) {
|
||||
t.Helper()
|
||||
prev := execTierGate
|
||||
execTierGate = func(_ *gin.Context, _ string) (string, int, bool) {
|
||||
return instanceID, tier, found
|
||||
}
|
||||
t.Cleanup(func() { execTierGate = prev })
|
||||
}
|
||||
|
||||
// stubEICExec swaps withEICTunnel + runEICExec so the handler runs against
|
||||
// a fake session with canned output. capture, if non-nil, records the argv
|
||||
// the handler actually dispatched.
|
||||
func stubEICExec(t *testing.T, res execResult, runErr error, capture *[]string) {
|
||||
t.Helper()
|
||||
prevTunnel := withEICTunnel
|
||||
prevRun := runEICExec
|
||||
withEICTunnel = func(ctx context.Context, instanceID string, fn func(s eicSSHSession) error) error {
|
||||
return fn(eicSSHSession{instanceID: instanceID, osUser: "ubuntu"})
|
||||
}
|
||||
runEICExec = func(_ context.Context, _ eicSSHSession, argv []string) (execResult, error) {
|
||||
if capture != nil {
|
||||
*capture = argv
|
||||
}
|
||||
return res, runErr
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
withEICTunnel = prevTunnel
|
||||
runEICExec = prevRun
|
||||
})
|
||||
}
|
||||
|
||||
// expectOneAuditInsert sets up a sqlmock that expects exactly one
|
||||
// activity_logs INSERT and asserts the persisted request/response bodies
|
||||
// do NOT contain the given forbidden substrings (stdout/stderr content).
|
||||
func expectOneAuditInsert(t *testing.T, forbidden ...string) sqlmock.Sqlmock {
|
||||
t.Helper()
|
||||
mock := setupTestDB(t)
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(),
|
||||
sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(),
|
||||
sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
return mock
|
||||
}
|
||||
|
||||
func decodeFrames(t *testing.T, body string) []map[string]any {
|
||||
t.Helper()
|
||||
var frames []map[string]any
|
||||
for _, line := range strings.Split(strings.TrimSpace(body), "\n") {
|
||||
if strings.TrimSpace(line) == "" {
|
||||
continue
|
||||
}
|
||||
var f map[string]any
|
||||
if err := json.Unmarshal([]byte(line), &f); err != nil {
|
||||
t.Fatalf("frame %q is not valid JSON: %v", line, err)
|
||||
}
|
||||
frames = append(frames, f)
|
||||
}
|
||||
return frames
|
||||
}
|
||||
|
||||
// --- happy path: exit 0 + output ---
|
||||
|
||||
func TestExec_HappyPath(t *testing.T) {
|
||||
expectOneAuditInsert(t)
|
||||
stubExecTierGate(t, "i-abc", 4, true)
|
||||
var gotArgv []string
|
||||
stubEICExec(t, execResult{Stdout: []byte("hello\n"), ExitCode: 0}, nil, &gotArgv)
|
||||
|
||||
c, w := execTestCtx(t, "ws-1", `{"cmd":["bash","-lc","echo hello"],"timeout_s":10}`)
|
||||
h := NewExecHandler(nil)
|
||||
h.Exec(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("want 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if want := []string{"bash", "-lc", "echo hello"}; strings.Join(gotArgv, "\x00") != strings.Join(want, "\x00") {
|
||||
t.Fatalf("argv mismatch: got %v want %v", gotArgv, want)
|
||||
}
|
||||
frames := decodeFrames(t, w.Body.String())
|
||||
last := frames[len(frames)-1]
|
||||
if ec, ok := last["exit_code"].(float64); !ok || ec != 0 {
|
||||
t.Fatalf("final frame must be exit_code 0, got %v", last)
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "hello") {
|
||||
t.Fatalf("stdout 'hello' not streamed: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// --- non-zero exit ---
|
||||
|
||||
func TestExec_NonZeroExit(t *testing.T) {
|
||||
expectOneAuditInsert(t)
|
||||
stubExecTierGate(t, "i-abc", 3, true)
|
||||
stubEICExec(t, execResult{Stderr: []byte("boom\n"), ExitCode: 7}, nil, nil)
|
||||
|
||||
c, w := execTestCtx(t, "ws-1", `{"cmd":["false"]}`)
|
||||
NewExecHandler(nil).Exec(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("want 200 (the exec ran; non-zero is in-band), got %d", w.Code)
|
||||
}
|
||||
frames := decodeFrames(t, w.Body.String())
|
||||
last := frames[len(frames)-1]
|
||||
if ec, _ := last["exit_code"].(float64); ec != 7 {
|
||||
t.Fatalf("want exit_code 7, got %v", last)
|
||||
}
|
||||
}
|
||||
|
||||
// --- timeout ---
|
||||
|
||||
func TestExec_Timeout(t *testing.T) {
|
||||
expectOneAuditInsert(t)
|
||||
stubExecTierGate(t, "i-abc", 4, true)
|
||||
// Honour the handler's per-op ctx: block until ITS deadline fires, then
|
||||
// return the transport error the way realRunEICExec does when ssh is
|
||||
// SIGKILLed by the expired deadline. The handler then sees ctx.Err() !=
|
||||
// nil and must surface a 504, not a generic 502. timeout_s:1 keeps the
|
||||
// test bounded to ~1s while still exercising the real deadline path.
|
||||
prevTunnel := withEICTunnel
|
||||
prevRun := runEICExec
|
||||
withEICTunnel = func(ctx context.Context, instanceID string, fn func(s eicSSHSession) error) error {
|
||||
return fn(eicSSHSession{instanceID: instanceID})
|
||||
}
|
||||
runEICExec = func(ctx context.Context, _ eicSSHSession, _ []string) (execResult, error) {
|
||||
<-ctx.Done() // wait for the handler's deadline to fire
|
||||
return execResult{ExitCode: -1}, ctx.Err()
|
||||
}
|
||||
t.Cleanup(func() { withEICTunnel = prevTunnel; runEICExec = prevRun })
|
||||
|
||||
c, w := execTestCtx(t, "ws-1", `{"cmd":["sleep","999"],"timeout_s":1}`)
|
||||
NewExecHandler(nil).Exec(c)
|
||||
|
||||
if w.Code != http.StatusGatewayTimeout {
|
||||
t.Fatalf("want 504 on timeout, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "exec_timeout") {
|
||||
t.Fatalf("want exec_timeout code, got %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// --- output-cap truncation ---
|
||||
|
||||
func TestExec_OutputTruncation(t *testing.T) {
|
||||
expectOneAuditInsert(t)
|
||||
stubExecTierGate(t, "i-abc", 4, true)
|
||||
// Drive the real cappedBuffer truncation by writing > execOutputCap.
|
||||
big := strings.Repeat("a", execOutputCap+1024)
|
||||
stubEICExec(t, execResult{Stdout: []byte(big[:execOutputCap]), StdoutTruncated: true, ExitCode: 0}, nil, nil)
|
||||
|
||||
c, w := execTestCtx(t, "ws-1", `{"cmd":["cat","/dev/zero"]}`)
|
||||
NewExecHandler(nil).Exec(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("want 200, got %d", w.Code)
|
||||
}
|
||||
frames := decodeFrames(t, w.Body.String())
|
||||
var sawTruncated bool
|
||||
for _, f := range frames {
|
||||
if f["stream"] == "stdout" {
|
||||
if tr, _ := f["truncated"].(bool); tr {
|
||||
sawTruncated = true
|
||||
}
|
||||
if data, _ := f["data"].(string); !strings.Contains(data, "truncated") {
|
||||
t.Fatalf("truncated stdout frame must carry an explicit marker, got %q", data[:min(80, len(data))])
|
||||
}
|
||||
}
|
||||
}
|
||||
if !sawTruncated {
|
||||
t.Fatalf("expected a truncated=true stdout frame; frames=%v", frames)
|
||||
}
|
||||
}
|
||||
|
||||
// --- argv validation: bare-string cmd rejected ---
|
||||
|
||||
func TestExec_RejectsStringCmd(t *testing.T) {
|
||||
// No tier gate / EIC stubs needed — validation fails before dispatch.
|
||||
c, w := execTestCtx(t, "ws-1", `{"cmd":"rm -rf /"}`)
|
||||
NewExecHandler(nil).Exec(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Fatalf("want 400 for a bare-string cmd, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "argv") && !strings.Contains(w.Body.String(), "array") {
|
||||
t.Fatalf("400 should explain argv-only requirement, got %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestExec_RejectsEmptyArgv(t *testing.T) {
|
||||
c, w := execTestCtx(t, "ws-1", `{"cmd":[]}`)
|
||||
NewExecHandler(nil).Exec(c)
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Fatalf("want 400 for empty argv, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
// --- capability/tier denied: a non-host-control tier gets 403 ---
|
||||
|
||||
func TestExec_TierForbidden(t *testing.T) {
|
||||
// Tier 2 (standard) is NOT a host-control tier → 403, no exec, no audit.
|
||||
stubExecTierGate(t, "i-abc", 2, true)
|
||||
// Fail loudly if the handler dispatches despite the gate.
|
||||
prevRun := runEICExec
|
||||
runEICExec = func(_ context.Context, _ eicSSHSession, _ []string) (execResult, error) {
|
||||
t.Fatalf("runEICExec must NOT be called for a forbidden tier")
|
||||
return execResult{}, nil
|
||||
}
|
||||
t.Cleanup(func() { runEICExec = prevRun })
|
||||
|
||||
c, w := execTestCtx(t, "ws-1", `{"cmd":["ls"]}`)
|
||||
NewExecHandler(nil).Exec(c)
|
||||
|
||||
if w.Code != http.StatusForbidden {
|
||||
t.Fatalf("want 403 for tier 2, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "exec_tier_forbidden") {
|
||||
t.Fatalf("want exec_tier_forbidden code, got %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// --- own-org scoping: an unknown :id (not visible to this tenant/org) is
|
||||
// 404, and the handler never dispatches exec for it. In production a
|
||||
// sibling-org id never resolves to a row in THIS one-org process, so
|
||||
// execTierGate returns found=false → 404. ---
|
||||
|
||||
func TestExec_SiblingOrgIDNotFound(t *testing.T) {
|
||||
stubExecTierGate(t, "", 0, false)
|
||||
prevRun := runEICExec
|
||||
runEICExec = func(_ context.Context, _ eicSSHSession, _ []string) (execResult, error) {
|
||||
t.Fatalf("runEICExec must NOT be called for an unknown workspace id")
|
||||
return execResult{}, nil
|
||||
}
|
||||
t.Cleanup(func() { runEICExec = prevRun })
|
||||
|
||||
c, w := execTestCtx(t, "ws-sibling-org", `{"cmd":["ls"]}`)
|
||||
NewExecHandler(nil).Exec(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Fatalf("want 404 for a workspace id not in this org, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// --- audit content contract: the persisted activity row carries argv +
|
||||
// exit but NEVER the stdout/stderr content. ---
|
||||
|
||||
func TestExec_AuditExcludesOutputContent(t *testing.T) {
|
||||
const secretOut = "SUPER_SECRET_TOKEN_VALUE"
|
||||
// Expect exactly one insert AND assert no arg contains the secret output.
|
||||
mock := setupTestDB(t)
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(
|
||||
sqlmock.AnyArg(), // workspace_id
|
||||
sqlmock.AnyArg(), // activity_type
|
||||
sqlmock.AnyArg(), // source_id
|
||||
sqlmock.AnyArg(), // target_id
|
||||
sqlmock.AnyArg(), // method
|
||||
noSecret(t, secretOut), // summary
|
||||
noSecret(t, secretOut), // request_body
|
||||
noSecret(t, secretOut), // response_body
|
||||
sqlmock.AnyArg(), // tool_trace
|
||||
sqlmock.AnyArg(), // duration_ms
|
||||
sqlmock.AnyArg(), // status
|
||||
sqlmock.AnyArg(), // error_detail
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
stubExecTierGate(t, "i-abc", 4, true)
|
||||
stubEICExec(t, execResult{Stdout: []byte(secretOut + "\n"), ExitCode: 0}, nil, nil)
|
||||
|
||||
c, w := execTestCtx(t, "ws-1", `{"cmd":["printenv","SECRET"]}`)
|
||||
NewExecHandler(nil).Exec(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("want 200, got %d", w.Code)
|
||||
}
|
||||
// The secret IS streamed to the caller (that's the point of exec)...
|
||||
if !strings.Contains(w.Body.String(), secretOut) {
|
||||
t.Fatalf("stdout should be streamed to the caller")
|
||||
}
|
||||
// ...but must NOT be in the audit row (asserted by noSecret matchers).
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("audit insert expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// noSecret is a sqlmock argument matcher that asserts the stringified
|
||||
// argument does not contain the forbidden substring.
|
||||
type secretFreeArg struct {
|
||||
t *testing.T
|
||||
forbidden string
|
||||
}
|
||||
|
||||
func (m secretFreeArg) Match(v driver.Value) bool {
|
||||
var s string
|
||||
switch t := v.(type) {
|
||||
case nil:
|
||||
s = ""
|
||||
case string:
|
||||
s = t
|
||||
case []byte:
|
||||
s = string(t)
|
||||
case *string:
|
||||
if t != nil {
|
||||
s = *t
|
||||
}
|
||||
default:
|
||||
s = fmt.Sprintf("%v", v)
|
||||
}
|
||||
if strings.Contains(s, m.forbidden) {
|
||||
m.t.Errorf("audit row leaked forbidden content %q in: %q", m.forbidden, s)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func noSecret(t *testing.T, forbidden string) secretFreeArg {
|
||||
return secretFreeArg{t: t, forbidden: forbidden}
|
||||
}
|
||||
@@ -694,6 +694,17 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
wsAuth.PUT("/files/*path", tmplh.WriteFile)
|
||||
wsAuth.DELETE("/files/*path", tmplh.DeleteFile)
|
||||
|
||||
// One-shot host exec (RFC internal#742 Part 1) — registered on the same
|
||||
// WorkspaceAuth-gated group as /files/* so it inherits the per-workspace
|
||||
// token authz (and, on SaaS, the per-org TenantGuard). The handler adds a
|
||||
// tier gate on top: only host-control tiers (T3/T4) may run host exec;
|
||||
// lower/read-only tiers get 403. Runs the caller's argv non-interactively
|
||||
// over the EXISTING EIC tunnel broker (execViaEIC → withEICTunnel), the
|
||||
// same pooled-SSH path the Files API uses. Emits one audited activity row
|
||||
// per exec (actor + argv + exit + duration; never stdout/stderr content).
|
||||
exech := handlers.NewExecHandler(broadcaster)
|
||||
wsAuth.POST("/exec", exech.Exec)
|
||||
|
||||
// Chat attachments — file upload (user → agent) and binary-safe
|
||||
// streaming download (agent → user). Namespaced under /chat/ so
|
||||
// the security model is obviously distinct from /files/* (which
|
||||
|
||||
Reference in New Issue
Block a user