fix(a2a-proxy): normalize system-caller source_id to NULL (core#2680 wedge recovery) #2701
@@ -316,7 +316,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
SourceID: callerIDToSourceID(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
@@ -344,7 +344,7 @@ func (h *WorkspaceHandler) logA2ABusyQueued(ctx context.Context, workspaceID, ca
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
SourceID: callerIDToSourceID(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
@@ -417,7 +417,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
|
||||
LogActivity(logCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
SourceID: callerIDToSourceID(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
@@ -438,6 +438,27 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
|
||||
}
|
||||
}
|
||||
|
||||
// nilIfEmpty returns nil for an empty string. The narrowest possible
|
||||
// contract: any other input is returned as a pointer to itself.
|
||||
//
|
||||
// nilIfEmpty is shared across many call sites — SourceID, TargetID,
|
||||
// Method, Summary, ErrorDetail, MessageId, workspace_dir — and
|
||||
// NOT ALL of them are caller identifiers. A system-caller
|
||||
// normalization (the kind callerIDToSourceID does below) would be
|
||||
// WRONG applied to a free-form string field like Method
|
||||
// (the value "system:foo" is a perfectly legitimate method name
|
||||
// that should NOT be silently nulled). System-caller handling is
|
||||
// therefore a SEPARATE helper, scoped to the only field that
|
||||
// actually needs it: the UUID-typed activity_logs.SourceID when
|
||||
// sourced from a callerID.
|
||||
//
|
||||
// Origin: prior #2701 attempt at this fix had the system-caller
|
||||
// check inline in nilIfEmpty; Researcher's RC #11295 caught the
|
||||
// too-generic contract (nilIfEmpty is also used on
|
||||
// method/summary/error-detail/message-id/workspace-dir, none of
|
||||
// which should be subject to system-caller normalization). The
|
||||
// scoped helper callerIDToSourceID is the corrected shape: same
|
||||
// intent, narrower surface, zero collateral on the other 6 callers.
|
||||
func nilIfEmpty(s string) *string {
|
||||
if s == "" {
|
||||
return nil
|
||||
@@ -445,6 +466,44 @@ func nilIfEmpty(s string) *string {
|
||||
return &s
|
||||
}
|
||||
|
||||
// callerIDToSourceID normalizes a callerID to *string for use as
|
||||
// activity_logs.SourceID. The column is UUID-typed; system-caller
|
||||
// strings like "system:restart-context" would poison the column
|
||||
// with a non-UUID value, break downstream joins (e.g. the canvas
|
||||
// /activity?source=canvas filter, which keys on source_id IS NULL),
|
||||
// and (most importantly for #2680) break the queue-fallback path
|
||||
// that lets a workspace recover from the post-restart wedge —
|
||||
// the recovery SELECT on activity_logs.message_id returns the
|
||||
// durable row, but every consumer of the row would crash on the
|
||||
// non-UUID source_id.
|
||||
//
|
||||
// Returns nil for:
|
||||
// - empty callerID (no caller — agent initiated, user attribute
|
||||
// is null)
|
||||
// - system-caller prefix (matches isSystemCaller in
|
||||
// a2a_proxy.go:85; preserves the "system caller" semantic via
|
||||
// source_id IS NULL, the same way the canvas /activity
|
||||
// filter already keys on)
|
||||
//
|
||||
// Returns &callerID for any real workspace UUID or op-style id
|
||||
// (preserves the row attribution so a per-workspace activity
|
||||
// filter still works).
|
||||
//
|
||||
// Idempotent + side-effect-free. Called from the 5 LogActivity
|
||||
// sites that previously used `SourceID: nilIfEmpty(callerID)`:
|
||||
// a2a_proxy_helpers.go:319, 347, 420, 863, 965.
|
||||
//
|
||||
// Mirrors the queue-side fix in #2696 (a2a_queue.caller_id) so
|
||||
// BOTH persisted-caller columns follow the same
|
||||
// isSystemCaller() → NULL normalization. Single source of truth
|
||||
// in a2a_proxy.go:84-91.
|
||||
func callerIDToSourceID(callerID string) *string {
|
||||
if callerID == "" || isSystemCaller(callerID) {
|
||||
return nil
|
||||
}
|
||||
return &callerID
|
||||
}
|
||||
|
||||
// validateCallerToken enforces the Phase 30.5 auth-token contract on the
|
||||
// caller of an A2A proxy request. Same lazy-bootstrap shape as
|
||||
// registry.requireWorkspaceToken: if the caller workspace has any live
|
||||
@@ -832,7 +891,7 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID,
|
||||
LogActivity(insCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
SourceID: callerIDToSourceID(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
@@ -934,7 +993,7 @@ func (h *WorkspaceHandler) persistUserMessageAtIngest(
|
||||
LogActivity(insCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
SourceID: callerIDToSourceID(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
|
||||
@@ -27,6 +27,100 @@ func TestNilIfEmpty_NonEmptyString(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// System-caller normalization (core#2680, fix/restart-context-callerid-normalize).
|
||||
// A synthetic caller like "system:restart-context" must not be
|
||||
// persisted into the UUID-typed activity_logs.source_id column;
|
||||
// that path is the only one that lets a workspace recover from the
|
||||
// post-restart wedge. Normalizing to NULL preserves the
|
||||
// "system caller" semantic via source_id IS NULL (the existing
|
||||
// canvas /activity?source=canvas filter) and lets the queue-fallback
|
||||
// path find the row by the durable message_id.
|
||||
//
|
||||
// Scoped helper: callerIDToSourceID. Per the Researcher's RC
|
||||
// #11295 on the prior #2701 attempt, system-caller normalization
|
||||
// must NOT be in nilIfEmpty itself — nilIfEmpty is also used on
|
||||
// non-ID fields (Method, Summary, ErrorDetail, MessageId,
|
||||
// workspace_dir), and a method name like "system:foo" is a
|
||||
// legitimate value that should NOT be silently nulled. The
|
||||
// normalization is therefore scoped to the ONLY field that
|
||||
// actually needs it: a callerID being persisted as
|
||||
// activity_logs.SourceID.
|
||||
|
||||
func TestCallerIDToSourceID_SystemCallerPrefixes(t *testing.T) {
|
||||
cases := []string{
|
||||
"system:restart-context",
|
||||
"webhook:github",
|
||||
"test:lifecycle-1",
|
||||
"channel:slack:C0123",
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c, func(t *testing.T) {
|
||||
got := callerIDToSourceID(c)
|
||||
if got != nil {
|
||||
t.Errorf("system caller %q: got %p (%q), want nil", c, got, *got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCallerIDToSourceID_RealWorkspaceUUIDStillPreserved(t *testing.T) {
|
||||
// Regression guard: a real workspace UUID must pass through
|
||||
// unchanged. The original #2694 RC closed because the fix
|
||||
// accidentally collapsed real UUIDs to NULL; this case is the
|
||||
// one that would have caught that.
|
||||
cases := []string{
|
||||
"ws-1", // op-style id
|
||||
"01234567-89ab-cdef-0123-456789abcdef", // uuid
|
||||
"agent-dev-b", // agent id (not a system prefix)
|
||||
"canvas_user", // canvas user placeholder
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c, func(t *testing.T) {
|
||||
got := callerIDToSourceID(c)
|
||||
if got == nil {
|
||||
t.Errorf("real caller %q: got nil, want preserved pointer", c)
|
||||
return
|
||||
}
|
||||
if *got != c {
|
||||
t.Errorf("real caller %q: got %q, want preserved", c, *got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCallerIDToSourceID_EmptyString(t *testing.T) {
|
||||
got := callerIDToSourceID("")
|
||||
if got != nil {
|
||||
t.Errorf("empty callerID: got %p, want nil", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestNilIfEmpty_NoSystemCallerNormalization guards the narrow
|
||||
// contract that prompted the RC #11295 fix. nilIfEmpty is used on
|
||||
// many non-ID fields (Method, Summary, ErrorDetail, MessageId,
|
||||
// workspace_dir); the system-caller normalization must NOT leak
|
||||
// into those callers. A method name like "system:foo" must pass
|
||||
// through unchanged.
|
||||
func TestNilIfEmpty_NoSystemCallerNormalization(t *testing.T) {
|
||||
cases := []string{
|
||||
"system:foo", // would-be method name
|
||||
"webhook:github", // would-be method name
|
||||
"channel:slack:C0123", // would-be channel id
|
||||
}
|
||||
for _, c := range cases {
|
||||
t.Run(c, func(t *testing.T) {
|
||||
got := nilIfEmpty(c)
|
||||
if got == nil {
|
||||
t.Errorf("nilIfEmpty on %q: got nil, want preserved pointer (the system-caller normalization must be scoped to callerIDToSourceID only)", c)
|
||||
return
|
||||
}
|
||||
if *got != c {
|
||||
t.Errorf("nilIfEmpty on %q: got %q, want preserved", c, *got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// extractToolTrace tests
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
Reference in New Issue
Block a user