fix(canvas): polite tasks/cancel before /workspaces/:id/restart for Stop All (task #377 companion) #1619
@@ -68,14 +68,103 @@ export function Toolbar() {
|
||||
return c;
|
||||
}, [nodes]);
|
||||
|
||||
/**
|
||||
* Stop All - task #377 fix.
|
||||
*
|
||||
* BEFORE this PR: directly POSTed `/workspaces/:id/restart`, which tears
|
||||
* the container down and back up. That kills in-flight tool subprocesses
|
||||
* (e.g. `bash -c 'sleep 600'`) but is heavy and discards any in-progress
|
||||
* agent state. It also bypasses the runtime-side fast cancel path (task
|
||||
* #377 PR#40 in template-claude-code) - meaning flipping
|
||||
* `MOLECULE_STOP_PROPAGATE=true` would produce zero canary signal because
|
||||
* nothing ever invokes `executor.cancel()` in production.
|
||||
*
|
||||
* AFTER this PR (two-phase polite cancel):
|
||||
*
|
||||
* 1. POST `tasks/cancel` (A2A JSON-RPC) to each active workspace's
|
||||
* `/workspaces/:id/a2a` proxy. The platform proxies the envelope to
|
||||
* the workspace runtime; the a2a-sdk framework dispatches `tasks/cancel`
|
||||
* to `AgentExecutor.cancel()` (a2a-sdk 1.0.3
|
||||
* `a2a/compat/v0_3/types.py` line 1125 pins the wire literal as
|
||||
* `Literal["tasks/cancel"]`; A2A protocol spec section 9.4.5 maps the
|
||||
* abstract `CancelTask` operation to that wire string). The runtime's
|
||||
* executor cancel path signals the CLI subprocess group with
|
||||
* SIGTERM/grace/SIGKILL (template-claude-code PR#40 `stop_propagate.py`).
|
||||
*
|
||||
* 2. Poll the canvas store (the platform pushes `TASK_UPDATED` over WS
|
||||
* on `active_tasks` changes - `canvas-events.ts` line 400) for up to
|
||||
* `STOP_ALL_DRAIN_TIMEOUT_MS`. A workspace whose `activeTasks` drops
|
||||
* to 0 is considered drained and is NOT restarted.
|
||||
*
|
||||
* 3. For any workspace that DID NOT drain inside the timeout - runtime
|
||||
* is on an old image without the cancel path, or the cancel
|
||||
* propagation is stuck - fall back to the original heavy
|
||||
* `/workspaces/:id/restart`. The original behavior is preserved as a
|
||||
* floor so a stuck workspace still gets stopped; the polite path is
|
||||
* a fast top-up that lets well-behaved workspaces cancel without
|
||||
* losing context.
|
||||
*
|
||||
* The polite-cancel envelope mirrors `ScheduleTab.handleRunNow` (line 168)
|
||||
* which is the only other place in canvas that POSTs `/workspaces/:id/a2a`
|
||||
* directly. Method string `tasks/cancel` and empty `params` match the
|
||||
* a2a-sdk shape verified above. The proxy adds `jsonrpc:"2.0"` and `id`
|
||||
* via `normalizeA2APayload` server-side, so the canvas envelope omits them.
|
||||
*/
|
||||
const stopAll = useCallback(async () => {
|
||||
setStopping(true);
|
||||
const active = nodes.filter((n) => (n.data.activeTasks as number) > 0);
|
||||
const activeIds = active.map((n) => n.id);
|
||||
|
||||
// Phase 1 - polite cancel on every active workspace in parallel.
|
||||
// Errors are swallowed (same shape as the pre-fix /restart
|
||||
// Promise.all): a 4xx/5xx on tasks/cancel just means we fall through
|
||||
// to /restart for that workspace below.
|
||||
await Promise.all(
|
||||
active.map((n) =>
|
||||
api.post(`/workspaces/${n.id}/restart`).catch(() => {})
|
||||
activeIds.map((id) =>
|
||||
api
|
||||
.post(`/workspaces/${id}/a2a`, {
|
||||
method: "tasks/cancel",
|
||||
params: {},
|
||||
})
|
||||
.catch(() => {})
|
||||
)
|
||||
);
|
||||
|
||||
// Phase 2 - poll the store for activeTasks reaching 0, with a hard
|
||||
// timeout. STOP_ALL_DRAIN_TIMEOUT_MS is sized to cover the runtime's
|
||||
// own SIGTERM-grace (5s in template-claude-code stop_propagate.py
|
||||
// `_SIGTERM_GRACE_S`) plus a small WS round-trip buffer for the
|
||||
// TASK_UPDATED push. STOP_ALL_POLL_INTERVAL_MS keeps the poll cheap
|
||||
// (no animation jitter, no busy-wait).
|
||||
const STOP_ALL_DRAIN_TIMEOUT_MS = 8000;
|
||||
const STOP_ALL_POLL_INTERVAL_MS = 250;
|
||||
const deadline = Date.now() + STOP_ALL_DRAIN_TIMEOUT_MS;
|
||||
let undrained = new Set(activeIds);
|
||||
while (undrained.size > 0 && Date.now() < deadline) {
|
||||
await new Promise((r) => setTimeout(r, STOP_ALL_POLL_INTERVAL_MS));
|
||||
const fresh = useCanvasStore.getState().nodes;
|
||||
const stillActive = new Set<string>();
|
||||
for (const id of undrained) {
|
||||
const n = fresh.find((x) => x.id === id);
|
||||
// Missing node (workspace deleted mid-cancel) is treated as
|
||||
// drained - there's nothing left to restart and reporting it as
|
||||
// "still running" would be a lie.
|
||||
if (n && (n.data.activeTasks as number) > 0) stillActive.add(id);
|
||||
}
|
||||
undrained = stillActive;
|
||||
}
|
||||
|
||||
// Phase 3 - hard-restart anything that did not drain. This is the
|
||||
// same call shape as the pre-fix Stop All, so behavior is strictly a
|
||||
// superset: undrained workspaces still get the heavy stop, drained
|
||||
// ones are spared.
|
||||
if (undrained.size > 0) {
|
||||
await Promise.all(
|
||||
Array.from(undrained).map((id) =>
|
||||
api.post(`/workspaces/${id}/restart`).catch(() => {})
|
||||
)
|
||||
);
|
||||
}
|
||||
setStopping(false);
|
||||
}, [nodes]);
|
||||
|
||||
|
||||
@@ -131,14 +131,30 @@ const defaultStore = {
|
||||
batchDelete: vi.fn(() => Promise.resolve()),
|
||||
};
|
||||
|
||||
vi.mock("@/store/canvas", () => ({
|
||||
useCanvasStore: vi.fn((selector: (s: typeof defaultStore) => unknown) =>
|
||||
vi.mock("@/store/canvas", () => {
|
||||
// useCanvasStore is used in two shapes:
|
||||
// 1. As a hook: `useCanvasStore((s) => s.x)` — selector path.
|
||||
// 2. As a static accessor: `useCanvasStore.getState().nodes` —
|
||||
// used by stopAll's drain-poll loop (task #377 Toolbar fix) and
|
||||
// restartAll's success-clear loop. Both read the LIVE
|
||||
// defaultStore object so tests that mutate `defaultStore.nodes`
|
||||
// mid-flight (e.g. simulating a TASK_UPDATED that drops
|
||||
// activeTasks to 0) see the update on the next poll tick.
|
||||
const hook = vi.fn((selector: (s: typeof defaultStore) => unknown) =>
|
||||
selector(defaultStore)
|
||||
),
|
||||
}));
|
||||
) as unknown as ((selector: (s: typeof defaultStore) => unknown) => unknown) & {
|
||||
getState: () => typeof defaultStore;
|
||||
};
|
||||
hook.getState = () => defaultStore;
|
||||
return { useCanvasStore: hook };
|
||||
});
|
||||
|
||||
// ── Component under test ───────────────────────────────────────────────────────
|
||||
import { Toolbar } from "../Toolbar";
|
||||
// Imported AFTER vi.mock("@/lib/api", ...) above (hoisted) so this
|
||||
// resolves to the mock module; gives the new task #377 tests a typed
|
||||
// handle on api.post without a CJS require() (Vitest runs ESM).
|
||||
import { api as mockedApi } from "@/lib/api";
|
||||
|
||||
// ── Tests ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -315,3 +331,157 @@ describe("Toolbar — ? shortcut opens shortcuts dialog", () => {
|
||||
expect(screen.queryByTestId("shortcuts-dialog")).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
// ── Toolbar — Stop All polite-cancel flow (task #377) ───────────────────────
|
||||
|
||||
describe("Toolbar — Stop All polite cancel before restart (#377)", () => {
|
||||
// `api` resolves to the top-level vi.mock factory's mocked `post`.
|
||||
// We type-cast so TS allows mockReset/mockResolvedValue/mockImplementation
|
||||
// calls without leaking the mock surface into the production type.
|
||||
const api = mockedApi as unknown as { post: ReturnType<typeof vi.fn> };
|
||||
|
||||
/**
|
||||
* Build a working set of two active workspaces so the assertions can
|
||||
* distinguish per-id behavior (drained vs undrained) within one test.
|
||||
*/
|
||||
const seedTwoActive = () => {
|
||||
defaultStore.nodes = toStoreNodes(makeNodes(["online", "online"], [2, 2]));
|
||||
};
|
||||
|
||||
/**
|
||||
* Drive an async useCallback handler to completion. Vitest's fake
|
||||
* timers don't see microtasks unless we yield between advances; the
|
||||
* helper interleaves `vi.advanceTimersByTimeAsync` with macrotask
|
||||
* yields so pending fetch resolutions and setTimeout callbacks both
|
||||
* settle before the assertion runs.
|
||||
*/
|
||||
const advanceUntilSettled = async (ms: number) => {
|
||||
await vi.advanceTimersByTimeAsync(ms);
|
||||
// One extra tick lets any chained .then() after a setTimeout
|
||||
// resolution fire before the test moves on.
|
||||
await Promise.resolve();
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
api.post.mockReset();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("phase 1: issues tasks/cancel via /workspaces/:id/a2a BEFORE any /restart", async () => {
|
||||
seedTwoActive();
|
||||
// Hold both tasks/cancel responses open so the click handler is
|
||||
// observably paused at phase 1. We don't actually need to resolve
|
||||
// them for the order assertion — just inspect the call log.
|
||||
let resolveCancels!: () => void;
|
||||
const cancelGate = new Promise<void>((r) => { resolveCancels = r; });
|
||||
api.post.mockImplementation(async (path: string) => {
|
||||
if (path.endsWith("/a2a")) {
|
||||
await cancelGate;
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
|
||||
render(<Toolbar />);
|
||||
const btn = screen.getByRole("button", { name: /stop all running tasks/i });
|
||||
fireEvent.click(btn);
|
||||
|
||||
// Yield once so the click handler enters phase 1 and dispatches the
|
||||
// two /a2a POSTs.
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
|
||||
const a2aCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/a2a"));
|
||||
const restartCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/restart"));
|
||||
expect(a2aCalls.length).toBe(2);
|
||||
expect(restartCalls.length).toBe(0);
|
||||
|
||||
// Each /a2a POST carries the canonical tasks/cancel envelope.
|
||||
for (const call of a2aCalls) {
|
||||
expect(call[1]).toEqual({ method: "tasks/cancel", params: {} });
|
||||
}
|
||||
|
||||
// Release the gate so the test cleanup doesn't dangle.
|
||||
resolveCancels();
|
||||
await advanceUntilSettled(10_000);
|
||||
});
|
||||
|
||||
it("phase 2: when activeTasks drains to 0 during the poll window, /restart is NOT called", async () => {
|
||||
seedTwoActive();
|
||||
api.post.mockResolvedValue(undefined);
|
||||
|
||||
render(<Toolbar />);
|
||||
fireEvent.click(screen.getByRole("button", { name: /stop all running tasks/i }));
|
||||
|
||||
// Let phase 1 fire (the two tasks/cancel calls).
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
|
||||
// Simulate the platform pushing TASK_UPDATED with active_tasks=0
|
||||
// on both workspaces — emulate by mutating the store directly,
|
||||
// which is what canvas-events.ts does in production.
|
||||
defaultStore.nodes = toStoreNodes(makeNodes(["online", "online"], [0, 0]));
|
||||
|
||||
// Advance past the first poll interval (250ms) so the loop sees
|
||||
// the drained store and exits early.
|
||||
await advanceUntilSettled(400);
|
||||
// Drain any remaining timers so the handler returns cleanly.
|
||||
await advanceUntilSettled(10_000);
|
||||
|
||||
const restartCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/restart"));
|
||||
expect(restartCalls.length).toBe(0);
|
||||
});
|
||||
|
||||
it("phase 3: when activeTasks does NOT drain inside the timeout, falls through to /restart for each stuck workspace", async () => {
|
||||
seedTwoActive();
|
||||
api.post.mockResolvedValue(undefined);
|
||||
|
||||
render(<Toolbar />);
|
||||
fireEvent.click(screen.getByRole("button", { name: /stop all running tasks/i }));
|
||||
|
||||
// Phase 1 dispatch.
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
|
||||
// Do NOT drain — activeTasks stays at 2 for both. Advance past the
|
||||
// 8000ms drain timeout plus a buffer so phase 3's /restart POSTs fire.
|
||||
await advanceUntilSettled(9_000);
|
||||
await advanceUntilSettled(1_000);
|
||||
|
||||
const a2aCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/a2a"));
|
||||
const restartCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/restart"));
|
||||
expect(a2aCalls.length).toBe(2);
|
||||
expect(restartCalls.length).toBe(2);
|
||||
|
||||
// Order check: every /a2a call comes before every /restart call.
|
||||
const lastA2AIdx = Math.max(
|
||||
...api.post.mock.calls.map((c, i) => (String(c[0]).endsWith("/a2a") ? i : -1))
|
||||
);
|
||||
const firstRestartIdx = Math.min(
|
||||
...api.post.mock.calls.map((c, i) => (String(c[0]).endsWith("/restart") ? i : Infinity))
|
||||
);
|
||||
expect(lastA2AIdx).toBeLessThan(firstRestartIdx);
|
||||
});
|
||||
|
||||
it("phase 3 selective: drains only one of two workspaces — /restart is called only for the stuck one", async () => {
|
||||
seedTwoActive();
|
||||
api.post.mockResolvedValue(undefined);
|
||||
|
||||
render(<Toolbar />);
|
||||
fireEvent.click(screen.getByRole("button", { name: /stop all running tasks/i }));
|
||||
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
|
||||
// ws-0 drains immediately, ws-1 stays stuck for the full timeout.
|
||||
defaultStore.nodes = toStoreNodes(makeNodes(["online", "online"], [0, 2]));
|
||||
await advanceUntilSettled(9_500);
|
||||
|
||||
const restartCalls = api.post.mock.calls.filter((c) => String(c[0]).endsWith("/restart"));
|
||||
expect(restartCalls.length).toBe(1);
|
||||
expect(restartCalls[0][0]).toBe("/workspaces/ws-1/restart");
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user