diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers_test.go b/workspace-server/internal/handlers/a2a_proxy_helpers_test.go index df62b3130..b5619aaf8 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers_test.go @@ -335,3 +335,44 @@ func TestParseUsageFromA2AResponse_MissingTokensInUsageObject(t *testing.T) { t.Errorf("missing tokens: got (%d, %d), want (0, 0)", in, out) } } + +// TestRestartContext_SystemCallerDoesNotPoisonSourceID is the +// regression guard for the #2680 residual (criterion a): when the +// restart-context production path (restart_context.go:sendRestartContext +// L296) calls ProxyA2ARequest with callerID="system:restart-context", +// the synthetic non-UUID callerID must NOT be inserted into the +// UUID-typed activity_logs.source_id column. The path is: +// sendRestartContext → ProxyA2ARequest(..., "system:restart-context", ...) +// → persistUserMessageAtIngest(..., "system:restart-context", ...) +// → LogActivityWithResult({SourceID: callerIDToSourceID("system:restart-context")}) +// → activity_logs INSERT with SourceID = NULL +// +// The fix (#2701) introduced the scoped helper callerIDToSourceID +// which returns nil for any system-caller prefix (matching +// isSystemCaller in a2a_proxy.go:85). This test pins the contract. +// +// If the callerIDToSourceID helper is later removed OR weakened OR +// the call site is reverted, the UUID cast on activity_logs.source_id +// will fail with pq: invalid input syntax for type uuid and the +// post-restart queue-fallback path will return 503 → workspace stays +// degraded. This test catches that regression. +func TestRestartContext_SystemCallerDoesNotPoisonSourceID(t *testing.T) { + // Direct unit-level check of the scoped helper against all 4 + // systemCallerPrefixes. If callerIDToSourceID returns nil for + // any of these, the production path's INSERT is safe; the + // SQL binds NULL, the cast is skipped, no poison. + prefixes := []string{ + "system:restart-context", // the specific offender + "webhook:github", + "test:lifecycle-1", + "channel:slack:C0123", + } + for _, p := range prefixes { + t.Run(p, func(t *testing.T) { + got := callerIDToSourceID(p) + if got != nil { + t.Errorf("system caller %q: got non-nil pointer; would poison activity_logs.source_id (UUID cast fail → degraded wedge)", p) + } + }) + } +} diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index ed1fdc770..62a804c5d 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -331,6 +331,12 @@ func isPlatformTunnelHostname(h string) bool { func (h *RegistryHandler) Register(c *gin.Context) { var payload models.RegisterPayload if err := c.ShouldBindJSON(&payload); err != nil { + // pre-ctx: the workspace's existing row state isn't fetched yet + // (we don't have ctx + we don't know the workspace ID until + // parse succeeds). Log with an empty diagnostics struct — + // the row state defaults to "(new)" in the log line, which is + // the right framing for a fresh-register parse failure. + logRegister400Reason("invalid_json", "", models.RegisterPayload{}, registerDiagnostics{}, err.Error()) c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) return } @@ -362,6 +368,9 @@ func (h *RegistryHandler) Register(c *gin.Context) { // allowed and resolves to the row's existing value (or "push" default) // in the upsert below. See #2339 for the poll/push split rationale. if payload.DeliveryMode != "" && !models.IsValidDeliveryMode(payload.DeliveryMode) { + // pre-existingState (see L398): pass an empty struct; the row + // state defaults to "(new)" in the log line. + logRegister400Reason("invalid_delivery_mode", payload.ID, payload, registerDiagnostics{}, "payload.delivery_mode="+payload.DeliveryMode) c.JSON(http.StatusBadRequest, gin.H{"error": "delivery_mode must be 'push' or 'poll'"}) return } @@ -370,12 +379,29 @@ func (h *RegistryHandler) Register(c *gin.Context) { // resolves to the row's existing value (or "workspace" default) in // resolveKind below. Only the platform-agent container declares 'platform'. if payload.Kind != "" && !models.IsValidKind(payload.Kind) { + logRegister400Reason("invalid_kind", payload.ID, payload, registerDiagnostics{}, "payload.kind="+payload.Kind) c.JSON(http.StatusBadRequest, gin.H{"error": "kind must be 'workspace' or 'platform'"}) return } ctx := c.Request.Context() + // #2680 residual: register-400 diagnostics on the recreate path. + // When a recreated container's first /registry/register call returns + // 400, we need to know WHICH validation step fired (URL missing? URL + // in a private range? delivery_mode? kind? invalid JSON?) AND what + // the workspace's existing row state was at the time. Without this, + // the next restart run produces a 400 with no actionable signal — + // the deferred boot_register_failed log at the end of the function + // only fires AFTER the validation has already returned, so by the + // time it logs we have the status code but not the reason. + // + // Helper: logRegister400Reason captures the failure reason + the + // workspace's existing row state in a single grep-able line. Called + // by every 400 path below. Idempotent: writes to log.Printf only, + // does not mutate state. + existingState := h.fetchExistingWorkspaceStateForDiagnostics(ctx, payload.ID) + // C18: prevent workspace URL hijacking on re-registration. // // An attacker can overwrite any workspace's agent_card URL by calling @@ -460,10 +486,20 @@ func (h *RegistryHandler) Register(c *gin.Context) { } } if effectiveURL == "" { + // Detail: which surface had a URL (so the operator can + // tell "no URL anywhere" from "URL in agent_card but + // not in payload"). NEVER log the raw URL (see RC + // #11335). + logRegister400Reason("url_required_for_push", payload.ID, payload, existingState, "effective_url_empty (payload_url_present="+urlPresence(payload.URL)+", agent_card_url_present="+urlPresence(agentCardURL(payload.AgentCard))+")") c.JSON(http.StatusBadRequest, gin.H{"error": "url is required for push-mode workspaces"}) return } if err := validateAgentURL(effectiveURL); err != nil { + // validateAgentURL returns a friendly CIDR label + // (e.g. "url targets a blocked address: RFC-1918 + // private address") that does NOT contain the actual + // address. Safe to log. + logRegister400Reason("url_validate_failed", payload.ID, payload, existingState, err.Error()) c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } @@ -1171,3 +1207,112 @@ func (h *RegistryHandler) requireWorkspaceToken( // see "gin.Context.Request.Context() is what we want" without re-typing // the import-heavy standard type. type gincontext = context.Context + +// fetchExistingWorkspaceStateForDiagnostics reads the workspace's +// current row state (url, kind, delivery_mode) for the diagnostic +// log line. Best-effort: a DB error here is logged+ignored; the +// diagnostic line emits "(unavailable)" for the missing fields. +// +// Why best-effort: the diagnostic MUST NOT introduce a new failure +// path. The function is called before the validation chain runs; +// if it errored loudly the operator would see a SECOND 500 on top +// of the original 400, and the new error would mask the original +// cause. The defer boot_register_failed log captures the row's +// failure timestamp for follow-up triage. +func (h *RegistryHandler) fetchExistingWorkspaceStateForDiagnostics(ctx context.Context, workspaceID string) registerDiagnostics { + var d registerDiagnostics + if workspaceID == "" { + return d + } + var url, kind, mode sql.NullString + if err := db.DB.QueryRowContext(ctx, + `SELECT url, kind, delivery_mode FROM workspaces WHERE id = $1`, + workspaceID, + ).Scan(&url, &kind, &mode); err == nil { + if url.Valid { + d.ExistingURL = url.String + } + if kind.Valid { + d.ExistingKind = kind.String + } + if mode.Valid { + d.ExistingDeliveryMode = mode.String + } + } else if !errors.Is(err, sql.ErrNoRows) { + log.Printf("Registry register: diagnostics fetch failed for %s: %v", workspaceID, err) + } + return d +} + +// registerDiagnostics captures the workspace's existing row state at +// the time of a 400 response, so the operator can compare the +// request payload against the row to identify the drift source. +// All fields are best-effort; empty string means "unavailable" +// (either the row didn't exist, the column was NULL, or the fetch +// errored). +type registerDiagnostics struct { + ExistingURL string + ExistingKind string + ExistingDeliveryMode string +} + +// logRegister400Reason emits a single grep-able log line for every +// 400 path in /registry/register. The line shape is stable +// (`registry_register_400`) so operators can grep Loki for the +// class. Fields: +// workspace_id — the requested ID +// reason — short key (invalid_json | invalid_delivery_mode | +// invalid_kind | url_required_for_push | url_validate_failed) +// payload_url — the URL the agent sent in the payload (may be empty) +// payload_card_url — the URL the agent put in its agent_card (may be empty) +// payload_kind — the kind the agent sent +// payload_delivery_mode — the delivery_mode the agent sent +// existing_url — the URL already on the workspaces row (or "(new)") +// existing_kind — the kind already on the row (or "(new)") +// existing_delivery_mode — the delivery_mode already on the row (or "(new)") +// detail — the failure-specific detail (parse error, validation +// error, missing-field note, etc.) +// +// The class is part of the #2680 residual: a recreated container's +// first /registry/register call has been returning 400 with no +// actionable signal. The deferred boot_register_failed log fires too +// late (after the 400 has already been returned to the client) and +// only carries the status code, not the reason. This line is +// emitted synchronously inside each 400 path, BEFORE the response +// is written, so the next restart run will surface the cause +// directly. +func logRegister400Reason(reason, workspaceID string, payload models.RegisterPayload, existing registerDiagnostics, detail string) { + cardURLPresence := urlPresence(agentCardURL(payload.AgentCard)) + payloadURLPresence := urlPresence(payload.URL) + exURLPresence := "(new)" + if existing.ExistingURL != "" { + exURLPresence = "present" + } + exKind := existing.ExistingKind + if exKind == "" { + exKind = "(new)" + } + exMode := existing.ExistingDeliveryMode + if exMode == "" { + exMode = "(new)" + } + log.Printf("registry_register_400 workspace=%s reason=%s payload_url=%s payload_card_url=%s payload_kind=%q payload_delivery_mode=%q existing_url=%s existing_kind=%q existing_delivery_mode=%q detail=%q", + workspaceID, reason, + payloadURLPresence, cardURLPresence, payload.Kind, payload.DeliveryMode, + exURLPresence, exKind, exMode, + detail, + ) +} + +// urlPresence reports "present" vs "absent" for a URL string, +// without ever logging the URL value. Used by logRegister400Reason +// to redact the URL columns (see RC #11335: workspace URLs can be +// private — Hetzner 10.0.0.x, GCP 10.x.x.x, in-VPC 172.31.x.x — +// and the prior implementation leaked them to anyone with Loki +// read access). +func urlPresence(url string) string { + if url == "" { + return "absent" + } + return "present" +} diff --git a/workspace-server/internal/handlers/registry_test.go b/workspace-server/internal/handlers/registry_test.go index 6f3eb537b..a7caa0f08 100644 --- a/workspace-server/internal/handlers/registry_test.go +++ b/workspace-server/internal/handlers/registry_test.go @@ -2811,3 +2811,240 @@ func TestRegister_PushModeNoURLNoCardURLStill400(t *testing.T) { t.Errorf("expected 400 (no url in payload or agent_card), got %d: %s", w.Code, w.Body.String()) } } + +// TestRegister_400_LogsDiagnosticsReason is the #2680 residual regression +// guard. When a recreated container's first /registry/register call +// returns 400, the operator needs the failing-reason key +// (invalid_json | invalid_delivery_mode | invalid_kind | url_required_for_push +// | url_validate_failed) AND the workspace's existing row state +// (url, kind, delivery_mode) to identify the drift source. The +// 400 path that fires the diagnostic must emit a single grep-able +// log line BEFORE writing the response so the next restart run +// surfaces the cause directly. +// +// Each subtest exercises one of the 5 documented 400 paths. The +// existing row state is mocked where needed to verify the +// `existing_*` log fields are populated. +func TestRegister_400_LogsDiagnosticsReason(t *testing.T) { + cases := []struct { + name string + body string + expectedReason string + expectStatus int + setup func(mock sqlmock.Sqlmock, workspaceID string) + }{ + { + name: "invalid_delivery_mode", + body: `{"id":"ws-1","url":"http://localhost:8000","delivery_mode":"foo","agent_card":{"name":"x"}}`, + expectedReason: "invalid_delivery_mode", + expectStatus: http.StatusBadRequest, + }, + { + name: "invalid_kind", + body: `{"id":"ws-1","url":"http://localhost:8000","kind":"foo","agent_card":{"name":"x"}}`, + expectedReason: "invalid_kind", + expectStatus: http.StatusBadRequest, + }, + { + name: "url_required_for_push", + body: `{"id":"ws-1","delivery_mode":"push","agent_card":{"name":"x"}}`, + expectedReason: "url_required_for_push", + expectStatus: http.StatusBadRequest, + setup: func(mock sqlmock.Sqlmock, workspaceID string) { + // C18 token gate: fresh-register path, no live tokens. + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(workspaceID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`). + WithArgs(workspaceID). + WillReturnError(sql.ErrNoRows) + // Defer boot_register_failed path: UPDATE failure timestamp. + mock.ExpectExec("UPDATE workspaces SET last_register_failure_at"). + WithArgs(workspaceID). + WillReturnResult(sqlmock.NewResult(0, 1)) + }, + }, + { + name: "url_validate_failed_link_local", + body: `{"id":"ws-1","url":"http://169.254.169.254:8000","delivery_mode":"push","agent_card":{"name":"x"}}`, + expectedReason: "url_validate_failed", + expectStatus: http.StatusBadRequest, + setup: func(mock sqlmock.Sqlmock, workspaceID string) { + // C18 token gate: fresh-register path, no live tokens. + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs(workspaceID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`). + WithArgs(workspaceID). + WillReturnError(sql.ErrNoRows) + mock.ExpectExec("UPDATE workspaces SET last_register_failure_at"). + WithArgs(workspaceID). + WillReturnResult(sqlmock.NewResult(0, 1)) + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + var buf bytes.Buffer + oldOutput := log.Writer() + log.SetOutput(&buf) + defer log.SetOutput(oldOutput) + + if tc.setup != nil { + tc.setup(mock, "ws-1") + } + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(tc.body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Register(c) + + if w.Code != tc.expectStatus { + t.Errorf("expected status %d, got %d: %s", tc.expectStatus, w.Code, w.Body.String()) + } + + logs := buf.String() + want := "registry_register_400 workspace=ws-1 reason=" + tc.expectedReason + if !strings.Contains(logs, want) { + t.Errorf("expected diagnostic log %q, got: %s", want, logs) + } + }) + } +} + +// TestRegister_400_LogsExistingRowState verifies that the diagnostic +// log line captures the workspace's existing row state (URL, kind, +// delivery_mode) at the time of the 400 — so the operator can +// compare the rejected payload against the row to identify the +// drift source. The fetchExistingWorkspaceStateForDiagnostics query +// is mocked; the test asserts the log line carries the expected +// `existing_*` values (NOT the raw URL per RC #11335). +// +// Uses the url_validate_failed path (link-local URL) because it's +// the only 400 path that runs AFTER the fetch — invalid_json, +// invalid_delivery_mode, and invalid_kind all run before the +// existingState fetch (pre-ctx) and pass an empty diagnostics. +// url_required_for_push + url_validate_failed both run post-ctx +// and carry the fetched state. +func TestRegister_400_LogsExistingRowState(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewRegistryHandler(broadcaster) + + var buf bytes.Buffer + oldOutput := log.Writer() + log.SetOutput(&buf) + defer log.SetOutput(oldOutput) + + // Existing row with the documented (recreated container) state. + // The trigger is a 400 on the url_validate_failed path, which + // runs AFTER the existing-state fetch — so the test exercises + // the real fetch path. + // + // Order of queries in the actual code path: + // 1. fetchExistingWorkspaceStateForDiagnostics + // (SELECT url, kind, delivery_mode) — happens at L398, AFTER + // the ShouldBindJSON + delivery_mode/kind early checks. + // 2. C18 token gate (Phase 30.1) + // (SELECT COUNT(*) FROM workspace_auth_tokens) — happens at L390, + // AFTER the fetch. Fresh-register path, no live tokens. + // 3. resolveDeliveryMode (SELECT delivery_mode, runtime) — happens + // at L427, AFTER the C18 check. + // 4. defer boot_register_failed UPDATE — happens AFTER the 400 + // is returned. + // + // fetchExistingWorkspaceStateForDiagnostics reads url, kind, + // delivery_mode from the same row. + mock.ExpectQuery(`SELECT url, kind, delivery_mode FROM workspaces WHERE id`). + WithArgs("ws-existing"). + WillReturnRows(sqlmock.NewRows([]string{"url", "kind", "delivery_mode"}). + AddRow("https://ws-existing.example.com", "workspace", "push")) + // C18 token gate: fresh-register path, no live tokens. + mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens"). + WithArgs("ws-existing"). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + // resolveDeliveryMode reads delivery_mode + runtime from the + // row. + mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`). + WithArgs("ws-existing"). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode", "runtime"}). + AddRow("push", "external")) + // Defer boot_register_failed path: UPDATE failure timestamp + // (authOK=true after the body parses). + mock.ExpectExec("UPDATE workspaces SET last_register_failure_at"). + WithArgs("ws-existing"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + // Link-local URL — always blocked by validateAgentURL in any + // deploy mode (RC #11335's chosen test URL). This drives the + // url_validate_failed path so the fetch is exercised. + body := `{"id":"ws-existing","url":"http://169.254.169.254:8000","delivery_mode":"push","agent_card":{"name":"x"}}` + c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Register(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String()) + } + + logs := buf.String() + // The diagnostic log must include the existing row's kind + // (workspace) and delivery_mode (push) as the basis for the + // operator's drift analysis. existing_url is REDACTED (RC #11335) + // — it must be the literal "present", NOT the raw URL. + if !strings.Contains(logs, "registry_register_400") { + t.Errorf("expected registry_register_400 diagnostic, got: %s", logs) + } + if !strings.Contains(logs, "reason=url_validate_failed") { + t.Errorf("expected reason=url_validate_failed, got: %s", logs) + } + if !strings.Contains(logs, "workspace=ws-existing") { + t.Errorf("expected workspace=ws-existing, got: %s", logs) + } + // existing_kind and existing_delivery_mode use %q in the format + // string (quoted), so the value is wrapped in double-quotes in + // the log line. Asserting for the substring with quotes is the + // correct shape. + if !strings.Contains(logs, `existing_kind="workspace"`) { + t.Errorf("expected existing_kind=\"workspace\" (from the row), got: %s", logs) + } + if !strings.Contains(logs, `existing_delivery_mode="push"`) { + t.Errorf("expected existing_delivery_mode=\"push\" (from the row), got: %s", logs) + } + if !strings.Contains(logs, "existing_url=present") { + t.Errorf("expected existing_url=present (URL redacted per RC #11335), got: %s", logs) + } + // Critical: the raw URL MUST NOT appear in the log line + // (RC #11335). The agent's URL is also a private link-local + // (169.254.169.254) which is itself PII-adjacent. + if strings.Contains(logs, "ws-existing.example.com") { + t.Errorf("REGRESSION: raw existing URL leaked into diagnostic log line (RC #11335): %s", logs) + } + if strings.Contains(logs, "169.254.169.254") { + t.Errorf("REGRESSION: raw payload URL leaked into diagnostic log line (RC #11335): %s", logs) + } + // The payload URL is present (the agent sent http://169.254.169.254:8000); + // it's the row's URL that may-or-may-not be present (we set it + // present in the mock). + if !strings.Contains(logs, "payload_url=present") { + t.Errorf("expected payload_url=present (agent sent a URL), got: %s", logs) + } + // The detail field carries validateAgentURL's friendly CIDR label, + // NOT the raw URL. (RC #11335's allowed exception for the detail + // field.) + if !strings.Contains(logs, "blocked address") { + t.Errorf("expected friendly CIDR label in detail (validateAgentURL's message), got: %s", logs) + } +}