canvas: cap pendingOnline buffer and add WS connect timeout (P2+P3) #2897
@@ -1,5 +1,5 @@
|
||||
import { describe, it, expect, beforeEach, vi } from "vitest";
|
||||
import { handleCanvasEvent, resetProvisioningSequence } from "../canvas-events";
|
||||
import { handleCanvasEvent, resetProvisioningSequence, __pendingOnlineSizeForTest } from "../canvas-events";
|
||||
import type { WSMessage } from "../socket";
|
||||
import type { WorkspaceNodeData } from "../canvas";
|
||||
import type { Node, Edge } from "@xyflow/react";
|
||||
@@ -97,6 +97,15 @@ describe("handleCanvasEvent – WORKSPACE_ONLINE", () => {
|
||||
const updated = (set.mock.calls[0][0] as { nodes: Node<WorkspaceNodeData>[] }).nodes;
|
||||
expect(updated.find((n) => n.id === "ws-2")!.data.status).toBe("offline");
|
||||
});
|
||||
|
||||
it("caps the unknown-workspace buffer so a dropped PROVISIONING cannot grow it forever", () => {
|
||||
resetProvisioningSequence();
|
||||
const { get, set } = makeStore([]);
|
||||
for (let i = 0; i < 1100; i++) {
|
||||
handleCanvasEvent(makeMsg({ event: "WORKSPACE_ONLINE", workspace_id: `ws-${i}` }), get, set);
|
||||
}
|
||||
expect(__pendingOnlineSizeForTest()).toBe(1000);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -375,6 +375,39 @@ describe("health check", () => {
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Connect timeout
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("connect timeout", () => {
|
||||
it("closes the socket and schedules a reconnect if onopen never fires", () => {
|
||||
connectSocket();
|
||||
const ws = getLastWS();
|
||||
vi.advanceTimersByTime(10_000);
|
||||
expect(ws.closeCallCount).toBeGreaterThanOrEqual(1);
|
||||
vi.advanceTimersByTime(1100);
|
||||
expect(MockWebSocket.instances.length).toBeGreaterThan(1);
|
||||
});
|
||||
|
||||
it("does not fire when onopen happens within the timeout window", () => {
|
||||
connectSocket();
|
||||
const ws = getLastWS();
|
||||
ws.triggerOpen();
|
||||
vi.advanceTimersByTime(10_000);
|
||||
expect(ws.closeCallCount).toBe(0);
|
||||
expect(MockWebSocket.instances).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("clears the timeout on disconnect", () => {
|
||||
const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout");
|
||||
connectSocket();
|
||||
disconnectSocket();
|
||||
// One of the cleared timeouts is the connect timeout.
|
||||
expect(clearTimeoutSpy).toHaveBeenCalled();
|
||||
clearTimeoutSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
|
||||
// Rehydrate dedup logic itself is exercised by `RehydrateDedup` unit
|
||||
// tests in this file (below). End-to-end coupling through the
|
||||
// dynamic-imported `@/lib/api` was non-trivial under our existing
|
||||
|
||||
@@ -35,9 +35,28 @@ export function resetProvisioningSequence(): void {
|
||||
* WORKSPACE_PROVISIONING — buffered here so the late-arriving
|
||||
* provision event can immediately flip to the correct status
|
||||
* instead of leaving the node stuck as "provisioning" forever.
|
||||
* Cleared when applied, or on module reset (tests). */
|
||||
* Cleared when applied, on module reset (tests), or when the cap
|
||||
* evicts the oldest entry. */
|
||||
const _pendingOnline = new Set<string>();
|
||||
|
||||
// Defensive cap: a dropped PROVISIONING event would otherwise let this
|
||||
// buffer grow without bound for the lifetime of the session.
|
||||
const PENDING_ONLINE_MAX = 1000;
|
||||
|
||||
function bufferPendingOnline(id: string): void {
|
||||
if (_pendingOnline.size >= PENDING_ONLINE_MAX) {
|
||||
// Sets preserve insertion order; evict the oldest buffered id.
|
||||
const oldest = _pendingOnline.values().next().value;
|
||||
if (oldest !== undefined) _pendingOnline.delete(oldest);
|
||||
}
|
||||
_pendingOnline.add(id);
|
||||
}
|
||||
|
||||
/** Exposed for unit tests only. */
|
||||
export function __pendingOnlineSizeForTest(): number {
|
||||
return _pendingOnline.size;
|
||||
}
|
||||
|
||||
/** Debounced parent-grow. Each child arrival schedules this; the
|
||||
* timer keeps resetting as more siblings land, so the actual
|
||||
* width/height update runs ONCE after arrivals go quiet. Avoids
|
||||
@@ -86,7 +105,7 @@ export function handleCanvasEvent(
|
||||
// this tab joined mid-deploy). Buffer so the later PROVISIONING
|
||||
// handler can flip status in one pass instead of leaving the
|
||||
// node stuck in "provisioning" forever.
|
||||
_pendingOnline.add(msg.workspace_id);
|
||||
bufferPendingOnline(msg.workspace_id);
|
||||
break;
|
||||
}
|
||||
// Flip incoming edge from blueprint → laser so the link is
|
||||
|
||||
@@ -66,6 +66,12 @@ export class RehydrateDedup {
|
||||
* for a duplicate fetch. */
|
||||
export const FALLBACK_POLL_MS = 10_000;
|
||||
|
||||
/** Maximum time to wait for a WebSocket handshake before giving up and
|
||||
* scheduling a reconnect. Without this the browser can leave a socket in
|
||||
* CONNECTING for ~75s (Chrome SYN-SENT behavior), leaving the UI silently
|
||||
* stuck. The fallback poll keeps /workspaces fresh in parallel. */
|
||||
const CONNECT_TIMEOUT_MS = 10_000;
|
||||
|
||||
class ReconnectingSocket {
|
||||
private ws: WebSocket | null = null;
|
||||
private attempt = 0;
|
||||
@@ -73,6 +79,7 @@ class ReconnectingSocket {
|
||||
private lastEventTime = 0;
|
||||
private healthCheckTimer: ReturnType<typeof setInterval> | null = null;
|
||||
private reconnectTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
private connectTimeoutTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
// Polls /workspaces while the WS is unhealthy so the canvas reflects
|
||||
// truth even when realtime events aren't arriving. Without this the
|
||||
// store can stay frozen for minutes — e.g. workspaces transition
|
||||
@@ -118,6 +125,7 @@ class ReconnectingSocket {
|
||||
this.startFallbackPoll();
|
||||
const ws = new WebSocket(this.url);
|
||||
this.ws = ws;
|
||||
this.startConnectTimeout(ws);
|
||||
|
||||
ws.onopen = () => {
|
||||
if (this.disposed || this.ws !== ws) {
|
||||
@@ -126,6 +134,7 @@ class ReconnectingSocket {
|
||||
try { ws.close(); } catch { /* noop */ }
|
||||
return;
|
||||
}
|
||||
this.clearConnectTimeout();
|
||||
this.attempt = 0;
|
||||
this.lastEventTime = Date.now();
|
||||
useCanvasStore.getState().setWsStatus("connected");
|
||||
@@ -157,12 +166,11 @@ class ReconnectingSocket {
|
||||
// corresponds to the WS we just tore down (prevents a stale
|
||||
// onclose from a zombie socket from re-arming the loop).
|
||||
if (this.disposed || this.ws !== ws) return;
|
||||
this.clearConnectTimeout();
|
||||
this.stopHealthCheck();
|
||||
useCanvasStore.getState().setWsStatus("connecting");
|
||||
this.startFallbackPoll();
|
||||
const delay = Math.min(1000 * 2 ** this.attempt, 30000);
|
||||
this.attempt++;
|
||||
this.reconnectTimer = setTimeout(() => this.connect(), delay);
|
||||
this.scheduleReconnect();
|
||||
};
|
||||
|
||||
ws.onerror = () => {
|
||||
@@ -192,6 +200,37 @@ class ReconnectingSocket {
|
||||
}
|
||||
}
|
||||
|
||||
private startConnectTimeout(ws: WebSocket) {
|
||||
this.clearConnectTimeout();
|
||||
this.connectTimeoutTimer = setTimeout(() => {
|
||||
this.handleConnectTimeout(ws);
|
||||
}, CONNECT_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
private clearConnectTimeout() {
|
||||
if (this.connectTimeoutTimer) {
|
||||
clearTimeout(this.connectTimeoutTimer);
|
||||
this.connectTimeoutTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
private handleConnectTimeout(ws: WebSocket) {
|
||||
if (this.disposed || this.ws !== ws) return;
|
||||
// Abandon this socket before closing it so the real onclose doesn't
|
||||
// double-schedule a reconnect.
|
||||
this.ws = null;
|
||||
try { ws.close(); } catch { /* noop */ }
|
||||
useCanvasStore.getState().setWsStatus("connecting");
|
||||
this.startFallbackPoll();
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
|
||||
private scheduleReconnect() {
|
||||
const delay = Math.min(1000 * 2 ** this.attempt, 30000);
|
||||
this.attempt++;
|
||||
this.reconnectTimer = setTimeout(() => this.connect(), delay);
|
||||
}
|
||||
|
||||
/** While the WS is in connecting/disconnected limbo, poll /workspaces
|
||||
* so the store stays fresh. The reconnect attempts continue in
|
||||
* parallel; whichever recovers first wins. rehydrate()'s own dedup
|
||||
@@ -249,6 +288,7 @@ class ReconnectingSocket {
|
||||
this.disposed = true;
|
||||
this.stopHealthCheck();
|
||||
this.stopFallbackPoll();
|
||||
this.clearConnectTimeout();
|
||||
if (this.reconnectTimer) {
|
||||
clearTimeout(this.reconnectTimer);
|
||||
this.reconnectTimer = null;
|
||||
|
||||
@@ -196,6 +196,60 @@ check() {
|
||||
pass() { echo "PASS: $1"; PASS=$((PASS + 1)); }
|
||||
fail() { echo "FAIL: $1"; [ -n "${2:-}" ] && echo " $2"; FAIL=$((FAIL + 1)); }
|
||||
|
||||
# Advisory-lane infra-skip helper. When the A2A layer itself is degraded
|
||||
# (queue never drains after a queued response), exit 0 with a scan_status line
|
||||
# rather than false-red on an advisory job. Fail-closed on repeated skips is
|
||||
# handled in the staging helper (#2917); this local-provision lane is advisory.
|
||||
infra_skip() {
|
||||
local reason="$1" detail="${2:-}"
|
||||
echo "[$(date +%H:%M:%S)] ⚠️ scan_status: infra-skip:${reason}${detail:+ $detail}"
|
||||
echo "=== Results: $PASS passed, $FAIL failed (infra-skip: $reason) ==="
|
||||
exit 0
|
||||
}
|
||||
|
||||
# Poll a queued A2A task until it completes, fails, or times out.
|
||||
# Prints the response_body (JSON-RPC result envelope) on success; returns 1 on
|
||||
# failure/timeout so the caller can decide to hard-fail or infra-skip.
|
||||
poll_a2a_queue() {
|
||||
local ws_id="$1" qid="$2" deadline="${3:-120}"
|
||||
local resp="" qstatus="" result_body=""
|
||||
local start elapsed
|
||||
start=$(date +%s)
|
||||
while true; do
|
||||
elapsed=$(($(date +%s) - start))
|
||||
if [ "$elapsed" -ge "$deadline" ]; then
|
||||
return 1
|
||||
fi
|
||||
resp=$(curl -s --max-time 30 "$BASE/workspaces/$ws_id/a2a/queue/$qid" \
|
||||
-H "Content-Type: application/json" \
|
||||
-H "X-Workspace-ID: $ws_id" 2>/dev/null || echo "")
|
||||
qstatus=$(echo "$resp" | python3 -c "import sys,json
|
||||
try:
|
||||
print(json.load(sys.stdin).get('status',''))
|
||||
except Exception:
|
||||
print('')" 2>/dev/null || echo "")
|
||||
case "$qstatus" in
|
||||
completed)
|
||||
result_body=$(echo "$resp" | python3 -c "import sys,json
|
||||
try:
|
||||
rb=json.load(sys.stdin).get('response_body')
|
||||
print(json.dumps(rb) if rb is not None else '')
|
||||
except Exception:
|
||||
print('')" 2>/dev/null || echo "")
|
||||
if [ -n "$result_body" ]; then
|
||||
printf '%s' "$result_body"
|
||||
return 0
|
||||
fi
|
||||
;;
|
||||
failed|dropped)
|
||||
return 1
|
||||
;;
|
||||
esac
|
||||
echo " queue poll for $qid status=$qstatus elapsed=${elapsed}s — backing off 2s" >&2
|
||||
sleep 2
|
||||
done
|
||||
}
|
||||
|
||||
admin_curl() {
|
||||
local _a=(); e2e_admin_auth_args _a
|
||||
curl -s "${_a[@]+"${_a[@]}"}" "$@"
|
||||
@@ -643,6 +697,38 @@ A2A_CEIL="$A2A_TIMEOUT"
|
||||
A2A=$(curl -s --max-time "$A2A_CEIL" -X POST "$BASE/workspaces/$WSID/a2a" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "$A2A_BODY")
|
||||
|
||||
# If the platform queued the A2A request, poll the durable queue result.
|
||||
# core#2917-follow-on: staging-SaaS saw the same A2A-layer degradation where
|
||||
# the initial POST returns 202-queued and the queue item never drains in time.
|
||||
# The local-provision advisory lane should handle this without false-red.
|
||||
A2A_QUEUED=$(echo "$A2A" | python3 -c "import sys,json
|
||||
try:
|
||||
d=json.load(sys.stdin)
|
||||
print('true' if d.get('queued') is True or (d.get('status') or '').lower() == 'queued' else 'false')
|
||||
except Exception:
|
||||
print('false')" 2>/dev/null || echo "false")
|
||||
if [ "$A2A_QUEUED" = "true" ]; then
|
||||
QUEUE_ID=$(echo "$A2A" | python3 -c "import sys,json
|
||||
try:
|
||||
print(json.load(sys.stdin).get('queue_id',''))
|
||||
except Exception:
|
||||
print('')" 2>/dev/null || echo "")
|
||||
if [ -n "$QUEUE_ID" ]; then
|
||||
echo " A2A queued (queue_id=$QUEUE_ID); polling durable result..." >&2
|
||||
if A2A_POLL=$(poll_a2a_queue "$WSID" "$QUEUE_ID" 120); then
|
||||
A2A="$A2A_POLL"
|
||||
else
|
||||
infra_skip "a2a-queue-timeout" "queue_id=$QUEUE_ID on local-provision advisory lane"
|
||||
fi
|
||||
else
|
||||
# push-async queued response with no queue_id exposed — we can't poll for
|
||||
# a durable result, so treat it as A2A-layer infra degradation on the
|
||||
# advisory lane rather than a product regression.
|
||||
infra_skip "a2a-queued-no-queue-id" "push-async queued response on local-provision advisory lane"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Extract the assistant text part once (shared by the minimax assertion +
|
||||
# diagnostics). Tolerates result.parts[].text and result.message.parts[].text.
|
||||
a2a_text() {
|
||||
|
||||
Reference in New Issue
Block a user