diff --git a/canvas/e2e/staging-setup.ts b/canvas/e2e/staging-setup.ts index f81816c4b..feb5988b2 100644 --- a/canvas/e2e/staging-setup.ts +++ b/canvas/e2e/staging-setup.ts @@ -234,30 +234,44 @@ export default async function globalSetup(_config: FullConfig): Promise { "Authorization": `Bearer ${tenantToken}`, "X-Molecule-Org-Id": orgID, }; - const ws = await jsonFetch(`${tenantURL}/workspaces`, { - method: "POST", - headers: tenantAuth, - body: JSON.stringify({ - name: "E2E Canvas Test", - runtime: "hermes", - tier: 2, - // Provider-registry SSOT (internal#718) registers ONLY Kimi models for - // the hermes runtime — `moonshot/kimi-k2.6` is the platform-managed - // entry (workspace-server/internal/providers/providers.yaml, hermes -> - // platform). The old `gpt-4o` was never a registered hermes model and - // now 422s UNREGISTERED_MODEL_FOR_RUNTIME (core#2225). This workspace - // defaults closed to platform_managed (see the boot-shape note below), - // so a platform-namespaced model id is the registry-correct choice. - model: "moonshot/kimi-k2.6", - }), - }); - if (ws.status >= 400 || !ws.body?.id) { - throw new Error(`Workspace create ${ws.status}: ${JSON.stringify(ws.body)}`); + // Retry workspace creation on transient 5xx / timeout — staging CP can + // return 502/503/504 under load and a single-shot failure kills the + // entire E2E run. 3 attempts with 3s exponential backoff (3s, 6s, 12s) + // gives ~21s total budget, well inside the 20-min provision envelope. + let workspaceId = ""; + for (let attempt = 1; attempt <= 3; attempt++) { + const ws = await jsonFetch(`${tenantURL}/workspaces`, { + method: "POST", + headers: tenantAuth, + body: JSON.stringify({ + name: "E2E Canvas Test", + runtime: "hermes", + tier: 2, + // Provider-registry SSOT (internal#718) registers ONLY Kimi models for + // the hermes runtime — `moonshot/kimi-k2.6` is the platform-managed + // entry (workspace-server/internal/providers/providers.yaml, hermes -> + // platform). The old `gpt-4o` was never a registered hermes model and + // now 422s UNREGISTERED_MODEL_FOR_RUNTIME (core#2225). This workspace + // defaults closed to platform_managed (see the boot-shape note below), + // so a platform-namespaced model id is the registry-correct choice. + model: "moonshot/kimi-k2.6", + }), + }); + if (ws.status >= 200 && ws.status < 300 && ws.body?.id) { + workspaceId = ws.body.id as string; + break; + } + const isTransient = ws.status >= 500 || ws.status === 0; + if (!isTransient || attempt === 3) { + throw new Error(`Workspace create ${ws.status} (attempt ${attempt}): ${JSON.stringify(ws.body)}`); + } + const backoff = 3000 * Math.pow(2, attempt - 1); + console.log(`[staging-setup] Workspace create transient ${ws.status}, retrying in ${backoff}ms...`); + await new Promise((r) => setTimeout(r, backoff)); } - const workspaceId = ws.body.id as string; console.log(`[staging-setup] Workspace created: ${workspaceId}`); - // 6. Wait for workspace RENDERABLE. + // 6. Wait for workspace online // // This harness exists to verify the canvas *tab UI* renders (staging- // tabs.spec.ts: open each of the 13 workspace-panel tabs, assert no hard @@ -266,6 +280,16 @@ export default async function globalSetup(_config: FullConfig): Promise { // it needs is a workspace ROW that the canvas lists so the node renders // and the side-panel tabs open. A fully-`online` agent is NOT required. // + // Hermes cold-boot takes 10-13 min on slow apt days (apt + uv + hermes + // install + npm browser-tools). The controlplane bootstrap-watcher + // deadline fires at 5 min and sets status=failed prematurely; heartbeat + // then transitions failed → online after install.sh finishes. The ONLY + // failed shape we tolerate is the pre-start credential-abort + // (uptime_seconds=0, no last_sample_error) — the agent never ran. Real + // boot regressions (image pull error, panic, PYTHONPATH, etc.) still + // hard-throw immediately so triage gets detail without waiting for a + // polling timeout. See test_staging_full_saas.sh step 7/11 and issue #2632. + // // That distinction became load-bearing on 2026-06-03: workspace-server // #2162 (fix(provision): platform-managed workspace must fail-closed when // CP proxy env absent) made a platform_managed workspace ABORT AT BOOT @@ -287,8 +311,10 @@ export default async function globalSetup(_config: FullConfig): Promise { // the node + tabs render, proceed. We do NOT mask a real boot regression: // any `failed` carrying a last_sample_error, OR a non-zero uptime (the // agent started then crashed — image pull, panic, PYTHONPATH, etc.), - // still hard-throws. Genuine *infra* provision failure is already caught - // loud one step earlier at the org level (instance_status === "failed"). + // still hard-throws immediately so triage gets boot_stage / last_error / + // image fields without waiting for a polling timeout. + // Genuine *infra* provision failure is already caught loud one step + // earlier at the org level (instance_status === "failed"). await waitFor( async () => { const r = await jsonFetch(`${tenantURL}/workspaces/${workspaceId}`, { @@ -315,13 +341,7 @@ export default async function globalSetup(_config: FullConfig): Promise { ); return true; } - // last_sample_error is often empty when the failure happens before - // the agent emits a sample (e.g. boot crash, image pull error, - // missing PYTHONPATH, OpenAI quota at startup). Dumping the full - // body gives triage the boot_stage / last_error / image fields it - // needs without a second probe. Otherwise this propagates as a - // bare "Workspace failed: " — the exact useless message that - // sent #2632 to the issue tracker. + // Real boot regression — hard-throw immediately with full detail. const detail = sampleErr ? sampleErr : `(no last_sample_error) full body: ${JSON.stringify(r.body)}`; @@ -333,7 +353,7 @@ export default async function globalSetup(_config: FullConfig): Promise { 10_000, "workspace online", ); - console.log(`[staging-setup] Workspace renderable`); + console.log(`[staging-setup] Workspace online`); // 7. Hand state off to tests + teardown — overwrite the slug-only // bootstrap state with the full state spec tests need. diff --git a/workspace-server/internal/handlers/delegation.go b/workspace-server/internal/handlers/delegation.go index aae43309d..93535e384 100644 --- a/workspace-server/internal/handlers/delegation.go +++ b/workspace-server/internal/handlers/delegation.go @@ -173,20 +173,8 @@ func (h *DelegationHandler) Delegate(c *gin.Context) { // check_task_status returned status='queued' forever even after a // real reply landed). messageId mirrors delegation_id so the // platform's idempotency-key extraction also keys off the same id. - a2aBody, marshalErr := json.Marshal(map[string]interface{}{ - "method": "message/send", - "params": map[string]interface{}{ - "message": map[string]interface{}{ - "role": "user", - "messageId": delegationID, - // A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251) — - // a `type`-keyed Part is dropped by the receiver's v0.3 - // validator, silently losing the delegated task. - "parts": []map[string]interface{}{{"kind": "text", "text": body.Task}}, - "metadata": map[string]interface{}{"delegation_id": delegationID}, - }, - }, - }) + // Build A2A payload via helper so contract tests can assert the envelope shape. + a2aBody, marshalErr := buildDelegateA2ABody(delegationID, body.Task) if marshalErr != nil { log.Printf("Delegation %s: json.Marshal a2aBody failed: %v", delegationID, marshalErr) } @@ -374,6 +362,27 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b return insertTrackingUnavailable } +// buildDelegateA2ABody constructs the A2A JSON-RPC envelope for a delegation. +// The returned shape is a schema-valid SendMessageRequest with role="user", +// messageId, parts, and delegation metadata. Extracted to a pure function so +// unit tests can assert the envelope contract without standing up HTTP or DB. +func buildDelegateA2ABody(delegationID, task string) ([]byte, error) { + return json.Marshal(map[string]interface{}{ + "method": "message/send", + "params": map[string]interface{}{ + "message": map[string]interface{}{ + "role": "user", + "messageId": delegationID, + // A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251) — + // a `type`-keyed Part is dropped by the receiver's v0.3 + // validator, silently losing the delegated task. + "parts": []map[string]interface{}{{"kind": "text", "text": task}}, + "metadata": map[string]interface{}{"delegation_id": delegationID}, + }, + }, + }) +} + // executeDelegation runs in a goroutine — sends A2A and stores the result. // Updates delegation status through: pending → dispatched → received → completed/failed // delegationRetryDelay is the pause between the first failed proxy attempt diff --git a/workspace-server/internal/handlers/delegation_test.go b/workspace-server/internal/handlers/delegation_test.go index c71454639..518c548d1 100644 --- a/workspace-server/internal/handlers/delegation_test.go +++ b/workspace-server/internal/handlers/delegation_test.go @@ -1762,3 +1762,74 @@ func TestListDelegations_LedgerFailedIncludesErrorDetail(t *testing.T) { t.Errorf("unmet sqlmock expectations: %v", err) } } + +// ---------- buildDelegateA2ABody: schema-valid SendMessageRequest ---------- + +// TestBuildDelegateA2ABody_SchemaValidSendMessageRequest pins the contract +// requested by issue #2251: delegate_task must produce a schema-valid A2A +// SendMessageRequest with role="user", messageId, parts, and metadata. +func TestBuildDelegateA2ABody_SchemaValidSendMessageRequest(t *testing.T) { + delegationID := "del-2251-test" + task := "write a contract test" + + body, err := buildDelegateA2ABody(delegationID, task) + if err != nil { + t.Fatalf("buildDelegateA2ABody failed: %v", err) + } + + var envelope map[string]interface{} + if err := json.Unmarshal(body, &envelope); err != nil { + t.Fatalf("body is not valid JSON: %v", err) + } + + // Top-level envelope shape + if envelope["method"] != "message/send" { + t.Errorf("method = %v, want message/send", envelope["method"]) + } + + params, ok := envelope["params"].(map[string]interface{}) + if !ok { + t.Fatalf("params missing or not a map: %T", envelope["params"]) + } + + msg, ok := params["message"].(map[string]interface{}) + if !ok { + t.Fatalf("message missing or not a map: %T", params["message"]) + } + + // Issue #2251: role is required + if msg["role"] != "user" { + t.Errorf("message.role = %v, want \"user\"", msg["role"]) + } + + // messageId must be present and match delegationID + if msg["messageId"] != delegationID { + t.Errorf("message.messageId = %v, want %s", msg["messageId"], delegationID) + } + + // parts must be a non-empty list with a text part + parts, ok := msg["parts"].([]interface{}) + if !ok || len(parts) == 0 { + t.Fatalf("message.parts missing or empty: %T", msg["parts"]) + } + firstPart, ok := parts[0].(map[string]interface{}) + if !ok { + t.Fatalf("first part is not a map: %T", parts[0]) + } + // A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251) + if firstPart["kind"] != "text" { + t.Errorf("first part kind = %v, want text", firstPart["kind"]) + } + if firstPart["text"] != task { + t.Errorf("first part text = %v, want %q", firstPart["text"], task) + } + + // metadata.delegation_id must match + meta, ok := msg["metadata"].(map[string]interface{}) + if !ok { + t.Fatalf("metadata missing or not a map: %T", msg["metadata"]) + } + if meta["delegation_id"] != delegationID { + t.Errorf("metadata.delegation_id = %v, want %s", meta["delegation_id"], delegationID) + } +} diff --git a/workspace-server/internal/handlers/mcp_test.go b/workspace-server/internal/handlers/mcp_test.go index dad564e81..9813334a1 100644 --- a/workspace-server/internal/handlers/mcp_test.go +++ b/workspace-server/internal/handlers/mcp_test.go @@ -54,6 +54,55 @@ func mcpPost(t *testing.T, h *MCPHandler, workspaceID string, body interface{}) return w } +// assertA2ASendMessageSchema validates that body is a schema-valid A2A +// SendMessageRequest with role="user", messageId, and non-empty parts. +// Issue #2251 contract test: delegate_task must always produce this shape. +func assertA2ASendMessageSchema(t *testing.T, body []byte, wantTask string) { + t.Helper() + var envelope map[string]interface{} + if err := json.Unmarshal(body, &envelope); err != nil { + t.Fatalf("A2A body is not valid JSON: %v", err) + } + if envelope["jsonrpc"] != "2.0" { + t.Errorf("jsonrpc = %v, want 2.0", envelope["jsonrpc"]) + } + if envelope["method"] != "message/send" { + t.Errorf("method = %v, want message/send", envelope["method"]) + } + + params, ok := envelope["params"].(map[string]interface{}) + if !ok { + t.Fatalf("params missing or not a map: %T", envelope["params"]) + } + msg, ok := params["message"].(map[string]interface{}) + if !ok { + t.Fatalf("message missing or not a map: %T", params["message"]) + } + + if msg["role"] != "user" { + t.Errorf("message.role = %v, want \"user\"", msg["role"]) + } + if msg["messageId"] == "" { + t.Error("message.messageId is empty") + } + + parts, ok := msg["parts"].([]interface{}) + if !ok || len(parts) == 0 { + t.Fatalf("message.parts missing or empty: %T", msg["parts"]) + } + firstPart, ok := parts[0].(map[string]interface{}) + if !ok { + t.Fatalf("first part is not a map: %T", parts[0]) + } + // A2A v0.3 Part discriminator is `kind`, NOT `type` (#2251) + if firstPart["kind"] != "text" { + t.Errorf("first part kind = %v, want text", firstPart["kind"]) + } + if firstPart["text"] != wantTask { + t.Errorf("first part text = %v, want %q", firstPart["text"], wantTask) + } +} + func expectCanCommunicateSiblings(mock sqlmock.Sqlmock, callerID, targetID, parentID string) { mock.ExpectQuery(`SELECT id, parent_id FROM workspaces WHERE id = \$1`). WithArgs(callerID). @@ -209,9 +258,7 @@ func TestMCPHandler_DelegateTask_RoutesThroughPlatformA2AProxy(t *testing.T) { if !logActivity { t.Fatal("delegate_task should log through platform A2A proxy") } - if !strings.Contains(string(body), "do work") { - t.Fatalf("A2A body missing task text: %s", string(body)) - } + assertA2ASendMessageSchema(t, body, "do work") return 200, []byte(`{"result":{"message":{"parts":[{"text":"done"}]}}}`), nil } @@ -252,9 +299,7 @@ func TestMCPHandler_DelegateTaskAsync_RoutesThroughPlatformA2AProxy(t *testing.T if workspaceID != targetID || proxyCallerID != callerID { t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID) } - if !strings.Contains(string(body), "async work") { - t.Fatalf("A2A body missing task text: %s", string(body)) - } + assertA2ASendMessageSchema(t, body, "async work") called <- struct{}{} return 200, []byte(`{"result":{"message":{"parts":[{"text":"accepted"}]}}}`), nil } @@ -304,10 +349,8 @@ func TestMCPHandler_DelegateTask_WithAttachments(t *testing.T) { if workspaceID != targetID || proxyCallerID != callerID { t.Fatalf("unexpected proxy route target=%q caller=%q", workspaceID, proxyCallerID) } + assertA2ASendMessageSchema(t, body, "review this video") bodyStr := string(body) - if !strings.Contains(bodyStr, `"text":"review this video"`) { - t.Fatalf("A2A body missing task text: %s", bodyStr) - } if !strings.Contains(bodyStr, `"kind":"video"`) { t.Fatalf("A2A body missing video attachment kind: %s", bodyStr) } @@ -386,6 +429,7 @@ func TestMCPHandler_DelegateTaskAsync_WithAttachments(t *testing.T) { waitGlobalAsyncForTest() select { case body := <-called: + assertA2ASendMessageSchema(t, body, "async work with image") bodyStr := string(body) if !strings.Contains(bodyStr, `"kind":"image"`) { t.Fatalf("A2A body missing image attachment kind: %s", bodyStr)