fix(canvas): polite tasks/cancel before /workspaces/:id/restart for Stop All (task #377 companion) #1619

Merged
core-devops merged 1 commits from fix/377-canvas-polite-cancel-before-restart into main 2026-05-20 22:58:59 +00:00
2 changed files with 265 additions and 6 deletions
+91 -2
View File
@@ -68,14 +68,103 @@ export function Toolbar() {
return c;
}, [nodes]);
/**
* Stop All - task #377 fix.
*
* BEFORE this PR: directly POSTed `/workspaces/:id/restart`, which tears
* the container down and back up. That kills in-flight tool subprocesses
* (e.g. `bash -c 'sleep 600'`) but is heavy and discards any in-progress
* agent state. It also bypasses the runtime-side fast cancel path (task
* #377 PR#40 in template-claude-code) - meaning flipping
* `MOLECULE_STOP_PROPAGATE=true` would produce zero canary signal because
* nothing ever invokes `executor.cancel()` in production.
*
* AFTER this PR (two-phase polite cancel):
*
* 1. POST `tasks/cancel` (A2A JSON-RPC) to each active workspace's
* `/workspaces/:id/a2a` proxy. The platform proxies the envelope to
* the workspace runtime; the a2a-sdk framework dispatches `tasks/cancel`
* to `AgentExecutor.cancel()` (a2a-sdk 1.0.3
* `a2a/compat/v0_3/types.py` line 1125 pins the wire literal as
* `Literal["tasks/cancel"]`; A2A protocol spec section 9.4.5 maps the
* abstract `CancelTask` operation to that wire string). The runtime's
* executor cancel path signals the CLI subprocess group with
* SIGTERM/grace/SIGKILL (template-claude-code PR#40 `stop_propagate.py`).
*
* 2. Poll the canvas store (the platform pushes `TASK_UPDATED` over WS
* on `active_tasks` changes - `canvas-events.ts` line 400) for up to
* `STOP_ALL_DRAIN_TIMEOUT_MS`. A workspace whose `activeTasks` drops
* to 0 is considered drained and is NOT restarted.
*
* 3. For any workspace that DID NOT drain inside the timeout - runtime
* is on an old image without the cancel path, or the cancel
* propagation is stuck - fall back to the original heavy
* `/workspaces/:id/restart`. The original behavior is preserved as a
* floor so a stuck workspace still gets stopped; the polite path is
* a fast top-up that lets well-behaved workspaces cancel without
* losing context.
*
* The polite-cancel envelope mirrors `ScheduleTab.handleRunNow` (line 168)
* which is the only other place in canvas that POSTs `/workspaces/:id/a2a`
* directly. Method string `tasks/cancel` and empty `params` match the
* a2a-sdk shape verified above. The proxy adds `jsonrpc:"2.0"` and `id`
* via `normalizeA2APayload` server-side, so the canvas envelope omits them.
*/
const stopAll = useCallback(async () => {
setStopping(true);
const active = nodes.filter((n) => (n.data.activeTasks as number) > 0);
const activeIds = active.map((n) => n.id);
// Phase 1 - polite cancel on every active workspace in parallel.
// Errors are swallowed (same shape as the pre-fix /restart
// Promise.all): a 4xx/5xx on tasks/cancel just means we fall through
// to /restart for that workspace below.
await Promise.all(
active.map((n) =>
api.post(`/workspaces/${n.id}/restart`).catch(() => {})
activeIds.map((id) =>
api
.post(`/workspaces/${id}/a2a`, {
method: "tasks/cancel",
params: {},
})
.catch(() => {})
)
);
// Phase 2 - poll the store for activeTasks reaching 0, with a hard
// timeout. STOP_ALL_DRAIN_TIMEOUT_MS is sized to cover the runtime's
// own SIGTERM-grace (5s in template-claude-code stop_propagate.py
// `_SIGTERM_GRACE_S`) plus a small WS round-trip buffer for the
// TASK_UPDATED push. STOP_ALL_POLL_INTERVAL_MS keeps the poll cheap
// (no animation jitter, no busy-wait).
const STOP_ALL_DRAIN_TIMEOUT_MS = 8000;
const STOP_ALL_POLL_INTERVAL_MS = 250;
const deadline = Date.now() + STOP_ALL_DRAIN_TIMEOUT_MS;
let undrained = new Set(activeIds);
while (undrained.size > 0 && Date.now() < deadline) {
await new Promise((r) => setTimeout(r, STOP_ALL_POLL_INTERVAL_MS));
const fresh = useCanvasStore.getState().nodes;
const stillActive = new Set<string>();
for (const id of undrained) {
const n = fresh.find((x) => x.id === id);
// Missing node (workspace deleted mid-cancel) is treated as
// drained - there's nothing left to restart and reporting it as
// "still running" would be a lie.
if (n && (n.data.activeTasks as number) > 0) stillActive.add(id);
}
undrained = stillActive;
}
// Phase 3 - hard-restart anything that did not drain. This is the
// same call shape as the pre-fix Stop All, so behavior is strictly a
// superset: undrained workspaces still get the heavy stop, drained
// ones are spared.
if (undrained.size > 0) {
await Promise.all(
Array.from(undrained).map((id) =>
api.post(`/workspaces/${id}/restart`).catch(() => {})
)
);
}
setStopping(false);
}, [nodes]);
@@ -131,14 +131,30 @@ const defaultStore = {
batchDelete: vi.fn(() => Promise.resolve()),
};
vi.mock("@/store/canvas", () => ({
useCanvasStore: vi.fn((selector: (s: typeof defaultStore) => unknown) =>
vi.mock("@/store/canvas", () => {
// useCanvasStore is used in two shapes:
// 1. As a hook: `useCanvasStore((s) => s.x)` — selector path.
// 2. As a static accessor: `useCanvasStore.getState().nodes` —
// used by stopAll's drain-poll loop (task #377 Toolbar fix) and
// restartAll's success-clear loop. Both read the LIVE
// defaultStore object so tests that mutate `defaultStore.nodes`
// mid-flight (e.g. simulating a TASK_UPDATED that drops
// activeTasks to 0) see the update on the next poll tick.
const hook = vi.fn((selector: (s: typeof defaultStore) => unknown) =>
selector(defaultStore)
),
}));
) as unknown as ((selector: (s: typeof defaultStore) => unknown) => unknown) & {
getState: () => typeof defaultStore;
};
hook.getState = () => defaultStore;
return { useCanvasStore: hook };
});
// ── Component under test ───────────────────────────────────────────────────────
import { Toolbar } from "../Toolbar";
// Imported AFTER vi.mock("@/lib/api", ...) above (hoisted) so this
// resolves to the mock module; gives the new task #377 tests a typed
// handle on api.post without a CJS require() (Vitest runs ESM).
import { api as mockedApi } from "@/lib/api";
// ── Tests ─────────────────────────────────────────────────────────────────────
@@ -315,3 +331,157 @@ describe("Toolbar — ? shortcut opens shortcuts dialog", () => {
expect(screen.queryByTestId("shortcuts-dialog")).toBeNull();
});
});
// ── Toolbar — Stop All polite-cancel flow (task #377) ───────────────────────
describe("Toolbar — Stop All polite cancel before restart (#377)", () => {
// `api` resolves to the top-level vi.mock factory's mocked `post`.
// We type-cast so TS allows mockReset/mockResolvedValue/mockImplementation
// calls without leaking the mock surface into the production type.
const api = mockedApi as unknown as { post: ReturnType<typeof vi.fn> };
/**
* Build a working set of two active workspaces so the assertions can
* distinguish per-id behavior (drained vs undrained) within one test.
*/
const seedTwoActive = () => {
defaultStore.nodes = toStoreNodes(makeNodes(["online", "online"], [2, 2]));
};
/**
* Drive an async useCallback handler to completion. Vitest's fake
* timers don't see microtasks unless we yield between advances; the
* helper interleaves `vi.advanceTimersByTimeAsync` with macrotask
* yields so pending fetch resolutions and setTimeout callbacks both
* settle before the assertion runs.
*/
const advanceUntilSettled = async (ms: number) => {
await vi.advanceTimersByTimeAsync(ms);
// One extra tick lets any chained .then() after a setTimeout
// resolution fire before the test moves on.
await Promise.resolve();
};
beforeEach(() => {
vi.useFakeTimers();
api.post.mockReset();
});
afterEach(() => {
vi.useRealTimers();
});
it("phase 1: issues tasks/cancel via /workspaces/:id/a2a BEFORE any /restart", async () => {
seedTwoActive();
// Hold both tasks/cancel responses open so the click handler is
// observably paused at phase 1. We don't actually need to resolve
// them for the order assertion — just inspect the call log.
let resolveCancels!: () => void;
const cancelGate = new Promise<void>((r) => { resolveCancels = r; });
api.post.mockImplementation(async (path: string) => {
if (path.endsWith("/a2a")) {
await cancelGate;
}
return undefined;
});
render(<Toolbar />);
const btn = screen.getByRole("button", { name: /stop all running tasks/i });
fireEvent.click(btn);
// Yield once so the click handler enters phase 1 and dispatches the
// two /a2a POSTs.
await Promise.resolve();
await Promise.resolve();
const a2aCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/a2a"));
const restartCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/restart"));
expect(a2aCalls.length).toBe(2);
expect(restartCalls.length).toBe(0);
// Each /a2a POST carries the canonical tasks/cancel envelope.
for (const call of a2aCalls) {
expect(call[1]).toEqual({ method: "tasks/cancel", params: {} });
}
// Release the gate so the test cleanup doesn't dangle.
resolveCancels();
await advanceUntilSettled(10_000);
});
it("phase 2: when activeTasks drains to 0 during the poll window, /restart is NOT called", async () => {
seedTwoActive();
api.post.mockResolvedValue(undefined);
render(<Toolbar />);
fireEvent.click(screen.getByRole("button", { name: /stop all running tasks/i }));
// Let phase 1 fire (the two tasks/cancel calls).
await Promise.resolve();
await Promise.resolve();
// Simulate the platform pushing TASK_UPDATED with active_tasks=0
// on both workspaces — emulate by mutating the store directly,
// which is what canvas-events.ts does in production.
defaultStore.nodes = toStoreNodes(makeNodes(["online", "online"], [0, 0]));
// Advance past the first poll interval (250ms) so the loop sees
// the drained store and exits early.
await advanceUntilSettled(400);
// Drain any remaining timers so the handler returns cleanly.
await advanceUntilSettled(10_000);
const restartCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/restart"));
expect(restartCalls.length).toBe(0);
});
it("phase 3: when activeTasks does NOT drain inside the timeout, falls through to /restart for each stuck workspace", async () => {
seedTwoActive();
api.post.mockResolvedValue(undefined);
render(<Toolbar />);
fireEvent.click(screen.getByRole("button", { name: /stop all running tasks/i }));
// Phase 1 dispatch.
await Promise.resolve();
await Promise.resolve();
// Do NOT drain — activeTasks stays at 2 for both. Advance past the
// 8000ms drain timeout plus a buffer so phase 3's /restart POSTs fire.
await advanceUntilSettled(9_000);
await advanceUntilSettled(1_000);
const a2aCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/a2a"));
const restartCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/restart"));
expect(a2aCalls.length).toBe(2);
expect(restartCalls.length).toBe(2);
// Order check: every /a2a call comes before every /restart call.
const lastA2AIdx = Math.max(
...api.post.mock.calls.map((c, i) => (String(c[0]).endsWith("/a2a") ? i : -1))
);
const firstRestartIdx = Math.min(
...api.post.mock.calls.map((c, i) => (String(c[0]).endsWith("/restart") ? i : Infinity))
);
expect(lastA2AIdx).toBeLessThan(firstRestartIdx);
});
it("phase 3 selective: drains only one of two workspaces — /restart is called only for the stuck one", async () => {
seedTwoActive();
api.post.mockResolvedValue(undefined);
render(<Toolbar />);
fireEvent.click(screen.getByRole("button", { name: /stop all running tasks/i }));
await Promise.resolve();
await Promise.resolve();
// ws-0 drains immediately, ws-1 stays stuck for the full timeout.
defaultStore.nodes = toStoreNodes(makeNodes(["online", "online"], [0, 2]));
await advanceUntilSettled(9_500);
const restartCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/restart"));
expect(restartCalls.length).toBe(1);
expect(restartCalls[0][0]).toBe("/workspaces/ws-1/restart");
});
});