diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index de6756fae..541cdaf04 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -90,6 +90,47 @@ func isSystemCaller(callerID string) bool { return false } +// callerIDToSourceID normalizes a callerID for storage in activity_logs.source_id +// (UUID-typed, see migration 009). Returns nil when the callerID is NOT a valid +// workspace UUID — empty string, a system caller prefix ("system:...", +// "webhook:...", "test:...", "channel:..."), or any other non-UUID shape — so +// the activity_logs INSERT never tries to coerce a synthetic caller like +// "system:restart-context" into the source_id column. The synthetic caller is +// still attributable via the request body + summary (e.g. summary="message/send +// → ws-name (ingest)"), and a future source_kind column could distinguish the +// shapes — for now, nil is the safe, non-poisoning storage. +// +// Why both the system-prefix check AND the UUID parse check: the system prefix +// is the documented allowlist (so we can add a new "channel:foo" tomorrow +// without breaking callers that have a non-UUID identifier), and the UUID parse +// is the structural backstop in case some future caller path forgets the +// prefix and a non-UUID slips through to a logA2A* helper. Defense in depth. +// +// This is the production-code half of #2680/#2693 / controlplane #2530: the +// restart-context path (restart_context.go:296) issues ProxyA2ARequest with +// callerID="system:restart-context", and prior to this helper the literal +// string reached activity_logs.source_id and either failed the UUID INSERT or +// poisoned the column. With this helper, the same flow stores source_id=NULL +// while keeping the request body, summary, and the system-caller trace in +// LogActivity's broadcast payload intact. +func callerIDToSourceID(callerID string) *string { + if callerID == "" { + return nil + } + if isSystemCaller(callerID) { + return nil + } + if _, err := uuid.Parse(callerID); err != nil { + // Non-UUID, non-system caller — treat the same as a system caller: + // the source_id column is structurally UUID-typed, and a non-UUID + // would fail the INSERT or be coerced to nil anyway. We make the + // nil explicit so the LogActivity broadcast and downstream readers + // see a consistent "no source workspace" state. + return nil + } + return &callerID +} + // maxProxyResponseBody is the maximum size of an A2A proxy response body (10MB). const maxProxyResponseBody = 10 << 20 diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index d9d9c8ec1..319c4b36c 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -315,7 +315,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle LogActivity(logCtx, h.broadcaster, ActivityParams{ WorkspaceID: workspaceID, ActivityType: "a2a_receive", - SourceID: nilIfEmpty(callerID), + SourceID: callerIDToSourceID(callerID), TargetID: &workspaceID, Method: &a2aMethod, Summary: &summary, @@ -343,7 +343,7 @@ func (h *WorkspaceHandler) logA2ABusyQueued(ctx context.Context, workspaceID, ca LogActivity(logCtx, h.broadcaster, ActivityParams{ WorkspaceID: workspaceID, ActivityType: "a2a_receive", - SourceID: nilIfEmpty(callerID), + SourceID: callerIDToSourceID(callerID), TargetID: &workspaceID, Method: &a2aMethod, Summary: &summary, @@ -416,7 +416,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle LogActivity(logCtx, h.broadcaster, ActivityParams{ WorkspaceID: workspaceID, ActivityType: "a2a_receive", - SourceID: nilIfEmpty(callerID), + SourceID: callerIDToSourceID(callerID), TargetID: &workspaceID, Method: &a2aMethod, Summary: &summary, @@ -757,7 +757,7 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, LogActivity(insCtx, h.broadcaster, ActivityParams{ WorkspaceID: workspaceID, ActivityType: "a2a_receive", - SourceID: nilIfEmpty(callerID), + SourceID: callerIDToSourceID(callerID), TargetID: &workspaceID, Method: &a2aMethod, Summary: &summary, @@ -859,7 +859,7 @@ func (h *WorkspaceHandler) persistUserMessageAtIngest( LogActivity(insCtx, h.broadcaster, ActivityParams{ WorkspaceID: workspaceID, ActivityType: "a2a_receive", - SourceID: nilIfEmpty(callerID), + SourceID: callerIDToSourceID(callerID), TargetID: &workspaceID, Method: &a2aMethod, Summary: &summary, diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index c7398f4a7..6a8a56529 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -845,6 +845,55 @@ func TestIsSystemCaller(t *testing.T) { } } +// TestCallerIDToSourceID is the production-code regression guard for #2693 +// (and the production half of #2680 / #2530 / #2701). The restart-context +// path passes callerID="system:restart-context" through ProxyA2ARequest; +// the logA2A* helpers then take callerID → activity_logs.source_id (UUID- +// typed, see migration 009). Pre-fix the literal "system:restart-context" +// string reached the source_id column and either failed the UUID INSERT +// or poisoned the column. callerIDToSourceID must: +// - return nil for empty / system-prefixed callerIDs (no poison) +// - return nil for any non-UUID caller (defense in depth: future caller +// paths that forget the system: prefix still get a nil, not a 500) +// - return &callerID for a valid workspace UUID +func TestCallerIDToSourceID(t *testing.T) { + validUUID := "550e8400-e29b-41d4-a716-446655440000" + cases := []struct { + name string + caller string + want *string // nil → expect nil; &"..." → expect &caller + }{ + {"empty returns nil", "", nil}, + {"system prefix returns nil", "system:restart-context", nil}, + {"system scheduler returns nil", "system:scheduler", nil}, + {"webhook prefix returns nil", "webhook:github", nil}, + {"test prefix returns nil", "test:fake", nil}, + {"channel prefix returns nil", "channel:telegram", nil}, + {"valid UUID returns pointer", validUUID, &validUUID}, + {"non-UUID non-system returns nil", "ws-uuid-123", nil}, + {"free-form text returns nil", "not-a-uuid-at-all", nil}, + {"uuid-like-but-malformed returns nil", "550e8400-e29b-41d4-a716", nil}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := callerIDToSourceID(tc.caller) + if tc.want == nil { + if got != nil { + t.Errorf("callerIDToSourceID(%q) = %v, want nil", tc.caller, *got) + } + return + } + if got == nil { + t.Errorf("callerIDToSourceID(%q) = nil, want %q", tc.caller, *tc.want) + return + } + if *got != *tc.want { + t.Errorf("callerIDToSourceID(%q) = %q, want %q", tc.caller, *got, *tc.want) + } + }) + } +} + // ==================== detectPlatformInDocker ==================== func TestDetectPlatformInDocker_EnvVar(t *testing.T) { diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index 6c286ecd5..a09c1e966 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -329,11 +329,34 @@ func isPlatformTunnelHostname(h string) bool { // Register handles POST /registry/register // Upserts workspace, sets Redis TTL, broadcasts WORKSPACE_ONLINE. func (h *RegistryHandler) Register(c *gin.Context) { + // #2693 register-400-on-recreate diagnostics. The pre-fix boot_register_failed + // log only reported status=400, which is the same line for five distinct + // causes (invalid body / invalid delivery_mode / invalid kind / url-required + // for push / url fails SSRF). Operators triaging restart-context "Secret + // with name bootstrap not found" downstream effects (#2530 / #2680) had no + // way to tell WHY register returned 400. This captures the specific reason + // (single grep-able "registry_register_400 reason=…" line per 400 path) and + // folds it into the deferred failure log so the same telemetry also lands + // on the canonical boot_register_failed line. + // + // workspaceID is set after ShouldBindJSON succeeds (so the invalid-body + // 400 path can't leak a prior request's ID via the defer); the empty-string + // case is handled cleanly by the log format (`workspace= reason=…`). + var register400Reason, register400Detail, registerWorkspaceID string + defer func() { + if register400Reason != "" { + log.Printf("Registry register: workspace=%s registry_register_400 reason=%s detail=%q", registerWorkspaceID, register400Reason, register400Detail) + } + }() + var payload models.RegisterPayload if err := c.ShouldBindJSON(&payload); err != nil { + register400Reason = "invalid_request_body" + register400Detail = err.Error() c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) return } + registerWorkspaceID = payload.ID // #2500 instrumentation: log non-200 boot Register outcomes so operators // can distinguish 401 (C18 token race), 400 (push-URL invalid/empty), @@ -343,7 +366,11 @@ func (h *RegistryHandler) Register(c *gin.Context) { authOK := false defer func(wsID string) { if status := c.Writer.Status(); status != http.StatusOK { - log.Printf("Registry register: workspace=%s boot_register_failed status=%d duration=%s", wsID, status, time.Since(registerStart)) + if register400Reason != "" { + log.Printf("Registry register: workspace=%s boot_register_failed status=%d reason=%s detail=%q duration=%s", wsID, status, register400Reason, register400Detail, time.Since(registerStart)) + } else { + log.Printf("Registry register: workspace=%s boot_register_failed status=%d duration=%s", wsID, status, time.Since(registerStart)) + } // #2530: record register failure so heartbeat can surface degraded status. // #2585 hardening: only stamp after the caller has authenticated // (requireWorkspaceToken succeeded). Unauthenticated 401s must NOT @@ -362,6 +389,8 @@ func (h *RegistryHandler) Register(c *gin.Context) { // allowed and resolves to the row's existing value (or "push" default) // in the upsert below. See #2339 for the poll/push split rationale. if payload.DeliveryMode != "" && !models.IsValidDeliveryMode(payload.DeliveryMode) { + register400Reason = "invalid_delivery_mode" + register400Detail = payload.DeliveryMode c.JSON(http.StatusBadRequest, gin.H{"error": "delivery_mode must be 'push' or 'poll'"}) return } @@ -370,6 +399,8 @@ func (h *RegistryHandler) Register(c *gin.Context) { // resolves to the row's existing value (or "workspace" default) in // resolveKind below. Only the platform-agent container declares 'platform'. if payload.Kind != "" && !models.IsValidKind(payload.Kind) { + register400Reason = "invalid_kind" + register400Detail = payload.Kind c.JSON(http.StatusBadRequest, gin.H{"error": "kind must be 'workspace' or 'platform'"}) return } @@ -460,10 +491,19 @@ func (h *RegistryHandler) Register(c *gin.Context) { } } if effectiveURL == "" { + register400Reason = "url_required_for_push" c.JSON(http.StatusBadRequest, gin.H{"error": "url is required for push-mode workspaces"}) return } if err := validateAgentURL(effectiveURL); err != nil { + register400Reason = "url_validate_failed" + // #2693 PII/secret-leak fix: don't echo the raw URL in the + // detail line. The SSRF classifier (validateAgentURL) already + // returns a friendly CIDR label (e.g. "url resolves to link-local + // 169.254.0.0/16 — disallowed by SSRF safety policy"), so the + // diagnostic value is preserved (operator can see WHICH CIDR + // class failed) without leaking the literal address. + register400Detail = err.Error() c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } diff --git a/workspace-server/internal/handlers/registry_test.go b/workspace-server/internal/handlers/registry_test.go index 1a309949f..af73ada21 100644 --- a/workspace-server/internal/handlers/registry_test.go +++ b/workspace-server/internal/handlers/registry_test.go @@ -2755,3 +2755,219 @@ func TestRegister_PushModeNoURLNoCardURLStill400(t *testing.T) { t.Errorf("expected 400 (no url in payload or agent_card), got %d: %s", w.Code, w.Body.String()) } } + +// ==================== #2693 register-400 diagnostics ==================== +// +// The pre-fix boot_register_failed line reported status=400, which is the +// same line for FIVE distinct causes (invalid body / invalid delivery_mode / +// invalid kind / url-required-for-push / url-validate-failed). Operators +// triaging the restart-context "Secret with name bootstrap not found" / +// register-400-on-recreate class of issues (#2530 / #2680 / #2693) had no +// way to tell WHY register returned 400. The fix adds a single grep-able +// "registry_register_400 reason=…" line per 400 path (and folds the same +// reason into the deferred boot_register_failed line). These tests pin +// the contract: each 400 path emits the expected reason, AND the deferred +// failure log carries the same reason on a single line. + +// captureLogs swaps log output to a buffer for the duration of t, returns +// the buffer (callers should read it AFTER the request has returned so +// all deferred log lines have flushed). +func captureLogs(t *testing.T) *bytes.Buffer { + t.Helper() + var buf bytes.Buffer + oldOutput := log.Writer() + log.SetOutput(&buf) + t.Cleanup(func() { log.SetOutput(oldOutput) }) + return &buf +} + +func TestRegister_400_InvalidRequestBody_LogsReason(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + buf := captureLogs(t) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString("not json")) + c.Request.Header.Set("Content-Type", "application/json") + NewRegistryHandler(newTestBroadcaster()).Register(c) + + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d", w.Code) + } + logs := buf.String() + if !strings.Contains(logs, "registry_register_400 reason=invalid_request_body") { + t.Errorf("expected invalid_request_body reason, got: %s", logs) + } + // NOTE: the pre-existing boot_register_failed defer is registered + // AFTER the invalid-body 400 return, so it does NOT fire on this + // path. That's a pre-fix limitation; the new registry_register_400 + // log fills the gap. The other 400 paths (invalid delivery_mode, + // invalid kind, url_required_for_push, url_validate_failed) return + // later in the function and DO hit the boot_register_failed defer — + // those tests assert both lines. +} + +func TestRegister_400_InvalidDeliveryMode_LogsReason(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + buf := captureLogs(t) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"id":"ws-dm","delivery_mode":"bogus","url":"http://localhost:8000","agent_card":{"name":"x"}}` + c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + NewRegistryHandler(newTestBroadcaster()).Register(c) + + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String()) + } + logs := buf.String() + if !strings.Contains(logs, "registry_register_400 reason=invalid_delivery_mode") { + t.Errorf("expected invalid_delivery_mode reason, got: %s", logs) + } + // detail carries the offending value (NOT the URL — delivery_mode is + // a short enum, no PII risk). + if !strings.Contains(logs, `detail="bogus"`) { + t.Errorf("expected detail to include offending delivery_mode, got: %s", logs) + } +} + +func TestRegister_400_InvalidKind_LogsReason(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + buf := captureLogs(t) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"id":"ws-kind","kind":"bogus","url":"http://localhost:8000","agent_card":{"name":"x"}}` + c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + NewRegistryHandler(newTestBroadcaster()).Register(c) + + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String()) + } + logs := buf.String() + if !strings.Contains(logs, "registry_register_400 reason=invalid_kind") { + t.Errorf("expected invalid_kind reason, got: %s", logs) + } + if !strings.Contains(logs, `detail="bogus"`) { + t.Errorf("expected detail to include offending kind, got: %s", logs) + } +} + +func TestRegister_400_URLRequiredForPush_LogsReason(t *testing.T) { + // Push mode with no URL anywhere — same shape as + // TestRegister_PushModeNoURLNoCardURLStill400, but we assert the + // diagnostic reason lands on the log line. + setupTestDB(t) + setupTestRedis(t) + mock := setupTestDB(t) + mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`). + WithArgs("ws-nourl"). + WillReturnError(sql.ErrNoRows) + buf := captureLogs(t) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"id":"ws-nourl","url":"","agent_card":{"name":"x"}}` + c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + NewRegistryHandler(newTestBroadcaster()).Register(c) + + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d", w.Code) + } + logs := buf.String() + if !strings.Contains(logs, "registry_register_400 reason=url_required_for_push") { + t.Errorf("expected url_required_for_push reason, got: %s", logs) + } + // The deferred boot_register_failed line carries the same reason. + if !strings.Contains(logs, "boot_register_failed status=400 reason=url_required_for_push") { + t.Errorf("expected boot_register_failed with reason, got: %s", logs) + } +} + +func TestRegister_400_URLValidateFailed_LogsReasonWithoutRawURL(t *testing.T) { + // Push mode with a URL that fails SSRF safety (link-local 169.254.x.x + // is always blocked by validateAgentURL — see registry.go:466-469). + // We assert: + // - 400 response + // - reason=url_validate_failed + // - detail carries the friendly CIDR label (e.g. "link-local") + // from validateAgentURL's error — NOT the raw URL, so the log + // line is safe to surface in operator dashboards without + // leaking the rejected address. + setupTestDB(t) + setupTestRedis(t) + mock := setupTestDB(t) + mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`). + WithArgs("ws-ssrf"). + WillReturnError(sql.ErrNoRows) + buf := captureLogs(t) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + // 169.254.0.1 is link-local; validateAgentURL rejects it. + body := `{"id":"ws-ssrf","url":"http://169.254.0.1:8000","agent_card":{"name":"x"}}` + c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + NewRegistryHandler(newTestBroadcaster()).Register(c) + + if w.Code != http.StatusBadRequest { + t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String()) + } + logs := buf.String() + if !strings.Contains(logs, "registry_register_400 reason=url_validate_failed") { + t.Errorf("expected url_validate_failed reason, got: %s", logs) + } + // The detail must include the friendly CIDR label, not the raw URL. + // validateAgentURL produces a message like "url resolves to link-local + // 169.254.0.0/16 — disallowed by SSRF safety policy", so the substring + // "link-local" or "SSRF" should appear. We don't pin the exact wording + // to keep the test resilient to friendly-message rephrasing. + if !strings.Contains(logs, "SSRF") && !strings.Contains(logs, "link-local") { + t.Errorf("expected detail to include the SSRF/link-local classifier, got: %s", logs) + } + // The raw URL must NOT appear in the log (the response body still + // contains it because that's what validateAgentURL returned — the + // log is the part the operator can ship to dashboards). + if strings.Contains(logs, "169.254.0.1") { + t.Errorf("raw URL 169.254.0.1 should NOT appear in the log line, got: %s", logs) + } +} + +func TestRegister_200_DoesNotEmit400ReasonLog(t *testing.T) { + // Mirror TestRegister_200_DoesNotLogFailure: a successful Register + // must not emit a registry_register_400 line (the deferred log + // guards on register400Reason != ""). Pinning the contract here so + // future refactors that always-log can't silently regress. + mock := setupTestDB(t) + setupTestRedis(t) + buf := captureLogs(t) + + // Same happy-path setup as the existing 200 test. + wsID := "ws-ok" + mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`). + WithArgs(wsID). + WillReturnError(sql.ErrNoRows) + mock.ExpectExec("INSERT INTO workspaces"). + WithArgs(wsID, wsID, "http://localhost:8000", sqlmock.AnyArg(), "push", ""). + WillReturnResult(sqlmock.NewResult(1, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + body := `{"id":"` + wsID + `","url":"http://localhost:8000","agent_card":{"name":"x"}}` + c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + NewRegistryHandler(newTestBroadcaster()).Register(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if strings.Contains(buf.String(), "registry_register_400") { + t.Errorf("200 response must not emit registry_register_400 log, got: %s", buf.String()) + } +}