From 52304d99a21be5b863b0a6cb44df22327a31d136 Mon Sep 17 00:00:00 2001 From: core-fe Date: Wed, 20 May 2026 21:24:32 +0000 Subject: [PATCH] fix(canvas): polite tasks/cancel before /workspaces/:id/restart for Stop All (task #377 companion) Companion to template-claude-code PR#40 (fix/377-stop-all-propagation, core-be). PR#40 adds a fast-cancel path on the runtime side (executor.cancel -> killpg the CLI subprocess group), gated on MOLECULE_STOP_PROPAGATE=true. But until canvas issues the A2A tasks/cancel JSON-RPC at the workspace before the heavy /restart, that runtime path is INERT in production - nothing ever reaches executor.cancel(). Flipping the env var would produce zero canary signal. Fix - two-phase Stop All in Toolbar.tsx: Phase 1: POST /workspaces/:id/a2a with method "tasks/cancel" and empty params for every workspace that has activeTasks>0, in parallel. The workspace-server a2a_proxy.go forwards the envelope verbatim to the runtime; a2a-sdk dispatches tasks/cancel to AgentExecutor.cancel() on the runtime side (claude_sdk_executor.py cancel() at line 853). Phase 2: poll the canvas Zustand store (TASK_UPDATED pushes drive the active_tasks field via canvas-events.ts:400) every 250ms for up to 8000ms. Drained workspaces (activeTasks=0) are removed from the to-restart set. Phase 3: for any workspace that did NOT drain inside the timeout - runtime on an old image without the cancel hook, or cancel propagation stuck - fall through to the original heavy /workspaces/:id/restart. Behavior is a strict superset of pre-fix Stop All: stuck workspaces still get the hammer, well-behaved workspaces are spared. Upstream wire-shape citations (per feedback_upstream_docs_first_before_hypothesizing): - A2A protocol spec section 9.4.5 "CancelTask" - JSON-RPC binding for the abstract Cancel Task operation (https://a2a-protocol.org/latest/specification/) - a2a-sdk 1.0.3 a2a/compat/v0_3/types.py line 1125 pins the wire method literal: `method: Literal['tasks/cancel'] = 'tasks/cancel'`. Matches the slash-notation our codebase already uses for "message/send" (workspace-server/internal/handlers/delegation.go:155, canvas/src/components/tabs/ScheduleTab.tsx:168). Tests - 4 new specs in Toolbar.test.tsx covering each phase: 1. phase 1 dispatches tasks/cancel via /a2a for every active workspace BEFORE any /restart (order assertion + envelope shape). 2. when activeTasks drains to 0 during the poll window, /restart is NOT called. 3. when activeTasks does NOT drain inside the 8s timeout, /restart is called for each stuck workspace (with phase-1-before-phase-3 order assertion). 4. selective drain - one workspace drains, the other doesn't; /restart is called only for the stuck one. Full canvas vitest suite: 3360 passed, 1 skipped, 0 failed. Toolbar file alone: 25 tests (21 prior + 4 new). Refs: task #377, template-claude-code PR#40 --- canvas/src/components/Toolbar.tsx | 93 ++++++++- .../src/components/__tests__/Toolbar.test.tsx | 178 +++++++++++++++++- 2 files changed, 265 insertions(+), 6 deletions(-) diff --git a/canvas/src/components/Toolbar.tsx b/canvas/src/components/Toolbar.tsx index b2563aeec..63f4b6680 100644 --- a/canvas/src/components/Toolbar.tsx +++ b/canvas/src/components/Toolbar.tsx @@ -68,14 +68,103 @@ export function Toolbar() { return c; }, [nodes]); + /** + * Stop All - task #377 fix. + * + * BEFORE this PR: directly POSTed `/workspaces/:id/restart`, which tears + * the container down and back up. That kills in-flight tool subprocesses + * (e.g. `bash -c 'sleep 600'`) but is heavy and discards any in-progress + * agent state. It also bypasses the runtime-side fast cancel path (task + * #377 PR#40 in template-claude-code) - meaning flipping + * `MOLECULE_STOP_PROPAGATE=true` would produce zero canary signal because + * nothing ever invokes `executor.cancel()` in production. + * + * AFTER this PR (two-phase polite cancel): + * + * 1. POST `tasks/cancel` (A2A JSON-RPC) to each active workspace's + * `/workspaces/:id/a2a` proxy. The platform proxies the envelope to + * the workspace runtime; the a2a-sdk framework dispatches `tasks/cancel` + * to `AgentExecutor.cancel()` (a2a-sdk 1.0.3 + * `a2a/compat/v0_3/types.py` line 1125 pins the wire literal as + * `Literal["tasks/cancel"]`; A2A protocol spec section 9.4.5 maps the + * abstract `CancelTask` operation to that wire string). The runtime's + * executor cancel path signals the CLI subprocess group with + * SIGTERM/grace/SIGKILL (template-claude-code PR#40 `stop_propagate.py`). + * + * 2. Poll the canvas store (the platform pushes `TASK_UPDATED` over WS + * on `active_tasks` changes - `canvas-events.ts` line 400) for up to + * `STOP_ALL_DRAIN_TIMEOUT_MS`. A workspace whose `activeTasks` drops + * to 0 is considered drained and is NOT restarted. + * + * 3. For any workspace that DID NOT drain inside the timeout - runtime + * is on an old image without the cancel path, or the cancel + * propagation is stuck - fall back to the original heavy + * `/workspaces/:id/restart`. The original behavior is preserved as a + * floor so a stuck workspace still gets stopped; the polite path is + * a fast top-up that lets well-behaved workspaces cancel without + * losing context. + * + * The polite-cancel envelope mirrors `ScheduleTab.handleRunNow` (line 168) + * which is the only other place in canvas that POSTs `/workspaces/:id/a2a` + * directly. Method string `tasks/cancel` and empty `params` match the + * a2a-sdk shape verified above. The proxy adds `jsonrpc:"2.0"` and `id` + * via `normalizeA2APayload` server-side, so the canvas envelope omits them. + */ const stopAll = useCallback(async () => { setStopping(true); const active = nodes.filter((n) => (n.data.activeTasks as number) > 0); + const activeIds = active.map((n) => n.id); + + // Phase 1 - polite cancel on every active workspace in parallel. + // Errors are swallowed (same shape as the pre-fix /restart + // Promise.all): a 4xx/5xx on tasks/cancel just means we fall through + // to /restart for that workspace below. await Promise.all( - active.map((n) => - api.post(`/workspaces/${n.id}/restart`).catch(() => {}) + activeIds.map((id) => + api + .post(`/workspaces/${id}/a2a`, { + method: "tasks/cancel", + params: {}, + }) + .catch(() => {}) ) ); + + // Phase 2 - poll the store for activeTasks reaching 0, with a hard + // timeout. STOP_ALL_DRAIN_TIMEOUT_MS is sized to cover the runtime's + // own SIGTERM-grace (5s in template-claude-code stop_propagate.py + // `_SIGTERM_GRACE_S`) plus a small WS round-trip buffer for the + // TASK_UPDATED push. STOP_ALL_POLL_INTERVAL_MS keeps the poll cheap + // (no animation jitter, no busy-wait). + const STOP_ALL_DRAIN_TIMEOUT_MS = 8000; + const STOP_ALL_POLL_INTERVAL_MS = 250; + const deadline = Date.now() + STOP_ALL_DRAIN_TIMEOUT_MS; + let undrained = new Set(activeIds); + while (undrained.size > 0 && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, STOP_ALL_POLL_INTERVAL_MS)); + const fresh = useCanvasStore.getState().nodes; + const stillActive = new Set(); + for (const id of undrained) { + const n = fresh.find((x) => x.id === id); + // Missing node (workspace deleted mid-cancel) is treated as + // drained - there's nothing left to restart and reporting it as + // "still running" would be a lie. + if (n && (n.data.activeTasks as number) > 0) stillActive.add(id); + } + undrained = stillActive; + } + + // Phase 3 - hard-restart anything that did not drain. This is the + // same call shape as the pre-fix Stop All, so behavior is strictly a + // superset: undrained workspaces still get the heavy stop, drained + // ones are spared. + if (undrained.size > 0) { + await Promise.all( + Array.from(undrained).map((id) => + api.post(`/workspaces/${id}/restart`).catch(() => {}) + ) + ); + } setStopping(false); }, [nodes]); diff --git a/canvas/src/components/__tests__/Toolbar.test.tsx b/canvas/src/components/__tests__/Toolbar.test.tsx index 81642e253..447c61197 100644 --- a/canvas/src/components/__tests__/Toolbar.test.tsx +++ b/canvas/src/components/__tests__/Toolbar.test.tsx @@ -131,14 +131,30 @@ const defaultStore = { batchDelete: vi.fn(() => Promise.resolve()), }; -vi.mock("@/store/canvas", () => ({ - useCanvasStore: vi.fn((selector: (s: typeof defaultStore) => unknown) => +vi.mock("@/store/canvas", () => { + // useCanvasStore is used in two shapes: + // 1. As a hook: `useCanvasStore((s) => s.x)` — selector path. + // 2. As a static accessor: `useCanvasStore.getState().nodes` — + // used by stopAll's drain-poll loop (task #377 Toolbar fix) and + // restartAll's success-clear loop. Both read the LIVE + // defaultStore object so tests that mutate `defaultStore.nodes` + // mid-flight (e.g. simulating a TASK_UPDATED that drops + // activeTasks to 0) see the update on the next poll tick. + const hook = vi.fn((selector: (s: typeof defaultStore) => unknown) => selector(defaultStore) - ), -})); + ) as unknown as ((selector: (s: typeof defaultStore) => unknown) => unknown) & { + getState: () => typeof defaultStore; + }; + hook.getState = () => defaultStore; + return { useCanvasStore: hook }; +}); // ── Component under test ─────────────────────────────────────────────────────── import { Toolbar } from "../Toolbar"; +// Imported AFTER vi.mock("@/lib/api", ...) above (hoisted) so this +// resolves to the mock module; gives the new task #377 tests a typed +// handle on api.post without a CJS require() (Vitest runs ESM). +import { api as mockedApi } from "@/lib/api"; // ── Tests ───────────────────────────────────────────────────────────────────── @@ -315,3 +331,157 @@ describe("Toolbar — ? shortcut opens shortcuts dialog", () => { expect(screen.queryByTestId("shortcuts-dialog")).toBeNull(); }); }); + +// ── Toolbar — Stop All polite-cancel flow (task #377) ─────────────────────── + +describe("Toolbar — Stop All polite cancel before restart (#377)", () => { + // `api` resolves to the top-level vi.mock factory's mocked `post`. + // We type-cast so TS allows mockReset/mockResolvedValue/mockImplementation + // calls without leaking the mock surface into the production type. + const api = mockedApi as unknown as { post: ReturnType }; + + /** + * Build a working set of two active workspaces so the assertions can + * distinguish per-id behavior (drained vs undrained) within one test. + */ + const seedTwoActive = () => { + defaultStore.nodes = toStoreNodes(makeNodes(["online", "online"], [2, 2])); + }; + + /** + * Drive an async useCallback handler to completion. Vitest's fake + * timers don't see microtasks unless we yield between advances; the + * helper interleaves `vi.advanceTimersByTimeAsync` with macrotask + * yields so pending fetch resolutions and setTimeout callbacks both + * settle before the assertion runs. + */ + const advanceUntilSettled = async (ms: number) => { + await vi.advanceTimersByTimeAsync(ms); + // One extra tick lets any chained .then() after a setTimeout + // resolution fire before the test moves on. + await Promise.resolve(); + }; + + beforeEach(() => { + vi.useFakeTimers(); + api.post.mockReset(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("phase 1: issues tasks/cancel via /workspaces/:id/a2a BEFORE any /restart", async () => { + seedTwoActive(); + // Hold both tasks/cancel responses open so the click handler is + // observably paused at phase 1. We don't actually need to resolve + // them for the order assertion — just inspect the call log. + let resolveCancels!: () => void; + const cancelGate = new Promise((r) => { resolveCancels = r; }); + api.post.mockImplementation(async (path: string) => { + if (path.endsWith("/a2a")) { + await cancelGate; + } + return undefined; + }); + + render(); + const btn = screen.getByRole("button", { name: /stop all running tasks/i }); + fireEvent.click(btn); + + // Yield once so the click handler enters phase 1 and dispatches the + // two /a2a POSTs. + await Promise.resolve(); + await Promise.resolve(); + + const a2aCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/a2a")); + const restartCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/restart")); + expect(a2aCalls.length).toBe(2); + expect(restartCalls.length).toBe(0); + + // Each /a2a POST carries the canonical tasks/cancel envelope. + for (const call of a2aCalls) { + expect(call[1]).toEqual({ method: "tasks/cancel", params: {} }); + } + + // Release the gate so the test cleanup doesn't dangle. + resolveCancels(); + await advanceUntilSettled(10_000); + }); + + it("phase 2: when activeTasks drains to 0 during the poll window, /restart is NOT called", async () => { + seedTwoActive(); + api.post.mockResolvedValue(undefined); + + render(); + fireEvent.click(screen.getByRole("button", { name: /stop all running tasks/i })); + + // Let phase 1 fire (the two tasks/cancel calls). + await Promise.resolve(); + await Promise.resolve(); + + // Simulate the platform pushing TASK_UPDATED with active_tasks=0 + // on both workspaces — emulate by mutating the store directly, + // which is what canvas-events.ts does in production. + defaultStore.nodes = toStoreNodes(makeNodes(["online", "online"], [0, 0])); + + // Advance past the first poll interval (250ms) so the loop sees + // the drained store and exits early. + await advanceUntilSettled(400); + // Drain any remaining timers so the handler returns cleanly. + await advanceUntilSettled(10_000); + + const restartCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/restart")); + expect(restartCalls.length).toBe(0); + }); + + it("phase 3: when activeTasks does NOT drain inside the timeout, falls through to /restart for each stuck workspace", async () => { + seedTwoActive(); + api.post.mockResolvedValue(undefined); + + render(); + fireEvent.click(screen.getByRole("button", { name: /stop all running tasks/i })); + + // Phase 1 dispatch. + await Promise.resolve(); + await Promise.resolve(); + + // Do NOT drain — activeTasks stays at 2 for both. Advance past the + // 8000ms drain timeout plus a buffer so phase 3's /restart POSTs fire. + await advanceUntilSettled(9_000); + await advanceUntilSettled(1_000); + + const a2aCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/a2a")); + const restartCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/restart")); + expect(a2aCalls.length).toBe(2); + expect(restartCalls.length).toBe(2); + + // Order check: every /a2a call comes before every /restart call. + const lastA2AIdx = Math.max( + ...api.post.mock.calls.map((c, i) => (String(c[0]).endsWith("/a2a") ? i : -1)) + ); + const firstRestartIdx = Math.min( + ...api.post.mock.calls.map((c, i) => (String(c[0]).endsWith("/restart") ? i : Infinity)) + ); + expect(lastA2AIdx).toBeLessThan(firstRestartIdx); + }); + + it("phase 3 selective: drains only one of two workspaces — /restart is called only for the stuck one", async () => { + seedTwoActive(); + api.post.mockResolvedValue(undefined); + + render(); + fireEvent.click(screen.getByRole("button", { name: /stop all running tasks/i })); + + await Promise.resolve(); + await Promise.resolve(); + + // ws-0 drains immediately, ws-1 stays stuck for the full timeout. + defaultStore.nodes = toStoreNodes(makeNodes(["online", "online"], [0, 2])); + await advanceUntilSettled(9_500); + + const restartCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/restart")); + expect(restartCalls.length).toBe(1); + expect(restartCalls[0][0]).toBe("/workspaces/ws-1/restart"); + }); +}); -- 2.52.0