fix(a2a): default message.role in normalizeA2APayload single-choke (#2251) #2255

Merged
core-devops merged 2 commits from fix/a2a-2251-go-role-default into main 2026-06-05 04:51:11 +00:00
3 changed files with 200 additions and 1 deletions
+31 -1
View File
@@ -300,7 +300,14 @@ rows = json.load(sys.stdin)
def text_of(r):
body = r.get('request_body') or {}
parts = (body.get('params') or {}).get('message', {}).get('parts') or []
return ''.join(p.get('text','') for p in parts if p.get('type')=='text')
# A2A v0.3 keys the Part discriminator on 'kind'; legacy senders used
# 'type'. ProxyA2A.normalizeA2APayload (#2251) rewrites 'type' -> 'kind'
# on ingest, so the stored request_body carries 'kind' even when the
# caller posted 'type'. Accept EITHER so this parser asserts on the text
# payload, not on which discriminator field the server happened to store.
def is_text(p):
return p.get('kind') == 'text' or p.get('type') == 'text'
return ''.join(p.get('text', '') for p in parts if is_text(p))
if len(rows) < 2:
print('NEED2_GOT_'+str(len(rows)))
else:
@@ -309,6 +316,29 @@ else:
check_eq "since_id feed orders ASC (oldest-new first, newest-new last)" \
"hello-from-e2e-2|hello-from-e2e-3" "$ASC_FIRST"
# Wire-contract gate (#2251): the caller posted parts with the LEGACY "type"
# discriminator, but ProxyA2A.normalizeA2APayload rewrites "type" -> "kind"
# (A2A v0.3) BEFORE the row is durably logged. Assert the stored request_body
# carries "kind" and no longer carries "type", so a regression that drops the
# rename — or a feed that stops storing the normalized body — fails loudly here
# instead of silently feeding the polling agent an untagged Part. This is the
# end-to-end half of the Go unit tests in a2a_proxy_test.go (which assert the
# rename in isolation); this proves it survives the durable activity_logs path.
DISC=$(echo "$ASC_RESP" | python3 -c "
import json, sys
rows = json.load(sys.stdin)
kinds, types = [], []
for r in rows:
body = r.get('request_body') or {}
parts = (body.get('params') or {}).get('message', {}).get('parts') or []
for p in parts:
if 'kind' in p: kinds.append(p['kind'])
if 'type' in p: types.append(p['type'])
print(('kind' if kinds and not types else 'BAD') + ':' + ','.join(kinds) + '/' + ','.join(types))
")
check_eq "stored Part uses v0.3 'kind' discriminator, never legacy 'type' (#2251)" \
"kind:text,text/" "$DISC"
# ---------- Phase 6: stale cursor returns 410 ----------
echo ""
echo "--- Phase 6: Stale / unknown cursor returns 410 ---"
@@ -801,6 +801,18 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) {
if _, hasID := msg["messageId"]; !hasID {
msg["messageId"] = uuid.New().String()
}
// #2251: default params.message.role to "user" when absent.
// The downstream a2a-sdk v0.3 Pydantic validator marks role a
// REQUIRED field; a role-less envelope fails parse with
// "params.message.role Field required". The Go builders
// (mcp_tools/delegation/scheduler/channels) already set it, but
// raw external/canvas POSTs to ProxyA2A may omit it — making this
// the single canonical choke that guarantees a schema-valid role.
// Mirror the messageId default exactly: inject only when missing,
// never overwrite a caller-supplied role (e.g. "agent").
if _, hasRole := msg["role"]; !hasRole {
msg["role"] = "user"
}
_, hasParts := msg["parts"]
rawContent, hasContent := msg["content"]
if !hasParts {
@@ -832,6 +844,27 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) {
}
}
}
// #2251: wire hygiene — the A2A v0.3 Part discriminator is
// "kind", but some builders/clients emit the legacy "type" key
// (e.g. delegation.go). The v0.3 Pydantic validator keys on
// "kind"; a stray "type" leaves the Part untagged. Rename
// "type" → "kind" on every Part that lacks an explicit "kind"
// so the discriminator is always present on the wire.
if parts, ok := msg["parts"].([]interface{}); ok {
for _, p := range parts {
part, ok := p.(map[string]interface{})
if !ok {
continue
}
if _, hasKind := part["kind"]; hasKind {
continue
}
if t, hasType := part["type"]; hasType {
part["kind"] = t
delete(part, "type")
}
}
}
}
}
@@ -1514,6 +1514,142 @@ func TestNormalizeA2APayload_NoMessageNoCheck(t *testing.T) {
}
}
// --- #2251: role default + part-kind hygiene contract tests ---
//
// These assert normalizeA2APayload is the single canonical Go choke that
// guarantees a schema-valid outbound message/send envelope: it injects a
// default params.message.role="user" when the sender omitted role (the bug
// that made delegate_task fail the peer's a2a Pydantic validator with
// "params.message.role Field required" while reply_to_workspace worked), and
// it renames the legacy Part discriminator "type"→"kind" for wire hygiene.
// normMsg is a small helper that runs normalizeA2APayload and returns the
// resolved params.message map, failing the test on any normalization error.
func normMsg(t *testing.T, raw string) map[string]interface{} {
t.Helper()
out, _, perr := normalizeA2APayload([]byte(raw))
if perr != nil {
t.Fatalf("normalizeA2APayload returned error: %+v", perr)
}
var parsed map[string]interface{}
if err := json.Unmarshal(out, &parsed); err != nil {
t.Fatalf("output not valid JSON: %v", err)
}
params, ok := parsed["params"].(map[string]interface{})
if !ok {
t.Fatalf("output missing params object: %s", string(out))
}
msg, ok := params["message"].(map[string]interface{})
if !ok {
t.Fatalf("output missing params.message object: %s", string(out))
}
return msg
}
func TestNormalizeA2APayload_DefaultsRoleWhenMissing(t *testing.T) {
cases := []struct {
name string
raw string
}{
{
name: "v0.3 parts, no role",
raw: `{"method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`,
},
{
name: "v0.2 string content, no role",
raw: `{"method":"message/send","params":{"message":{"content":"hi"}}}`,
},
{
name: "legacy type part, no role",
raw: `{"method":"message/send","params":{"message":{"parts":[{"type":"text","text":"hi"}]}}}`,
},
{
name: "already wrapped jsonrpc, no role",
raw: `{"jsonrpc":"2.0","id":"x","method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
msg := normMsg(t, tc.raw)
if msg["role"] != "user" {
t.Errorf("expected role defaulted to \"user\", got %v", msg["role"])
}
// Parts must remain valid (non-empty) after normalization.
parts, ok := msg["parts"].([]interface{})
if !ok || len(parts) == 0 {
t.Fatalf("expected non-empty parts after normalization, got %v", msg["parts"])
}
// Every part must carry the v0.3 "kind" discriminator.
for i, p := range parts {
part, ok := p.(map[string]interface{})
if !ok {
t.Fatalf("part %d is not an object: %v", i, p)
}
if _, hasKind := part["kind"]; !hasKind {
t.Errorf("part %d missing \"kind\" discriminator: %v", i, part)
}
if _, hasType := part["type"]; hasType {
t.Errorf("part %d still has legacy \"type\" key: %v", i, part)
}
}
})
}
}
func TestNormalizeA2APayload_PreservesExplicitRole(t *testing.T) {
// A caller-supplied role (e.g. "agent") must NOT be overwritten with "user".
msg := normMsg(t, `{"method":"message/send","params":{"message":{"role":"agent","parts":[{"kind":"text","text":"hi"}]}}}`)
if msg["role"] != "agent" {
t.Errorf("explicit role overwritten: expected \"agent\", got %v", msg["role"])
}
}
func TestNormalizeA2APayload_RenamesPartTypeToKind(t *testing.T) {
// Mirrors delegation.go's builder which emits {"type":"text",...}. After
// normalization the wire Part must be discriminated by "kind".
msg := normMsg(t, `{"method":"message/send","params":{"message":{"role":"user","parts":[{"type":"text","text":"a"},{"type":"file","uri":"workspace:/x"}]}}}`)
parts := msg["parts"].([]interface{})
if len(parts) != 2 {
t.Fatalf("expected 2 parts, got %d", len(parts))
}
wantKind := []string{"text", "file"}
for i, p := range parts {
part := p.(map[string]interface{})
if part["kind"] != wantKind[i] {
t.Errorf("part %d: expected kind=%q, got %v", i, wantKind[i], part["kind"])
}
if _, hasType := part["type"]; hasType {
t.Errorf("part %d still carries legacy \"type\": %v", i, part)
}
}
}
func TestNormalizeA2APayload_DoesNotClobberKindWithType(t *testing.T) {
// If a part has BOTH kind and type, kind wins and is left untouched.
msg := normMsg(t, `{"method":"message/send","params":{"message":{"role":"user","parts":[{"kind":"text","type":"ignored","text":"a"}]}}}`)
part := msg["parts"].([]interface{})[0].(map[string]interface{})
if part["kind"] != "text" {
t.Errorf("expected kind preserved as \"text\", got %v", part["kind"])
}
}
// TestNormalizeA2APayload_RoleDefault_ContractRegression documents the
// pre-fix failure: without the role default, a role-less message/send body
// emerged from normalization still missing params.message.role, which the
// peer's a2a Pydantic validator rejects. This asserts the POST-fix invariant
// (role present) directly; before the a2a_proxy.go change this assertion
// fails (role is absent → msg["role"] == nil).
func TestNormalizeA2APayload_RoleDefault_ContractRegression(t *testing.T) {
msg := normMsg(t, `{"method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"delegate this"}]}}}`)
role, hasRole := msg["role"]
if !hasRole {
t.Fatal("REGRESSION (#2251): params.message.role absent after normalization — peer a2a validator will reject with 'role Field required'")
}
if role != "user" {
t.Errorf("expected default role \"user\", got %v", role)
}
}
// --- resolveAgentURL direct unit tests ---
func TestResolveAgentURL_CacheHit(t *testing.T) {