fix(canvas): mobile chat realtime — WS wake-recovery + resume back-fill #1435

Merged
agent-dev-a merged 1 commits from fix/canvas-mobile-ws-wake-resume into staging 2026-05-26 10:15:28 +00:00
4 changed files with 384 additions and 1 deletions
@@ -2,6 +2,7 @@
import { useCallback, useEffect, useRef, useState } from "react";
import { api } from "@/lib/api";
import { subscribeSocketResume } from "@/store/socket-events";
import { type ChatMessage, appendMessageDeduped as appendMessageDedupedFn } from "../types";
const INITIAL_HISTORY_LIMIT = 10;
@@ -82,6 +83,23 @@ export function useChatHistory(
loadInitial();
}, [loadInitial]);
// Back-fill on socket resume. The singleton WS emits this when it
// recovers from a down period (ordinary drop, or — the case this
// fixes — a mobile-browser background-suspend that silently killed
// the socket while the page was frozen). While the socket was dead
// every AGENT_MESSAGE / A2A_RESPONSE for this thread was missed, and
// the store's rehydrate() only re-pulls /workspaces status, not chat.
// Re-running loadInitial() re-fetches the latest persisted history —
// exactly what a navigate-away-and-back (remount) does today, but
// without the user having to do it. Shared by desktop ChatTab and
// MobileChat (both consume this hook), so the realtime path stays
// unified across surfaces rather than forked for mobile.
useEffect(() => {
return subscribeSocketResume(() => {
loadInitial();
});
}, [loadInitial]);
const loadOlder = useCallback(async () => {
if (inflightRef.current || !hasMoreRef.current) return;
const oldest = oldestMessageRef.current;
+199
View File
@@ -21,12 +21,22 @@ vi.mock("../canvas", () => ({
class MockWebSocket {
static instances: MockWebSocket[] = [];
// Mirror the real WebSocket readyState constants — socket.ts's wake
// path reads WebSocket.OPEN / WebSocket.CONNECTING and this.ws.readyState.
static readonly CONNECTING = 0;
static readonly OPEN = 1;
static readonly CLOSING = 2;
static readonly CLOSED = 3;
url: string;
onopen: (() => void) | null = null;
onmessage: ((event: { data: string }) => void) | null = null;
onclose: (() => void) | null = null;
onerror: (() => void) | null = null;
closeCallCount = 0;
// Starts OPEN once triggerOpen runs; tests flip this to simulate a
// mobile background-suspend that left a dead/half-open socket.
readyState = MockWebSocket.CONNECTING;
constructor(url: string) {
this.url = url;
@@ -35,10 +45,12 @@ class MockWebSocket {
close() {
this.closeCallCount++;
this.readyState = MockWebSocket.CLOSED;
}
// Helpers to trigger events in tests
triggerOpen() {
this.readyState = MockWebSocket.OPEN;
this.onopen?.();
}
@@ -59,6 +71,46 @@ class MockWebSocket {
}
}
// ---------------------------------------------------------------------------
// Minimal DOM stub (vitest environment is 'node' — no window/document).
// socket.ts's wake-recovery attaches visibilitychange/pageshow/online/
// focus listeners; under node it self-no-ops via a typeof guard, so to
// exercise the path we inject just enough of window/document here, the
// same way WebSocket is stubbed above. Kept tiny on purpose — a single
// listener registry keyed by event name, plus a settable
// visibilityState.
// ---------------------------------------------------------------------------
interface FakeTarget {
_l: Record<string, Array<() => void>>;
addEventListener: (type: string, fn: () => void) => void;
removeEventListener: (type: string, fn: () => void) => void;
dispatch: (type: string) => void;
}
function makeFakeTarget(): FakeTarget {
const l: Record<string, Array<() => void>> = {};
return {
_l: l,
addEventListener(type, fn) {
(l[type] ||= []).push(fn);
},
removeEventListener(type, fn) {
l[type] = (l[type] || []).filter((f) => f !== fn);
},
dispatch(type) {
for (const fn of l[type] || []) fn();
},
};
}
const fakeWindow = makeFakeTarget();
const fakeDocument = Object.assign(makeFakeTarget(), {
visibilityState: "visible" as string,
});
(globalThis as unknown as Record<string, unknown>).window = fakeWindow;
(globalThis as unknown as Record<string, unknown>).document = fakeDocument;
// Install mock WebSocket globally before importing socket module
(globalThis as unknown as Record<string, unknown>).WebSocket = MockWebSocket;
@@ -328,6 +380,153 @@ describe("WebSocket onerror", () => {
});
});
// ---------------------------------------------------------------------------
// Wake recovery — mobile background-suspend regression (mobile chat not
// updating in real time until refresh). Simulates: connect → open →
// the OS freezes the page and silently kills the WS WITHOUT firing
// onclose → user returns (visibilitychange / pageshow / online /
// focus) → assert the dead socket is replaced AND, on the new socket's
// open, the resume signal fires so chat history back-fills the missed
// AGENT_MESSAGE / A2A_RESPONSE events.
// ---------------------------------------------------------------------------
import {
subscribeSocketResume,
_resetSocketResumeListenersForTests,
} from "../socket-events";
describe("wake recovery (mobile background-suspend)", () => {
beforeEach(() => {
_resetSocketResumeListenersForTests();
fakeDocument.visibilityState = "visible";
});
function suspendKill(ws: MockWebSocket) {
// Mobile background-suspend: the OS tore the transport down but the
// page was frozen so onclose never ran. The socket object survives
// with a CLOSED readyState and no reconnect was scheduled.
ws.readyState = MockWebSocket.CLOSED;
}
it("reconnects on visibilitychange when the socket was silently killed", () => {
connectSocket();
const ws = getLastWS();
ws.triggerOpen();
expect(MockWebSocket.instances).toHaveLength(1);
suspendKill(ws);
fakeDocument.dispatch("visibilitychange");
// A fresh socket must have been created — the stale one is not
// reused.
expect(MockWebSocket.instances.length).toBeGreaterThan(1);
});
it("does NOT reconnect on visibilitychange while the socket is still healthy", () => {
connectSocket();
const ws = getLastWS();
ws.triggerOpen();
expect(MockWebSocket.instances).toHaveLength(1);
// Healthy OPEN socket + a spurious visibilitychange (e.g. quick tab
// peek that never actually suspended) → no churn.
fakeDocument.dispatch("visibilitychange");
expect(MockWebSocket.instances).toHaveLength(1);
});
it("ignores visibilitychange when the page is hidden (the hide transition)", () => {
connectSocket();
const ws = getLastWS();
ws.triggerOpen();
suspendKill(ws);
fakeDocument.visibilityState = "hidden";
fakeDocument.dispatch("visibilitychange");
// Hidden → must not reconnect (would defeat the purpose; we only
// re-arm when the user is actually looking at the page again).
expect(MockWebSocket.instances).toHaveLength(1);
});
it.each(["pageshow", "online", "focus"])(
"reconnects on window '%s' after a silent kill",
(evt) => {
connectSocket();
const ws = getLastWS();
ws.triggerOpen();
suspendKill(ws);
fakeWindow.dispatch(evt);
expect(MockWebSocket.instances.length).toBeGreaterThan(1);
},
);
it("emits the resume signal once the recovered socket re-opens (so chat back-fills missed messages)", () => {
const onResume = vi.fn();
const unsub = subscribeSocketResume(onResume);
connectSocket();
const ws1 = getLastWS();
ws1.triggerOpen();
// First open must NOT fire resume — the mount-time chat-history
// fetch already covers the initial load.
expect(onResume).not.toHaveBeenCalled();
// Background-suspend silently kills the socket, then the user
// returns.
suspendKill(ws1);
fakeDocument.dispatch("visibilitychange");
// The wake handler force-reconnected; the new socket completing its
// handshake is what signals "we recovered from a gap — re-fetch".
const ws2 = getLastWS();
expect(ws2).not.toBe(ws1);
ws2.triggerOpen();
expect(onResume).toHaveBeenCalledTimes(1);
unsub();
});
it("does not emit resume on the very first connect", () => {
const onResume = vi.fn();
const unsub = subscribeSocketResume(onResume);
connectSocket();
getLastWS().triggerOpen();
expect(onResume).not.toHaveBeenCalled();
unsub();
});
it("emits resume after an ordinary onclose-driven reconnect too (desktop path unchanged)", () => {
const onResume = vi.fn();
const unsub = subscribeSocketResume(onResume);
connectSocket();
const ws1 = getLastWS();
ws1.triggerOpen();
// Ordinary network drop — onclose fires normally.
ws1.triggerClose();
vi.advanceTimersByTime(1100); // past the 1s backoff
const ws2 = getLastWS();
expect(ws2).not.toBe(ws1);
ws2.triggerOpen();
expect(onResume).toHaveBeenCalledTimes(1);
unsub();
});
it("detaches wake listeners on disconnect (no reconnect after teardown)", () => {
connectSocket();
const ws = getLastWS();
ws.triggerOpen();
disconnectSocket();
const countAfterDisconnect = MockWebSocket.instances.length;
// A wake event after teardown must be inert.
fakeDocument.dispatch("visibilitychange");
fakeWindow.dispatch("focus");
expect(MockWebSocket.instances.length).toBe(countAfterDisconnect);
});
});
// ---------------------------------------------------------------------------
// Health check (startHealthCheck / stopHealthCheck via onopen / disconnect)
// ---------------------------------------------------------------------------
+50
View File
@@ -61,3 +61,53 @@ export function subscribeSocketEvents(listener: Listener): () => void {
export function _resetSocketEventListenersForTests(): void {
listeners.clear();
}
// ---------------------------------------------------------------------------
// Socket-resume signal
// ---------------------------------------------------------------------------
//
// Fired by the ReconnectingSocket when the WS comes back up AFTER having
// been down (drop, or a mobile-browser background-suspend that silently
// killed the socket while the page was frozen). Distinct from the raw
// event bus above: while the socket was dead the page missed every
// AGENT_MESSAGE / A2A_RESPONSE, and the store's rehydrate() only re-pulls
// /workspaces status — it does NOT back-fill chat messages. Components
// that render a live message thread (desktop ChatTab + MobileChat, both
// via useChatHistory) subscribe here to re-fetch their history on resume
// so missed agent replies appear without the user having to navigate
// away+back or hard-refresh. Shared by desktop and mobile — the recovery
// is in the singleton socket, not forked per-surface.
type ResumeListener = () => void;
const resumeListeners = new Set<ResumeListener>();
/** Notify every resume subscriber that the socket just recovered from a
* down period. Called by ReconnectingSocket.onopen, but only when the
* open follows a prior loss (not the very first connect — the initial
* mount-time history fetch already covers that). */
export function emitSocketResume(): void {
for (const listener of resumeListeners) {
try {
listener();
} catch (err) {
if (typeof console !== "undefined") {
console.error("socket-resume listener threw:", err);
}
}
}
}
/** Register a resume subscriber. Returns an unsubscribe function the
* caller must invoke from its effect cleanup. */
export function subscribeSocketResume(listener: ResumeListener): () => void {
resumeListeners.add(listener);
return () => {
resumeListeners.delete(listener);
};
}
/** Test-only: drop all resume subscribers. */
export function _resetSocketResumeListenersForTests(): void {
resumeListeners.clear();
}
+117 -1
View File
@@ -1,6 +1,6 @@
import { useCanvasStore } from "./canvas";
import { deriveWsBaseUrl } from "@/lib/ws-url";
import { emitSocketEvent } from "./socket-events";
import { emitSocketEvent, emitSocketResume } from "./socket-events";
// If explicit WS_URL is set, use it as-is (may include custom path).
// Otherwise derive base + append /ws.
@@ -98,9 +98,107 @@ class ReconnectingSocket {
// caller can fire-and-forget without coordinating.
private rehydrateInFlight: Promise<void> | null = null;
private rehydrateDedup = new RehydrateDedup(REHYDRATE_DEDUP_WINDOW_MS);
// True once any onopen has fired. Gates the resume signal so the very
// first connect doesn't fire it (the mount-time chat-history fetch
// already covers the initial load — a resume here would be a wasted
// duplicate). Set on the first successful open and stays true.
private everConnected = false;
// True between a loss (onclose / wake-detected stale socket) and the
// next successful onopen. Only when this is set does onopen emit the
// resume signal — i.e. we recovered from a real gap during which
// AGENT_MESSAGE / A2A_RESPONSE events may have been missed.
private wasDown = false;
// Bound wake handler. iOS Safari / Chrome-mobile freeze the page and
// its timers when the tab is backgrounded or the device locks, and
// tear the WS down WITHOUT reliably firing onclose before the freeze.
// On thaw nothing re-arms: onclose never ran so no reconnect was
// scheduled, and the health-check / fallback-poll intervals were
// suspended. The socket is silently dead until a manual refresh. This
// handler force-reconnects on any wake signal when the socket isn't
// healthy. Stored so disconnect() can detach the listeners.
private onWake: (() => void) | null = null;
constructor(url: string) {
this.url = url;
this.installWakeListeners();
}
/** Attach page-lifecycle listeners that force a reconnect when the
* page returns to the foreground / regains connectivity and the
* socket is not OPEN. Shared by desktop and mobile — desktop rarely
* hits the stale-socket path (its onclose fires promptly) so this is
* effectively a no-op there, while mobile depends on it because the
* background-suspend kills the socket without an onclose. */
private installWakeListeners() {
if (typeof window === "undefined" || typeof document === "undefined") {
return;
}
const wake = () => {
if (this.disposed) return;
// Only act on a visible page — visibilitychange also fires on the
// hide transition, which we must ignore (closing here would defeat
// the point).
if (
typeof document.visibilityState === "string" &&
document.visibilityState !== "visible"
) {
return;
}
// Healthy socket → nothing to do. A stale/half-open socket on
// mobile reports CLOSED or CLOSING (the OS tore the transport
// down); CONNECTING is also unhealthy from the user's POV but a
// reconnect attempt is already in flight, so leave it.
const live =
this.ws !== null &&
(this.ws.readyState === WebSocket.OPEN ||
this.ws.readyState === WebSocket.CONNECTING);
if (live) return;
// Tear down any zombie and reconnect immediately. Mark wasDown so
// the subsequent onopen emits the resume signal and chat threads
// back-fill the messages missed while frozen.
this.wasDown = true;
this.forceReconnect();
};
this.onWake = wake;
document.addEventListener("visibilitychange", wake);
window.addEventListener("pageshow", wake);
window.addEventListener("online", wake);
window.addEventListener("focus", wake);
}
private removeWakeListeners() {
if (!this.onWake) return;
if (typeof window !== "undefined" && typeof document !== "undefined") {
document.removeEventListener("visibilitychange", this.onWake);
window.removeEventListener("pageshow", this.onWake);
window.removeEventListener("online", this.onWake);
window.removeEventListener("focus", this.onWake);
}
this.onWake = null;
}
/** Detach the current (presumed dead/stale) socket without routing
* through its onclose, cancel any pending backoff timer, and
* reconnect now. Used by the wake path: the browser already killed
* the transport, so the exponential backoff that onclose would have
* scheduled is both absent and undesirable — the user is looking at
* the page and wants it live immediately. */
private forceReconnect() {
if (this.disposed) return;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.ws) {
this.ws.onopen = null;
this.ws.onmessage = null;
this.ws.onclose = null;
this.ws.onerror = null;
try { this.ws.close(); } catch { /* noop */ }
this.ws = null;
}
this.attempt = 0;
this.connect();
}
connect() {
@@ -132,6 +230,18 @@ class ReconnectingSocket {
this.stopFallbackPoll();
this.rehydrate();
this.startHealthCheck();
// If this open follows a real loss (drop, or a mobile background-
// suspend that the wake handler recovered from), signal resume so
// live message threads re-fetch the AGENT_MESSAGE / A2A_RESPONSE
// history they missed while the socket was dead — rehydrate()
// above only refreshes /workspaces status, not chat. Gate on
// everConnected so the very first open (covered by the mount-time
// history fetch) doesn't fire a redundant resume.
if (this.everConnected && this.wasDown) {
emitSocketResume();
}
this.everConnected = true;
this.wasDown = false;
};
ws.onmessage = (event) => {
@@ -157,6 +267,11 @@ class ReconnectingSocket {
// corresponds to the WS we just tore down (prevents a stale
// onclose from a zombie socket from re-arming the loop).
if (this.disposed || this.ws !== ws) return;
// We had a live socket and lost it — mark down so the next onopen
// emits the resume signal and chat threads back-fill missed
// messages. (The wake path also sets this; setting it here covers
// the ordinary network-drop case.)
this.wasDown = true;
this.stopHealthCheck();
useCanvasStore.getState().setWsStatus("connecting");
this.startFallbackPoll();
@@ -247,6 +362,7 @@ class ReconnectingSocket {
disconnect() {
this.disposed = true;
this.removeWakeListeners();
this.stopHealthCheck();
this.stopFallbackPoll();
if (this.reconnectTimer) {