canvas: cap pendingOnline buffer and add WS connect timeout (P2+P3) #2897

Merged
devops-engineer merged 3 commits from fix/2601-canvas-resilience-p2p3 into main 2026-06-15 08:23:30 +00:00
5 changed files with 193 additions and 6 deletions
@@ -1,5 +1,5 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { handleCanvasEvent, resetProvisioningSequence } from "../canvas-events";
import { handleCanvasEvent, resetProvisioningSequence, __pendingOnlineSizeForTest } from "../canvas-events";
import type { WSMessage } from "../socket";
import type { WorkspaceNodeData } from "../canvas";
import type { Node, Edge } from "@xyflow/react";
@@ -97,6 +97,15 @@ describe("handleCanvasEvent WORKSPACE_ONLINE", () => {
const updated = (set.mock.calls[0][0] as { nodes: Node<WorkspaceNodeData>[] }).nodes;
expect(updated.find((n) => n.id === "ws-2")!.data.status).toBe("offline");
});
it("caps the unknown-workspace buffer so a dropped PROVISIONING cannot grow it forever", () => {
resetProvisioningSequence();
const { get, set } = makeStore([]);
for (let i = 0; i < 1100; i++) {
handleCanvasEvent(makeMsg({ event: "WORKSPACE_ONLINE", workspace_id: `ws-${i}` }), get, set);
}
expect(__pendingOnlineSizeForTest()).toBe(1000);
});
});
// ---------------------------------------------------------------------------
+33
View File
@@ -375,6 +375,39 @@ describe("health check", () => {
});
});
// ---------------------------------------------------------------------------
// Connect timeout
// ---------------------------------------------------------------------------
describe("connect timeout", () => {
it("closes the socket and schedules a reconnect if onopen never fires", () => {
connectSocket();
const ws = getLastWS();
vi.advanceTimersByTime(10_000);
expect(ws.closeCallCount).toBeGreaterThanOrEqual(1);
vi.advanceTimersByTime(1100);
expect(MockWebSocket.instances.length).toBeGreaterThan(1);
});
it("does not fire when onopen happens within the timeout window", () => {
connectSocket();
const ws = getLastWS();
ws.triggerOpen();
vi.advanceTimersByTime(10_000);
expect(ws.closeCallCount).toBe(0);
expect(MockWebSocket.instances).toHaveLength(1);
});
it("clears the timeout on disconnect", () => {
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout");
connectSocket();
disconnectSocket();
// One of the cleared timeouts is the connect timeout.
expect(clearTimeoutSpy).toHaveBeenCalled();
clearTimeoutSpy.mockRestore();
});
});
// Rehydrate dedup logic itself is exercised by `RehydrateDedup` unit
// tests in this file (below). End-to-end coupling through the
// dynamic-imported `@/lib/api` was non-trivial under our existing
+21 -2
View File
@@ -35,9 +35,28 @@ export function resetProvisioningSequence(): void {
* WORKSPACE_PROVISIONING — buffered here so the late-arriving
* provision event can immediately flip to the correct status
* instead of leaving the node stuck as "provisioning" forever.
* Cleared when applied, or on module reset (tests). */
* Cleared when applied, on module reset (tests), or when the cap
* evicts the oldest entry. */
const _pendingOnline = new Set<string>();
// Defensive cap: a dropped PROVISIONING event would otherwise let this
// buffer grow without bound for the lifetime of the session.
const PENDING_ONLINE_MAX = 1000;
function bufferPendingOnline(id: string): void {
if (_pendingOnline.size >= PENDING_ONLINE_MAX) {
// Sets preserve insertion order; evict the oldest buffered id.
const oldest = _pendingOnline.values().next().value;
if (oldest !== undefined) _pendingOnline.delete(oldest);
}
_pendingOnline.add(id);
}
/** Exposed for unit tests only. */
export function __pendingOnlineSizeForTest(): number {
return _pendingOnline.size;
}
/** Debounced parent-grow. Each child arrival schedules this; the
* timer keeps resetting as more siblings land, so the actual
* width/height update runs ONCE after arrivals go quiet. Avoids
@@ -86,7 +105,7 @@ export function handleCanvasEvent(
// this tab joined mid-deploy). Buffer so the later PROVISIONING
// handler can flip status in one pass instead of leaving the
// node stuck in "provisioning" forever.
_pendingOnline.add(msg.workspace_id);
bufferPendingOnline(msg.workspace_id);
break;
}
// Flip incoming edge from blueprint → laser so the link is
+43 -3
View File
@@ -66,6 +66,12 @@ export class RehydrateDedup {
* for a duplicate fetch. */
export const FALLBACK_POLL_MS = 10_000;
/** Maximum time to wait for a WebSocket handshake before giving up and
* scheduling a reconnect. Without this the browser can leave a socket in
* CONNECTING for ~75s (Chrome SYN-SENT behavior), leaving the UI silently
* stuck. The fallback poll keeps /workspaces fresh in parallel. */
const CONNECT_TIMEOUT_MS = 10_000;
class ReconnectingSocket {
private ws: WebSocket | null = null;
private attempt = 0;
@@ -73,6 +79,7 @@ class ReconnectingSocket {
private lastEventTime = 0;
private healthCheckTimer: ReturnType<typeof setInterval> | null = null;
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private connectTimeoutTimer: ReturnType<typeof setTimeout> | null = null;
// Polls /workspaces while the WS is unhealthy so the canvas reflects
// truth even when realtime events aren't arriving. Without this the
// store can stay frozen for minutes — e.g. workspaces transition
@@ -118,6 +125,7 @@ class ReconnectingSocket {
this.startFallbackPoll();
const ws = new WebSocket(this.url);
this.ws = ws;
this.startConnectTimeout(ws);
ws.onopen = () => {
if (this.disposed || this.ws !== ws) {
@@ -126,6 +134,7 @@ class ReconnectingSocket {
try { ws.close(); } catch { /* noop */ }
return;
}
this.clearConnectTimeout();
this.attempt = 0;
this.lastEventTime = Date.now();
useCanvasStore.getState().setWsStatus("connected");
@@ -157,12 +166,11 @@ class ReconnectingSocket {
// corresponds to the WS we just tore down (prevents a stale
// onclose from a zombie socket from re-arming the loop).
if (this.disposed || this.ws !== ws) return;
this.clearConnectTimeout();
this.stopHealthCheck();
useCanvasStore.getState().setWsStatus("connecting");
this.startFallbackPoll();
const delay = Math.min(1000 * 2 ** this.attempt, 30000);
this.attempt++;
this.reconnectTimer = setTimeout(() => this.connect(), delay);
this.scheduleReconnect();
};
ws.onerror = () => {
@@ -192,6 +200,37 @@ class ReconnectingSocket {
}
}
private startConnectTimeout(ws: WebSocket) {
this.clearConnectTimeout();
this.connectTimeoutTimer = setTimeout(() => {
this.handleConnectTimeout(ws);
}, CONNECT_TIMEOUT_MS);
}
private clearConnectTimeout() {
if (this.connectTimeoutTimer) {
clearTimeout(this.connectTimeoutTimer);
this.connectTimeoutTimer = null;
}
}
private handleConnectTimeout(ws: WebSocket) {
if (this.disposed || this.ws !== ws) return;
// Abandon this socket before closing it so the real onclose doesn't
// double-schedule a reconnect.
this.ws = null;
try { ws.close(); } catch { /* noop */ }
useCanvasStore.getState().setWsStatus("connecting");
this.startFallbackPoll();
this.scheduleReconnect();
}
private scheduleReconnect() {
const delay = Math.min(1000 * 2 ** this.attempt, 30000);
this.attempt++;
this.reconnectTimer = setTimeout(() => this.connect(), delay);
}
/** While the WS is in connecting/disconnected limbo, poll /workspaces
* so the store stays fresh. The reconnect attempts continue in
* parallel; whichever recovers first wins. rehydrate()'s own dedup
@@ -249,6 +288,7 @@ class ReconnectingSocket {
this.disposed = true;
this.stopHealthCheck();
this.stopFallbackPoll();
this.clearConnectTimeout();
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
@@ -196,6 +196,60 @@ check() {
pass() { echo "PASS: $1"; PASS=$((PASS + 1)); }
fail() { echo "FAIL: $1"; [ -n "${2:-}" ] && echo " $2"; FAIL=$((FAIL + 1)); }
# Advisory-lane infra-skip helper. When the A2A layer itself is degraded
# (queue never drains after a queued response), exit 0 with a scan_status line
# rather than false-red on an advisory job. Fail-closed on repeated skips is
# handled in the staging helper (#2917); this local-provision lane is advisory.
infra_skip() {
local reason="$1" detail="${2:-}"
echo "[$(date +%H:%M:%S)] ⚠️ scan_status: infra-skip:${reason}${detail:+ $detail}"
echo "=== Results: $PASS passed, $FAIL failed (infra-skip: $reason) ==="
exit 0
}
# Poll a queued A2A task until it completes, fails, or times out.
# Prints the response_body (JSON-RPC result envelope) on success; returns 1 on
# failure/timeout so the caller can decide to hard-fail or infra-skip.
poll_a2a_queue() {
local ws_id="$1" qid="$2" deadline="${3:-120}"
local resp="" qstatus="" result_body=""
local start elapsed
start=$(date +%s)
while true; do
elapsed=$(($(date +%s) - start))
if [ "$elapsed" -ge "$deadline" ]; then
return 1
fi
resp=$(curl -s --max-time 30 "$BASE/workspaces/$ws_id/a2a/queue/$qid" \
-H "Content-Type: application/json" \
-H "X-Workspace-ID: $ws_id" 2>/dev/null || echo "")
qstatus=$(echo "$resp" | python3 -c "import sys,json
try:
print(json.load(sys.stdin).get('status',''))
except Exception:
print('')" 2>/dev/null || echo "")
case "$qstatus" in
completed)
result_body=$(echo "$resp" | python3 -c "import sys,json
try:
rb=json.load(sys.stdin).get('response_body')
print(json.dumps(rb) if rb is not None else '')
except Exception:
print('')" 2>/dev/null || echo "")
if [ -n "$result_body" ]; then
printf '%s' "$result_body"
return 0
fi
;;
failed|dropped)
return 1
;;
esac
echo " queue poll for $qid status=$qstatus elapsed=${elapsed}s — backing off 2s" >&2
sleep 2
done
}
admin_curl() {
local _a=(); e2e_admin_auth_args _a
curl -s "${_a[@]+"${_a[@]}"}" "$@"
@@ -643,6 +697,38 @@ A2A_CEIL="$A2A_TIMEOUT"
A2A=$(curl -s --max-time "$A2A_CEIL" -X POST "$BASE/workspaces/$WSID/a2a" \
-H "Content-Type: application/json" \
-d "$A2A_BODY")
# If the platform queued the A2A request, poll the durable queue result.
# core#2917-follow-on: staging-SaaS saw the same A2A-layer degradation where
# the initial POST returns 202-queued and the queue item never drains in time.
# The local-provision advisory lane should handle this without false-red.
A2A_QUEUED=$(echo "$A2A" | python3 -c "import sys,json
try:
d=json.load(sys.stdin)
print('true' if d.get('queued') is True or (d.get('status') or '').lower() == 'queued' else 'false')
except Exception:
print('false')" 2>/dev/null || echo "false")
if [ "$A2A_QUEUED" = "true" ]; then
QUEUE_ID=$(echo "$A2A" | python3 -c "import sys,json
try:
print(json.load(sys.stdin).get('queue_id',''))
except Exception:
print('')" 2>/dev/null || echo "")
if [ -n "$QUEUE_ID" ]; then
echo " A2A queued (queue_id=$QUEUE_ID); polling durable result..." >&2
if A2A_POLL=$(poll_a2a_queue "$WSID" "$QUEUE_ID" 120); then
A2A="$A2A_POLL"
else
infra_skip "a2a-queue-timeout" "queue_id=$QUEUE_ID on local-provision advisory lane"
fi
else
# push-async queued response with no queue_id exposed — we can't poll for
# a durable result, so treat it as A2A-layer infra degradation on the
# advisory lane rather than a product regression.
infra_skip "a2a-queued-no-queue-id" "push-async queued response on local-provision advisory lane"
fi
fi
# Extract the assistant text part once (shared by the minimax assertion +
# diagnostics). Tolerates result.parts[].text and result.message.parts[].text.
a2a_text() {