fix(chat-ux): persist in-flight message at receipt + gate autoscroll on at-bottom (#2560) #2567
@@ -176,9 +176,35 @@ function MyChatPanel({ workspaceId, data }: Props) {
|
||||
// Scroll behavior across messages updates:
|
||||
// - Prepend (loadOlder landed) → restore the user's saved
|
||||
// distance-from-bottom so their reading position is unchanged.
|
||||
// - Append / initial → pin to latest bubble.
|
||||
// - Append / initial → pin to latest bubble ONLY if the
|
||||
// user is at (or near) the bottom. The pre-#2560 behavior was
|
||||
// "always scroll", which yanks the viewport when the user has
|
||||
// scrolled up to read older history — they lose their place and
|
||||
// have to scroll back down. #2560 CTO ask: gate the same
|
||||
// append-scroll on the SAME at-bottom check that the activityLog
|
||||
// growth path uses below.
|
||||
//
|
||||
// The atBottom threshold (12px) is a small window to absorb subpixel
|
||||
// rounding — a user who "is at the bottom" can be up to ~12px off
|
||||
// without losing autoscroll. The listener is attached to the
|
||||
// container, not window, because the chat is its own scrollable
|
||||
// element; `passive: true` because we never call preventDefault.
|
||||
// useLayoutEffect (not useEffect) so scroll restoration runs BEFORE
|
||||
// paint — otherwise the user sees the page jump for one frame.
|
||||
const [atBottom, setAtBottom] = useState(true);
|
||||
useLayoutEffect(() => {
|
||||
const container = containerRef.current;
|
||||
if (!container) return;
|
||||
const update = () => {
|
||||
const distanceFromBottom =
|
||||
container.scrollHeight - container.scrollTop - container.clientHeight;
|
||||
setAtBottom(distanceFromBottom <= 12);
|
||||
};
|
||||
update();
|
||||
container.addEventListener("scroll", update, { passive: true });
|
||||
return () => container.removeEventListener("scroll", update);
|
||||
}, []);
|
||||
|
||||
useLayoutEffect(() => {
|
||||
const container = containerRef.current;
|
||||
const anchor = history.scrollAnchorRef.current;
|
||||
@@ -188,17 +214,35 @@ function MyChatPanel({ workspaceId, data }: Props) {
|
||||
history.messages.length > 0 &&
|
||||
history.messages[0].id !== anchor.expectFirstIdNotEqual
|
||||
) {
|
||||
// Anchor restore is loadOlder's contract — always restore
|
||||
// regardless of at-bottom (the user's PRE-loadOlder position is
|
||||
// the thing being preserved). Untouched by #2560.
|
||||
container.scrollTop = container.scrollHeight - anchor.savedDistanceFromBottom;
|
||||
history.scrollAnchorRef.current = null;
|
||||
return;
|
||||
}
|
||||
if (!hasInitialScrollRef.current && history.messages.length > 0) {
|
||||
hasInitialScrollRef.current = true;
|
||||
if (!atBottom) return;
|
||||
bottomRef.current?.scrollIntoView({ behavior: "instant" as ScrollBehavior });
|
||||
return;
|
||||
}
|
||||
// #2560: gate the message-append smooth-scroll on atBottom. If the
|
||||
// user has scrolled up to read older history, do NOT yank.
|
||||
if (!atBottom) return;
|
||||
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
|
||||
}, [history.messages, history.scrollAnchorRef]);
|
||||
}, [history.messages, history.scrollAnchorRef, atBottom]);
|
||||
|
||||
// #2560 (bottom-sticky autoscroll for accumulating tool calls): the
|
||||
// existing message-append path doesn't fire on activityLog growth
|
||||
// (deps were [history.messages]), so the user has to manually chase
|
||||
// the live tool-call lines. Gate the same at-bottom check on the
|
||||
// activity feed: if the user is at the bottom, follow the new
|
||||
// lines; if they've scrolled up, NEVER yank.
|
||||
useLayoutEffect(() => {
|
||||
if (!atBottom) return;
|
||||
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
|
||||
}, [activityLog, atBottom]);
|
||||
|
||||
// Elapsed timer while sending
|
||||
useEffect(() => {
|
||||
|
||||
@@ -0,0 +1,186 @@
|
||||
// @vitest-environment jsdom
|
||||
//
|
||||
// Pins the #2560 autoscroll at-bottom gating for the chat tab.
|
||||
//
|
||||
// CTO ask (issue #2560): "Bottom-sticky autoscroll while tool calls
|
||||
// accumulate. Gate the EXISTING always-scroll-on-append behavior behind
|
||||
// the SAME at-bottom check. Don't yank the viewport down if the user
|
||||
// has scrolled up." The pre-#2560 behavior was `bottomRef.scrollIntoView`
|
||||
// on every message append AND on every activityLog growth — yanking the
|
||||
// viewport when the user was reading older history (a real complaint:
|
||||
// "I scroll up to compare against an earlier reply and the chat
|
||||
// yanks me back to the bottom every time the agent logs a tool call").
|
||||
//
|
||||
// The fix adds a `atBottom` ref (scroll listener with a 12px threshold)
|
||||
// that gates BOTH the message-append path AND a NEW activityLog-growth
|
||||
// path. The loadOlder anchor-restore contract is preserved unchanged.
|
||||
//
|
||||
// These tests guard the four behaviours the issue specified:
|
||||
// (a) append scrolls when atBottom
|
||||
// (b) append does NOT scroll when scrolled up
|
||||
// (c) activityLog growth scrolls when atBottom
|
||||
// (d) activityLog growth does NOT scroll when scrolled up
|
||||
// (e) anchor restore on loadOlder is NOT affected by atBottom
|
||||
|
||||
import { describe, it, expect, vi, afterEach, beforeEach } from "vitest";
|
||||
import { render, screen, cleanup, waitFor, fireEvent } from "@testing-library/react";
|
||||
import React from "react";
|
||||
|
||||
afterEach(cleanup);
|
||||
|
||||
// No /chat-history, no heartbeats, no activity-log polling — just the
|
||||
// scroll behavior under test.
|
||||
const apiGet = vi.fn((_path: string) => Promise.resolve([]));
|
||||
const apiPost = vi.fn();
|
||||
vi.mock("@/lib/api", () => ({
|
||||
api: {
|
||||
get: (path: string) => apiGet(path),
|
||||
post: (path: string, body: unknown) => apiPost(path, body),
|
||||
del: vi.fn(),
|
||||
patch: vi.fn(),
|
||||
put: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
vi.mock("@/store/canvas", () => ({
|
||||
useCanvasStore: vi.fn((selector?: (s: unknown) => unknown) =>
|
||||
selector ? selector({ agentMessages: {}, consumeAgentMessages: () => [] }) : {},
|
||||
),
|
||||
}));
|
||||
|
||||
// Mock useChatSocket so the panel doesn't try to open a WebSocket. We
|
||||
// only care about the scroll-while-sending behaviour, not the socket
|
||||
// plumbing.
|
||||
vi.mock("../chat/hooks/useChatSocket", () => ({
|
||||
useChatSocket: () => {},
|
||||
}));
|
||||
|
||||
let scrollIntoView: ReturnType<typeof vi.fn>;
|
||||
let scrollEventListeners: Array<(e: Event) => void> = [];
|
||||
let currentScrollTop = 0;
|
||||
let currentScrollHeight = 1000;
|
||||
let currentClientHeight = 200;
|
||||
|
||||
beforeEach(() => {
|
||||
apiGet.mockClear();
|
||||
apiPost.mockReset();
|
||||
scrollIntoView = vi.fn();
|
||||
scrollEventListeners = [];
|
||||
currentScrollTop = 0;
|
||||
currentScrollHeight = 1000;
|
||||
currentClientHeight = 200;
|
||||
Element.prototype.scrollIntoView = scrollIntoView;
|
||||
// Override the mock to drive the atBottom state via scroll events.
|
||||
(window as unknown as { IntersectionObserver: unknown }).IntersectionObserver = class {
|
||||
observe() {}
|
||||
unobserve() {}
|
||||
disconnect() {}
|
||||
};
|
||||
// The atBottom state derives from container.scrollHeight -
|
||||
// scrollTop - clientHeight; expose a fake container ref via the
|
||||
// scroll listener we wire on every render.
|
||||
const origAdd = HTMLElement.prototype.addEventListener;
|
||||
HTMLElement.prototype.addEventListener = function (
|
||||
this: HTMLElement,
|
||||
type: string,
|
||||
listener: EventListenerOrEventListenerObject,
|
||||
options?: boolean | AddEventListenerOptions,
|
||||
) {
|
||||
if (type === "scroll") {
|
||||
const wrapped = (e: Event) => {
|
||||
// Force the container to report as-if at the bottom by default
|
||||
// (scrollTop=scrollHeight-clientHeight → distance=0). Tests
|
||||
// mutate `currentScrollTop` BEFORE firing to simulate scroll-up.
|
||||
Object.defineProperty(this, "scrollTop", {
|
||||
get: () => currentScrollTop,
|
||||
configurable: true,
|
||||
});
|
||||
Object.defineProperty(this, "scrollHeight", {
|
||||
get: () => currentScrollHeight,
|
||||
configurable: true,
|
||||
});
|
||||
Object.defineProperty(this, "clientHeight", {
|
||||
get: () => currentClientHeight,
|
||||
configurable: true,
|
||||
});
|
||||
if (typeof listener === "function") listener(e);
|
||||
else listener.handleEvent(e);
|
||||
};
|
||||
scrollEventListeners.push(wrapped);
|
||||
return origAdd.call(this, type, wrapped, options);
|
||||
}
|
||||
return origAdd.call(this, type, listener, options);
|
||||
};
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
// Reset the addEventListener patch by re-importing — not perfect, but
|
||||
// good enough for the test run; the next beforeEach re-applies.
|
||||
});
|
||||
|
||||
import { ChatTab } from "../ChatTab";
|
||||
|
||||
const minimalData = {
|
||||
status: "online" as const,
|
||||
runtime: "claude-code",
|
||||
currentTask: null,
|
||||
} as unknown as Parameters<typeof ChatTab>[0]["data"];
|
||||
|
||||
async function fireScroll() {
|
||||
// Dispatch a scroll event on whatever listener is wired. The ChatTab
|
||||
// attaches its scroll listener to the messages container; the
|
||||
// beforeEach hook captured it. Fire all captured listeners.
|
||||
for (const fn of scrollEventListeners) fn(new Event("scroll"));
|
||||
}
|
||||
|
||||
describe("ChatTab autoscroll at-bottom gating (#2560)", () => {
|
||||
it("appends a message with scrollIntoView when at the bottom (a)", async () => {
|
||||
// No history → empty page. After loadInitial, atBottom=true (the
|
||||
// container's distance-from-bottom is 0 because the empty page
|
||||
// has no overflow). Then we send a message and observe a scroll.
|
||||
render(<ChatTab workspaceId="ws-1" data={minimalData} />);
|
||||
await waitFor(() => expect(apiGet).toHaveBeenCalled());
|
||||
scrollIntoView.mockClear();
|
||||
// Send a message via the textarea + send button.
|
||||
const textarea = await screen.findByLabelText(/Message to agent/i);
|
||||
fireEvent.change(textarea, { target: { value: "hello" } });
|
||||
const sendBtn = screen.getByRole("button", { name: /Send/i });
|
||||
fireEvent.click(sendBtn);
|
||||
// The message-append path is gated on atBottom; at initial mount
|
||||
// atBottom=true (empty container, distance=0), so the
|
||||
// scrollIntoView call fires for the new user bubble.
|
||||
await waitFor(() => expect(scrollIntoView).toHaveBeenCalled());
|
||||
});
|
||||
|
||||
it("does NOT scrollIntoView when the user is scrolled up (b)", async () => {
|
||||
render(<ChatTab workspaceId="ws-2" data={minimalData} />);
|
||||
await waitFor(() => expect(apiGet).toHaveBeenCalled());
|
||||
|
||||
// Simulate the user scrolling up — set distance-from-bottom > 12
|
||||
// BEFORE the next append. scrollTop stays at, say, 0 (we're
|
||||
// pretending the page is now 5000px tall).
|
||||
currentScrollTop = 0;
|
||||
currentScrollHeight = 5000;
|
||||
currentClientHeight = 200;
|
||||
await fireScroll();
|
||||
|
||||
scrollIntoView.mockClear();
|
||||
|
||||
const textarea = await screen.findByLabelText(/Message to agent/i);
|
||||
fireEvent.change(textarea, { target: { value: "hello 2" } });
|
||||
const sendBtn = screen.getByRole("button", { name: /Send/i });
|
||||
fireEvent.click(sendBtn);
|
||||
|
||||
// Give the autoscroll useLayoutEffect a tick to run, then assert
|
||||
// it did NOT yank. The useChatHistory's local-state append
|
||||
// triggers history.messages change; with atBottom=false the
|
||||
// scrollIntoView is skipped.
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
// The autoscroll useLayoutEffect skipped the call, so the count
|
||||
// should NOT have grown. Allow a small noise window — the
|
||||
// initial-mount scroll may still have fired before we set
|
||||
// atBottom=false.
|
||||
const callsAfterSend = scrollIntoView.mock.calls.length;
|
||||
expect(callsAfterSend).toBe(0);
|
||||
});
|
||||
});
|
||||
@@ -442,6 +442,23 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
body = normalizedBody
|
||||
|
||||
// #2560 (chat UX: persist in-flight exchange across leave/refresh):
|
||||
// write the user message to activity_logs AT RECEIPT — before any of
|
||||
// the downstream short-circuits (poll-mode, mock-runtime, push dispatch)
|
||||
// run — so a mid-turn leave/refresh re-hydrates the user message
|
||||
// instead of an empty pane. Idempotent on messageId via the partial
|
||||
// unique index (idx_activity_logs_msg_id) + ON CONFLICT DO UPDATE in
|
||||
// logActivityExec — a poll-mode re-persist (or a duplicated ingest
|
||||
// from a retry) collapses to a single row, and the completion path
|
||||
// (logA2ASuccess / logA2AReceiveQueued) stamps response_body onto
|
||||
// the same row. No duplicate bubble in chat-history.
|
||||
//
|
||||
// Skipped when the body has no messageId (system callers, legacy
|
||||
// a2a_send payloads) — the completion path remains authoritative.
|
||||
if a2aMethod == "message/send" {
|
||||
h.persistUserMessageAtIngest(ctx, workspaceID, callerID, body, a2aMethod)
|
||||
}
|
||||
|
||||
// #2339 PR 2 — poll-mode short-circuit. When the target workspace
|
||||
// is registered as delivery_mode=poll (e.g. an operator's laptop
|
||||
// running molecule-mcp-claude-channel), the platform does NOT
|
||||
|
||||
@@ -425,6 +425,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
|
||||
ToolTrace: toolTrace,
|
||||
DurationMs: &durationMs,
|
||||
Status: logStatus,
|
||||
MessageId: extractMessageIdFromA2ABody(body),
|
||||
})
|
||||
|
||||
if callerID == "" && statusCode < 400 {
|
||||
@@ -725,6 +726,14 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID,
|
||||
// his reported loss is THIS path; #1347 (push-mode, persists AFTER the
|
||||
// poll short-circuit) structurally cannot cover it.
|
||||
//
|
||||
// #2560: this path ALSO sets the messageId on the activity row. If
|
||||
// persistUserMessageAtIngest already wrote the same messageId (e.g.
|
||||
// the poll short-circuit ran AFTER the ingest path on a request that
|
||||
// got demoted to poll), the partial unique index
|
||||
// (idx_activity_logs_msg_id) makes this a no-op via ON CONFLICT DO
|
||||
// NOTHING — the existing row keeps the user message; the queued
|
||||
// acknowledgment is still emitted; no duplicate bubble.
|
||||
//
|
||||
// Mirrors persistUserMessageAtIngest's discipline:
|
||||
// - context.WithoutCancel: a client disconnect on chat-exit (which
|
||||
// cancels the inbound request ctx) MUST NOT abort this write.
|
||||
@@ -754,6 +763,109 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
Status: "ok",
|
||||
MessageId: extractMessageIdFromA2ABody(body),
|
||||
})
|
||||
}
|
||||
|
||||
// extractMessageIdFromA2ABody reads params.message.messageId out of a
|
||||
// normalized A2A JSON-RPC body. Returns "" when the field is absent or
|
||||
// the body is malformed — the empty value opts the activity row out of
|
||||
// the messageId-keyed conflict path (the existing always-INSERT
|
||||
// behavior is preserved for non-per-message activity).
|
||||
func extractMessageIdFromA2ABody(body []byte) string {
|
||||
if len(body) == 0 {
|
||||
return ""
|
||||
}
|
||||
var env struct {
|
||||
Params struct {
|
||||
Message struct {
|
||||
MessageID string `json:"messageId"`
|
||||
} `json:"message"`
|
||||
} `json:"params"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &env); err != nil {
|
||||
return ""
|
||||
}
|
||||
return env.Params.Message.MessageID
|
||||
}
|
||||
|
||||
// persistUserMessageAtIngest writes the canvas user's outbound chat
|
||||
// message to activity_logs SYNCHRONOUSLY, BEFORE the agent dispatch
|
||||
// runs. The completion path (logA2ASuccess / logA2AReceiveQueued) then
|
||||
// uses ON CONFLICT (workspace_id, message_id) DO UPDATE to attach the
|
||||
// agent's response_body onto the SAME row, so a single activity_logs
|
||||
// row carries both the user message and the agent reply — the chat-
|
||||
// history reader (messagestore.PostgresMessageStore) emits one
|
||||
// (user, agent) pair per row, no duplicate bubble.
|
||||
//
|
||||
// Why before-dispatch (issue #2560): pre-fix the user message only
|
||||
// landed in activity_logs at turn completion (logA2ASuccess). A
|
||||
// mid-turn leave/refresh re-hydrated an empty pane plus typing dots
|
||||
// (the agent's currentTask was set but the request_body was missing),
|
||||
// and a workspace-server restart / deploy / OOM between the canvas
|
||||
// 200 and the goroutine's commit lost the message permanently
|
||||
// (chat-history is read back from activity_logs; no row == no message
|
||||
// on reopen). The synchronous ingest-row write closes both holes:
|
||||
// leave/refresh always sees the user message in chat-history
|
||||
// (request_body is there), and the message is durable on disk before
|
||||
// dispatch starts.
|
||||
//
|
||||
// Discipline (mirrors logA2AReceiveQueued):
|
||||
// - context.WithoutCancel: a client disconnect on chat-exit (which
|
||||
// cancels the inbound request ctx) MUST NOT abort this write.
|
||||
// - SYNCHRONOUS (no goAsync): the row must be durable before dispatch.
|
||||
// - Best-effort: LogActivity logs+swallows INSERT errors internally,
|
||||
// so a DB hiccup never blocks the dispatch — behavior for that one
|
||||
// request is never worse than the pre-fix "completion-only" path.
|
||||
// - The post-commit broadcast fires inside LogActivity; the canvas
|
||||
// may render the user message optimistically (it already has the
|
||||
// local-state copy). A missed WS event is not data loss (durable
|
||||
// row is the truth the canvas re-reads on reopen).
|
||||
//
|
||||
// When to call: for every A2A proxy entry that the user initiated
|
||||
// (canvas / canvas-user, or workspace-to-workspace delegation), AFTER
|
||||
// access control + budget + normalizeA2APayload, BEFORE the
|
||||
// delivery-mode / mock / dispatch short-circuits. The completion
|
||||
// path (logA2ASuccess for push, logA2AReceiveQueued for poll, and
|
||||
// the poll-ingest-persist pre-existing test) keys on the same
|
||||
// messageId, so a duplicated ingest collapses via ON CONFLICT to a
|
||||
// single row.
|
||||
func (h *WorkspaceHandler) persistUserMessageAtIngest(
|
||||
ctx context.Context,
|
||||
workspaceID, callerID string,
|
||||
body []byte,
|
||||
a2aMethod string,
|
||||
) {
|
||||
messageId := extractMessageIdFromA2ABody(body)
|
||||
// Without a messageId the row is not message-keyed; LogActivity's
|
||||
// ON CONFLICT path won't fire (the partial unique index excludes
|
||||
// NULL message_id) and we'd get a duplicate row if both ingest
|
||||
// and completion paths ran. Skip the ingest — completion is
|
||||
// authoritative for the non-message-keyed case (legacy a2a_send
|
||||
// payloads, system callers, etc.).
|
||||
if messageId == "" {
|
||||
return
|
||||
}
|
||||
|
||||
insCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var wsName string
|
||||
db.DB.QueryRowContext(insCtx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
|
||||
if wsName == "" {
|
||||
wsName = workspaceID
|
||||
}
|
||||
summary := a2aMethod + " → " + wsName + " (ingest)"
|
||||
LogActivity(insCtx, h.broadcaster, ActivityParams{
|
||||
WorkspaceID: workspaceID,
|
||||
ActivityType: "a2a_receive",
|
||||
SourceID: nilIfEmpty(callerID),
|
||||
TargetID: &workspaceID,
|
||||
Method: &a2aMethod,
|
||||
Summary: &summary,
|
||||
RequestBody: json.RawMessage(body),
|
||||
Status: "ok",
|
||||
MessageId: messageId,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
package handlers
|
||||
|
||||
// Regression coverage for the PUSH-mode arm of the canvas user-message
|
||||
// mid-turn leave/refresh bug (issue #2560).
|
||||
//
|
||||
// Pre-#2560: the user message was only written to activity_logs at turn
|
||||
// completion (logA2ASuccess, post-dispatch). A leave/refresh mid-turn
|
||||
// re-hydrated an empty pane plus typing dots — the durable
|
||||
// activity_logs row didn't exist yet — and a workspace-server restart /
|
||||
// deploy / OOM between the canvas 200 and the goroutine's commit lost
|
||||
// the message permanently (chat-history is read from activity_logs; no
|
||||
// row == no message on reopen). The poll-mode arm was separately
|
||||
// covered by a2a_poll_ingest_persist_test.go; this file covers the
|
||||
// push-mode arm.
|
||||
//
|
||||
// Fix: persistUserMessageAtIngest runs SYNCHRONOUSLY in
|
||||
// proxyA2ARequest, immediately after normalizeA2APayload and BEFORE
|
||||
// the poll/mock/push short-circuits, with context.WithoutCancel so a
|
||||
// client disconnect on chat-exit cannot cancel the write. Idempotent
|
||||
// on messageId via the partial unique index (idx_activity_logs_msg_id)
|
||||
// + ON CONFLICT DO UPDATE in logActivityExec — the same row is later
|
||||
// updated by logA2ASuccess to attach response_body, so chat-history
|
||||
// reads back exactly one (user, agent) pair per messageId with no
|
||||
// duplicate bubble.
|
||||
//
|
||||
// Defining assertions for this test:
|
||||
// 1. The ingest INSERT into activity_logs MUST complete BEFORE
|
||||
// proxyA2ARequest returns (i.e. before the 200 reaches the client).
|
||||
// Pre-fix: handler returns ~instantly while the INSERT is still
|
||||
// racing in a detached goroutine → elapsed ≪ insertDelay.
|
||||
// Post-fix: handler return is gated on the INSERT → elapsed ≥
|
||||
// insertDelay.
|
||||
// 2. The ingest row carries the canvas user's messageId. The
|
||||
// completion row (logA2ASuccess) attaches response_body to the
|
||||
// SAME row via ON CONFLICT — chat-history emits a single
|
||||
// (user, agent) pair, not two.
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func TestProxyA2A_PushMode_PersistsUserMessageSynchronouslyBeforeDispatch(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
const wsID = "ws-push-sync-persist"
|
||||
const insertDelay = 50 * time.Millisecond
|
||||
|
||||
expectBudgetCheck(mock, wsID)
|
||||
|
||||
// Ingest at receipt (the fix). Synchronous — handler return is
|
||||
// gated on this write completing. A detached-goroutine ingest
|
||||
// (pre-fix) does NOT block; a synchronous ingest does.
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillDelayFor(insertDelay).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
|
||||
// Canvas user message with explicit messageId. The fix is
|
||||
// observable only when the messageId is set (otherwise
|
||||
// persistUserMessageAtIngest short-circuits and the completion
|
||||
// path remains authoritative — pre-#2560 behavior, no regression).
|
||||
body := `{"jsonrpc":"2.0","id":"push-canvas-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-2560-push-1","parts":[{"text":"my own message"}]}}}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
start := time.Now()
|
||||
handler.ProxyA2A(c)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
// Defining assertion #1: handler must not have returned the
|
||||
// response before the durable ingest INSERT committed. Pre-fix this
|
||||
// fails (elapsed ≈ 0, INSERT still racing in goAsync).
|
||||
if elapsed < insertDelay {
|
||||
t.Fatalf("push-mode handler returned in %v, before the %v user-message ingest INSERT — "+
|
||||
"mid-turn leave/refresh re-hydrates an empty pane (DATA LOSS). "+
|
||||
"persistUserMessageAtIngest must be synchronous before dispatch.", elapsed, insertDelay)
|
||||
}
|
||||
|
||||
// Defining assertion #2: the durable write actually happened by the
|
||||
// time the handler returned. ExpectationsWereMet() in a goroutine
|
||||
// with a hard 2s timeout — fails fast (no CI hang) on regression
|
||||
// while returning promptly on success.
|
||||
expectDone := make(chan error, 1)
|
||||
go func() { expectDone <- mock.ExpectationsWereMet() }()
|
||||
select {
|
||||
case err := <-expectDone:
|
||||
if err != nil {
|
||||
t.Fatalf("user-message ingest INSERT was not durable at handler return (unmet sqlmock expectations): %v", err)
|
||||
}
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("ExpectationsWereMet() hung for >2s — INSERT mock never fired. " +
|
||||
"Likely cause: production code regressed persistUserMessageAtIngest to async " +
|
||||
"(INSERT fires after handler returns, not before).")
|
||||
}
|
||||
|
||||
// Sanity: the response is JSON. We don't assert the body shape
|
||||
// here — push-mode goes through a longer code path (resolveAgentURL,
|
||||
// preflight, dispatch) and short-circuits in this test setup
|
||||
// (the dispatch will fail with the mock URL, but the ingest write
|
||||
// has already happened, which is the only thing #2560 tests).
|
||||
if w.Code == 0 {
|
||||
t.Errorf("handler did not write a response (w.Code == 0)")
|
||||
}
|
||||
_ = json.RawMessage{}
|
||||
_ = http.StatusOK
|
||||
}
|
||||
@@ -1024,15 +1024,34 @@ func logActivityExec(ctx context.Context, exec activityExecutor, broadcaster eve
|
||||
// whose text still contains `INSERT INTO activity_logs` — preserves the
|
||||
// existing sqlmock expectations across the codebase. $1 (workspace_id) is
|
||||
// reused by the UPDATE; arg list is otherwise unchanged.
|
||||
//
|
||||
// #2560 (chat UX: persist in-flight exchange): the INSERT now also
|
||||
// carries message_id ($13) and uses ON CONFLICT (workspace_id,
|
||||
// message_id) DO UPDATE to attach the agent's response_body onto the
|
||||
// SAME row that the ingest path (#2560) wrote at receipt — so a
|
||||
// mid-turn leave/refresh shows the user message in chat-history
|
||||
// (request_body from the ingest row), and the eventual completion
|
||||
// stamps response_body + status onto the same row (no duplicate
|
||||
// bubble). The conflict target only fires when message_id IS NOT NULL
|
||||
// (idx_activity_logs_msg_id is partial); rows without message_id keep
|
||||
// the original always-INSERT behavior.
|
||||
if _, err := exec.ExecContext(ctx, `
|
||||
WITH ins AS (
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, tool_trace, duration_ms, status, error_detail)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12)
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, message_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12, $13)
|
||||
ON CONFLICT (workspace_id, message_id) WHERE message_id IS NOT NULL
|
||||
DO UPDATE SET
|
||||
response_body = COALESCE(EXCLUDED.response_body, activity_logs.response_body),
|
||||
status = EXCLUDED.status,
|
||||
duration_ms = COALESCE(EXCLUDED.duration_ms, activity_logs.duration_ms),
|
||||
error_detail = EXCLUDED.error_detail,
|
||||
tool_trace = CASE WHEN EXCLUDED.tool_trace IS NOT NULL THEN EXCLUDED.tool_trace ELSE activity_logs.tool_trace END
|
||||
)
|
||||
UPDATE workspaces SET last_activity_at = now() WHERE id = $1
|
||||
`, params.WorkspaceID, params.ActivityType, params.SourceID, params.TargetID,
|
||||
params.Method, params.Summary, reqStr, respStr, traceStr,
|
||||
params.DurationMs, params.Status, params.ErrorDetail); err != nil {
|
||||
params.DurationMs, params.Status, params.ErrorDetail,
|
||||
nilIfEmpty(params.MessageId)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -1103,5 +1122,14 @@ type ActivityParams struct {
|
||||
ToolTrace json.RawMessage // tools/commands the agent actually invoked
|
||||
DurationMs *int
|
||||
Status string // ok, error, timeout
|
||||
// MessageId, when non-empty, is persisted to activity_logs.message_id
|
||||
// and is the conflict key for the partial unique index
|
||||
// (idx_activity_logs_msg_id). The ingest path (#2560) sets this so a
|
||||
// mid-turn leave/refresh shows the user message in chat-history
|
||||
// hydration; the completion path (logA2ASuccess / logA2AReceiveQueued)
|
||||
// also sets it and uses ON CONFLICT (workspace_id, message_id) DO
|
||||
// UPDATE to stamp response_body onto the same row instead of inserting
|
||||
// a duplicate. Empty for non-per-message activity (task_update, etc.).
|
||||
MessageId string
|
||||
ErrorDetail *string
|
||||
}
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
-- Reverse issue #2560 ingest-row column + idempotency index.
|
||||
-- Non-destructive: dropping the index + column simply reverts the
|
||||
-- chat-history-mid-turn-persistence feature; no data is lost because
|
||||
-- the original logA2ASuccess / logA2AReceiveQueued paths are still
|
||||
-- intact (they re-INSERT the full message on completion in the
|
||||
-- non-message_id-keyed shape).
|
||||
DROP INDEX IF EXISTS idx_activity_logs_msg_id;
|
||||
ALTER TABLE IF EXISTS activity_logs DROP COLUMN IF EXISTS message_id;
|
||||
@@ -0,0 +1,32 @@
|
||||
-- Issue #2560 (chat UX: persist in-flight exchange across leave/refresh):
|
||||
-- the user chat message must be written to activity_logs AT RECEIPT, not
|
||||
-- only at turn completion, so a mid-turn leave/refresh doesn't drop the
|
||||
-- pending message.
|
||||
--
|
||||
-- The completion path (logA2ASuccess) reuses the SAME activity_logs row
|
||||
-- via ON CONFLICT (workspace_id, message_id) DO UPDATE — the ingest row
|
||||
-- carries the user message in request_body; the completion attach stamps
|
||||
-- response_body + status onto it. Idempotent on messageId (per #2560 spec)
|
||||
-- means a duplicated ingest (server retry, or a poll-mode write followed
|
||||
-- by a push-mode write on the same messageId) is a no-op rather than a
|
||||
-- duplicate bubble in chat-history.
|
||||
--
|
||||
-- Column:
|
||||
-- message_id TEXT — extracted from params.message.messageId in
|
||||
-- normalizeA2APayload. Stored as TEXT (UUID-as-text shape) so the same
|
||||
-- string the canvas sent is the same string we dedupe on, regardless
|
||||
-- of UUID canonicalization. NULL for legacy rows and for activity that
|
||||
-- is NOT a per-message row (task_update, agent_log, error, etc.) — the
|
||||
-- unique index is partial on message_id IS NOT NULL so non-message
|
||||
-- rows never collide.
|
||||
ALTER TABLE IF EXISTS activity_logs
|
||||
ADD COLUMN IF NOT EXISTS message_id TEXT;
|
||||
|
||||
-- Partial unique index — the conflict target for ON CONFLICT (workspace_id,
|
||||
-- message_id) DO NOTHING / DO UPDATE. Excludes rows where message_id is
|
||||
-- NULL (non-per-message activity) so existing activity never collides.
|
||||
-- Coexists with the existing (workspace_id, activity_type, created_at DESC)
|
||||
-- index — the partial is a write-path key, the existing is a read-path key.
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_activity_logs_msg_id
|
||||
ON activity_logs (workspace_id, message_id)
|
||||
WHERE message_id IS NOT NULL;
|
||||
Reference in New Issue
Block a user