Merge pull request #2185 from Molecule-AI/fix/canvas-send-button-stuck-after-ws-reply

fix(canvas): clear sendInFlightRef on WS-push reply path
This commit is contained in:
Hongming Wang 2026-04-27 20:16:39 +00:00 committed by GitHub
commit 18b21d420e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 295 additions and 51 deletions

View File

@ -33,6 +33,7 @@ import {
cleanup,
fireEvent,
} from "@testing-library/react";
import type { ComponentProps } from "react";
// ── Hoisted mocks ────────────────────────────────────────────────────────────
@ -78,7 +79,13 @@ import { A2AEdge } from "../A2AEdge";
// ── Helpers ──────────────────────────────────────────────────────────────────
function defaultEdgeProps(over: Record<string, unknown> = {}) {
type A2AEdgeProps = ComponentProps<typeof A2AEdge>;
function defaultEdgeProps(over: Record<string, unknown> = {}): A2AEdgeProps {
// EdgeProps is a discriminated union — the test fixture only supplies
// the geometry/identity fields the component actually reads, so a
// double-cast through unknown lets call sites spread without restating
// every optional field React Flow declares.
return {
id: "edge-1",
source: "ws-source",
@ -91,7 +98,7 @@ function defaultEdgeProps(over: Record<string, unknown> = {}) {
targetPosition: "left",
style: {},
...over,
} as never; // EdgeProps is a discriminated union; cast simplifies the test fixture
} as unknown as A2AEdgeProps;
}
beforeEach(() => {

View File

@ -5,8 +5,7 @@ import ReactMarkdown from "react-markdown";
import remarkGfm from "remark-gfm";
import { api } from "@/lib/api";
import { useCanvasStore, type WorkspaceNodeData } from "@/store/canvas";
import { WS_URL } from "@/store/socket";
import { closeWebSocketGracefully } from "@/lib/ws-close";
import { useSocketEvent } from "@/hooks/useSocketEvent";
import { type ChatMessage, type ChatAttachment, createMessage, appendMessageDeduped } from "./chat/types";
import { uploadChatFiles, downloadChatFile } from "./chat/uploads";
import { AttachmentChip, PendingAttachmentPill } from "./chat/AttachmentViews";
@ -263,6 +262,32 @@ function MyChatPanel({ workspaceId, data }: Props) {
// from the closure and lets a second `sendMessage` enter. A ref
// observes the latest value synchronously.
const sendInFlightRef = useRef(false);
// Monotonic token bumped on every sendMessage entry. Each .then()/
// .catch() captures its own token in closure and bails if a newer
// send has superseded it — prevents a late HTTP response for an
// earlier message from clobbering the flags / appending text that
// belong to a newer in-flight send. Race scenario the token closes:
// (1) send msg #1 (2) WS push for msg #1 arrives, releases guards
// (3) user sends msg #2 (4) HTTP for msg #1 finally lands — without
// the token check, .then() sees sendingFromAPIRef=true (set by
// msg #2's send), enters the main body, and processes msg #1's body
// as if it were msg #2's reply.
const sendTokenRef = useRef(0);
// Release every in-flight send guard at once. Used by every site
// that ends a send: pendingAgentMsgs WS push, ACTIVITY_LOGGED
// a2a_receive ok/error WS event, HTTP .then() success, and HTTP
// .catch() success. Keep these in lockstep — a future contributor
// adding a new "I saw the reply" path that only clears `sending` +
// `sendingFromAPIRef` (the natural pair) silently re-introduces
// the post-WS Send-button freeze, because the disabled-button
// logic can't see `sendInFlightRef` and so the visible state diverges
// from the synchronous re-entry guard at line 464.
const releaseSendGuards = useCallback(() => {
setSending(false);
sendingFromAPIRef.current = false;
sendInFlightRef.current = false;
}, []);
// Load chat history from database on mount
useEffect(() => {
@ -311,8 +336,11 @@ function MyChatPanel({ workspaceId, data }: Props) {
setMessages((prev) => appendMessageDeduped(prev, createMessage("agent", m.content, m.attachments)));
}
if (sendingFromAPIRef.current && msgs.length > 0) {
setSending(false);
sendingFromAPIRef.current = false;
// Reply arrived via WS push (e.g. claude-code SDK). Release all
// three guards together — without sendInFlightRef the next
// sendMessage() silently no-ops at the synchronous re-entry
// check.
releaseSendGuards();
}
}, [pendingAgentMsgs, workspaceId]);
@ -336,22 +364,25 @@ function MyChatPanel({ workspaceId, data }: Props) {
return () => clearInterval(timer);
}, [sending]);
// Live activity feed via WebSocket while sending
// Live activity feed seed — clears when not sending. The actual
// event subscription is unconditional below (useSocketEvent at the
// top level — hooks can't be conditional). The handler gates on
// `sending` itself so it's a no-op when idle.
useEffect(() => {
if (!sending) {
setActivityLog([]);
return;
}
setActivityLog([`Processing with ${runtimeDisplayName(data.runtime)}...`]);
}, [sending, data.runtime]);
const ws = new WebSocket(WS_URL);
ws.onerror = () => {
// Don't crash — activity feed is non-essential, just log
console.warn("ChatTab activity feed WS error");
};
ws.onmessage = (event) => {
try {
const msg = JSON.parse(event.data);
// Subscribe to global WS via the singleton ReconnectingSocket (no
// per-component WebSocket — the previous pattern dropped events
// silently on any reconnect because each panel's raw socket had no
// onclose handler).
useSocketEvent((msg) => {
if (!sending) return;
try {
if (msg.event === "ACTIVITY_LOGGED") {
// Filter to events for THIS workspace. The platform's
// BroadcastOnly fires to every connected client, and
@ -382,15 +413,13 @@ function MyChatPanel({ workspaceId, data }: Props) {
// via pendingAgentMsgs or the HTTP .then()).
const own = (targetId || msg.workspace_id) === workspaceId;
if (own && sendingFromAPIRef.current) {
setSending(false);
sendingFromAPIRef.current = false;
releaseSendGuards();
}
} else if (status === "error") {
line = `${targetName} error`;
const own = (targetId || msg.workspace_id) === workspaceId;
if (own && sendingFromAPIRef.current) {
setSending(false);
sendingFromAPIRef.current = false;
releaseSendGuards();
setError("Agent error (Exception) — see workspace logs for details.");
}
}
@ -420,13 +449,8 @@ function MyChatPanel({ workspaceId, data }: Props) {
// A2A_RESPONSE is already consumed by the store and its text is
// appended to messages via the pendingAgentMsgs effect above; we
// don't need to duplicate it here.
} catch { /* ignore */ }
};
return () => {
closeWebSocketGracefully(ws);
};
}, [sending, workspaceId, resolveWorkspaceName]);
} catch { /* ignore */ }
});
const sendMessage = async () => {
const text = input.trim();
@ -462,6 +486,10 @@ function MyChatPanel({ workspaceId, data }: Props) {
setSending(true);
sendingFromAPIRef.current = true;
setError(null);
// Capture this send's token so the .then()/.catch() callbacks can
// detect a newer send that may have superseded them. See the
// sendTokenRef declaration for the race scenario this closes.
const myToken = ++sendTokenRef.current;
// Build conversation history from prior messages (last 20)
const history = messages
@ -507,10 +535,18 @@ function MyChatPanel({ workspaceId, data }: Props) {
},
}, { timeoutMs: 120_000 })
.then((resp) => {
// Bail without touching any flags if a newer sendMessage has
// already run — its myToken bumped sendTokenRef, so this is
// a stale callback for an earlier message. The newer send
// owns the in-flight guards now.
if (sendTokenRef.current !== myToken) return;
// Skip if the WS A2A_RESPONSE event already handled this response.
// Both paths (WS + HTTP) check sendingFromAPIRef — whichever clears
// it first wins, the other becomes a no-op (no duplicate messages).
if (!sendingFromAPIRef.current) return;
if (!sendingFromAPIRef.current) {
sendInFlightRef.current = false;
return;
}
const replyText = extractReplyText(resp);
const replyFiles = extractFilesFromTask((resp?.result ?? {}) as Record<string, unknown>);
if (replyText || replyFiles.length > 0) {
@ -518,11 +554,11 @@ function MyChatPanel({ workspaceId, data }: Props) {
appendMessageDeduped(prev, createMessage("agent", replyText, replyFiles)),
);
}
setSending(false);
sendingFromAPIRef.current = false;
sendInFlightRef.current = false;
releaseSendGuards();
})
.catch(() => {
// Stale-callback guard — same rationale as .then().
if (sendTokenRef.current !== myToken) return;
// Same dedup guard as .then(): if a WS path (pendingAgentMsgs
// or ACTIVITY_LOGGED a2a_receive ok) already delivered the
// reply, sendingFromAPIRef is already false and there's
@ -534,9 +570,7 @@ function MyChatPanel({ workspaceId, data }: Props) {
sendInFlightRef.current = false;
return;
}
setSending(false);
sendingFromAPIRef.current = false;
sendInFlightRef.current = false;
releaseSendGuards();
setError("Failed to send message — agent may be unreachable");
});
};

View File

@ -5,8 +5,7 @@ import ReactMarkdown from "react-markdown";
import remarkGfm from "remark-gfm";
import { api } from "@/lib/api";
import { useCanvasStore, type WorkspaceNodeData } from "@/store/canvas";
import { WS_URL } from "@/store/socket";
import { closeWebSocketGracefully } from "@/lib/ws-close";
import { useSocketEvent } from "@/hooks/useSocketEvent";
import { showToast } from "../../Toaster";
import { extractResponseText, extractRequestText } from "./message-parser";
import { inferA2AErrorHint } from "./a2aErrorHint";
@ -239,16 +238,15 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) {
});
}, [workspaceId]);
// Live updates via WebSocket
useEffect(() => {
const ws = new WebSocket(WS_URL);
ws.onerror = () => {
console.warn("AgentCommsPanel WS error");
};
ws.onmessage = (event) => {
try {
const msg = JSON.parse(event.data);
if (msg.workspace_id !== workspaceId) return;
// Live updates routed through the global ReconnectingSocket. The
// previous pattern of `new WebSocket(WS_URL)` per panel had no
// onclose / no reconnect, so any drop (idle timeout, browser
// background-tab throttle) silently stopped delivering events until
// the panel re-mounted. Routing through useSocketEvent inherits the
// store socket's reconnect, backoff, and HTTP fallback for free.
useSocketEvent((msg) => {
try {
if (msg.workspace_id !== workspaceId) return;
// Two live-update paths:
// 1. ACTIVITY_LOGGED — fired by the LogActivity helper for
@ -357,12 +355,8 @@ export function AgentCommsPanel({ workspaceId }: { workspaceId: string }) {
seenKeys.current.add(key);
setMessages((prev) => [...prev, m]);
}
} catch { /* ignore */ }
};
return () => {
closeWebSocketGracefully(ws);
};
}, [workspaceId]);
} catch { /* ignore */ }
});
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: "smooth" });

View File

@ -0,0 +1,36 @@
/** React hook to subscribe to global WS events without opening a new
* WebSocket connection. Subscribers are routed through the singleton
* ReconnectingSocket in store/socket.ts so they inherit its
* reconnect, backoff, and HTTP fallback for free.
*
* Usage:
*
* useSocketEvent((msg) => {
* if (msg.workspace_id !== workspaceId) return;
* if (msg.event !== "ACTIVITY_LOGGED") return;
* // ... handle ...
* });
*
* The handler is captured into a ref on every render so the latest
* closure (with its current state / props) is always invoked, while
* the actual subscription is registered exactly once per mount.
* Without the ref, an inline-defined handler would re-subscribe on
* every render, churning Set add/delete and risking missed events
* during the gap.
*
* The handler is responsible for its own filtering by event type,
* workspace_id, payload shape, etc. The bus is intentionally untyped
* beyond the WSMessage envelope; coupling each consumer to a typed
* per-event schema would defeat the "tiny pub/sub" goal. */
import { useEffect, useRef } from "react";
import type { WSMessage } from "@/store/socket";
import { subscribeSocketEvents } from "@/store/socket-events";
export function useSocketEvent(handler: (msg: WSMessage) => void): void {
const handlerRef = useRef(handler);
handlerRef.current = handler;
useEffect(() => {
return subscribeSocketEvents((msg) => handlerRef.current(msg));
}, []);
}

View File

@ -0,0 +1,103 @@
// @vitest-environment jsdom
/**
* Tests for the socket-events pub/sub bus that lets feature components
* subscribe to global WS messages without each opening their own
* WebSocket. The previous per-panel `new WebSocket(WS_URL)` pattern
* silently dropped events on any reconnect because each raw socket
* had no onclose handler.
*
* The bus contract:
* - Every emit fans out to every registered listener.
* - Subscribe returns an unsubscribe; calling it removes that listener.
* - A throwing listener does not prevent siblings from receiving the
* event (bug-tolerant fan-out).
* - The bus survives test cases _resetSocketEventListenersForTests
* gives unit tests a clean slate.
*/
import { describe, it, expect, beforeEach, vi } from "vitest";
import {
emitSocketEvent,
subscribeSocketEvents,
_resetSocketEventListenersForTests,
} from "../socket-events";
import type { WSMessage } from "../socket";
const sampleMsg: WSMessage = {
event: "ACTIVITY_LOGGED",
workspace_id: "ws-test",
timestamp: "2026-04-27T19:00:00Z",
payload: { activity_type: "a2a_send", source_id: "ws-test" },
};
beforeEach(() => {
_resetSocketEventListenersForTests();
});
describe("socket-events bus", () => {
it("delivers an emitted message to a single subscriber", () => {
const listener = vi.fn();
subscribeSocketEvents(listener);
emitSocketEvent(sampleMsg);
expect(listener).toHaveBeenCalledOnce();
expect(listener).toHaveBeenCalledWith(sampleMsg);
});
it("fans out to every subscriber in registration order", () => {
const order: number[] = [];
subscribeSocketEvents(() => order.push(1));
subscribeSocketEvents(() => order.push(2));
subscribeSocketEvents(() => order.push(3));
emitSocketEvent(sampleMsg);
expect(order).toEqual([1, 2, 3]);
});
it("returned unsubscribe stops further delivery to that listener", () => {
const a = vi.fn();
const b = vi.fn();
const unsubA = subscribeSocketEvents(a);
subscribeSocketEvents(b);
emitSocketEvent(sampleMsg);
expect(a).toHaveBeenCalledOnce();
expect(b).toHaveBeenCalledOnce();
unsubA();
emitSocketEvent(sampleMsg);
expect(a).toHaveBeenCalledOnce(); // still 1 — unsubscribed
expect(b).toHaveBeenCalledTimes(2);
});
it("a throwing listener does not break sibling listeners", () => {
// Suppress the expected console.error so test output stays clean.
const errSpy = vi.spyOn(console, "error").mockImplementation(() => {});
const sibling = vi.fn();
subscribeSocketEvents(() => {
throw new Error("buggy handler");
});
subscribeSocketEvents(sibling);
emitSocketEvent(sampleMsg);
expect(sibling).toHaveBeenCalledOnce();
expect(errSpy).toHaveBeenCalled();
errSpy.mockRestore();
});
it("emit is a no-op when there are no subscribers", () => {
// Just verifies it doesn't throw.
expect(() => emitSocketEvent(sampleMsg)).not.toThrow();
});
it("re-subscribing the same listener instance is a no-op (Set semantics)", () => {
const listener = vi.fn();
const unsubA = subscribeSocketEvents(listener);
subscribeSocketEvents(listener); // duplicate
emitSocketEvent(sampleMsg);
// Set dedupes — listener fires once per emit, not twice.
expect(listener).toHaveBeenCalledOnce();
// First unsubscribe removes it (Set delete is idempotent — second
// unsub from the duplicate subscribe call is also a no-op).
unsubA();
emitSocketEvent(sampleMsg);
expect(listener).toHaveBeenCalledOnce();
});
});

View File

@ -0,0 +1,63 @@
/** Tiny pub/sub on top of the global ReconnectingSocket so feature
* components can subscribe to raw WS messages without each opening
* their own WebSocket. The previous pattern (each panel calling
* `new WebSocket(WS_URL)` in a useEffect with no onclose / no
* reconnect) silently dropped events whenever the underlying socket
* blipped idle timeout, browser background-tab throttling, network
* jitter and forced a refresh to recover.
*
* The global ReconnectingSocket already owns reconnect, exponential
* backoff, health-check, and HTTP fallback poll. Routing component
* subscribers through it gives every consumer those guarantees for
* free, with one TCP connection instead of N.
*
* Wiring: the socket's `ws.onmessage` calls `emitSocketEvent(msg)`
* after `useCanvasStore.getState().applyEvent(msg)`. Subscribers see
* events in arrival order; emit is synchronous so React's batched
* setState in handlers behaves the same as before.
*
* Listeners are stored in a Set, so duplicate-subscribe is a no-op
* and unsubscribe is O(1). The bus survives the socket itself
* intentional, since reconnect creates a new ws but listeners stay
* bound to the bus, not to any one ws instance. */
import type { WSMessage } from "./socket";
type Listener = (msg: WSMessage) => void;
const listeners = new Set<Listener>();
/** Fan a single decoded WS message out to every subscriber. Called by
* the socket's onmessage immediately after the store's applyEvent so
* derived store state and component handlers stay in lockstep. */
export function emitSocketEvent(msg: WSMessage): void {
for (const listener of listeners) {
try {
listener(msg);
} catch (err) {
// One bad subscriber shouldn't break the others. Surface in dev,
// swallow in prod — a thrown handler is a component bug, not a
// socket bug.
if (typeof console !== "undefined") {
console.error("socket-events listener threw:", err);
}
}
}
}
/** Register a subscriber. Returns an unsubscribe function the caller
* must invoke (typically from a useEffect cleanup). The listener is
* called for EVERY event the caller is responsible for filtering by
* workspace_id, event type, etc. */
export function subscribeSocketEvents(listener: Listener): () => void {
listeners.add(listener);
return () => {
listeners.delete(listener);
};
}
/** Test-only: drop all subscribers. Lets unit tests reset state
* between cases without touching the singleton socket. */
export function _resetSocketEventListenersForTests(): void {
listeners.clear();
}

View File

@ -1,5 +1,6 @@
import { useCanvasStore } from "./canvas";
import { deriveWsBaseUrl } from "@/lib/ws-url";
import { emitSocketEvent } from "./socket-events";
// If explicit WS_URL is set, use it as-is (may include custom path).
// Otherwise derive base + append /ws.
@ -139,6 +140,12 @@ class ReconnectingSocket {
try {
const msg: WSMessage = JSON.parse(event.data);
useCanvasStore.getState().applyEvent(msg);
// Fan out to component-level subscribers so panels (Agent
// Comms, MyChat activity feed) don't have to open their own
// raw WebSocket — that pattern silently dropped events on
// any reconnect because the per-component sockets had no
// onclose / no backoff. See store/socket-events.ts.
emitSocketEvent(msg);
} catch {
// Malformed WS message — skip silently
}