fix(core#2693): normalize system:restart-context callerID + register-400 diagnostics #2722

Closed
agent-dev-b wants to merge 1 commits from fix/core2693-restart-context-callerid-register400 into main
5 changed files with 352 additions and 6 deletions
@@ -90,6 +90,47 @@ func isSystemCaller(callerID string) bool {
return false
}
// callerIDToSourceID normalizes a callerID for storage in activity_logs.source_id
// (UUID-typed, see migration 009). Returns nil when the callerID is NOT a valid
// workspace UUID — empty string, a system caller prefix ("system:...",
// "webhook:...", "test:...", "channel:..."), or any other non-UUID shape — so
// the activity_logs INSERT never tries to coerce a synthetic caller like
// "system:restart-context" into the source_id column. The synthetic caller is
// still attributable via the request body + summary (e.g. summary="message/send
// → ws-name (ingest)"), and a future source_kind column could distinguish the
// shapes — for now, nil is the safe, non-poisoning storage.
//
// Why both the system-prefix check AND the UUID parse check: the system prefix
// is the documented allowlist (so we can add a new "channel:foo" tomorrow
// without breaking callers that have a non-UUID identifier), and the UUID parse
// is the structural backstop in case some future caller path forgets the
// prefix and a non-UUID slips through to a logA2A* helper. Defense in depth.
//
// This is the production-code half of #2680/#2693 / controlplane #2530: the
// restart-context path (restart_context.go:296) issues ProxyA2ARequest with
// callerID="system:restart-context", and prior to this helper the literal
// string reached activity_logs.source_id and either failed the UUID INSERT or
// poisoned the column. With this helper, the same flow stores source_id=NULL
// while keeping the request body, summary, and the system-caller trace in
// LogActivity's broadcast payload intact.
func callerIDToSourceID(callerID string) *string {
if callerID == "" {
return nil
}
if isSystemCaller(callerID) {
return nil
}
if _, err := uuid.Parse(callerID); err != nil {
// Non-UUID, non-system caller — treat the same as a system caller:
// the source_id column is structurally UUID-typed, and a non-UUID
// would fail the INSERT or be coerced to nil anyway. We make the
// nil explicit so the LogActivity broadcast and downstream readers
// see a consistent "no source workspace" state.
return nil
}
return &callerID
}
// maxProxyResponseBody is the maximum size of an A2A proxy response body (10MB).
const maxProxyResponseBody = 10 << 20
@@ -315,7 +315,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
SourceID: callerIDToSourceID(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
@@ -343,7 +343,7 @@ func (h *WorkspaceHandler) logA2ABusyQueued(ctx context.Context, workspaceID, ca
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
SourceID: callerIDToSourceID(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
@@ -416,7 +416,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
SourceID: callerIDToSourceID(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
@@ -757,7 +757,7 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID,
LogActivity(insCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
SourceID: callerIDToSourceID(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
@@ -859,7 +859,7 @@ func (h *WorkspaceHandler) persistUserMessageAtIngest(
LogActivity(insCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
SourceID: callerIDToSourceID(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
@@ -845,6 +845,55 @@ func TestIsSystemCaller(t *testing.T) {
}
}
// TestCallerIDToSourceID is the production-code regression guard for #2693
// (and the production half of #2680 / #2530 / #2701). The restart-context
// path passes callerID="system:restart-context" through ProxyA2ARequest;
// the logA2A* helpers then take callerID → activity_logs.source_id (UUID-
// typed, see migration 009). Pre-fix the literal "system:restart-context"
// string reached the source_id column and either failed the UUID INSERT
// or poisoned the column. callerIDToSourceID must:
// - return nil for empty / system-prefixed callerIDs (no poison)
// - return nil for any non-UUID caller (defense in depth: future caller
// paths that forget the system: prefix still get a nil, not a 500)
// - return &callerID for a valid workspace UUID
func TestCallerIDToSourceID(t *testing.T) {
validUUID := "550e8400-e29b-41d4-a716-446655440000"
cases := []struct {
name string
caller string
want *string // nil → expect nil; &"..." → expect &caller
}{
{"empty returns nil", "", nil},
{"system prefix returns nil", "system:restart-context", nil},
{"system scheduler returns nil", "system:scheduler", nil},
{"webhook prefix returns nil", "webhook:github", nil},
{"test prefix returns nil", "test:fake", nil},
{"channel prefix returns nil", "channel:telegram", nil},
{"valid UUID returns pointer", validUUID, &validUUID},
{"non-UUID non-system returns nil", "ws-uuid-123", nil},
{"free-form text returns nil", "not-a-uuid-at-all", nil},
{"uuid-like-but-malformed returns nil", "550e8400-e29b-41d4-a716", nil},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := callerIDToSourceID(tc.caller)
if tc.want == nil {
if got != nil {
t.Errorf("callerIDToSourceID(%q) = %v, want nil", tc.caller, *got)
}
return
}
if got == nil {
t.Errorf("callerIDToSourceID(%q) = nil, want %q", tc.caller, *tc.want)
return
}
if *got != *tc.want {
t.Errorf("callerIDToSourceID(%q) = %q, want %q", tc.caller, *got, *tc.want)
}
})
}
}
// ==================== detectPlatformInDocker ====================
func TestDetectPlatformInDocker_EnvVar(t *testing.T) {
+41 -1
View File
@@ -329,11 +329,34 @@ func isPlatformTunnelHostname(h string) bool {
// Register handles POST /registry/register
// Upserts workspace, sets Redis TTL, broadcasts WORKSPACE_ONLINE.
func (h *RegistryHandler) Register(c *gin.Context) {
// #2693 register-400-on-recreate diagnostics. The pre-fix boot_register_failed
// log only reported status=400, which is the same line for five distinct
// causes (invalid body / invalid delivery_mode / invalid kind / url-required
// for push / url fails SSRF). Operators triaging restart-context "Secret
// with name bootstrap not found" downstream effects (#2530 / #2680) had no
// way to tell WHY register returned 400. This captures the specific reason
// (single grep-able "registry_register_400 reason=…" line per 400 path) and
// folds it into the deferred failure log so the same telemetry also lands
// on the canonical boot_register_failed line.
//
// workspaceID is set after ShouldBindJSON succeeds (so the invalid-body
// 400 path can't leak a prior request's ID via the defer); the empty-string
// case is handled cleanly by the log format (`workspace= reason=…`).
var register400Reason, register400Detail, registerWorkspaceID string
defer func() {
if register400Reason != "" {
log.Printf("Registry register: workspace=%s registry_register_400 reason=%s detail=%q", registerWorkspaceID, register400Reason, register400Detail)
}
}()
var payload models.RegisterPayload
if err := c.ShouldBindJSON(&payload); err != nil {
register400Reason = "invalid_request_body"
register400Detail = err.Error()
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
registerWorkspaceID = payload.ID
// #2500 instrumentation: log non-200 boot Register outcomes so operators
// can distinguish 401 (C18 token race), 400 (push-URL invalid/empty),
@@ -343,7 +366,11 @@ func (h *RegistryHandler) Register(c *gin.Context) {
authOK := false
defer func(wsID string) {
if status := c.Writer.Status(); status != http.StatusOK {
log.Printf("Registry register: workspace=%s boot_register_failed status=%d duration=%s", wsID, status, time.Since(registerStart))
if register400Reason != "" {
log.Printf("Registry register: workspace=%s boot_register_failed status=%d reason=%s detail=%q duration=%s", wsID, status, register400Reason, register400Detail, time.Since(registerStart))
} else {
log.Printf("Registry register: workspace=%s boot_register_failed status=%d duration=%s", wsID, status, time.Since(registerStart))
}
// #2530: record register failure so heartbeat can surface degraded status.
// #2585 hardening: only stamp after the caller has authenticated
// (requireWorkspaceToken succeeded). Unauthenticated 401s must NOT
@@ -362,6 +389,8 @@ func (h *RegistryHandler) Register(c *gin.Context) {
// allowed and resolves to the row's existing value (or "push" default)
// in the upsert below. See #2339 for the poll/push split rationale.
if payload.DeliveryMode != "" && !models.IsValidDeliveryMode(payload.DeliveryMode) {
register400Reason = "invalid_delivery_mode"
register400Detail = payload.DeliveryMode
c.JSON(http.StatusBadRequest, gin.H{"error": "delivery_mode must be 'push' or 'poll'"})
return
}
@@ -370,6 +399,8 @@ func (h *RegistryHandler) Register(c *gin.Context) {
// resolves to the row's existing value (or "workspace" default) in
// resolveKind below. Only the platform-agent container declares 'platform'.
if payload.Kind != "" && !models.IsValidKind(payload.Kind) {
register400Reason = "invalid_kind"
register400Detail = payload.Kind
c.JSON(http.StatusBadRequest, gin.H{"error": "kind must be 'workspace' or 'platform'"})
return
}
@@ -460,10 +491,19 @@ func (h *RegistryHandler) Register(c *gin.Context) {
}
}
if effectiveURL == "" {
register400Reason = "url_required_for_push"
c.JSON(http.StatusBadRequest, gin.H{"error": "url is required for push-mode workspaces"})
return
}
if err := validateAgentURL(effectiveURL); err != nil {
register400Reason = "url_validate_failed"
// #2693 PII/secret-leak fix: don't echo the raw URL in the
// detail line. The SSRF classifier (validateAgentURL) already
// returns a friendly CIDR label (e.g. "url resolves to link-local
// 169.254.0.0/16 — disallowed by SSRF safety policy"), so the
// diagnostic value is preserved (operator can see WHICH CIDR
// class failed) without leaking the literal address.
register400Detail = err.Error()
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
@@ -2755,3 +2755,219 @@ func TestRegister_PushModeNoURLNoCardURLStill400(t *testing.T) {
t.Errorf("expected 400 (no url in payload or agent_card), got %d: %s", w.Code, w.Body.String())
}
}
// ==================== #2693 register-400 diagnostics ====================
//
// The pre-fix boot_register_failed line reported status=400, which is the
// same line for FIVE distinct causes (invalid body / invalid delivery_mode /
// invalid kind / url-required-for-push / url-validate-failed). Operators
// triaging the restart-context "Secret with name bootstrap not found" /
// register-400-on-recreate class of issues (#2530 / #2680 / #2693) had no
// way to tell WHY register returned 400. The fix adds a single grep-able
// "registry_register_400 reason=…" line per 400 path (and folds the same
// reason into the deferred boot_register_failed line). These tests pin
// the contract: each 400 path emits the expected reason, AND the deferred
// failure log carries the same reason on a single line.
// captureLogs swaps log output to a buffer for the duration of t, returns
// the buffer (callers should read it AFTER the request has returned so
// all deferred log lines have flushed).
func captureLogs(t *testing.T) *bytes.Buffer {
t.Helper()
var buf bytes.Buffer
oldOutput := log.Writer()
log.SetOutput(&buf)
t.Cleanup(func() { log.SetOutput(oldOutput) })
return &buf
}
func TestRegister_400_InvalidRequestBody_LogsReason(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
buf := captureLogs(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString("not json"))
c.Request.Header.Set("Content-Type", "application/json")
NewRegistryHandler(newTestBroadcaster()).Register(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d", w.Code)
}
logs := buf.String()
if !strings.Contains(logs, "registry_register_400 reason=invalid_request_body") {
t.Errorf("expected invalid_request_body reason, got: %s", logs)
}
// NOTE: the pre-existing boot_register_failed defer is registered
// AFTER the invalid-body 400 return, so it does NOT fire on this
// path. That's a pre-fix limitation; the new registry_register_400
// log fills the gap. The other 400 paths (invalid delivery_mode,
// invalid kind, url_required_for_push, url_validate_failed) return
// later in the function and DO hit the boot_register_failed defer —
// those tests assert both lines.
}
func TestRegister_400_InvalidDeliveryMode_LogsReason(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
buf := captureLogs(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"id":"ws-dm","delivery_mode":"bogus","url":"http://localhost:8000","agent_card":{"name":"x"}}`
c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
NewRegistryHandler(newTestBroadcaster()).Register(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
logs := buf.String()
if !strings.Contains(logs, "registry_register_400 reason=invalid_delivery_mode") {
t.Errorf("expected invalid_delivery_mode reason, got: %s", logs)
}
// detail carries the offending value (NOT the URL — delivery_mode is
// a short enum, no PII risk).
if !strings.Contains(logs, `detail="bogus"`) {
t.Errorf("expected detail to include offending delivery_mode, got: %s", logs)
}
}
func TestRegister_400_InvalidKind_LogsReason(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
buf := captureLogs(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"id":"ws-kind","kind":"bogus","url":"http://localhost:8000","agent_card":{"name":"x"}}`
c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
NewRegistryHandler(newTestBroadcaster()).Register(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
logs := buf.String()
if !strings.Contains(logs, "registry_register_400 reason=invalid_kind") {
t.Errorf("expected invalid_kind reason, got: %s", logs)
}
if !strings.Contains(logs, `detail="bogus"`) {
t.Errorf("expected detail to include offending kind, got: %s", logs)
}
}
func TestRegister_400_URLRequiredForPush_LogsReason(t *testing.T) {
// Push mode with no URL anywhere — same shape as
// TestRegister_PushModeNoURLNoCardURLStill400, but we assert the
// diagnostic reason lands on the log line.
setupTestDB(t)
setupTestRedis(t)
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs("ws-nourl").
WillReturnError(sql.ErrNoRows)
buf := captureLogs(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"id":"ws-nourl","url":"","agent_card":{"name":"x"}}`
c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
NewRegistryHandler(newTestBroadcaster()).Register(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d", w.Code)
}
logs := buf.String()
if !strings.Contains(logs, "registry_register_400 reason=url_required_for_push") {
t.Errorf("expected url_required_for_push reason, got: %s", logs)
}
// The deferred boot_register_failed line carries the same reason.
if !strings.Contains(logs, "boot_register_failed status=400 reason=url_required_for_push") {
t.Errorf("expected boot_register_failed with reason, got: %s", logs)
}
}
func TestRegister_400_URLValidateFailed_LogsReasonWithoutRawURL(t *testing.T) {
// Push mode with a URL that fails SSRF safety (link-local 169.254.x.x
// is always blocked by validateAgentURL — see registry.go:466-469).
// We assert:
// - 400 response
// - reason=url_validate_failed
// - detail carries the friendly CIDR label (e.g. "link-local")
// from validateAgentURL's error — NOT the raw URL, so the log
// line is safe to surface in operator dashboards without
// leaking the rejected address.
setupTestDB(t)
setupTestRedis(t)
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs("ws-ssrf").
WillReturnError(sql.ErrNoRows)
buf := captureLogs(t)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
// 169.254.0.1 is link-local; validateAgentURL rejects it.
body := `{"id":"ws-ssrf","url":"http://169.254.0.1:8000","agent_card":{"name":"x"}}`
c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
NewRegistryHandler(newTestBroadcaster()).Register(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
logs := buf.String()
if !strings.Contains(logs, "registry_register_400 reason=url_validate_failed") {
t.Errorf("expected url_validate_failed reason, got: %s", logs)
}
// The detail must include the friendly CIDR label, not the raw URL.
// validateAgentURL produces a message like "url resolves to link-local
// 169.254.0.0/16 — disallowed by SSRF safety policy", so the substring
// "link-local" or "SSRF" should appear. We don't pin the exact wording
// to keep the test resilient to friendly-message rephrasing.
if !strings.Contains(logs, "SSRF") && !strings.Contains(logs, "link-local") {
t.Errorf("expected detail to include the SSRF/link-local classifier, got: %s", logs)
}
// The raw URL must NOT appear in the log (the response body still
// contains it because that's what validateAgentURL returned — the
// log is the part the operator can ship to dashboards).
if strings.Contains(logs, "169.254.0.1") {
t.Errorf("raw URL 169.254.0.1 should NOT appear in the log line, got: %s", logs)
}
}
func TestRegister_200_DoesNotEmit400ReasonLog(t *testing.T) {
// Mirror TestRegister_200_DoesNotLogFailure: a successful Register
// must not emit a registry_register_400 line (the deferred log
// guards on register400Reason != ""). Pinning the contract here so
// future refactors that always-log can't silently regress.
mock := setupTestDB(t)
setupTestRedis(t)
buf := captureLogs(t)
// Same happy-path setup as the existing 200 test.
wsID := "ws-ok"
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs(wsID).
WillReturnError(sql.ErrNoRows)
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(wsID, wsID, "http://localhost:8000", sqlmock.AnyArg(), "push", "").
WillReturnResult(sqlmock.NewResult(1, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"id":"` + wsID + `","url":"http://localhost:8000","agent_card":{"name":"x"}}`
c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
NewRegistryHandler(newTestBroadcaster()).Register(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if strings.Contains(buf.String(), "registry_register_400") {
t.Errorf("200 response must not emit registry_register_400 log, got: %s", buf.String())
}
}