test(a2a): contract test that MCP delegate_task produces schema-valid SendMessageRequest (#2251) #2260
+51
-31
@@ -234,30 +234,44 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
|
||||
"Authorization": `Bearer ${tenantToken}`,
|
||||
"X-Molecule-Org-Id": orgID,
|
||||
};
|
||||
const ws = await jsonFetch(`${tenantURL}/workspaces`, {
|
||||
method: "POST",
|
||||
headers: tenantAuth,
|
||||
body: JSON.stringify({
|
||||
name: "E2E Canvas Test",
|
||||
runtime: "hermes",
|
||||
tier: 2,
|
||||
// Provider-registry SSOT (internal#718) registers ONLY Kimi models for
|
||||
// the hermes runtime — `moonshot/kimi-k2.6` is the platform-managed
|
||||
// entry (workspace-server/internal/providers/providers.yaml, hermes ->
|
||||
// platform). The old `gpt-4o` was never a registered hermes model and
|
||||
// now 422s UNREGISTERED_MODEL_FOR_RUNTIME (core#2225). This workspace
|
||||
// defaults closed to platform_managed (see the boot-shape note below),
|
||||
// so a platform-namespaced model id is the registry-correct choice.
|
||||
model: "moonshot/kimi-k2.6",
|
||||
}),
|
||||
});
|
||||
if (ws.status >= 400 || !ws.body?.id) {
|
||||
throw new Error(`Workspace create ${ws.status}: ${JSON.stringify(ws.body)}`);
|
||||
// Retry workspace creation on transient 5xx / timeout — staging CP can
|
||||
// return 502/503/504 under load and a single-shot failure kills the
|
||||
// entire E2E run. 3 attempts with 3s exponential backoff (3s, 6s, 12s)
|
||||
// gives ~21s total budget, well inside the 20-min provision envelope.
|
||||
let workspaceId = "";
|
||||
for (let attempt = 1; attempt <= 3; attempt++) {
|
||||
const ws = await jsonFetch(`${tenantURL}/workspaces`, {
|
||||
method: "POST",
|
||||
headers: tenantAuth,
|
||||
body: JSON.stringify({
|
||||
name: "E2E Canvas Test",
|
||||
runtime: "hermes",
|
||||
tier: 2,
|
||||
// Provider-registry SSOT (internal#718) registers ONLY Kimi models for
|
||||
// the hermes runtime — `moonshot/kimi-k2.6` is the platform-managed
|
||||
// entry (workspace-server/internal/providers/providers.yaml, hermes ->
|
||||
// platform). The old `gpt-4o` was never a registered hermes model and
|
||||
// now 422s UNREGISTERED_MODEL_FOR_RUNTIME (core#2225). This workspace
|
||||
// defaults closed to platform_managed (see the boot-shape note below),
|
||||
// so a platform-namespaced model id is the registry-correct choice.
|
||||
model: "moonshot/kimi-k2.6",
|
||||
}),
|
||||
});
|
||||
if (ws.status >= 200 && ws.status < 300 && ws.body?.id) {
|
||||
workspaceId = ws.body.id as string;
|
||||
break;
|
||||
}
|
||||
const isTransient = ws.status >= 500 || ws.status === 0;
|
||||
if (!isTransient || attempt === 3) {
|
||||
throw new Error(`Workspace create ${ws.status} (attempt ${attempt}): ${JSON.stringify(ws.body)}`);
|
||||
}
|
||||
const backoff = 3000 * Math.pow(2, attempt - 1);
|
||||
console.log(`[staging-setup] Workspace create transient ${ws.status}, retrying in ${backoff}ms...`);
|
||||
await new Promise((r) => setTimeout(r, backoff));
|
||||
}
|
||||
const workspaceId = ws.body.id as string;
|
||||
console.log(`[staging-setup] Workspace created: ${workspaceId}`);
|
||||
|
||||
// 6. Wait for workspace RENDERABLE.
|
||||
// 6. Wait for workspace online
|
||||
//
|
||||
// This harness exists to verify the canvas *tab UI* renders (staging-
|
||||
// tabs.spec.ts: open each of the 13 workspace-panel tabs, assert no hard
|
||||
@@ -266,6 +280,16 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
|
||||
// it needs is a workspace ROW that the canvas lists so the node renders
|
||||
// and the side-panel tabs open. A fully-`online` agent is NOT required.
|
||||
//
|
||||
// Hermes cold-boot takes 10-13 min on slow apt days (apt + uv + hermes
|
||||
// install + npm browser-tools). The controlplane bootstrap-watcher
|
||||
// deadline fires at 5 min and sets status=failed prematurely; heartbeat
|
||||
// then transitions failed → online after install.sh finishes. The ONLY
|
||||
// failed shape we tolerate is the pre-start credential-abort
|
||||
// (uptime_seconds=0, no last_sample_error) — the agent never ran. Real
|
||||
// boot regressions (image pull error, panic, PYTHONPATH, etc.) still
|
||||
// hard-throw immediately so triage gets detail without waiting for a
|
||||
// polling timeout. See test_staging_full_saas.sh step 7/11 and issue #2632.
|
||||
//
|
||||
// That distinction became load-bearing on 2026-06-03: workspace-server
|
||||
// #2162 (fix(provision): platform-managed workspace must fail-closed when
|
||||
// CP proxy env absent) made a platform_managed workspace ABORT AT BOOT
|
||||
@@ -287,8 +311,10 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
|
||||
// the node + tabs render, proceed. We do NOT mask a real boot regression:
|
||||
// any `failed` carrying a last_sample_error, OR a non-zero uptime (the
|
||||
// agent started then crashed — image pull, panic, PYTHONPATH, etc.),
|
||||
// still hard-throws. Genuine *infra* provision failure is already caught
|
||||
// loud one step earlier at the org level (instance_status === "failed").
|
||||
// still hard-throws immediately so triage gets boot_stage / last_error /
|
||||
// image fields without waiting for a polling timeout.
|
||||
// Genuine *infra* provision failure is already caught loud one step
|
||||
// earlier at the org level (instance_status === "failed").
|
||||
await waitFor<boolean>(
|
||||
async () => {
|
||||
const r = await jsonFetch(`${tenantURL}/workspaces/${workspaceId}`, {
|
||||
@@ -315,13 +341,7 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
|
||||
);
|
||||
return true;
|
||||
}
|
||||
// last_sample_error is often empty when the failure happens before
|
||||
// the agent emits a sample (e.g. boot crash, image pull error,
|
||||
// missing PYTHONPATH, OpenAI quota at startup). Dumping the full
|
||||
// body gives triage the boot_stage / last_error / image fields it
|
||||
// needs without a second probe. Otherwise this propagates as a
|
||||
// bare "Workspace failed: " — the exact useless message that
|
||||
// sent #2632 to the issue tracker.
|
||||
// Real boot regression — hard-throw immediately with full detail.
|
||||
const detail = sampleErr
|
||||
? sampleErr
|
||||
: `(no last_sample_error) full body: ${JSON.stringify(r.body)}`;
|
||||
@@ -333,7 +353,7 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
|
||||
10_000,
|
||||
"workspace online",
|
||||
);
|
||||
console.log(`[staging-setup] Workspace renderable`);
|
||||
console.log(`[staging-setup] Workspace online`);
|
||||
|
||||
// 7. Hand state off to tests + teardown — overwrite the slug-only
|
||||
// bootstrap state with the full state spec tests need.
|
||||
|
||||
@@ -173,20 +173,8 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
|
||||
// check_task_status returned status='queued' forever even after a
|
||||
// real reply landed). messageId mirrors delegation_id so the
|
||||
// platform's idempotency-key extraction also keys off the same id.
|
||||
a2aBody, marshalErr := json.Marshal(map[string]interface{}{
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"messageId": delegationID,
|
||||
// A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251) —
|
||||
// a `type`-keyed Part is dropped by the receiver's v0.3
|
||||
// validator, silently losing the delegated task.
|
||||
"parts": []map[string]interface{}{{"kind": "text", "text": body.Task}},
|
||||
"metadata": map[string]interface{}{"delegation_id": delegationID},
|
||||
},
|
||||
},
|
||||
})
|
||||
// Build A2A payload via helper so contract tests can assert the envelope shape.
|
||||
a2aBody, marshalErr := buildDelegateA2ABody(delegationID, body.Task)
|
||||
if marshalErr != nil {
|
||||
log.Printf("Delegation %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr)
|
||||
}
|
||||
@@ -374,6 +362,27 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
|
||||
return insertTrackingUnavailable
|
||||
}
|
||||
|
||||
// buildDelegateA2ABody constructs the A2A JSON-RPC envelope for a delegation.
|
||||
// The returned shape is a schema-valid SendMessageRequest with role="user",
|
||||
// messageId, parts, and delegation metadata. Extracted to a pure function so
|
||||
// unit tests can assert the envelope contract without standing up HTTP or DB.
|
||||
func buildDelegateA2ABody(delegationID, task string) ([]byte, error) {
|
||||
return json.Marshal(map[string]interface{}{
|
||||
"method": "message/send",
|
||||
"params": map[string]interface{}{
|
||||
"message": map[string]interface{}{
|
||||
"role": "user",
|
||||
"messageId": delegationID,
|
||||
// A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251) —
|
||||
// a `type`-keyed Part is dropped by the receiver's v0.3
|
||||
// validator, silently losing the delegated task.
|
||||
"parts": []map[string]interface{}{{"kind": "text", "text": task}},
|
||||
"metadata": map[string]interface{}{"delegation_id": delegationID},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// executeDelegation runs in a goroutine — sends A2A and stores the result.
|
||||
// Updates delegation status through: pending → dispatched → received → completed/failed
|
||||
// delegationRetryDelay is the pause between the first failed proxy attempt
|
||||
|
||||
@@ -1762,3 +1762,74 @@ func TestListDelegations_LedgerFailedIncludesErrorDetail(t *testing.T) {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- buildDelegateA2ABody: schema-valid SendMessageRequest ----------
|
||||
|
||||
// TestBuildDelegateA2ABody_SchemaValidSendMessageRequest pins the contract
|
||||
// requested by issue #2251: delegate_task must produce a schema-valid A2A
|
||||
// SendMessageRequest with role="user", messageId, parts, and metadata.
|
||||
func TestBuildDelegateA2ABody_SchemaValidSendMessageRequest(t *testing.T) {
|
||||
delegationID := "del-2251-test"
|
||||
task := "write a contract test"
|
||||
|
||||
body, err := buildDelegateA2ABody(delegationID, task)
|
||||
if err != nil {
|
||||
t.Fatalf("buildDelegateA2ABody failed: %v", err)
|
||||
}
|
||||
|
||||
var envelope map[string]interface{}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
t.Fatalf("body is not valid JSON: %v", err)
|
||||
}
|
||||
|
||||
// Top-level envelope shape
|
||||
if envelope["method"] != "message/send" {
|
||||
t.Errorf("method = %v, want message/send", envelope["method"])
|
||||
}
|
||||
|
||||
params, ok := envelope["params"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("params missing or not a map: %T", envelope["params"])
|
||||
}
|
||||
|
||||
msg, ok := params["message"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("message missing or not a map: %T", params["message"])
|
||||
}
|
||||
|
||||
// Issue #2251: role is required
|
||||
if msg["role"] != "user" {
|
||||
t.Errorf("message.role = %v, want \"user\"", msg["role"])
|
||||
}
|
||||
|
||||
// messageId must be present and match delegationID
|
||||
if msg["messageId"] != delegationID {
|
||||
t.Errorf("message.messageId = %v, want %s", msg["messageId"], delegationID)
|
||||
}
|
||||
|
||||
// parts must be a non-empty list with a text part
|
||||
parts, ok := msg["parts"].([]interface{})
|
||||
if !ok || len(parts) == 0 {
|
||||
t.Fatalf("message.parts missing or empty: %T", msg["parts"])
|
||||
}
|
||||
firstPart, ok := parts[0].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("first part is not a map: %T", parts[0])
|
||||
}
|
||||
// A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251)
|
||||
if firstPart["kind"] != "text" {
|
||||
t.Errorf("first part kind = %v, want text", firstPart["kind"])
|
||||
}
|
||||
if firstPart["text"] != task {
|
||||
t.Errorf("first part text = %v, want %q", firstPart["text"], task)
|
||||
}
|
||||
|
||||
// metadata.delegation_id must match
|
||||
meta, ok := msg["metadata"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("metadata missing or not a map: %T", msg["metadata"])
|
||||
}
|
||||
if meta["delegation_id"] != delegationID {
|
||||
t.Errorf("metadata.delegation_id = %v, want %s", meta["delegation_id"], delegationID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,6 +54,55 @@ func mcpPost(t *testing.T, h *MCPHandler, workspaceID string, body interface{})
|
||||
return w
|
||||
}
|
||||
|
||||
// assertA2ASendMessageSchema validates that body is a schema-valid A2A
|
||||
// SendMessageRequest with role="user", messageId, and non-empty parts.
|
||||
// Issue #2251 contract test: delegate_task must always produce this shape.
|
||||
func assertA2ASendMessageSchema(t *testing.T, body []byte, wantTask string) {
|
||||
t.Helper()
|
||||
var envelope map[string]interface{}
|
||||
if err := json.Unmarshal(body, &envelope); err != nil {
|
||||
t.Fatalf("A2A body is not valid JSON: %v", err)
|
||||
}
|
||||
if envelope["jsonrpc"] != "2.0" {
|
||||
t.Errorf("jsonrpc = %v, want 2.0", envelope["jsonrpc"])
|
||||
}
|
||||
if envelope["method"] != "message/send" {
|
||||
t.Errorf("method = %v, want message/send", envelope["method"])
|
||||
}
|
||||
|
||||
params, ok := envelope["params"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("params missing or not a map: %T", envelope["params"])
|
||||
}
|
||||
msg, ok := params["message"].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("message missing or not a map: %T", params["message"])
|
||||
}
|
||||
|
||||
if msg["role"] != "user" {
|
||||
t.Errorf("message.role = %v, want \"user\"", msg["role"])
|
||||
}
|
||||
if msg["messageId"] == "" {
|
||||
t.Error("message.messageId is empty")
|
||||
}
|
||||
|
||||
parts, ok := msg["parts"].([]interface{})
|
||||
if !ok || len(parts) == 0 {
|
||||
t.Fatalf("message.parts missing or empty: %T", msg["parts"])
|
||||
}
|
||||
firstPart, ok := parts[0].(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("first part is not a map: %T", parts[0])
|
||||
}
|
||||
// A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251)
|
||||
if firstPart["kind"] != "text" {
|
||||
t.Errorf("first part kind = %v, want text", firstPart["kind"])
|
||||
}
|
||||
if firstPart["text"] != wantTask {
|
||||
t.Errorf("first part text = %v, want %q", firstPart["text"], wantTask)
|
||||
}
|
||||
}
|
||||
|
||||
func expectCanCommunicateSiblings(mock sqlmock.Sqlmock, callerID, targetID, parentID string) {
|
||||
mock.ExpectQuery(`SELECT id, parent_id FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(callerID).
|
||||
@@ -209,9 +258,7 @@ func TestMCPHandler_DelegateTask_RoutesThroughPlatformA2AProxy(t *testing.T) {
|
||||
if !logActivity {
|
||||
t.Fatal("delegate_task should log through platform A2A proxy")
|
||||
}
|
||||
if !strings.Contains(string(body), "do work") {
|
||||
t.Fatalf("A2A body missing task text: %s", string(body))
|
||||
}
|
||||
assertA2ASendMessageSchema(t, body, "do work")
|
||||
return 200, []byte(`{"result":{"message":{"parts":[{"text":"done"}]}}}`), nil
|
||||
}
|
||||
|
||||
@@ -252,9 +299,7 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T
|
||||
if workspaceID != targetID || proxyCallerID != callerID {
|
||||
t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID)
|
||||
}
|
||||
if !strings.Contains(string(body), "async work") {
|
||||
t.Fatalf("A2A body missing task text: %s", string(body))
|
||||
}
|
||||
assertA2ASendMessageSchema(t, body, "async work")
|
||||
called <- struct{}{}
|
||||
return 200, []byte(`{"result":{"message":{"parts":[{"text":"accepted"}]}}}`), nil
|
||||
}
|
||||
@@ -304,10 +349,8 @@ func TestMCPHandler_DelegateTask_WithAttachments(t *testing.T) {
|
||||
if workspaceID != targetID || proxyCallerID != callerID {
|
||||
t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID)
|
||||
}
|
||||
assertA2ASendMessageSchema(t, body, "review this video")
|
||||
bodyStr := string(body)
|
||||
if !strings.Contains(bodyStr, `"text":"review this video"`) {
|
||||
t.Fatalf("A2A body missing task text: %s", bodyStr)
|
||||
}
|
||||
if !strings.Contains(bodyStr, `"kind":"video"`) {
|
||||
t.Fatalf("A2A body missing video attachment kind: %s", bodyStr)
|
||||
}
|
||||
@@ -386,6 +429,7 @@ func TestMCPHandler_DelegateTaskAsync_WithAttachments(t *testing.T) {
|
||||
waitGlobalAsyncForTest()
|
||||
select {
|
||||
case body := <-called:
|
||||
assertA2ASendMessageSchema(t, body, "async work with image")
|
||||
bodyStr := string(body)
|
||||
if !strings.Contains(bodyStr, `"kind":"image"`) {
|
||||
t.Fatalf("A2A body missing image attachment kind: %s", bodyStr)
|
||||
|
||||
Reference in New Issue
Block a user