fix(canvas): keep "agent is working" indicator alive for external/poll-mode turns #1437

Merged
hongming merged 1 commits from fix/external-workspace-progress-feedback into staging 2026-05-18 19:36:08 +00:00
2 changed files with 294 additions and 3 deletions
@@ -0,0 +1,209 @@
// @vitest-environment jsdom
/**
* Tests for useChatSend — the canvas user→agent send hook.
*
* Behavioural focus: the poll-mode ("queued") path. When the target
* workspace is an external / MCP-registered agent (delivery_mode=poll,
* e.g. an operator laptop running the molecule MCP channel), the
* platform's POST /workspaces/:id/a2a returns a synthetic
* {status:"queued", delivery_mode:"poll"} envelope IMMEDIATELY with no
* reply — the real reply arrives later over the AGENT_MESSAGE
* WebSocket push.
*
* Pre-fix the hook treated that synthetic envelope as a terminal
* response and called releaseSendGuards() → `sending` went false the
* instant the POST returned → the "agent is working" indicator
* vanished and the external turn looked dead. This suite pins the
* fixed contract:
*
* - a real reply still clears `sending` (regression guard)
* - a poll "queued" envelope KEEPS `sending` true (no terminal
* clear) so the existing thinking indicator persists
* - the eventual reply path (releaseSendGuards, the same call the
* AGENT_MESSAGE WS push makes via useChatSocket) clears it
* - an offline poll agent that never replies eventually surfaces an
* honest error instead of an infinite spinner
*
* Plus pure-function coverage for the poll-envelope detector.
*
* Root cause: workspace-server a2a_proxy.go:402 poll-mode
* short-circuit returns {status:"queued"} synchronously.
*/
import {
describe,
it,
expect,
vi,
beforeEach,
afterEach,
type Mock,
} from "vitest";
import { act, renderHook, cleanup } from "@testing-library/react";
const { mockApiPost } = vi.hoisted(() => ({ mockApiPost: vi.fn() }));
vi.mock("@/lib/api", () => ({
api: { post: mockApiPost },
}));
vi.mock("../uploads", () => ({
uploadChatFiles: vi.fn(),
}));
// Import AFTER mocks.
import {
useChatSend,
isPollQueuedResponse,
extractReplyText,
POLL_QUEUED_REPLY_TIMEOUT_MS,
} from "../useChatSend";
const flush = () => act(async () => { await Promise.resolve(); });
describe("isPollQueuedResponse", () => {
it("is true only for the synthetic poll-mode queued envelope", () => {
expect(isPollQueuedResponse({ status: "queued", delivery_mode: "poll" })).toBe(true);
});
it("is false for a real agent reply", () => {
expect(
isPollQueuedResponse({ result: { parts: [{ kind: "text", text: "hi" }] } }),
).toBe(false);
});
it("is false for null / undefined / partial shapes", () => {
expect(isPollQueuedResponse(null)).toBe(false);
expect(isPollQueuedResponse(undefined)).toBe(false);
// status=queued without delivery_mode=poll is NOT the poll envelope
// — don't accidentally swallow a real reply that happens to carry
// an unrelated status field.
expect(isPollQueuedResponse({ status: "queued" })).toBe(false);
expect(isPollQueuedResponse({ delivery_mode: "poll" })).toBe(false);
});
});
describe("extractReplyText (regression guard — unchanged by fix)", () => {
it("collects text parts from result", () => {
expect(
extractReplyText({ result: { parts: [{ kind: "text", text: "hello" }] } }),
).toBe("hello");
});
it("returns empty for the poll-queued envelope", () => {
expect(extractReplyText({ status: "queued", delivery_mode: "poll" })).toBe("");
});
});
describe("useChatSend — poll-mode in-progress state", () => {
beforeEach(() => {
vi.useFakeTimers();
mockApiPost.mockReset();
});
afterEach(() => {
vi.runOnlyPendingTimers();
vi.useRealTimers();
cleanup();
});
const setup = () => {
const onUserMessage = vi.fn();
const onAgentMessage = vi.fn();
const { result } = renderHook(() =>
useChatSend("ws-ext-1", {
getHistoryMessages: () => [],
onUserMessage,
onAgentMessage,
}),
);
return { result, onUserMessage, onAgentMessage };
};
it("a real reply clears `sending` (regression guard)", async () => {
mockApiPost.mockResolvedValue({
result: { parts: [{ kind: "text", text: "real reply" }] },
});
const { result, onAgentMessage } = setup();
await act(async () => {
void result.current.sendMessage("hi");
});
await flush();
expect(onAgentMessage).toHaveBeenCalledTimes(1);
expect(result.current.sending).toBe(false);
});
it("keeps `sending` true on a poll 'queued' envelope (no terminal clear)", async () => {
mockApiPost.mockResolvedValue({ status: "queued", delivery_mode: "poll" });
const { result, onAgentMessage } = setup();
await act(async () => {
void result.current.sendMessage("hi external agent");
});
await flush();
// The POST resolved, but it was only a queued ack — the indicator
// must stay up and no agent bubble should be rendered yet.
expect(result.current.sending).toBe(true);
expect(onAgentMessage).not.toHaveBeenCalled();
expect(result.current.error).toBeNull();
});
it("releaseSendGuards (the AGENT_MESSAGE-push path) clears the poll in-progress state", async () => {
mockApiPost.mockResolvedValue({ status: "queued", delivery_mode: "poll" });
const { result } = setup();
await act(async () => {
void result.current.sendMessage("hi");
});
await flush();
expect(result.current.sending).toBe(true);
// Simulate the terminal AGENT_MESSAGE WebSocket push arriving:
// useChatSocket's onAgentMessage / onSendComplete call
// releaseSendGuards. That must clear the in-progress state AND the
// safety timer (asserted by the next test).
act(() => {
result.current.releaseSendGuards();
});
expect(result.current.sending).toBe(false);
});
it("surfaces an honest error if a poll agent never replies (safety timeout)", async () => {
mockApiPost.mockResolvedValue({ status: "queued", delivery_mode: "poll" });
const { result } = setup();
await act(async () => {
void result.current.sendMessage("hi");
});
await flush();
expect(result.current.sending).toBe(true);
act(() => {
vi.advanceTimersByTime(POLL_QUEUED_REPLY_TIMEOUT_MS + 1000);
});
expect(result.current.sending).toBe(false);
expect(result.current.error).toMatch(/queued/i);
});
it("does NOT fire the safety error when the reply arrives before timeout", async () => {
mockApiPost.mockResolvedValue({ status: "queued", delivery_mode: "poll" });
const { result } = setup();
await act(async () => {
void result.current.sendMessage("hi");
});
await flush();
// Reply arrives (releaseSendGuards) well before the timeout.
act(() => {
result.current.releaseSendGuards();
});
act(() => {
vi.advanceTimersByTime(POLL_QUEUED_REPLY_TIMEOUT_MS + 1000);
});
expect(result.current.error).toBeNull();
expect(result.current.sending).toBe(false);
});
});
@@ -1,6 +1,6 @@
"use client";
import { useCallback, useRef, useState } from "react";
import { useCallback, useEffect, useRef, useState } from "react";
import { api } from "@/lib/api";
import { uploadChatFiles } from "../uploads";
import { createMessage, type ChatMessage, type ChatAttachment } from "../types";
@@ -22,8 +22,42 @@ interface A2AResponse {
parts?: A2APart[];
artifacts?: Array<{ parts: A2APart[] }>;
};
/** Synthetic poll-mode envelope. The platform returns this
* immediately (HTTP 200) when the target workspace is registered
* delivery_mode=poll — an external / MCP-registered agent with no
* public URL (e.g. an operator's laptop running the molecule MCP
* channel). The request has only been QUEUED into activity_logs;
* the agent will pick it up on its next poll and the real reply
* arrives asynchronously over the AGENT_MESSAGE WebSocket push
* (consumed by useChatSocket). See workspace-server
* a2a_proxy.go:402 (poll-mode short-circuit) and
* a2a_proxy_helpers.go:516 (logA2AReceiveQueued). */
status?: string;
delivery_mode?: string;
}
/** True when `resp` is the platform's synthetic poll-mode "queued"
* envelope rather than a real agent reply. For these the send is
* acknowledged-but-pending: the user's message landed and the agent
* is working, but there is no reply yet — the terminal AGENT_MESSAGE
* push will arrive later over the WebSocket. Treating this as a
* terminal response (the pre-fix behaviour) cleared the "agent is
* working" indicator the instant the POST returned, so an external
* workspace turn looked dead even though work had not started. */
export function isPollQueuedResponse(resp: A2AResponse | null | undefined): boolean {
return !!resp && resp.status === "queued" && resp.delivery_mode === "poll";
}
/** Hard ceiling on how long the "agent is working" indicator stays up
* for a poll-mode turn with no reply. The terminal AGENT_MESSAGE push
* normally clears it well before this. The cap exists so a poll-mode
* workspace that is offline / never consumes its queue doesn't pin a
* spinner forever — at which point we surface an honest, actionable
* error instead of an opaque dead spinner. Generous because poll
* agents (an operator laptop) can legitimately take minutes to wake,
* poll, and respond; the goal is "eventually honest", not fail-fast. */
export const POLL_QUEUED_REPLY_TIMEOUT_MS = 15 * 60 * 1000;
export function extractReplyText(resp: A2AResponse): string {
const collect = (parts: A2APart[] | undefined): string => {
if (!parts) return "";
@@ -59,14 +93,29 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
const sendInFlightRef = useRef(false);
const sendingFromAPIRef = useRef(false);
const sendTokenRef = useRef(0);
// Safety-net timer armed only for poll-mode ("queued") turns: the
// POST returns immediately with no reply, so the normal
// POST-resolves-→-clear-spinner path can't drive the indicator. The
// terminal AGENT_MESSAGE WebSocket push clears it via
// releaseSendGuards (which also clears this timer); the timer is the
// backstop for an offline poll agent that never consumes its queue.
const pollTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const optionsRef = useRef(options);
optionsRef.current = options;
const clearPollTimeout = useCallback(() => {
if (pollTimeoutRef.current !== null) {
clearTimeout(pollTimeoutRef.current);
pollTimeoutRef.current = null;
}
}, []);
const releaseSendGuards = useCallback(() => {
clearPollTimeout();
setSending(false);
sendingFromAPIRef.current = false;
sendInFlightRef.current = false;
}, []);
}, [clearPollTimeout]);
const clearError = useCallback(() => setError(null), []);
@@ -146,6 +195,33 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
sendInFlightRef.current = false;
return;
}
// Poll-mode ("queued") turn: the message landed and the
// external/MCP agent will pick it up on its next poll, but
// there is NO reply in this response. Pre-fix this fell
// through to releaseSendGuards() below and the "agent is
// working" indicator vanished the instant the POST returned —
// an external-workspace turn looked dead even though work had
// not started. Instead, keep `sending` true so the existing
// thinking indicator (the same one internal agents use)
// persists as a "received — agent is working" state; the
// terminal AGENT_MESSAGE WebSocket push (consumed by
// useChatSocket → onAgentMessage / onSendComplete →
// releaseSendGuards) clears it when the real reply arrives,
// exactly the path an internal async reply already uses.
if (isPollQueuedResponse(resp)) {
clearPollTimeout();
pollTimeoutRef.current = setTimeout(() => {
if (sendTokenRef.current !== myToken) return;
if (!sendingFromAPIRef.current) return;
releaseSendGuards();
setError(
"No response yet from this agent — it may be offline or " +
"busy. Your message was delivered and is queued; the " +
"reply will appear here if the agent picks it up.",
);
}, POLL_QUEUED_REPLY_TIMEOUT_MS);
return;
}
const replyText = extractReplyText(resp);
const replyFiles = extractFilesFromTask(
(resp?.result ?? {}) as Record<string, unknown>,
@@ -167,9 +243,15 @@ export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
setError("Failed to send message — agent may be unreachable");
});
},
[workspaceId, sending, uploading],
[workspaceId, sending, uploading, clearPollTimeout],
);
// Drop the poll-mode safety timer on unmount / workspace switch so a
// stale timeout can't fire setError against a panel the user has
// already navigated away from. sendTokenRef guards correctness if it
// ever did fire; this just avoids the wasted timer + setState churn.
useEffect(() => clearPollTimeout, [clearPollTimeout]);
return {
sending,
uploading,