fix(registry): add register-400 diagnostics + restart-context callerID regression guard (#2680 residual) #2710

Merged
devops-engineer merged 3 commits from fix/restart-context-register-400-diagnostics into main 2026-06-13 06:20:18 +00:00
3 changed files with 423 additions and 0 deletions
@@ -335,3 +335,44 @@ func TestParseUsageFromA2AResponse_MissingTokensInUsageObject(t *testing.T) {
t.Errorf("missing tokens: got (%d, %d), want (0, 0)", in, out)
}
}
// TestRestartContext_SystemCallerDoesNotPoisonSourceID is the
// regression guard for the #2680 residual (criterion a): when the
// restart-context production path (restart_context.go:sendRestartContext
// L296) calls ProxyA2ARequest with callerID="system:restart-context",
// the synthetic non-UUID callerID must NOT be inserted into the
// UUID-typed activity_logs.source_id column. The path is:
// sendRestartContext → ProxyA2ARequest(..., "system:restart-context", ...)
// → persistUserMessageAtIngest(..., "system:restart-context", ...)
// → LogActivityWithResult({SourceID: callerIDToSourceID("system:restart-context")})
// → activity_logs INSERT with SourceID = NULL
//
// The fix (#2701) introduced the scoped helper callerIDToSourceID
// which returns nil for any system-caller prefix (matching
// isSystemCaller in a2a_proxy.go:85). This test pins the contract.
//
// If the callerIDToSourceID helper is later removed OR weakened OR
// the call site is reverted, the UUID cast on activity_logs.source_id
// will fail with pq: invalid input syntax for type uuid and the
// post-restart queue-fallback path will return 503 → workspace stays
// degraded. This test catches that regression.
func TestRestartContext_SystemCallerDoesNotPoisonSourceID(t *testing.T) {
// Direct unit-level check of the scoped helper against all 4
// systemCallerPrefixes. If callerIDToSourceID returns nil for
// any of these, the production path's INSERT is safe; the
// SQL binds NULL, the cast is skipped, no poison.
prefixes := []string{
"system:restart-context", // the specific offender
"webhook:github",
"test:lifecycle-1",
"channel:slack:C0123",
}
for _, p := range prefixes {
t.Run(p, func(t *testing.T) {
got := callerIDToSourceID(p)
if got != nil {
t.Errorf("system caller %q: got non-nil pointer; would poison activity_logs.source_id (UUID cast fail → degraded wedge)", p)
}
})
}
}
@@ -331,6 +331,12 @@ func isPlatformTunnelHostname(h string) bool {
func (h *RegistryHandler) Register(c *gin.Context) {
var payload models.RegisterPayload
if err := c.ShouldBindJSON(&payload); err != nil {
// pre-ctx: the workspace's existing row state isn't fetched yet
// (we don't have ctx + we don't know the workspace ID until
// parse succeeds). Log with an empty diagnostics struct —
// the row state defaults to "(new)" in the log line, which is
// the right framing for a fresh-register parse failure.
logRegister400Reason("invalid_json", "", models.RegisterPayload{}, registerDiagnostics{}, err.Error())
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
@@ -362,6 +368,9 @@ func (h *RegistryHandler) Register(c *gin.Context) {
// allowed and resolves to the row's existing value (or "push" default)
// in the upsert below. See #2339 for the poll/push split rationale.
if payload.DeliveryMode != "" && !models.IsValidDeliveryMode(payload.DeliveryMode) {
// pre-existingState (see L398): pass an empty struct; the row
// state defaults to "(new)" in the log line.
logRegister400Reason("invalid_delivery_mode", payload.ID, payload, registerDiagnostics{}, "payload.delivery_mode="+payload.DeliveryMode)
c.JSON(http.StatusBadRequest, gin.H{"error": "delivery_mode must be 'push' or 'poll'"})
return
}
@@ -370,12 +379,29 @@ func (h *RegistryHandler) Register(c *gin.Context) {
// resolves to the row's existing value (or "workspace" default) in
// resolveKind below. Only the platform-agent container declares 'platform'.
if payload.Kind != "" && !models.IsValidKind(payload.Kind) {
logRegister400Reason("invalid_kind", payload.ID, payload, registerDiagnostics{}, "payload.kind="+payload.Kind)
c.JSON(http.StatusBadRequest, gin.H{"error": "kind must be 'workspace' or 'platform'"})
return
}
ctx := c.Request.Context()
// #2680 residual: register-400 diagnostics on the recreate path.
// When a recreated container's first /registry/register call returns
// 400, we need to know WHICH validation step fired (URL missing? URL
// in a private range? delivery_mode? kind? invalid JSON?) AND what
// the workspace's existing row state was at the time. Without this,
// the next restart run produces a 400 with no actionable signal —
// the deferred boot_register_failed log at the end of the function
// only fires AFTER the validation has already returned, so by the
// time it logs we have the status code but not the reason.
//
// Helper: logRegister400Reason captures the failure reason + the
// workspace's existing row state in a single grep-able line. Called
// by every 400 path below. Idempotent: writes to log.Printf only,
// does not mutate state.
existingState := h.fetchExistingWorkspaceStateForDiagnostics(ctx, payload.ID)
// C18: prevent workspace URL hijacking on re-registration.
//
// An attacker can overwrite any workspace's agent_card URL by calling
@@ -460,10 +486,20 @@ func (h *RegistryHandler) Register(c *gin.Context) {
}
}
if effectiveURL == "" {
// Detail: which surface had a URL (so the operator can
// tell "no URL anywhere" from "URL in agent_card but
// not in payload"). NEVER log the raw URL (see RC
// #11335).
logRegister400Reason("url_required_for_push", payload.ID, payload, existingState, "effective_url_empty (payload_url_present="+urlPresence(payload.URL)+", agent_card_url_present="+urlPresence(agentCardURL(payload.AgentCard))+")")
c.JSON(http.StatusBadRequest, gin.H{"error": "url is required for push-mode workspaces"})
return
}
if err := validateAgentURL(effectiveURL); err != nil {
// validateAgentURL returns a friendly CIDR label
// (e.g. "url targets a blocked address: RFC-1918
// private address") that does NOT contain the actual
// address. Safe to log.
logRegister400Reason("url_validate_failed", payload.ID, payload, existingState, err.Error())
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
@@ -1171,3 +1207,112 @@ func (h *RegistryHandler) requireWorkspaceToken(
// see "gin.Context.Request.Context() is what we want" without re-typing
// the import-heavy standard type.
type gincontext = context.Context
// fetchExistingWorkspaceStateForDiagnostics reads the workspace's
// current row state (url, kind, delivery_mode) for the diagnostic
// log line. Best-effort: a DB error here is logged+ignored; the
// diagnostic line emits "(unavailable)" for the missing fields.
//
// Why best-effort: the diagnostic MUST NOT introduce a new failure
// path. The function is called before the validation chain runs;
// if it errored loudly the operator would see a SECOND 500 on top
// of the original 400, and the new error would mask the original
// cause. The defer boot_register_failed log captures the row's
// failure timestamp for follow-up triage.
func (h *RegistryHandler) fetchExistingWorkspaceStateForDiagnostics(ctx context.Context, workspaceID string) registerDiagnostics {
var d registerDiagnostics
if workspaceID == "" {
return d
}
var url, kind, mode sql.NullString
if err := db.DB.QueryRowContext(ctx,
`SELECT url, kind, delivery_mode FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&url, &kind, &mode); err == nil {
if url.Valid {
d.ExistingURL = url.String
}
if kind.Valid {
d.ExistingKind = kind.String
}
if mode.Valid {
d.ExistingDeliveryMode = mode.String
}
} else if !errors.Is(err, sql.ErrNoRows) {
log.Printf("Registry register: diagnostics fetch failed for %s: %v", workspaceID, err)
}
return d
}
// registerDiagnostics captures the workspace's existing row state at
// the time of a 400 response, so the operator can compare the
// request payload against the row to identify the drift source.
// All fields are best-effort; empty string means "unavailable"
// (either the row didn't exist, the column was NULL, or the fetch
// errored).
type registerDiagnostics struct {
ExistingURL string
ExistingKind string
ExistingDeliveryMode string
}
// logRegister400Reason emits a single grep-able log line for every
// 400 path in /registry/register. The line shape is stable
// (`registry_register_400`) so operators can grep Loki for the
// class. Fields:
// workspace_id — the requested ID
// reason — short key (invalid_json | invalid_delivery_mode |
// invalid_kind | url_required_for_push | url_validate_failed)
// payload_url — the URL the agent sent in the payload (may be empty)
// payload_card_url — the URL the agent put in its agent_card (may be empty)
// payload_kind — the kind the agent sent
// payload_delivery_mode — the delivery_mode the agent sent
// existing_url — the URL already on the workspaces row (or "(new)")
// existing_kind — the kind already on the row (or "(new)")
// existing_delivery_mode — the delivery_mode already on the row (or "(new)")
// detail — the failure-specific detail (parse error, validation
// error, missing-field note, etc.)
//
// The class is part of the #2680 residual: a recreated container's
// first /registry/register call has been returning 400 with no
// actionable signal. The deferred boot_register_failed log fires too
// late (after the 400 has already been returned to the client) and
// only carries the status code, not the reason. This line is
// emitted synchronously inside each 400 path, BEFORE the response
// is written, so the next restart run will surface the cause
// directly.
func logRegister400Reason(reason, workspaceID string, payload models.RegisterPayload, existing registerDiagnostics, detail string) {
cardURLPresence := urlPresence(agentCardURL(payload.AgentCard))
payloadURLPresence := urlPresence(payload.URL)
exURLPresence := "(new)"
if existing.ExistingURL != "" {
exURLPresence = "present"
}
exKind := existing.ExistingKind
if exKind == "" {
exKind = "(new)"
}
exMode := existing.ExistingDeliveryMode
if exMode == "" {
exMode = "(new)"
}
log.Printf("registry_register_400 workspace=%s reason=%s payload_url=%s payload_card_url=%s payload_kind=%q payload_delivery_mode=%q existing_url=%s existing_kind=%q existing_delivery_mode=%q detail=%q",
workspaceID, reason,
payloadURLPresence, cardURLPresence, payload.Kind, payload.DeliveryMode,
exURLPresence, exKind, exMode,
detail,
)
}
// urlPresence reports "present" vs "absent" for a URL string,
// without ever logging the URL value. Used by logRegister400Reason
// to redact the URL columns (see RC #11335: workspace URLs can be
// private — Hetzner 10.0.0.x, GCP 10.x.x.x, in-VPC 172.31.x.x —
// and the prior implementation leaked them to anyone with Loki
// read access).
func urlPresence(url string) string {
if url == "" {
return "absent"
}
return "present"
}
@@ -2811,3 +2811,240 @@ func TestRegister_PushModeNoURLNoCardURLStill400(t *testing.T) {
t.Errorf("expected 400 (no url in payload or agent_card), got %d: %s", w.Code, w.Body.String())
}
}
// TestRegister_400_LogsDiagnosticsReason is the #2680 residual regression
// guard. When a recreated container's first /registry/register call
// returns 400, the operator needs the failing-reason key
// (invalid_json | invalid_delivery_mode | invalid_kind | url_required_for_push
// | url_validate_failed) AND the workspace's existing row state
// (url, kind, delivery_mode) to identify the drift source. The
// 400 path that fires the diagnostic must emit a single grep-able
// log line BEFORE writing the response so the next restart run
// surfaces the cause directly.
//
// Each subtest exercises one of the 5 documented 400 paths. The
// existing row state is mocked where needed to verify the
// `existing_*` log fields are populated.
func TestRegister_400_LogsDiagnosticsReason(t *testing.T) {
cases := []struct {
name string
body string
expectedReason string
expectStatus int
setup func(mock sqlmock.Sqlmock, workspaceID string)
}{
{
name: "invalid_delivery_mode",
body: `{"id":"ws-1","url":"http://localhost:8000","delivery_mode":"foo","agent_card":{"name":"x"}}`,
expectedReason: "invalid_delivery_mode",
expectStatus: http.StatusBadRequest,
},
{
name: "invalid_kind",
body: `{"id":"ws-1","url":"http://localhost:8000","kind":"foo","agent_card":{"name":"x"}}`,
expectedReason: "invalid_kind",
expectStatus: http.StatusBadRequest,
},
{
name: "url_required_for_push",
body: `{"id":"ws-1","delivery_mode":"push","agent_card":{"name":"x"}}`,
expectedReason: "url_required_for_push",
expectStatus: http.StatusBadRequest,
setup: func(mock sqlmock.Sqlmock, workspaceID string) {
// C18 token gate: fresh-register path, no live tokens.
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(workspaceID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs(workspaceID).
WillReturnError(sql.ErrNoRows)
// Defer boot_register_failed path: UPDATE failure timestamp.
mock.ExpectExec("UPDATE workspaces SET last_register_failure_at").
WithArgs(workspaceID).
WillReturnResult(sqlmock.NewResult(0, 1))
},
},
{
name: "url_validate_failed_link_local",
body: `{"id":"ws-1","url":"http://169.254.169.254:8000","delivery_mode":"push","agent_card":{"name":"x"}}`,
expectedReason: "url_validate_failed",
expectStatus: http.StatusBadRequest,
setup: func(mock sqlmock.Sqlmock, workspaceID string) {
// C18 token gate: fresh-register path, no live tokens.
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs(workspaceID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs(workspaceID).
WillReturnError(sql.ErrNoRows)
mock.ExpectExec("UPDATE workspaces SET last_register_failure_at").
WithArgs(workspaceID).
WillReturnResult(sqlmock.NewResult(0, 1))
},
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
var buf bytes.Buffer
oldOutput := log.Writer()
log.SetOutput(&buf)
defer log.SetOutput(oldOutput)
if tc.setup != nil {
tc.setup(mock, "ws-1")
}
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(tc.body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != tc.expectStatus {
t.Errorf("expected status %d, got %d: %s", tc.expectStatus, w.Code, w.Body.String())
}
logs := buf.String()
want := "registry_register_400 workspace=ws-1 reason=" + tc.expectedReason
if !strings.Contains(logs, want) {
t.Errorf("expected diagnostic log %q, got: %s", want, logs)
}
})
}
}
// TestRegister_400_LogsExistingRowState verifies that the diagnostic
// log line captures the workspace's existing row state (URL, kind,
// delivery_mode) at the time of the 400 — so the operator can
// compare the rejected payload against the row to identify the
// drift source. The fetchExistingWorkspaceStateForDiagnostics query
// is mocked; the test asserts the log line carries the expected
// `existing_*` values (NOT the raw URL per RC #11335).
//
// Uses the url_validate_failed path (link-local URL) because it's
// the only 400 path that runs AFTER the fetch — invalid_json,
// invalid_delivery_mode, and invalid_kind all run before the
// existingState fetch (pre-ctx) and pass an empty diagnostics.
// url_required_for_push + url_validate_failed both run post-ctx
// and carry the fetched state.
func TestRegister_400_LogsExistingRowState(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewRegistryHandler(broadcaster)
var buf bytes.Buffer
oldOutput := log.Writer()
log.SetOutput(&buf)
defer log.SetOutput(oldOutput)
// Existing row with the documented (recreated container) state.
// The trigger is a 400 on the url_validate_failed path, which
// runs AFTER the existing-state fetch — so the test exercises
// the real fetch path.
//
// Order of queries in the actual code path:
// 1. fetchExistingWorkspaceStateForDiagnostics
// (SELECT url, kind, delivery_mode) — happens at L398, AFTER
// the ShouldBindJSON + delivery_mode/kind early checks.
// 2. C18 token gate (Phase 30.1)
// (SELECT COUNT(*) FROM workspace_auth_tokens) — happens at L390,
// AFTER the fetch. Fresh-register path, no live tokens.
// 3. resolveDeliveryMode (SELECT delivery_mode, runtime) — happens
// at L427, AFTER the C18 check.
// 4. defer boot_register_failed UPDATE — happens AFTER the 400
// is returned.
//
// fetchExistingWorkspaceStateForDiagnostics reads url, kind,
// delivery_mode from the same row.
mock.ExpectQuery(`SELECT url, kind, delivery_mode FROM workspaces WHERE id`).
WithArgs("ws-existing").
WillReturnRows(sqlmock.NewRows([]string{"url", "kind", "delivery_mode"}).
AddRow("https://ws-existing.example.com", "workspace", "push"))
// C18 token gate: fresh-register path, no live tokens.
mock.ExpectQuery("SELECT COUNT\\(\\*\\) FROM workspace_auth_tokens").
WithArgs("ws-existing").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// resolveDeliveryMode reads delivery_mode + runtime from the
// row.
mock.ExpectQuery(`SELECT delivery_mode, runtime FROM workspaces WHERE id`).
WithArgs("ws-existing").
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode", "runtime"}).
AddRow("push", "external"))
// Defer boot_register_failed path: UPDATE failure timestamp
// (authOK=true after the body parses).
mock.ExpectExec("UPDATE workspaces SET last_register_failure_at").
WithArgs("ws-existing").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
// Link-local URL — always blocked by validateAgentURL in any
// deploy mode (RC #11335's chosen test URL). This drives the
// url_validate_failed path so the fetch is exercised.
body := `{"id":"ws-existing","url":"http://169.254.169.254:8000","delivery_mode":"push","agent_card":{"name":"x"}}`
c.Request = httptest.NewRequest("POST", "/registry/register", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Register(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
logs := buf.String()
// The diagnostic log must include the existing row's kind
// (workspace) and delivery_mode (push) as the basis for the
// operator's drift analysis. existing_url is REDACTED (RC #11335)
// — it must be the literal "present", NOT the raw URL.
if !strings.Contains(logs, "registry_register_400") {
t.Errorf("expected registry_register_400 diagnostic, got: %s", logs)
}
if !strings.Contains(logs, "reason=url_validate_failed") {
t.Errorf("expected reason=url_validate_failed, got: %s", logs)
}
if !strings.Contains(logs, "workspace=ws-existing") {
t.Errorf("expected workspace=ws-existing, got: %s", logs)
}
// existing_kind and existing_delivery_mode use %q in the format
// string (quoted), so the value is wrapped in double-quotes in
// the log line. Asserting for the substring with quotes is the
// correct shape.
if !strings.Contains(logs, `existing_kind="workspace"`) {
t.Errorf("expected existing_kind=\"workspace\" (from the row), got: %s", logs)
}
if !strings.Contains(logs, `existing_delivery_mode="push"`) {
t.Errorf("expected existing_delivery_mode=\"push\" (from the row), got: %s", logs)
}
if !strings.Contains(logs, "existing_url=present") {
t.Errorf("expected existing_url=present (URL redacted per RC #11335), got: %s", logs)
}
// Critical: the raw URL MUST NOT appear in the log line
// (RC #11335). The agent's URL is also a private link-local
// (169.254.169.254) which is itself PII-adjacent.
if strings.Contains(logs, "ws-existing.example.com") {
t.Errorf("REGRESSION: raw existing URL leaked into diagnostic log line (RC #11335): %s", logs)
}
if strings.Contains(logs, "169.254.169.254") {
t.Errorf("REGRESSION: raw payload URL leaked into diagnostic log line (RC #11335): %s", logs)
}
// The payload URL is present (the agent sent http://169.254.169.254:8000);
// it's the row's URL that may-or-may-not be present (we set it
// present in the mock).
if !strings.Contains(logs, "payload_url=present") {
t.Errorf("expected payload_url=present (agent sent a URL), got: %s", logs)
}
// The detail field carries validateAgentURL's friendly CIDR label,
// NOT the raw URL. (RC #11335's allowed exception for the detail
// field.)
if !strings.Contains(logs, "blocked address") {
t.Errorf("expected friendly CIDR label in detail (validateAgentURL's message), got: %s", logs)
}
}