fix(a2a): default message.role in normalizeA2APayload single-choke (#2251) #2255
@@ -300,7 +300,14 @@ rows = json.load(sys.stdin)
|
||||
def text_of(r):
|
||||
body = r.get('request_body') or {}
|
||||
parts = (body.get('params') or {}).get('message', {}).get('parts') or []
|
||||
return ''.join(p.get('text','') for p in parts if p.get('type')=='text')
|
||||
# A2A v0.3 keys the Part discriminator on 'kind'; legacy senders used
|
||||
# 'type'. ProxyA2A.normalizeA2APayload (#2251) rewrites 'type' -> 'kind'
|
||||
# on ingest, so the stored request_body carries 'kind' even when the
|
||||
# caller posted 'type'. Accept EITHER so this parser asserts on the text
|
||||
# payload, not on which discriminator field the server happened to store.
|
||||
def is_text(p):
|
||||
return p.get('kind') == 'text' or p.get('type') == 'text'
|
||||
return ''.join(p.get('text', '') for p in parts if is_text(p))
|
||||
if len(rows) < 2:
|
||||
print('NEED2_GOT_'+str(len(rows)))
|
||||
else:
|
||||
@@ -309,6 +316,29 @@ else:
|
||||
check_eq "since_id feed orders ASC (oldest-new first, newest-new last)" \
|
||||
"hello-from-e2e-2|hello-from-e2e-3" "$ASC_FIRST"
|
||||
|
||||
# Wire-contract gate (#2251): the caller posted parts with the LEGACY "type"
|
||||
# discriminator, but ProxyA2A.normalizeA2APayload rewrites "type" -> "kind"
|
||||
# (A2A v0.3) BEFORE the row is durably logged. Assert the stored request_body
|
||||
# carries "kind" and no longer carries "type", so a regression that drops the
|
||||
# rename — or a feed that stops storing the normalized body — fails loudly here
|
||||
# instead of silently feeding the polling agent an untagged Part. This is the
|
||||
# end-to-end half of the Go unit tests in a2a_proxy_test.go (which assert the
|
||||
# rename in isolation); this proves it survives the durable activity_logs path.
|
||||
DISC=$(echo "$ASC_RESP" | python3 -c "
|
||||
import json, sys
|
||||
rows = json.load(sys.stdin)
|
||||
kinds, types = [], []
|
||||
for r in rows:
|
||||
body = r.get('request_body') or {}
|
||||
parts = (body.get('params') or {}).get('message', {}).get('parts') or []
|
||||
for p in parts:
|
||||
if 'kind' in p: kinds.append(p['kind'])
|
||||
if 'type' in p: types.append(p['type'])
|
||||
print(('kind' if kinds and not types else 'BAD') + ':' + ','.join(kinds) + '/' + ','.join(types))
|
||||
")
|
||||
check_eq "stored Part uses v0.3 'kind' discriminator, never legacy 'type' (#2251)" \
|
||||
"kind:text,text/" "$DISC"
|
||||
|
||||
# ---------- Phase 6: stale cursor returns 410 ----------
|
||||
echo ""
|
||||
echo "--- Phase 6: Stale / unknown cursor returns 410 ---"
|
||||
|
||||
@@ -801,6 +801,18 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) {
|
||||
if _, hasID := msg["messageId"]; !hasID {
|
||||
msg["messageId"] = uuid.New().String()
|
||||
}
|
||||
// #2251: default params.message.role to "user" when absent.
|
||||
// The downstream a2a-sdk v0.3 Pydantic validator marks role a
|
||||
// REQUIRED field; a role-less envelope fails parse with
|
||||
// "params.message.role Field required". The Go builders
|
||||
// (mcp_tools/delegation/scheduler/channels) already set it, but
|
||||
// raw external/canvas POSTs to ProxyA2A may omit it — making this
|
||||
// the single canonical choke that guarantees a schema-valid role.
|
||||
// Mirror the messageId default exactly: inject only when missing,
|
||||
// never overwrite a caller-supplied role (e.g. "agent").
|
||||
if _, hasRole := msg["role"]; !hasRole {
|
||||
msg["role"] = "user"
|
||||
}
|
||||
_, hasParts := msg["parts"]
|
||||
rawContent, hasContent := msg["content"]
|
||||
if !hasParts {
|
||||
@@ -832,6 +844,27 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) {
|
||||
}
|
||||
}
|
||||
}
|
||||
// #2251: wire hygiene — the A2A v0.3 Part discriminator is
|
||||
// "kind", but some builders/clients emit the legacy "type" key
|
||||
// (e.g. delegation.go). The v0.3 Pydantic validator keys on
|
||||
// "kind"; a stray "type" leaves the Part untagged. Rename
|
||||
// "type" → "kind" on every Part that lacks an explicit "kind"
|
||||
// so the discriminator is always present on the wire.
|
||||
if parts, ok := msg["parts"].([]interface{}); ok {
|
||||
for _, p := range parts {
|
||||
part, ok := p.(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if _, hasKind := part["kind"]; hasKind {
|
||||
continue
|
||||
}
|
||||
if t, hasType := part["type"]; hasType {
|
||||
part["kind"] = t
|
||||
delete(part, "type")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1514,6 +1514,142 @@ func TestNormalizeA2APayload_NoMessageNoCheck(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// --- #2251: role default + part-kind hygiene contract tests ---
|
||||
//
|
||||
// These assert normalizeA2APayload is the single canonical Go choke that
|
||||
// guarantees a schema-valid outbound message/send envelope: it injects a
|
||||
// default params.message.role="user" when the sender omitted role (the bug
|
||||
// that made delegate_task fail the peer's a2a Pydantic validator with
|
||||
// "params.message.role Field required" while reply_to_workspace worked), and
|
||||
// it renames the legacy Part discriminator "type"→"kind" for wire hygiene.
|
||||
|
||||
// normMsg is a small helper that runs normalizeA2APayload and returns the
|
||||
// resolved params.message map, failing the test on any normalization error.
|
||||
func normMsg(t *testing.T, raw string) map[string]interface{} {
|
||||
t.Helper()
|
||||
out, _, perr := normalizeA2APayload([]byte(raw))
|
||||
if perr != nil {
|
||||
t.Fatalf("normalizeA2APayload returned error: %+v", perr)
|
||||
}
|
||||
var parsed map[string]interface{}
|
||||
if err := json.Unmarshal(out, &parsed); err != nil {
|
||||
t.Fatalf("output not valid JSON: %v", err)
|
||||
}
|
||||
params, ok := parsed["params"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("output missing params object: %s", string(out))
|
||||
}
|
||||
msg, ok := params["message"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("output missing params.message object: %s", string(out))
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_DefaultsRoleWhenMissing(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
raw string
|
||||
}{
|
||||
{
|
||||
name: "v0.3 parts, no role",
|
||||
raw: `{"method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`,
|
||||
},
|
||||
{
|
||||
name: "v0.2 string content, no role",
|
||||
raw: `{"method":"message/send","params":{"message":{"content":"hi"}}}`,
|
||||
},
|
||||
{
|
||||
name: "legacy type part, no role",
|
||||
raw: `{"method":"message/send","params":{"message":{"parts":[{"type":"text","text":"hi"}]}}}`,
|
||||
},
|
||||
{
|
||||
name: "already wrapped jsonrpc, no role",
|
||||
raw: `{"jsonrpc":"2.0","id":"x","method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
msg := normMsg(t, tc.raw)
|
||||
if msg["role"] != "user" {
|
||||
t.Errorf("expected role defaulted to \"user\", got %v", msg["role"])
|
||||
}
|
||||
// Parts must remain valid (non-empty) after normalization.
|
||||
parts, ok := msg["parts"].([]interface{})
|
||||
if !ok || len(parts) == 0 {
|
||||
t.Fatalf("expected non-empty parts after normalization, got %v", msg["parts"])
|
||||
}
|
||||
// Every part must carry the v0.3 "kind" discriminator.
|
||||
for i, p := range parts {
|
||||
part, ok := p.(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("part %d is not an object: %v", i, p)
|
||||
}
|
||||
if _, hasKind := part["kind"]; !hasKind {
|
||||
t.Errorf("part %d missing \"kind\" discriminator: %v", i, part)
|
||||
}
|
||||
if _, hasType := part["type"]; hasType {
|
||||
t.Errorf("part %d still has legacy \"type\" key: %v", i, part)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_PreservesExplicitRole(t *testing.T) {
|
||||
// A caller-supplied role (e.g. "agent") must NOT be overwritten with "user".
|
||||
msg := normMsg(t, `{"method":"message/send","params":{"message":{"role":"agent","parts":[{"kind":"text","text":"hi"}]}}}`)
|
||||
if msg["role"] != "agent" {
|
||||
t.Errorf("explicit role overwritten: expected \"agent\", got %v", msg["role"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_RenamesPartTypeToKind(t *testing.T) {
|
||||
// Mirrors delegation.go's builder which emits {"type":"text",...}. After
|
||||
// normalization the wire Part must be discriminated by "kind".
|
||||
msg := normMsg(t, `{"method":"message/send","params":{"message":{"role":"user","parts":[{"type":"text","text":"a"},{"type":"file","uri":"workspace:/x"}]}}}`)
|
||||
parts := msg["parts"].([]interface{})
|
||||
if len(parts) != 2 {
|
||||
t.Fatalf("expected 2 parts, got %d", len(parts))
|
||||
}
|
||||
wantKind := []string{"text", "file"}
|
||||
for i, p := range parts {
|
||||
part := p.(map[string]interface{})
|
||||
if part["kind"] != wantKind[i] {
|
||||
t.Errorf("part %d: expected kind=%q, got %v", i, wantKind[i], part["kind"])
|
||||
}
|
||||
if _, hasType := part["type"]; hasType {
|
||||
t.Errorf("part %d still carries legacy \"type\": %v", i, part)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeA2APayload_DoesNotClobberKindWithType(t *testing.T) {
|
||||
// If a part has BOTH kind and type, kind wins and is left untouched.
|
||||
msg := normMsg(t, `{"method":"message/send","params":{"message":{"role":"user","parts":[{"kind":"text","type":"ignored","text":"a"}]}}}`)
|
||||
part := msg["parts"].([]interface{})[0].(map[string]interface{})
|
||||
if part["kind"] != "text" {
|
||||
t.Errorf("expected kind preserved as \"text\", got %v", part["kind"])
|
||||
}
|
||||
}
|
||||
|
||||
// TestNormalizeA2APayload_RoleDefault_ContractRegression documents the
|
||||
// pre-fix failure: without the role default, a role-less message/send body
|
||||
// emerged from normalization still missing params.message.role, which the
|
||||
// peer's a2a Pydantic validator rejects. This asserts the POST-fix invariant
|
||||
// (role present) directly; before the a2a_proxy.go change this assertion
|
||||
// fails (role is absent → msg["role"] == nil).
|
||||
func TestNormalizeA2APayload_RoleDefault_ContractRegression(t *testing.T) {
|
||||
msg := normMsg(t, `{"method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"delegate this"}]}}}`)
|
||||
role, hasRole := msg["role"]
|
||||
if !hasRole {
|
||||
t.Fatal("REGRESSION (#2251): params.message.role absent after normalization — peer a2a validator will reject with 'role Field required'")
|
||||
}
|
||||
if role != "user" {
|
||||
t.Errorf("expected default role \"user\", got %v", role)
|
||||
}
|
||||
}
|
||||
|
||||
// --- resolveAgentURL direct unit tests ---
|
||||
|
||||
func TestResolveAgentURL_CacheHit(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user