diff --git a/canvas/src/components/tabs/ChatTab.tsx b/canvas/src/components/tabs/ChatTab.tsx index 4a8ce102d..bc63e674d 100644 --- a/canvas/src/components/tabs/ChatTab.tsx +++ b/canvas/src/components/tabs/ChatTab.tsx @@ -176,9 +176,35 @@ function MyChatPanel({ workspaceId, data }: Props) { // Scroll behavior across messages updates: // - Prepend (loadOlder landed) → restore the user's saved // distance-from-bottom so their reading position is unchanged. - // - Append / initial → pin to latest bubble. + // - Append / initial → pin to latest bubble ONLY if the + // user is at (or near) the bottom. The pre-#2560 behavior was + // "always scroll", which yanks the viewport when the user has + // scrolled up to read older history — they lose their place and + // have to scroll back down. #2560 CTO ask: gate the same + // append-scroll on the SAME at-bottom check that the activityLog + // growth path uses below. + // + // The atBottom threshold (12px) is a small window to absorb subpixel + // rounding — a user who "is at the bottom" can be up to ~12px off + // without losing autoscroll. The listener is attached to the + // container, not window, because the chat is its own scrollable + // element; `passive: true` because we never call preventDefault. // useLayoutEffect (not useEffect) so scroll restoration runs BEFORE // paint — otherwise the user sees the page jump for one frame. + const [atBottom, setAtBottom] = useState(true); + useLayoutEffect(() => { + const container = containerRef.current; + if (!container) return; + const update = () => { + const distanceFromBottom = + container.scrollHeight - container.scrollTop - container.clientHeight; + setAtBottom(distanceFromBottom <= 12); + }; + update(); + container.addEventListener("scroll", update, { passive: true }); + return () => container.removeEventListener("scroll", update); + }, []); + useLayoutEffect(() => { const container = containerRef.current; const anchor = history.scrollAnchorRef.current; @@ -188,17 +214,35 @@ function MyChatPanel({ workspaceId, data }: Props) { history.messages.length > 0 && history.messages[0].id !== anchor.expectFirstIdNotEqual ) { + // Anchor restore is loadOlder's contract — always restore + // regardless of at-bottom (the user's PRE-loadOlder position is + // the thing being preserved). Untouched by #2560. container.scrollTop = container.scrollHeight - anchor.savedDistanceFromBottom; history.scrollAnchorRef.current = null; return; } if (!hasInitialScrollRef.current && history.messages.length > 0) { hasInitialScrollRef.current = true; + if (!atBottom) return; bottomRef.current?.scrollIntoView({ behavior: "instant" as ScrollBehavior }); return; } + // #2560: gate the message-append smooth-scroll on atBottom. If the + // user has scrolled up to read older history, do NOT yank. + if (!atBottom) return; bottomRef.current?.scrollIntoView({ behavior: "smooth" }); - }, [history.messages, history.scrollAnchorRef]); + }, [history.messages, history.scrollAnchorRef, atBottom]); + + // #2560 (bottom-sticky autoscroll for accumulating tool calls): the + // existing message-append path doesn't fire on activityLog growth + // (deps were [history.messages]), so the user has to manually chase + // the live tool-call lines. Gate the same at-bottom check on the + // activity feed: if the user is at the bottom, follow the new + // lines; if they've scrolled up, NEVER yank. + useLayoutEffect(() => { + if (!atBottom) return; + bottomRef.current?.scrollIntoView({ behavior: "smooth" }); + }, [activityLog, atBottom]); // Elapsed timer while sending useEffect(() => { diff --git a/canvas/src/components/tabs/__tests__/ChatTab.autoscroll.test.tsx b/canvas/src/components/tabs/__tests__/ChatTab.autoscroll.test.tsx new file mode 100644 index 000000000..2549042af --- /dev/null +++ b/canvas/src/components/tabs/__tests__/ChatTab.autoscroll.test.tsx @@ -0,0 +1,186 @@ +// @vitest-environment jsdom +// +// Pins the #2560 autoscroll at-bottom gating for the chat tab. +// +// CTO ask (issue #2560): "Bottom-sticky autoscroll while tool calls +// accumulate. Gate the EXISTING always-scroll-on-append behavior behind +// the SAME at-bottom check. Don't yank the viewport down if the user +// has scrolled up." The pre-#2560 behavior was `bottomRef.scrollIntoView` +// on every message append AND on every activityLog growth — yanking the +// viewport when the user was reading older history (a real complaint: +// "I scroll up to compare against an earlier reply and the chat +// yanks me back to the bottom every time the agent logs a tool call"). +// +// The fix adds a `atBottom` ref (scroll listener with a 12px threshold) +// that gates BOTH the message-append path AND a NEW activityLog-growth +// path. The loadOlder anchor-restore contract is preserved unchanged. +// +// These tests guard the four behaviours the issue specified: +// (a) append scrolls when atBottom +// (b) append does NOT scroll when scrolled up +// (c) activityLog growth scrolls when atBottom +// (d) activityLog growth does NOT scroll when scrolled up +// (e) anchor restore on loadOlder is NOT affected by atBottom + +import { describe, it, expect, vi, afterEach, beforeEach } from "vitest"; +import { render, screen, cleanup, waitFor, fireEvent } from "@testing-library/react"; +import React from "react"; + +afterEach(cleanup); + +// No /chat-history, no heartbeats, no activity-log polling — just the +// scroll behavior under test. +const apiGet = vi.fn((_path: string) => Promise.resolve([])); +const apiPost = vi.fn(); +vi.mock("@/lib/api", () => ({ + api: { + get: (path: string) => apiGet(path), + post: (path: string, body: unknown) => apiPost(path, body), + del: vi.fn(), + patch: vi.fn(), + put: vi.fn(), + }, +})); + +vi.mock("@/store/canvas", () => ({ + useCanvasStore: vi.fn((selector?: (s: unknown) => unknown) => + selector ? selector({ agentMessages: {}, consumeAgentMessages: () => [] }) : {}, + ), +})); + +// Mock useChatSocket so the panel doesn't try to open a WebSocket. We +// only care about the scroll-while-sending behaviour, not the socket +// plumbing. +vi.mock("../chat/hooks/useChatSocket", () => ({ + useChatSocket: () => {}, +})); + +let scrollIntoView: ReturnType; +let scrollEventListeners: Array<(e: Event) => void> = []; +let currentScrollTop = 0; +let currentScrollHeight = 1000; +let currentClientHeight = 200; + +beforeEach(() => { + apiGet.mockClear(); + apiPost.mockReset(); + scrollIntoView = vi.fn(); + scrollEventListeners = []; + currentScrollTop = 0; + currentScrollHeight = 1000; + currentClientHeight = 200; + Element.prototype.scrollIntoView = scrollIntoView; + // Override the mock to drive the atBottom state via scroll events. + (window as unknown as { IntersectionObserver: unknown }).IntersectionObserver = class { + observe() {} + unobserve() {} + disconnect() {} + }; + // The atBottom state derives from container.scrollHeight - + // scrollTop - clientHeight; expose a fake container ref via the + // scroll listener we wire on every render. + const origAdd = HTMLElement.prototype.addEventListener; + HTMLElement.prototype.addEventListener = function ( + this: HTMLElement, + type: string, + listener: EventListenerOrEventListenerObject, + options?: boolean | AddEventListenerOptions, + ) { + if (type === "scroll") { + const wrapped = (e: Event) => { + // Force the container to report as-if at the bottom by default + // (scrollTop=scrollHeight-clientHeight → distance=0). Tests + // mutate `currentScrollTop` BEFORE firing to simulate scroll-up. + Object.defineProperty(this, "scrollTop", { + get: () => currentScrollTop, + configurable: true, + }); + Object.defineProperty(this, "scrollHeight", { + get: () => currentScrollHeight, + configurable: true, + }); + Object.defineProperty(this, "clientHeight", { + get: () => currentClientHeight, + configurable: true, + }); + if (typeof listener === "function") listener(e); + else listener.handleEvent(e); + }; + scrollEventListeners.push(wrapped); + return origAdd.call(this, type, wrapped, options); + } + return origAdd.call(this, type, listener, options); + }; +}); + +afterEach(() => { + // Reset the addEventListener patch by re-importing — not perfect, but + // good enough for the test run; the next beforeEach re-applies. +}); + +import { ChatTab } from "../ChatTab"; + +const minimalData = { + status: "online" as const, + runtime: "claude-code", + currentTask: null, +} as unknown as Parameters[0]["data"]; + +async function fireScroll() { + // Dispatch a scroll event on whatever listener is wired. The ChatTab + // attaches its scroll listener to the messages container; the + // beforeEach hook captured it. Fire all captured listeners. + for (const fn of scrollEventListeners) fn(new Event("scroll")); +} + +describe("ChatTab autoscroll at-bottom gating (#2560)", () => { + it("appends a message with scrollIntoView when at the bottom (a)", async () => { + // No history → empty page. After loadInitial, atBottom=true (the + // container's distance-from-bottom is 0 because the empty page + // has no overflow). Then we send a message and observe a scroll. + render(); + await waitFor(() => expect(apiGet).toHaveBeenCalled()); + scrollIntoView.mockClear(); + // Send a message via the textarea + send button. + const textarea = await screen.findByLabelText(/Message to agent/i); + fireEvent.change(textarea, { target: { value: "hello" } }); + const sendBtn = screen.getByRole("button", { name: /Send/i }); + fireEvent.click(sendBtn); + // The message-append path is gated on atBottom; at initial mount + // atBottom=true (empty container, distance=0), so the + // scrollIntoView call fires for the new user bubble. + await waitFor(() => expect(scrollIntoView).toHaveBeenCalled()); + }); + + it("does NOT scrollIntoView when the user is scrolled up (b)", async () => { + render(); + await waitFor(() => expect(apiGet).toHaveBeenCalled()); + + // Simulate the user scrolling up — set distance-from-bottom > 12 + // BEFORE the next append. scrollTop stays at, say, 0 (we're + // pretending the page is now 5000px tall). + currentScrollTop = 0; + currentScrollHeight = 5000; + currentClientHeight = 200; + await fireScroll(); + + scrollIntoView.mockClear(); + + const textarea = await screen.findByLabelText(/Message to agent/i); + fireEvent.change(textarea, { target: { value: "hello 2" } }); + const sendBtn = screen.getByRole("button", { name: /Send/i }); + fireEvent.click(sendBtn); + + // Give the autoscroll useLayoutEffect a tick to run, then assert + // it did NOT yank. The useChatHistory's local-state append + // triggers history.messages change; with atBottom=false the + // scrollIntoView is skipped. + await new Promise((r) => setTimeout(r, 50)); + // The autoscroll useLayoutEffect skipped the call, so the count + // should NOT have grown. Allow a small noise window — the + // initial-mount scroll may still have fired before we set + // atBottom=false. + const callsAfterSend = scrollIntoView.mock.calls.length; + expect(callsAfterSend).toBe(0); + }); +}); diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 0f2a808ab..de6756fae 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -442,6 +442,23 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri } body = normalizedBody + // #2560 (chat UX: persist in-flight exchange across leave/refresh): + // write the user message to activity_logs AT RECEIPT — before any of + // the downstream short-circuits (poll-mode, mock-runtime, push dispatch) + // run — so a mid-turn leave/refresh re-hydrates the user message + // instead of an empty pane. Idempotent on messageId via the partial + // unique index (idx_activity_logs_msg_id) + ON CONFLICT DO UPDATE in + // logActivityExec — a poll-mode re-persist (or a duplicated ingest + // from a retry) collapses to a single row, and the completion path + // (logA2ASuccess / logA2AReceiveQueued) stamps response_body onto + // the same row. No duplicate bubble in chat-history. + // + // Skipped when the body has no messageId (system callers, legacy + // a2a_send payloads) — the completion path remains authoritative. + if a2aMethod == "message/send" { + h.persistUserMessageAtIngest(ctx, workspaceID, callerID, body, a2aMethod) + } + // #2339 PR 2 — poll-mode short-circuit. When the target workspace // is registered as delivery_mode=poll (e.g. an operator's laptop // running molecule-mcp-claude-channel), the platform does NOT diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 06434ec60..d9d9c8ec1 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -425,6 +425,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle ToolTrace: toolTrace, DurationMs: &durationMs, Status: logStatus, + MessageId: extractMessageIdFromA2ABody(body), }) if callerID == "" && statusCode < 400 { @@ -725,6 +726,14 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, // his reported loss is THIS path; #1347 (push-mode, persists AFTER the // poll short-circuit) structurally cannot cover it. // + // #2560: this path ALSO sets the messageId on the activity row. If + // persistUserMessageAtIngest already wrote the same messageId (e.g. + // the poll short-circuit ran AFTER the ingest path on a request that + // got demoted to poll), the partial unique index + // (idx_activity_logs_msg_id) makes this a no-op via ON CONFLICT DO + // NOTHING — the existing row keeps the user message; the queued + // acknowledgment is still emitted; no duplicate bubble. + // // Mirrors persistUserMessageAtIngest's discipline: // - context.WithoutCancel: a client disconnect on chat-exit (which // cancels the inbound request ctx) MUST NOT abort this write. @@ -754,6 +763,109 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID, Summary: &summary, RequestBody: json.RawMessage(body), Status: "ok", + MessageId: extractMessageIdFromA2ABody(body), + }) +} + +// extractMessageIdFromA2ABody reads params.message.messageId out of a +// normalized A2A JSON-RPC body. Returns "" when the field is absent or +// the body is malformed — the empty value opts the activity row out of +// the messageId-keyed conflict path (the existing always-INSERT +// behavior is preserved for non-per-message activity). +func extractMessageIdFromA2ABody(body []byte) string { + if len(body) == 0 { + return "" + } + var env struct { + Params struct { + Message struct { + MessageID string `json:"messageId"` + } `json:"message"` + } `json:"params"` + } + if err := json.Unmarshal(body, &env); err != nil { + return "" + } + return env.Params.Message.MessageID +} + +// persistUserMessageAtIngest writes the canvas user's outbound chat +// message to activity_logs SYNCHRONOUSLY, BEFORE the agent dispatch +// runs. The completion path (logA2ASuccess / logA2AReceiveQueued) then +// uses ON CONFLICT (workspace_id, message_id) DO UPDATE to attach the +// agent's response_body onto the SAME row, so a single activity_logs +// row carries both the user message and the agent reply — the chat- +// history reader (messagestore.PostgresMessageStore) emits one +// (user, agent) pair per row, no duplicate bubble. +// +// Why before-dispatch (issue #2560): pre-fix the user message only +// landed in activity_logs at turn completion (logA2ASuccess). A +// mid-turn leave/refresh re-hydrated an empty pane plus typing dots +// (the agent's currentTask was set but the request_body was missing), +// and a workspace-server restart / deploy / OOM between the canvas +// 200 and the goroutine's commit lost the message permanently +// (chat-history is read back from activity_logs; no row == no message +// on reopen). The synchronous ingest-row write closes both holes: +// leave/refresh always sees the user message in chat-history +// (request_body is there), and the message is durable on disk before +// dispatch starts. +// +// Discipline (mirrors logA2AReceiveQueued): +// - context.WithoutCancel: a client disconnect on chat-exit (which +// cancels the inbound request ctx) MUST NOT abort this write. +// - SYNCHRONOUS (no goAsync): the row must be durable before dispatch. +// - Best-effort: LogActivity logs+swallows INSERT errors internally, +// so a DB hiccup never blocks the dispatch — behavior for that one +// request is never worse than the pre-fix "completion-only" path. +// - The post-commit broadcast fires inside LogActivity; the canvas +// may render the user message optimistically (it already has the +// local-state copy). A missed WS event is not data loss (durable +// row is the truth the canvas re-reads on reopen). +// +// When to call: for every A2A proxy entry that the user initiated +// (canvas / canvas-user, or workspace-to-workspace delegation), AFTER +// access control + budget + normalizeA2APayload, BEFORE the +// delivery-mode / mock / dispatch short-circuits. The completion +// path (logA2ASuccess for push, logA2AReceiveQueued for poll, and +// the poll-ingest-persist pre-existing test) keys on the same +// messageId, so a duplicated ingest collapses via ON CONFLICT to a +// single row. +func (h *WorkspaceHandler) persistUserMessageAtIngest( + ctx context.Context, + workspaceID, callerID string, + body []byte, + a2aMethod string, +) { + messageId := extractMessageIdFromA2ABody(body) + // Without a messageId the row is not message-keyed; LogActivity's + // ON CONFLICT path won't fire (the partial unique index excludes + // NULL message_id) and we'd get a duplicate row if both ingest + // and completion paths ran. Skip the ingest — completion is + // authoritative for the non-message-keyed case (legacy a2a_send + // payloads, system callers, etc.). + if messageId == "" { + return + } + + insCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second) + defer cancel() + + var wsName string + db.DB.QueryRowContext(insCtx, `SELECT name FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName) + if wsName == "" { + wsName = workspaceID + } + summary := a2aMethod + " → " + wsName + " (ingest)" + LogActivity(insCtx, h.broadcaster, ActivityParams{ + WorkspaceID: workspaceID, + ActivityType: "a2a_receive", + SourceID: nilIfEmpty(callerID), + TargetID: &workspaceID, + Method: &a2aMethod, + Summary: &summary, + RequestBody: json.RawMessage(body), + Status: "ok", + MessageId: messageId, }) } diff --git a/workspace-server/internal/handlers/a2a_push_ingest_persist_test.go b/workspace-server/internal/handlers/a2a_push_ingest_persist_test.go new file mode 100644 index 000000000..32b683738 --- /dev/null +++ b/workspace-server/internal/handlers/a2a_push_ingest_persist_test.go @@ -0,0 +1,120 @@ +package handlers + +// Regression coverage for the PUSH-mode arm of the canvas user-message +// mid-turn leave/refresh bug (issue #2560). +// +// Pre-#2560: the user message was only written to activity_logs at turn +// completion (logA2ASuccess, post-dispatch). A leave/refresh mid-turn +// re-hydrated an empty pane plus typing dots — the durable +// activity_logs row didn't exist yet — and a workspace-server restart / +// deploy / OOM between the canvas 200 and the goroutine's commit lost +// the message permanently (chat-history is read from activity_logs; no +// row == no message on reopen). The poll-mode arm was separately +// covered by a2a_poll_ingest_persist_test.go; this file covers the +// push-mode arm. +// +// Fix: persistUserMessageAtIngest runs SYNCHRONOUSLY in +// proxyA2ARequest, immediately after normalizeA2APayload and BEFORE +// the poll/mock/push short-circuits, with context.WithoutCancel so a +// client disconnect on chat-exit cannot cancel the write. Idempotent +// on messageId via the partial unique index (idx_activity_logs_msg_id) +// + ON CONFLICT DO UPDATE in logActivityExec — the same row is later +// updated by logA2ASuccess to attach response_body, so chat-history +// reads back exactly one (user, agent) pair per messageId with no +// duplicate bubble. +// +// Defining assertions for this test: +// 1. The ingest INSERT into activity_logs MUST complete BEFORE +// proxyA2ARequest returns (i.e. before the 200 reaches the client). +// Pre-fix: handler returns ~instantly while the INSERT is still +// racing in a detached goroutine → elapsed ≪ insertDelay. +// Post-fix: handler return is gated on the INSERT → elapsed ≥ +// insertDelay. +// 2. The ingest row carries the canvas user's messageId. The +// completion row (logA2ASuccess) attaches response_body to the +// SAME row via ON CONFLICT — chat-history emits a single +// (user, agent) pair, not two. + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +func TestProxyA2A_PushMode_PersistsUserMessageSynchronouslyBeforeDispatch(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + const wsID = "ws-push-sync-persist" + const insertDelay = 50 * time.Millisecond + + expectBudgetCheck(mock, wsID) + + // Ingest at receipt (the fix). Synchronous — handler return is + // gated on this write completing. A detached-goroutine ingest + // (pre-fix) does NOT block; a synchronous ingest does. + mock.ExpectExec("INSERT INTO activity_logs"). + WillDelayFor(insertDelay). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + + // Canvas user message with explicit messageId. The fix is + // observable only when the messageId is set (otherwise + // persistUserMessageAtIngest short-circuits and the completion + // path remains authoritative — pre-#2560 behavior, no regression). + body := `{"jsonrpc":"2.0","id":"push-canvas-1","method":"message/send","params":{"message":{"role":"user","messageId":"msg-2560-push-1","parts":[{"text":"my own message"}]}}}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + start := time.Now() + handler.ProxyA2A(c) + elapsed := time.Since(start) + + // Defining assertion #1: handler must not have returned the + // response before the durable ingest INSERT committed. Pre-fix this + // fails (elapsed ≈ 0, INSERT still racing in goAsync). + if elapsed < insertDelay { + t.Fatalf("push-mode handler returned in %v, before the %v user-message ingest INSERT — "+ + "mid-turn leave/refresh re-hydrates an empty pane (DATA LOSS). "+ + "persistUserMessageAtIngest must be synchronous before dispatch.", elapsed, insertDelay) + } + + // Defining assertion #2: the durable write actually happened by the + // time the handler returned. ExpectationsWereMet() in a goroutine + // with a hard 2s timeout — fails fast (no CI hang) on regression + // while returning promptly on success. + expectDone := make(chan error, 1) + go func() { expectDone <- mock.ExpectationsWereMet() }() + select { + case err := <-expectDone: + if err != nil { + t.Fatalf("user-message ingest INSERT was not durable at handler return (unmet sqlmock expectations): %v", err) + } + case <-time.After(2 * time.Second): + t.Fatalf("ExpectationsWereMet() hung for >2s — INSERT mock never fired. " + + "Likely cause: production code regressed persistUserMessageAtIngest to async " + + "(INSERT fires after handler returns, not before).") + } + + // Sanity: the response is JSON. We don't assert the body shape + // here — push-mode goes through a longer code path (resolveAgentURL, + // preflight, dispatch) and short-circuits in this test setup + // (the dispatch will fail with the mock URL, but the ingest write + // has already happened, which is the only thing #2560 tests). + if w.Code == 0 { + t.Errorf("handler did not write a response (w.Code == 0)") + } + _ = json.RawMessage{} + _ = http.StatusOK +} diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index deb453808..5c2857222 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -1024,15 +1024,34 @@ func logActivityExec(ctx context.Context, exec activityExecutor, broadcaster eve // whose text still contains `INSERT INTO activity_logs` — preserves the // existing sqlmock expectations across the codebase. $1 (workspace_id) is // reused by the UPDATE; arg list is otherwise unchanged. + // + // #2560 (chat UX: persist in-flight exchange): the INSERT now also + // carries message_id ($13) and uses ON CONFLICT (workspace_id, + // message_id) DO UPDATE to attach the agent's response_body onto the + // SAME row that the ingest path (#2560) wrote at receipt — so a + // mid-turn leave/refresh shows the user message in chat-history + // (request_body from the ingest row), and the eventual completion + // stamps response_body + status onto the same row (no duplicate + // bubble). The conflict target only fires when message_id IS NOT NULL + // (idx_activity_logs_msg_id is partial); rows without message_id keep + // the original always-INSERT behavior. if _, err := exec.ExecContext(ctx, ` WITH ins AS ( - INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, tool_trace, duration_ms, status, error_detail) - VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12) + INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, message_id) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12, $13) + ON CONFLICT (workspace_id, message_id) WHERE message_id IS NOT NULL + DO UPDATE SET + response_body = COALESCE(EXCLUDED.response_body, activity_logs.response_body), + status = EXCLUDED.status, + duration_ms = COALESCE(EXCLUDED.duration_ms, activity_logs.duration_ms), + error_detail = EXCLUDED.error_detail, + tool_trace = CASE WHEN EXCLUDED.tool_trace IS NOT NULL THEN EXCLUDED.tool_trace ELSE activity_logs.tool_trace END ) UPDATE workspaces SET last_activity_at = now() WHERE id = $1 `, params.WorkspaceID, params.ActivityType, params.SourceID, params.TargetID, params.Method, params.Summary, reqStr, respStr, traceStr, - params.DurationMs, params.Status, params.ErrorDetail); err != nil { + params.DurationMs, params.Status, params.ErrorDetail, + nilIfEmpty(params.MessageId)); err != nil { return nil, err } @@ -1103,5 +1122,14 @@ type ActivityParams struct { ToolTrace json.RawMessage // tools/commands the agent actually invoked DurationMs *int Status string // ok, error, timeout + // MessageId, when non-empty, is persisted to activity_logs.message_id + // and is the conflict key for the partial unique index + // (idx_activity_logs_msg_id). The ingest path (#2560) sets this so a + // mid-turn leave/refresh shows the user message in chat-history + // hydration; the completion path (logA2ASuccess / logA2AReceiveQueued) + // also sets it and uses ON CONFLICT (workspace_id, message_id) DO + // UPDATE to stamp response_body onto the same row instead of inserting + // a duplicate. Empty for non-per-message activity (task_update, etc.). + MessageId string ErrorDetail *string } diff --git a/workspace-server/migrations/20260611000000_activity_logs_message_id.down.sql b/workspace-server/migrations/20260611000000_activity_logs_message_id.down.sql new file mode 100644 index 000000000..fcb48e400 --- /dev/null +++ b/workspace-server/migrations/20260611000000_activity_logs_message_id.down.sql @@ -0,0 +1,8 @@ +-- Reverse issue #2560 ingest-row column + idempotency index. +-- Non-destructive: dropping the index + column simply reverts the +-- chat-history-mid-turn-persistence feature; no data is lost because +-- the original logA2ASuccess / logA2AReceiveQueued paths are still +-- intact (they re-INSERT the full message on completion in the +-- non-message_id-keyed shape). +DROP INDEX IF EXISTS idx_activity_logs_msg_id; +ALTER TABLE IF EXISTS activity_logs DROP COLUMN IF EXISTS message_id; diff --git a/workspace-server/migrations/20260611000000_activity_logs_message_id.up.sql b/workspace-server/migrations/20260611000000_activity_logs_message_id.up.sql new file mode 100644 index 000000000..ec7e7fb7f --- /dev/null +++ b/workspace-server/migrations/20260611000000_activity_logs_message_id.up.sql @@ -0,0 +1,32 @@ +-- Issue #2560 (chat UX: persist in-flight exchange across leave/refresh): +-- the user chat message must be written to activity_logs AT RECEIPT, not +-- only at turn completion, so a mid-turn leave/refresh doesn't drop the +-- pending message. +-- +-- The completion path (logA2ASuccess) reuses the SAME activity_logs row +-- via ON CONFLICT (workspace_id, message_id) DO UPDATE — the ingest row +-- carries the user message in request_body; the completion attach stamps +-- response_body + status onto it. Idempotent on messageId (per #2560 spec) +-- means a duplicated ingest (server retry, or a poll-mode write followed +-- by a push-mode write on the same messageId) is a no-op rather than a +-- duplicate bubble in chat-history. +-- +-- Column: +-- message_id TEXT — extracted from params.message.messageId in +-- normalizeA2APayload. Stored as TEXT (UUID-as-text shape) so the same +-- string the canvas sent is the same string we dedupe on, regardless +-- of UUID canonicalization. NULL for legacy rows and for activity that +-- is NOT a per-message row (task_update, agent_log, error, etc.) — the +-- unique index is partial on message_id IS NOT NULL so non-message +-- rows never collide. +ALTER TABLE IF EXISTS activity_logs + ADD COLUMN IF NOT EXISTS message_id TEXT; + +-- Partial unique index — the conflict target for ON CONFLICT (workspace_id, +-- message_id) DO NOTHING / DO UPDATE. Excludes rows where message_id is +-- NULL (non-per-message activity) so existing activity never collides. +-- Coexists with the existing (workspace_id, activity_type, created_at DESC) +-- index — the partial is a write-path key, the existing is a read-path key. +CREATE UNIQUE INDEX IF NOT EXISTS idx_activity_logs_msg_id + ON activity_logs (workspace_id, message_id) + WHERE message_id IS NOT NULL;