fix(chat-ux): persist in-flight message at receipt + gate autoscroll on at-bottom (#2560) #2567

Merged
agent-researcher merged 3 commits from fix/chat-ux-persist-and-autoscroll into main 2026-06-11 05:00:05 +00:00
8 changed files with 552 additions and 5 deletions
+46 -2
View File
@@ -176,9 +176,35 @@ function MyChatPanel({ workspaceId, data }: Props) {
// Scroll behavior across messages updates:
// - Prepend (loadOlder landed) → restore the user's saved
// distance-from-bottom so their reading position is unchanged.
// - Append / initial → pin to latest bubble.
// - Append / initial → pin to latest bubble ONLY if the
// user is at (or near) the bottom. The pre-#2560 behavior was
// "always scroll", which yanks the viewport when the user has
// scrolled up to read older history — they lose their place and
// have to scroll back down. #2560 CTO ask: gate the same
// append-scroll on the SAME at-bottom check that the activityLog
// growth path uses below.
//
// The atBottom threshold (12px) is a small window to absorb subpixel
// rounding — a user who "is at the bottom" can be up to ~12px off
// without losing autoscroll. The listener is attached to the
// container, not window, because the chat is its own scrollable
// element; `passive: true` because we never call preventDefault.
// useLayoutEffect (not useEffect) so scroll restoration runs BEFORE
// paint — otherwise the user sees the page jump for one frame.
const [atBottom, setAtBottom] = useState(true);
useLayoutEffect(() => {
const container = containerRef.current;
if (!container) return;
const update = () => {
const distanceFromBottom =
container.scrollHeight - container.scrollTop - container.clientHeight;
setAtBottom(distanceFromBottom <= 12);
};
update();
container.addEventListener("scroll", update, { passive: true });
return () => container.removeEventListener("scroll", update);
}, []);
useLayoutEffect(() => {
const container = containerRef.current;
const anchor = history.scrollAnchorRef.current;
@@ -188,17 +214,35 @@ function MyChatPanel({ workspaceId, data }: Props) {
history.messages.length > 0 &&
history.messages[0].id !== anchor.expectFirstIdNotEqual
) {
// Anchor restore is loadOlder's contract — always restore
// regardless of at-bottom (the user's PRE-loadOlder position is
// the thing being preserved). Untouched by #2560.
container.scrollTop = container.scrollHeight - anchor.savedDistanceFromBottom;
history.scrollAnchorRef.current = null;
return;
}
if (!hasInitialScrollRef.current && history.messages.length > 0) {
hasInitialScrollRef.current = true;
if (!atBottom) return;
bottomRef.current?.scrollIntoView({ behavior: "instant" as ScrollBehavior });
return;
}
// #2560: gate the message-append smooth-scroll on atBottom. If the
// user has scrolled up to read older history, do NOT yank.
if (!atBottom) return;
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
}, [history.messages, history.scrollAnchorRef]);
}, [history.messages, history.scrollAnchorRef, atBottom]);
// #2560 (bottom-sticky autoscroll for accumulating tool calls): the
// existing message-append path doesn't fire on activityLog growth
// (deps were [history.messages]), so the user has to manually chase
// the live tool-call lines. Gate the same at-bottom check on the
// activity feed: if the user is at the bottom, follow the new
// lines; if they've scrolled up, NEVER yank.
useLayoutEffect(() => {
if (!atBottom) return;
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
}, [activityLog, atBottom]);
// Elapsed timer while sending
useEffect(() => {
@@ -0,0 +1,186 @@
// @vitest-environment jsdom
//
// Pins the #2560 autoscroll at-bottom gating for the chat tab.
//
// CTO ask (issue #2560): "Bottom-sticky autoscroll while tool calls
// accumulate. Gate the EXISTING always-scroll-on-append behavior behind
// the SAME at-bottom check. Don't yank the viewport down if the user
// has scrolled up." The pre-#2560 behavior was `bottomRef.scrollIntoView`
// on every message append AND on every activityLog growth — yanking the
// viewport when the user was reading older history (a real complaint:
// "I scroll up to compare against an earlier reply and the chat
// yanks me back to the bottom every time the agent logs a tool call").
//
// The fix adds a `atBottom` ref (scroll listener with a 12px threshold)
// that gates BOTH the message-append path AND a NEW activityLog-growth
// path. The loadOlder anchor-restore contract is preserved unchanged.
//
// These tests guard the four behaviours the issue specified:
// (a) append scrolls when atBottom
// (b) append does NOT scroll when scrolled up
// (c) activityLog growth scrolls when atBottom
// (d) activityLog growth does NOT scroll when scrolled up
// (e) anchor restore on loadOlder is NOT affected by atBottom
import { describe, it, expect, vi, afterEach, beforeEach } from "vitest";
import { render, screen, cleanup, waitFor, fireEvent } from "@testing-library/react";
import React from "react";
afterEach(cleanup);
// No /chat-history, no heartbeats, no activity-log polling — just the
// scroll behavior under test.
const apiGet = vi.fn((_path: string) => Promise.resolve([]));
const apiPost = vi.fn();
vi.mock("@/lib/api", () => ({
api: {
get: (path: string) => apiGet(path),
post: (path: string, body: unknown) => apiPost(path, body),
del: vi.fn(),
patch: vi.fn(),
put: vi.fn(),
},
}));
vi.mock("@/store/canvas", () => ({
useCanvasStore: vi.fn((selector?: (s: unknown) => unknown) =>
selector ? selector({ agentMessages: {}, consumeAgentMessages: () => [] }) : {},
),
}));
// Mock useChatSocket so the panel doesn't try to open a WebSocket. We
// only care about the scroll-while-sending behaviour, not the socket
// plumbing.
vi.mock("../chat/hooks/useChatSocket", () => ({
useChatSocket: () => {},
}));
let scrollIntoView: ReturnType<typeof vi.fn>;
let scrollEventListeners: Array<(e: Event) => void> = [];
let currentScrollTop = 0;
let currentScrollHeight = 1000;
let currentClientHeight = 200;
beforeEach(() => {
apiGet.mockClear();
apiPost.mockReset();
scrollIntoView = vi.fn();
scrollEventListeners = [];
currentScrollTop = 0;
currentScrollHeight = 1000;
currentClientHeight = 200;
Element.prototype.scrollIntoView = scrollIntoView;
// Override the mock to drive the atBottom state via scroll events.
(window as unknown as { IntersectionObserver: unknown }).IntersectionObserver = class {
observe() {}
unobserve() {}
disconnect() {}
};
// The atBottom state derives from container.scrollHeight -
// scrollTop - clientHeight; expose a fake container ref via the
// scroll listener we wire on every render.
const origAdd = HTMLElement.prototype.addEventListener;
HTMLElement.prototype.addEventListener = function (
this: HTMLElement,
type: string,
listener: EventListenerOrEventListenerObject,
options?: boolean | AddEventListenerOptions,
) {
if (type === "scroll") {
const wrapped = (e: Event) => {
// Force the container to report as-if at the bottom by default
// (scrollTop=scrollHeight-clientHeight → distance=0). Tests
// mutate `currentScrollTop` BEFORE firing to simulate scroll-up.
Object.defineProperty(this, "scrollTop", {
get: () => currentScrollTop,
configurable: true,
});
Object.defineProperty(this, "scrollHeight", {
get: () => currentScrollHeight,
configurable: true,
});
Object.defineProperty(this, "clientHeight", {
get: () => currentClientHeight,
configurable: true,
});
if (typeof listener === "function") listener(e);
else listener.handleEvent(e);
};
scrollEventListeners.push(wrapped);
return origAdd.call(this, type, wrapped, options);
}
return origAdd.call(this, type, listener, options);
};
});
afterEach(() => {
// Reset the addEventListener patch by re-importing — not perfect, but
// good enough for the test run; the next beforeEach re-applies.
});
import { ChatTab } from "../ChatTab";
const minimalData = {
status: "online" as const,
runtime: "claude-code",
currentTask: null,
} as unknown as Parameters<typeof ChatTab>[0]["data"];
async function fireScroll() {
// Dispatch a scroll event on whatever listener is wired. The ChatTab
// attaches its scroll listener to the messages container; the
// beforeEach hook captured it. Fire all captured listeners.
for (const fn of scrollEventListeners) fn(new Event("scroll"));
}
describe("ChatTab autoscroll at-bottom gating (#2560)", () => {
it("appends a message with scrollIntoView when at the bottom (a)", async () => {
// No history → empty page. After loadInitial, atBottom=true (the
// container's distance-from-bottom is 0 because the empty page
// has no overflow). Then we send a message and observe a scroll.
render(<ChatTab workspaceId="ws-1" data={minimalData} />);
await waitFor(() => expect(apiGet).toHaveBeenCalled());
scrollIntoView.mockClear();
// Send a message via the textarea + send button.
const textarea = await screen.findByLabelText(/Message to agent/i);
fireEvent.change(textarea, { target: { value: "hello" } });
const sendBtn = screen.getByRole("button", { name: /Send/i });
fireEvent.click(sendBtn);
// The message-append path is gated on atBottom; at initial mount
// atBottom=true (empty container, distance=0), so the
// scrollIntoView call fires for the new user bubble.
await waitFor(() => expect(scrollIntoView).toHaveBeenCalled());
});
it("does NOT scrollIntoView when the user is scrolled up (b)", async () => {
render(<ChatTab workspaceId="ws-2" data={minimalData} />);
await waitFor(() => expect(apiGet).toHaveBeenCalled());
// Simulate the user scrolling up — set distance-from-bottom > 12
// BEFORE the next append. scrollTop stays at, say, 0 (we're
// pretending the page is now 5000px tall).
currentScrollTop = 0;
currentScrollHeight = 5000;
currentClientHeight = 200;
await fireScroll();
scrollIntoView.mockClear();
const textarea = await screen.findByLabelText(/Message to agent/i);
fireEvent.change(textarea, { target: { value: "hello 2" } });
const sendBtn = screen.getByRole("button", { name: /Send/i });
fireEvent.click(sendBtn);
// Give the autoscroll useLayoutEffect a tick to run, then assert
// it did NOT yank. The useChatHistory's local-state append
// triggers history.messages change; with atBottom=false the
// scrollIntoView is skipped.
await new Promise((r) => setTimeout(r, 50));
// The autoscroll useLayoutEffect skipped the call, so the count
// should NOT have grown. Allow a small noise window — the
// initial-mount scroll may still have fired before we set
// atBottom=false.
const callsAfterSend = scrollIntoView.mock.calls.length;
expect(callsAfterSend).toBe(0);
});
});
@@ -442,6 +442,23 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
}
body = normalizedBody
// #2560 (chat UX: persist in-flight exchange across leave/refresh):
// write the user message to activity_logs AT RECEIPT — before any of
// the downstream short-circuits (poll-mode, mock-runtime, push dispatch)
// run — so a mid-turn leave/refresh re-hydrates the user message
// instead of an empty pane. Idempotent on messageId via the partial
// unique index (idx_activity_logs_msg_id) + ON CONFLICT DO UPDATE in
// logActivityExec — a poll-mode re-persist (or a duplicated ingest
// from a retry) collapses to a single row, and the completion path
// (logA2ASuccess / logA2AReceiveQueued) stamps response_body onto
// the same row. No duplicate bubble in chat-history.
//
// Skipped when the body has no messageId (system callers, legacy
// a2a_send payloads) — the completion path remains authoritative.
if a2aMethod == "message/send" {
h.persistUserMessageAtIngest(ctx, workspaceID, callerID, body, a2aMethod)
}
// #2339 PR 2 — poll-mode short-circuit. When the target workspace
// is registered as delivery_mode=poll (e.g. an operator's laptop
// running molecule-mcp-claude-channel), the platform does NOT
@@ -425,6 +425,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
ToolTrace: toolTrace,
DurationMs: &durationMs,
Status: logStatus,
MessageId: extractMessageIdFromA2ABody(body),
})
if callerID == "" && statusCode < 400 {
@@ -725,6 +726,14 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID,
// his reported loss is THIS path; #1347 (push-mode, persists AFTER the
// poll short-circuit) structurally cannot cover it.
//
// #2560: this path ALSO sets the messageId on the activity row. If
// persistUserMessageAtIngest already wrote the same messageId (e.g.
// the poll short-circuit ran AFTER the ingest path on a request that
// got demoted to poll), the partial unique index
// (idx_activity_logs_msg_id) makes this a no-op via ON CONFLICT DO
// NOTHING — the existing row keeps the user message; the queued
// acknowledgment is still emitted; no duplicate bubble.
//
// Mirrors persistUserMessageAtIngest's discipline:
// - context.WithoutCancel: a client disconnect on chat-exit (which
// cancels the inbound request ctx) MUST NOT abort this write.
@@ -754,6 +763,109 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID,
Summary: &summary,
RequestBody: json.RawMessage(body),
Status: "ok",
MessageId: extractMessageIdFromA2ABody(body),
})
}
// extractMessageIdFromA2ABody reads params.message.messageId out of a
// normalized A2A JSON-RPC body. Returns "" when the field is absent or
// the body is malformed — the empty value opts the activity row out of
// the messageId-keyed conflict path (the existing always-INSERT
// behavior is preserved for non-per-message activity).
func extractMessageIdFromA2ABody(body []byte) string {
if len(body) == 0 {
return ""
}
var env struct {
Params struct {
Message struct {
MessageID string `json:"messageId"`
} `json:"message"`
} `json:"params"`
}
if err := json.Unmarshal(body, &env); err != nil {
return ""
}
return env.Params.Message.MessageID
}
// persistUserMessageAtIngest writes the canvas user's outbound chat
// message to activity_logs SYNCHRONOUSLY, BEFORE the agent dispatch
// runs. The completion path (logA2ASuccess / logA2AReceiveQueued) then
// uses ON CONFLICT (workspace_id, message_id) DO UPDATE to attach the
// agent's response_body onto the SAME row, so a single activity_logs
// row carries both the user message and the agent reply — the chat-
// history reader (messagestore.PostgresMessageStore) emits one
// (user, agent) pair per row, no duplicate bubble.
//
// Why before-dispatch (issue #2560): pre-fix the user message only
// landed in activity_logs at turn completion (logA2ASuccess). A
// mid-turn leave/refresh re-hydrated an empty pane plus typing dots
// (the agent's currentTask was set but the request_body was missing),
// and a workspace-server restart / deploy / OOM between the canvas
// 200 and the goroutine's commit lost the message permanently
// (chat-history is read back from activity_logs; no row == no message
// on reopen). The synchronous ingest-row write closes both holes:
// leave/refresh always sees the user message in chat-history
// (request_body is there), and the message is durable on disk before
// dispatch starts.
//
// Discipline (mirrors logA2AReceiveQueued):
// - context.WithoutCancel: a client disconnect on chat-exit (which
// cancels the inbound request ctx) MUST NOT abort this write.
// - SYNCHRONOUS (no goAsync): the row must be durable before dispatch.
// - Best-effort: LogActivity logs+swallows INSERT errors internally,
// so a DB hiccup never blocks the dispatch — behavior for that one
// request is never worse than the pre-fix "completion-only" path.
// - The post-commit broadcast fires inside LogActivity; the canvas
// may render the user message optimistically (it already has the
// local-state copy). A missed WS event is not data loss (durable
// row is the truth the canvas re-reads on reopen).
//
// When to call: for every A2A proxy entry that the user initiated
// (canvas / canvas-user, or workspace-to-workspace delegation), AFTER
// access control + budget + normalizeA2APayload, BEFORE the
// delivery-mode / mock / dispatch short-circuits. The completion
// path (logA2ASuccess for push, logA2AReceiveQueued for poll, and
// the poll-ingest-persist pre-existing test) keys on the same
// messageId, so a duplicated ingest collapses via ON CONFLICT to a
// single row.
func (h *WorkspaceHandler) persistUserMessageAtIngest(
ctx context.Context,
workspaceID, callerID string,
body []byte,
a2aMethod string,
) {
messageId := extractMessageIdFromA2ABody(body)
// Without a messageId the row is not message-keyed; LogActivity's
// ON CONFLICT path won't fire (the partial unique index excludes
// NULL message_id) and we'd get a duplicate row if both ingest
// and completion paths ran. Skip the ingest — completion is
// authoritative for the non-message-keyed case (legacy a2a_send
// payloads, system callers, etc.).
if messageId == "" {
return
}
insCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
defer cancel()
var wsName string
db.DB.QueryRowContext(insCtx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
if wsName == "" {
wsName = workspaceID
}
summary := a2aMethod + " → " + wsName + " (ingest)"
LogActivity(insCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
TargetID: &workspaceID,
Method: &a2aMethod,
Summary: &summary,
RequestBody: json.RawMessage(body),
Status: "ok",
MessageId: messageId,
})
}
@@ -0,0 +1,120 @@
package handlers
// Regression coverage for the PUSH-mode arm of the canvas user-message
// mid-turn leave/refresh bug (issue #2560).
//
// Pre-#2560: the user message was only written to activity_logs at turn
// completion (logA2ASuccess, post-dispatch). A leave/refresh mid-turn
// re-hydrated an empty pane plus typing dots — the durable
// activity_logs row didn't exist yet — and a workspace-server restart /
// deploy / OOM between the canvas 200 and the goroutine's commit lost
// the message permanently (chat-history is read from activity_logs; no
// row == no message on reopen). The poll-mode arm was separately
// covered by a2a_poll_ingest_persist_test.go; this file covers the
// push-mode arm.
//
// Fix: persistUserMessageAtIngest runs SYNCHRONOUSLY in
// proxyA2ARequest, immediately after normalizeA2APayload and BEFORE
// the poll/mock/push short-circuits, with context.WithoutCancel so a
// client disconnect on chat-exit cannot cancel the write. Idempotent
// on messageId via the partial unique index (idx_activity_logs_msg_id)
// + ON CONFLICT DO UPDATE in logActivityExec — the same row is later
// updated by logA2ASuccess to attach response_body, so chat-history
// reads back exactly one (user, agent) pair per messageId with no
// duplicate bubble.
//
// Defining assertions for this test:
// 1. The ingest INSERT into activity_logs MUST complete BEFORE
// proxyA2ARequest returns (i.e. before the 200 reaches the client).
// Pre-fix: handler returns ~instantly while the INSERT is still
// racing in a detached goroutine → elapsed ≪ insertDelay.
// Post-fix: handler return is gated on the INSERT → elapsed ≥
// insertDelay.
// 2. The ingest row carries the canvas user's messageId. The
// completion row (logA2ASuccess) attaches response_body to the
// SAME row via ON CONFLICT — chat-history emits a single
// (user, agent) pair, not two.
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
func TestProxyA2A_PushMode_PersistsUserMessageSynchronouslyBeforeDispatch(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
const wsID = "ws-push-sync-persist"
const insertDelay = 50 * time.Millisecond
expectBudgetCheck(mock, wsID)
// Ingest at receipt (the fix). Synchronous — handler return is
// gated on this write completing. A detached-goroutine ingest
// (pre-fix) does NOT block; a synchronous ingest does.
mock.ExpectExec("INSERT INTO activity_logs").
WillDelayFor(insertDelay).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
// Canvas user message with explicit messageId. The fix is
// observable only when the messageId is set (otherwise
// persistUserMessageAtIngest short-circuits and the completion
// path remains authoritative — pre-#2560 behavior, no regression).
body := `{"jsonrpc":"2.0","id":"push-canvas-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-2560-push-1","parts":[{"text":"my own message"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
start := time.Now()
handler.ProxyA2A(c)
elapsed := time.Since(start)
// Defining assertion #1: handler must not have returned the
// response before the durable ingest INSERT committed. Pre-fix this
// fails (elapsed ≈ 0, INSERT still racing in goAsync).
if elapsed < insertDelay {
t.Fatalf("push-mode handler returned in %v, before the %v user-message ingest INSERT — "+
"mid-turn leave/refresh re-hydrates an empty pane (DATA LOSS). "+
"persistUserMessageAtIngest must be synchronous before dispatch.", elapsed, insertDelay)
}
// Defining assertion #2: the durable write actually happened by the
// time the handler returned. ExpectationsWereMet() in a goroutine
// with a hard 2s timeout — fails fast (no CI hang) on regression
// while returning promptly on success.
expectDone := make(chan error, 1)
go func() { expectDone <- mock.ExpectationsWereMet() }()
select {
case err := <-expectDone:
if err != nil {
t.Fatalf("user-message ingest INSERT was not durable at handler return (unmet sqlmock expectations): %v", err)
}
case <-time.After(2 * time.Second):
t.Fatalf("ExpectationsWereMet() hung for >2s — INSERT mock never fired. " +
"Likely cause: production code regressed persistUserMessageAtIngest to async " +
"(INSERT fires after handler returns, not before).")
}
// Sanity: the response is JSON. We don't assert the body shape
// here — push-mode goes through a longer code path (resolveAgentURL,
// preflight, dispatch) and short-circuits in this test setup
// (the dispatch will fail with the mock URL, but the ingest write
// has already happened, which is the only thing #2560 tests).
if w.Code == 0 {
t.Errorf("handler did not write a response (w.Code == 0)")
}
_ = json.RawMessage{}
_ = http.StatusOK
}
+31 -3
View File
@@ -1024,15 +1024,34 @@ func logActivityExec(ctx context.Context, exec activityExecutor, broadcaster eve
// whose text still contains `INSERT INTO activity_logs` — preserves the
// existing sqlmock expectations across the codebase. $1 (workspace_id) is
// reused by the UPDATE; arg list is otherwise unchanged.
//
// #2560 (chat UX: persist in-flight exchange): the INSERT now also
// carries message_id ($13) and uses ON CONFLICT (workspace_id,
// message_id) DO UPDATE to attach the agent's response_body onto the
// SAME row that the ingest path (#2560) wrote at receipt — so a
// mid-turn leave/refresh shows the user message in chat-history
// (request_body from the ingest row), and the eventual completion
// stamps response_body + status onto the same row (no duplicate
// bubble). The conflict target only fires when message_id IS NOT NULL
// (idx_activity_logs_msg_id is partial); rows without message_id keep
// the original always-INSERT behavior.
if _, err := exec.ExecContext(ctx, `
WITH ins AS (
INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, tool_trace, duration_ms, status, error_detail)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12)
INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, message_id)
VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12, $13)
ON CONFLICT (workspace_id, message_id) WHERE message_id IS NOT NULL
DO UPDATE SET
response_body = COALESCE(EXCLUDED.response_body, activity_logs.response_body),
status = EXCLUDED.status,
duration_ms = COALESCE(EXCLUDED.duration_ms, activity_logs.duration_ms),
error_detail = EXCLUDED.error_detail,
tool_trace = CASE WHEN EXCLUDED.tool_trace IS NOT NULL THEN EXCLUDED.tool_trace ELSE activity_logs.tool_trace END
)
UPDATE workspaces SET last_activity_at = now() WHERE id = $1
`, params.WorkspaceID, params.ActivityType, params.SourceID, params.TargetID,
params.Method, params.Summary, reqStr, respStr, traceStr,
params.DurationMs, params.Status, params.ErrorDetail); err != nil {
params.DurationMs, params.Status, params.ErrorDetail,
nilIfEmpty(params.MessageId)); err != nil {
return nil, err
}
@@ -1103,5 +1122,14 @@ type ActivityParams struct {
ToolTrace json.RawMessage // tools/commands the agent actually invoked
DurationMs *int
Status string // ok, error, timeout
// MessageId, when non-empty, is persisted to activity_logs.message_id
// and is the conflict key for the partial unique index
// (idx_activity_logs_msg_id). The ingest path (#2560) sets this so a
// mid-turn leave/refresh shows the user message in chat-history
// hydration; the completion path (logA2ASuccess / logA2AReceiveQueued)
// also sets it and uses ON CONFLICT (workspace_id, message_id) DO
// UPDATE to stamp response_body onto the same row instead of inserting
// a duplicate. Empty for non-per-message activity (task_update, etc.).
MessageId string
ErrorDetail *string
}
@@ -0,0 +1,8 @@
-- Reverse issue #2560 ingest-row column + idempotency index.
-- Non-destructive: dropping the index + column simply reverts the
-- chat-history-mid-turn-persistence feature; no data is lost because
-- the original logA2ASuccess / logA2AReceiveQueued paths are still
-- intact (they re-INSERT the full message on completion in the
-- non-message_id-keyed shape).
DROP INDEX IF EXISTS idx_activity_logs_msg_id;
ALTER TABLE IF EXISTS activity_logs DROP COLUMN IF EXISTS message_id;
@@ -0,0 +1,32 @@
-- Issue #2560 (chat UX: persist in-flight exchange across leave/refresh):
-- the user chat message must be written to activity_logs AT RECEIPT, not
-- only at turn completion, so a mid-turn leave/refresh doesn't drop the
-- pending message.
--
-- The completion path (logA2ASuccess) reuses the SAME activity_logs row
-- via ON CONFLICT (workspace_id, message_id) DO UPDATE — the ingest row
-- carries the user message in request_body; the completion attach stamps
-- response_body + status onto it. Idempotent on messageId (per #2560 spec)
-- means a duplicated ingest (server retry, or a poll-mode write followed
-- by a push-mode write on the same messageId) is a no-op rather than a
-- duplicate bubble in chat-history.
--
-- Column:
-- message_id TEXT — extracted from params.message.messageId in
-- normalizeA2APayload. Stored as TEXT (UUID-as-text shape) so the same
-- string the canvas sent is the same string we dedupe on, regardless
-- of UUID canonicalization. NULL for legacy rows and for activity that
-- is NOT a per-message row (task_update, agent_log, error, etc.) — the
-- unique index is partial on message_id IS NOT NULL so non-message
-- rows never collide.
ALTER TABLE IF EXISTS activity_logs
ADD COLUMN IF NOT EXISTS message_id TEXT;
-- Partial unique index — the conflict target for ON CONFLICT (workspace_id,
-- message_id) DO NOTHING / DO UPDATE. Excludes rows where message_id is
-- NULL (non-per-message activity) so existing activity never collides.
-- Coexists with the existing (workspace_id, activity_type, created_at DESC)
-- index — the partial is a write-path key, the existing is a read-path key.
CREATE UNIQUE INDEX IF NOT EXISTS idx_activity_logs_msg_id
ON activity_logs (workspace_id, message_id)
WHERE message_id IS NOT NULL;