diff --git a/tests/e2e/test_poll_mode_e2e.sh b/tests/e2e/test_poll_mode_e2e.sh index 46daa8584..f73a721ee 100755 --- a/tests/e2e/test_poll_mode_e2e.sh +++ b/tests/e2e/test_poll_mode_e2e.sh @@ -300,7 +300,14 @@ rows = json.load(sys.stdin) def text_of(r): body = r.get('request_body') or {} parts = (body.get('params') or {}).get('message', {}).get('parts') or [] - return ''.join(p.get('text','') for p in parts if p.get('type')=='text') + # A2A v0.3 keys the Part discriminator on 'kind'; legacy senders used + # 'type'. ProxyA2A.normalizeA2APayload (#2251) rewrites 'type' -> 'kind' + # on ingest, so the stored request_body carries 'kind' even when the + # caller posted 'type'. Accept EITHER so this parser asserts on the text + # payload, not on which discriminator field the server happened to store. + def is_text(p): + return p.get('kind') == 'text' or p.get('type') == 'text' + return ''.join(p.get('text', '') for p in parts if is_text(p)) if len(rows) < 2: print('NEED2_GOT_'+str(len(rows))) else: @@ -309,6 +316,29 @@ else: check_eq "since_id feed orders ASC (oldest-new first, newest-new last)" \ "hello-from-e2e-2|hello-from-e2e-3" "$ASC_FIRST" +# Wire-contract gate (#2251): the caller posted parts with the LEGACY "type" +# discriminator, but ProxyA2A.normalizeA2APayload rewrites "type" -> "kind" +# (A2A v0.3) BEFORE the row is durably logged. Assert the stored request_body +# carries "kind" and no longer carries "type", so a regression that drops the +# rename — or a feed that stops storing the normalized body — fails loudly here +# instead of silently feeding the polling agent an untagged Part. This is the +# end-to-end half of the Go unit tests in a2a_proxy_test.go (which assert the +# rename in isolation); this proves it survives the durable activity_logs path. +DISC=$(echo "$ASC_RESP" | python3 -c " +import json, sys +rows = json.load(sys.stdin) +kinds, types = [], [] +for r in rows: + body = r.get('request_body') or {} + parts = (body.get('params') or {}).get('message', {}).get('parts') or [] + for p in parts: + if 'kind' in p: kinds.append(p['kind']) + if 'type' in p: types.append(p['type']) +print(('kind' if kinds and not types else 'BAD') + ':' + ','.join(kinds) + '/' + ','.join(types)) +") +check_eq "stored Part uses v0.3 'kind' discriminator, never legacy 'type' (#2251)" \ + "kind:text,text/" "$DISC" + # ---------- Phase 6: stale cursor returns 410 ---------- echo "" echo "--- Phase 6: Stale / unknown cursor returns 410 ---" diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 17321a352..b1fc19ae9 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -801,6 +801,18 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) { if _, hasID := msg["messageId"]; !hasID { msg["messageId"] = uuid.New().String() } + // #2251: default params.message.role to "user" when absent. + // The downstream a2a-sdk v0.3 Pydantic validator marks role a + // REQUIRED field; a role-less envelope fails parse with + // "params.message.role Field required". The Go builders + // (mcp_tools/delegation/scheduler/channels) already set it, but + // raw external/canvas POSTs to ProxyA2A may omit it — making this + // the single canonical choke that guarantees a schema-valid role. + // Mirror the messageId default exactly: inject only when missing, + // never overwrite a caller-supplied role (e.g. "agent"). + if _, hasRole := msg["role"]; !hasRole { + msg["role"] = "user" + } _, hasParts := msg["parts"] rawContent, hasContent := msg["content"] if !hasParts { @@ -832,6 +844,27 @@ func normalizeA2APayload(body []byte) ([]byte, string, *proxyA2AError) { } } } + // #2251: wire hygiene — the A2A v0.3 Part discriminator is + // "kind", but some builders/clients emit the legacy "type" key + // (e.g. delegation.go). The v0.3 Pydantic validator keys on + // "kind"; a stray "type" leaves the Part untagged. Rename + // "type" → "kind" on every Part that lacks an explicit "kind" + // so the discriminator is always present on the wire. + if parts, ok := msg["parts"].([]interface{}); ok { + for _, p := range parts { + part, ok := p.(map[string]interface{}) + if !ok { + continue + } + if _, hasKind := part["kind"]; hasKind { + continue + } + if t, hasType := part["type"]; hasType { + part["kind"] = t + delete(part, "type") + } + } + } } } diff --git a/workspace-server/internal/handlers/a2a_proxy_test.go b/workspace-server/internal/handlers/a2a_proxy_test.go index 10b21cbda..d0c5e7d81 100644 --- a/workspace-server/internal/handlers/a2a_proxy_test.go +++ b/workspace-server/internal/handlers/a2a_proxy_test.go @@ -1514,6 +1514,142 @@ func TestNormalizeA2APayload_NoMessageNoCheck(t *testing.T) { } } +// --- #2251: role default + part-kind hygiene contract tests --- +// +// These assert normalizeA2APayload is the single canonical Go choke that +// guarantees a schema-valid outbound message/send envelope: it injects a +// default params.message.role="user" when the sender omitted role (the bug +// that made delegate_task fail the peer's a2a Pydantic validator with +// "params.message.role Field required" while reply_to_workspace worked), and +// it renames the legacy Part discriminator "type"→"kind" for wire hygiene. + +// normMsg is a small helper that runs normalizeA2APayload and returns the +// resolved params.message map, failing the test on any normalization error. +func normMsg(t *testing.T, raw string) map[string]interface{} { + t.Helper() + out, _, perr := normalizeA2APayload([]byte(raw)) + if perr != nil { + t.Fatalf("normalizeA2APayload returned error: %+v", perr) + } + var parsed map[string]interface{} + if err := json.Unmarshal(out, &parsed); err != nil { + t.Fatalf("output not valid JSON: %v", err) + } + params, ok := parsed["params"].(map[string]interface{}) + if !ok { + t.Fatalf("output missing params object: %s", string(out)) + } + msg, ok := params["message"].(map[string]interface{}) + if !ok { + t.Fatalf("output missing params.message object: %s", string(out)) + } + return msg +} + +func TestNormalizeA2APayload_DefaultsRoleWhenMissing(t *testing.T) { + cases := []struct { + name string + raw string + }{ + { + name: "v0.3 parts, no role", + raw: `{"method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`, + }, + { + name: "v0.2 string content, no role", + raw: `{"method":"message/send","params":{"message":{"content":"hi"}}}`, + }, + { + name: "legacy type part, no role", + raw: `{"method":"message/send","params":{"message":{"parts":[{"type":"text","text":"hi"}]}}}`, + }, + { + name: "already wrapped jsonrpc, no role", + raw: `{"jsonrpc":"2.0","id":"x","method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + msg := normMsg(t, tc.raw) + if msg["role"] != "user" { + t.Errorf("expected role defaulted to \"user\", got %v", msg["role"]) + } + // Parts must remain valid (non-empty) after normalization. + parts, ok := msg["parts"].([]interface{}) + if !ok || len(parts) == 0 { + t.Fatalf("expected non-empty parts after normalization, got %v", msg["parts"]) + } + // Every part must carry the v0.3 "kind" discriminator. + for i, p := range parts { + part, ok := p.(map[string]interface{}) + if !ok { + t.Fatalf("part %d is not an object: %v", i, p) + } + if _, hasKind := part["kind"]; !hasKind { + t.Errorf("part %d missing \"kind\" discriminator: %v", i, part) + } + if _, hasType := part["type"]; hasType { + t.Errorf("part %d still has legacy \"type\" key: %v", i, part) + } + } + }) + } +} + +func TestNormalizeA2APayload_PreservesExplicitRole(t *testing.T) { + // A caller-supplied role (e.g. "agent") must NOT be overwritten with "user". + msg := normMsg(t, `{"method":"message/send","params":{"message":{"role":"agent","parts":[{"kind":"text","text":"hi"}]}}}`) + if msg["role"] != "agent" { + t.Errorf("explicit role overwritten: expected \"agent\", got %v", msg["role"]) + } +} + +func TestNormalizeA2APayload_RenamesPartTypeToKind(t *testing.T) { + // Mirrors delegation.go's builder which emits {"type":"text",...}. After + // normalization the wire Part must be discriminated by "kind". + msg := normMsg(t, `{"method":"message/send","params":{"message":{"role":"user","parts":[{"type":"text","text":"a"},{"type":"file","uri":"workspace:/x"}]}}}`) + parts := msg["parts"].([]interface{}) + if len(parts) != 2 { + t.Fatalf("expected 2 parts, got %d", len(parts)) + } + wantKind := []string{"text", "file"} + for i, p := range parts { + part := p.(map[string]interface{}) + if part["kind"] != wantKind[i] { + t.Errorf("part %d: expected kind=%q, got %v", i, wantKind[i], part["kind"]) + } + if _, hasType := part["type"]; hasType { + t.Errorf("part %d still carries legacy \"type\": %v", i, part) + } + } +} + +func TestNormalizeA2APayload_DoesNotClobberKindWithType(t *testing.T) { + // If a part has BOTH kind and type, kind wins and is left untouched. + msg := normMsg(t, `{"method":"message/send","params":{"message":{"role":"user","parts":[{"kind":"text","type":"ignored","text":"a"}]}}}`) + part := msg["parts"].([]interface{})[0].(map[string]interface{}) + if part["kind"] != "text" { + t.Errorf("expected kind preserved as \"text\", got %v", part["kind"]) + } +} + +// TestNormalizeA2APayload_RoleDefault_ContractRegression documents the +// pre-fix failure: without the role default, a role-less message/send body +// emerged from normalization still missing params.message.role, which the +// peer's a2a Pydantic validator rejects. This asserts the POST-fix invariant +// (role present) directly; before the a2a_proxy.go change this assertion +// fails (role is absent → msg["role"] == nil). +func TestNormalizeA2APayload_RoleDefault_ContractRegression(t *testing.T) { + msg := normMsg(t, `{"method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"delegate this"}]}}}`) + role, hasRole := msg["role"] + if !hasRole { + t.Fatal("REGRESSION (#2251): params.message.role absent after normalization — peer a2a validator will reject with 'role Field required'") + } + if role != "user" { + t.Errorf("expected default role \"user\", got %v", role) + } +} + // --- resolveAgentURL direct unit tests --- func TestResolveAgentURL_CacheHit(t *testing.T) {