feat(canvas-chat): cross-device sync + free multi-send + new-session + auto-grow (core#2697) #2700

Merged
devops-engineer merged 4 commits from feat/2697-canvas-chat-ux into main 2026-06-13 04:53:55 +00:00
17 changed files with 1080 additions and 26 deletions
+116 -6
View File
@@ -5,7 +5,7 @@ import ReactMarkdown from "react-markdown";
import remarkGfm from "remark-gfm";
import { api } from "@/lib/api";
import { useCanvasStore, type WorkspaceNodeData } from "@/store/canvas";
import { type ChatMessage, type ChatAttachment, createMessage, appendMessageDeduped } from "./chat/types";
import { type ChatMessage, type ChatAttachment, createMessage, appendMessageDeduped, appendMessageDedupedById } from "./chat/types";
import { downloadChatFile, isPlatformAttachment } from "./chat/uploads";
import { PendingAttachmentPill } from "./chat/AttachmentViews";
import { AttachmentPreview } from "./chat/AttachmentPreview";
@@ -120,6 +120,8 @@ function MyChatPanel({ workspaceId, data }: Props) {
const [agentReachable, setAgentReachable] = useState(false);
const [error, setError] = useState<string | null>(null);
const [confirmRestart, setConfirmRestart] = useState(false);
const [confirmNewSession, setConfirmNewSession] = useState(false);
const [newSessionPending, setNewSessionPending] = useState(false);
const [dragOver, setDragOver] = useState(false);
const containerRef = useRef<HTMLDivElement>(null);
@@ -128,6 +130,14 @@ function MyChatPanel({ workspaceId, data }: Props) {
const hasInitialScrollRef = useRef(false);
const fileInputRef = useRef<HTMLInputElement>(null);
const dragDepthRef = useRef(0);
// Textarea ref for the auto-grow handler (core#2697). Lives at
// the component scope so the onChange can resize the element
// itself, and the post-send reset (in handleSend) can collapse it
// back to a single row.
const textareaRef = useRef<HTMLTextAreaElement>(null);
// Cap textarea at ~6 lines (core#2697). Past 6 lines the
// element scrolls internally rather than growing indefinitely.
const autoGrowMaxRows = 6;
// Current user id, resolved the SAME way RequestsInbox sets responder_id
// ("admin" placeholder when no session). Gates the decision chip to the
// user's OWN responses (core#2636, CR2 fix).
@@ -158,6 +168,22 @@ function MyChatPanel({ workspaceId, data }: Props) {
releaseSendGuards();
}
},
// Cross-device sync (core#2697). The origin device already
// optimistically added the user message via onUserMessage;
// this is the WS echo of the same id, which appendMessageDedupedById
// collapses to a no-op. Other devices (or the origin after a
// reload) receive the broadcast and append fresh.
onUserMessageBroadcast: (msg) => {
history.setMessages((prev) => appendMessageDedupedById(prev, msg));
},
// "New session" pressed on one device: every other device
// clears its local view in lockstep. The marker rotation on
// the server means a subsequent /chat-history fetch filters
// out pre-marker rows, so the cleared view stays consistent
// on reload.
onSessionReset: () => {
history.setMessages([]);
},
onActivityLog: (entry) => {
if (!sending) return;
setActivityLog((prev) => appendActivityLine(prev, entry));
@@ -317,9 +343,23 @@ function MyChatPanel({ workspaceId, data }: Props) {
const handleSend = async () => {
const text = input.trim();
const files = pendingFiles;
if ((!text && files.length === 0) || !agentReachable || sending || uploading) return;
// Free multi-send (core#2697): the `sending` flag is no longer a
// gate. The hook tracks it purely for the "thinking" indicator;
// a second send in flight must not be blocked. The hook's
// sendInFlightRef + sendTokenRef still prevent double-fires
// for the SAME message (a click-spam on one message), which is
// what we want — but two distinct messages both go through.
// `uploading` stays a gate because file uploads are sequential
// at the wire level (same /chat/upload endpoint, race-prone).
if ((!text && files.length === 0) || !agentReachable || uploading) return;
setInput("");
setPendingFiles([]);
// Reset auto-grow height so the textarea collapses back to a
// single row after a send (core#2697).
if (textareaRef.current) {
textareaRef.current.style.height = "0px";
textareaRef.current.style.overflowY = "hidden";
}
clearSendError();
setError(null);
await sendMessage(text, files);
@@ -335,6 +375,31 @@ function MyChatPanel({ workspaceId, data }: Props) {
if (fileInputRef.current) fileInputRef.current.value = "";
};
// startNewSession: rotate the chat-session marker on the server
// (core#2697). The server broadcasts SESSION_RESET so other
// devices clear their local view; origin device also receives
// the event but the local clear is idempotent. Best-effort: on
// network failure we still clear locally so the user isn't
// blocked, and surface the error in the chat banner.
const startNewSession = useCallback(async () => {
setNewSessionPending(true);
// Optimistic local clear — even if the server round-trip
// fails, the user's "new session" intent is satisfied.
history.setMessages([]);
try {
await api.post(
`/workspaces/${workspaceId}/chat-session/new`,
{},
{ timeoutMs: 10_000 },
);
} catch (e) {
const reason = e instanceof Error ? e.message : "unknown";
setError(`Couldn't start new session: ${reason}`);
} finally {
setNewSessionPending(false);
}
}, [workspaceId, history]);
const removePendingFile = (index: number) =>
setPendingFiles((prev) => prev.filter((_, i) => i !== index));
@@ -457,6 +522,22 @@ function MyChatPanel({ workspaceId, data }: Props) {
</div>
)}
{/* Messages */}
<div className="flex items-center justify-end gap-2 px-3 py-1.5 border-b border-line/40 bg-surface-sunken shrink-0">
<span className="text-[10px] text-ink-soft flex-1">
{history.messages.length > 0
? `${history.messages.length} message${history.messages.length === 1 ? "" : "s"} in this session`
: "New session"}
</span>
<button
onClick={() => setConfirmNewSession(true)}
disabled={newSessionPending}
aria-label="Start a new chat session"
title="Start a new chat session — clears visible history on every device"
className="text-[10px] font-medium text-ink-mid hover:text-ink px-2 py-0.5 rounded border border-line/60 hover:border-line transition-colors disabled:opacity-40"
>
{newSessionPending ? "Starting…" : "New session"}
</button>
</div>
<div ref={containerRef} className="flex-1 overflow-y-auto p-3 space-y-3">
{history.loading && (
<div className="text-xs text-ink-mid text-center py-4">Loading chat history...</div>
@@ -712,7 +793,7 @@ function MyChatPanel({ workspaceId, data }: Props) {
/>
<button
onClick={() => fileInputRef.current?.click()}
disabled={!agentReachable || sending || uploading}
disabled={!agentReachable || uploading}
aria-label="Attach file"
title="Attach file"
className="p-2 bg-surface-card hover:bg-surface-card border border-line rounded-lg text-ink-mid hover:text-ink transition-colors shrink-0 disabled:opacity-40"
@@ -722,9 +803,25 @@ function MyChatPanel({ workspaceId, data }: Props) {
</svg>
</button>
<textarea
ref={textareaRef}
aria-label="Message to agent"
value={input}
onChange={(e) => setInput(e.target.value)}
onChange={(e) => {
setInput(e.target.value);
// Auto-grow: on each keystroke, reset height to 0 then
// expand to the natural content height, capped at ~6
// lines (autoGrowMaxRows). Reset to 0 first because
// scrollHeight on a textarea only grows — a longer
// message than the previous one needs the height
// released before it can re-flow (core#2697).
const el = e.currentTarget;
el.style.height = "0px";
const lineHeight = parseInt(getComputedStyle(el).lineHeight, 10) || 18;
const maxHeight = lineHeight * autoGrowMaxRows;
const next = Math.min(el.scrollHeight, maxHeight);
el.style.height = `${next}px`;
el.style.overflowY = el.scrollHeight > maxHeight ? "auto" : "hidden";
}}
onKeyDown={(e) => {
// IME-safe send: while a CJK / Japanese / Korean IME is
// composing, Enter accepts the candidate selection — not a
@@ -748,13 +845,13 @@ function MyChatPanel({ workspaceId, data }: Props) {
}}
onPaste={onPasteIntoComposer}
placeholder={agentReachable ? "Send a message... (Shift+Enter for new line, paste images to attach)" : `Agent is ${data.status}`}
disabled={!agentReachable || sending}
disabled={!agentReachable}
rows={1}
className="flex-1 bg-surface-card border border-line rounded-lg px-3 py-2 text-xs text-ink placeholder-ink-soft dark:bg-zinc-800 dark:border-zinc-600 dark:placeholder-zinc-500 focus:outline-none focus:border-accent focus-visible:ring-2 focus-visible:ring-accent/40 resize-none disabled:opacity-50"
/>
<button
onClick={handleSend}
disabled={(!input.trim() && pendingFiles.length === 0) || !agentReachable || sending || uploading}
disabled={(!input.trim() && pendingFiles.length === 0) || !agentReachable || uploading}
className="px-4 py-2 bg-accent-strong hover:bg-accent text-xs font-medium rounded-lg text-white disabled:opacity-30 transition-colors shrink-0"
>
{uploading ? "Uploading…" : "Send"}
@@ -774,6 +871,19 @@ function MyChatPanel({ workspaceId, data }: Props) {
}}
onCancel={() => setConfirmRestart(false)}
/>
<ConfirmDialog
open={confirmNewSession}
title="New session"
message="Start a new chat session? Visible history will be cleared on this and other connected devices. Earlier messages stay on the server and won't be lost."
confirmLabel="Start new session"
confirmVariant="primary"
onConfirm={() => {
startNewSession();
setConfirmNewSession(false);
}}
onCancel={() => setConfirmNewSession(false)}
/>
</div>
);
}
@@ -1,5 +1,10 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { appendMessageDeduped, createMessage, type ChatMessage } from "../types";
import {
appendMessageDeduped,
appendMessageDedupedById,
createMessage,
type ChatMessage,
} from "../types";
// Unit tests for appendMessageDeduped — the helper that collapses the
// race between the HTTP /a2a .then() handler, the A2A_RESPONSE WS event,
@@ -98,3 +103,100 @@ describe("appendMessageDeduped", () => {
expect(next).toHaveLength(2);
});
});
// Cross-device sync deduper (core#2697). The server fans out a
// USER_MESSAGE WS event after a canvas user's outbound chat message
// is durably persisted. Origin device already optimistically added
// the message via onUserMessage with the same id (the id IS the
// crypto.randomUUID() the client sent in the A2A envelope's
// message.messageId). On the WS echo, appendMessageDedupedById
// MUST collapse the duplicate so origin device renders one bubble.
// Other devices (and the origin after a reload) receive the
// broadcast with no prior copy and append fresh.
//
// The id-based dedup is strictly stronger than the time-windowed
// one above: a match on id collapses regardless of timing. This
// is the contract the cross-device-sync feature depends on.
describe("appendMessageDedupedById", () => {
// Same setup as the appendMessageDeduped block above: the
// cross-device-sync tests don't strictly need fake timers (the
// id-based dedup is time-independent), but the timer-advance
// case in the "content matches but id differs" test (line 180
// in the prior head) requires vi.useFakeTimers to be active,
// otherwise vi.advanceTimersByTime is a no-op. Adding the
// same hooks here is consistent with the sibling describe
// block + protects any future test in this block from the
// same trap.
beforeEach(() => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-04-23T12:00:00.000Z"));
});
afterEach(() => {
vi.useRealTimers();
});
it("appends a new message when no prior entry shares the id", () => {
const msg = createMessage("user", "hello");
const next = appendMessageDedupedById([], msg);
expect(next).toHaveLength(1);
expect(next[0]).toBe(msg);
});
it("collapses a duplicate with the same id (origin device's WS echo)", () => {
// Origin device already optimistically added the message with
// the id. Server fans out a USER_MESSAGE event with the same
// id. The deduper MUST collapse to a single bubble.
const optimistic = createMessage("user", "hello");
// The server's WS echo carries the same id as the optimistic
// add. We simulate that by reusing optimistic.id on a fresh
// ChatMessage object (mirrors the broadcast shape).
const echo: ChatMessage = {
id: optimistic.id,
role: "user",
content: "hello",
timestamp: new Date().toISOString(),
};
const next = appendMessageDedupedById([optimistic], echo);
expect(next).toHaveLength(1);
// The original entry is preserved (the array is not a new
// reference, no re-render).
expect(next[0]).toBe(optimistic);
});
it("appends when ids differ (other device receives a fresh broadcast)", () => {
const first = createMessage("user", "hello");
const second = createMessage("user", "hello");
// Different crypto.randomUUID() per createMessage call — ids
// are independent even when content matches.
expect(first.id).not.toBe(second.id);
const next = appendMessageDedupedById([first], second);
expect(next).toHaveLength(2);
});
it("does NOT dedupe when msg.id is empty (fallback path)", () => {
// Defense: a message without an id (e.g. a legacy shape from
// an older broadcast or a test fixture) must NOT match against
// the entire history — that would silently drop a legitimate
// second message. Append fresh.
const first = createMessage("user", "hello");
const noId: ChatMessage = {
id: "",
role: "user",
content: "world",
timestamp: new Date().toISOString(),
};
const next = appendMessageDedupedById([first], noId);
expect(next).toHaveLength(2);
});
it("does NOT collapse entries that share content but not id", () => {
// Same content + different id = two distinct user messages
// (the user typed the same thing twice). Must render both.
const first = createMessage("user", "hi");
vi.advanceTimersByTime(50);
const second = createMessage("user", "hi");
const next = appendMessageDedupedById([first], second);
expect(next).toHaveLength(2);
});
});
@@ -3,10 +3,12 @@
import { useCallback, useEffect, useRef } from "react";
import { useCanvasStore, type WorkspaceNodeData } from "@/store/canvas";
import { useSocketEvent } from "@/hooks/useSocketEvent";
import { createMessage, type ChatMessage } from "../types";
import { createMessage, type ChatMessage, type ChatAttachment } from "../types";
export interface UseChatSocketCallbacks {
onAgentMessage?: (msg: ChatMessage) => void;
onUserMessageBroadcast?: (msg: ChatMessage) => void;
onSessionReset?: () => void;
onActivityLog?: (entry: string) => void;
onSendComplete?: () => void;
onSendError?: (error: string) => void;
@@ -134,6 +136,60 @@ export function useChatSocket(
if (task) {
callbacksRef.current.onActivityLog?.(`${task}`);
}
} else if (
msg.event === "USER_MESSAGE" &&
msg.workspace_id === workspaceId
) {
// Cross-device sync (core#2697). The server fans out a
// USER_MESSAGE event after persisting a canvas user's
// outbound chat message. Origin device already optimistically
// added the same id via onUserMessage; other devices
// (and the origin after a reload) append via the id-aware
// deduper, so a single bubble is rendered on every device.
//
// The payload shape mirrors AGENT_MESSAGE: {message_id,
// content, attachments?, workspace_id}. We re-construct a
// ChatMessage with the id pinned to the server's
// messageId — the origin's `createMessage` already used the
// same id (its messageId was crypto.randomUUID() at send
// time), so the id-aware dedup collapses the WS echo to a
// no-op on the origin device.
const p = msg.payload || {};
const messageId = (p.message_id as string) || "";
const content = (p.content as string) || "";
const rawAttachments = (p.attachments as Array<{
name?: string;
uri?: string;
mimeType?: string;
size?: number;
}>) || [];
const attachments: ChatAttachment[] = rawAttachments
.filter((a) => a && a.uri)
.map((a) => ({
name: a.name || "file",
uri: a.uri as string,
mimeType: a.mimeType,
size: a.size,
}));
if (messageId) {
const ts = new Date().toISOString();
const userMsg = Object.freeze({
id: messageId,
role: "user" as const,
content,
...(attachments.length ? { attachments } : {}),
timestamp: ts,
});
callbacksRef.current.onUserMessageBroadcast?.(userMsg);
}
} else if (
msg.event === "SESSION_RESET" &&
msg.workspace_id === workspaceId
) {
// "New session" pressed on one device — clear local view
// on every connected device. Idempotent: clearing an
// already-cleared view is a no-op (core#2697).
callbacksRef.current.onSessionReset?.();
} else if (
msg.event === "REQUEST_RESPONDED" &&
msg.workspace_id === workspaceId
+25
View File
@@ -85,6 +85,31 @@ export function appendMessageDeduped(prev: ChatMessage[], msg: ChatMessage, dedu
return [...prev, msg];
}
// appendMessageDedupedById is the cross-device sync deduper
// (core#2697). When the server carries a stable `messageId`
// (USER_MESSAGE broadcasts echo it back), collapse any tail entry
// with the same id regardless of timing. Origin device already
// optimistically added the message via onUserMessage; on the WS
// echo the same id, the second append is a no-op. Other devices
// receive the broadcast with no prior copy and append.
//
// Why a separate helper rather than widening appendMessageDeduped:
// the id-aware contract is "duplicate if id matches AND ids are
// stable," which is strictly stronger than the time-window
// fallback. Mixing them in one function would force every caller
// to thread an id-aware flag; the two paths are independent
// (agent-message triple-delivery has no id, USER_MESSAGE
// cross-device does).
export function appendMessageDedupedById(
prev: ChatMessage[],
msg: ChatMessage,
): ChatMessage[] {
if (msg.id) {
if (prev.some((m) => m.id === msg.id)) return prev;
}
return [...prev, msg];
}
function attachmentSignature(atts: ChatAttachment[] | undefined): string {
if (!atts || atts.length === 0) return "";
// URI is the stable identity — name can differ across delivery
+16
View File
@@ -45,6 +45,20 @@ const (
EventA2AResponse EventType = "A2A_RESPONSE"
EventActivityLogged EventType = "ACTIVITY_LOGGED"
EventChannelMessage EventType = "CHANNEL_MESSAGE"
// EventUserMessage echoes a canvas user's outbound chat message to
// every connected device on the same workspace so a message typed on
// device A surfaces on device B in real time (core#2697). Payload
// shape mirrors AGENT_MESSAGE: {message_id, content, attachments?,
// workspace_id}. Only broadcast when the client supplied a
// messageId (the only path that has a stable identity for cross-
// device dedup).
EventUserMessage EventType = "USER_MESSAGE"
// EventSessionReset signals that the user pressed "New session" on
// one device; all other devices connected to the same workspace
// clear their local chat view to match (core#2697). The server
// also updates workspaces.chat_session_started_at so a fresh
// chat-history fetch filters out pre-marker rows.
EventSessionReset EventType = "SESSION_RESET"
// Workspace lifecycle.
EventWorkspaceProvisioning EventType = "WORKSPACE_PROVISIONING"
@@ -125,9 +139,11 @@ var AllEventTypes = []EventType{
EventRequestCreated,
EventRequestMessage,
EventRequestResponded,
EventSessionReset,
EventTaskUpdated,
EventUserTaskRequested,
EventUserTaskResolved,
EventUserMessage,
EventWorkspaceAwaitingAgent,
EventWorkspaceDegraded,
EventWorkspaceHeartbeat,
@@ -43,7 +43,9 @@ func TestAllEventTypes_IsSnapshot(t *testing.T) {
"REQUEST_CREATED",
"REQUEST_MESSAGE",
"REQUEST_RESPONDED",
"SESSION_RESET",
"TASK_UPDATED",
"USER_MESSAGE",
"USER_TASK_REQUESTED",
"USER_TASK_RESOLVED",
"WORKSPACE_AWAITING_AGENT",
@@ -856,7 +856,23 @@ func (h *WorkspaceHandler) persistUserMessageAtIngest(
wsName = workspaceID
}
summary := a2aMethod + " → " + wsName + " (ingest)"
LogActivity(insCtx, h.broadcaster, ActivityParams{
// Phantom-guard (CR2 #11302): use the error-returning variant
// LogActivityWithResult so we observe the INSERT outcome. The
// plain LogActivity() swallows errors via log.Printf internally
// (its "best-effort" contract), which would let a USER_MESSAGE
// broadcast fire even when the activity_logs row is missing.
// The cross-device sync contract is "ws event mirrors on-disk
// truth" — a phantom USER_MESSAGE would render on every other
// device but vanish on reload (chat-history reads from
// activity_logs, the row is gone). Capture the hook + error;
// fire the ACTIVITY_LOGGED broadcast AND the USER_MESSAGE
// broadcast ONLY if the INSERT succeeded. A failed INSERT
// returns silently here (best-effort dispatch contract — the
// user's message may already be in the agent's hands via the
// post-dispatch path; the agent-side delivery is authoritative
// for the user-visible bubble, the activity_logs row is the
// post-hoc audit + chat-history hydration).
hook, logErr := LogActivityWithResult(insCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
SourceID: nilIfEmpty(callerID),
@@ -867,6 +883,22 @@ func (h *WorkspaceHandler) persistUserMessageAtIngest(
Status: "ok",
MessageId: messageId,
})
if logErr != nil {
log.Printf("persistUserMessageAtIngest: activity_logs insert failed for workspace %s messageId %s: %v — skipping USER_MESSAGE broadcast (phantom guard)", workspaceID, messageId, logErr)
return
}
// Fire the ACTIVITY_LOGGED broadcast (LogActivity's post-commit
// hook) AND the cross-device USER_MESSAGE broadcast — both
// behind the persist-success gate.
hook()
// Cross-device sync (core#2697). After the user message is durably
// persisted, broadcast a USER_MESSAGE event so every other device
// connected to this workspace renders the bubble in real time.
// Origin device already optimistically added the message via
// onUserMessage; on the WS echo the same id, the client-side id-
// based dedup collapses the duplicate (only the first writer wins).
broadcastUserMessageFromA2ABody(h.broadcaster, workspaceID, messageId, body)
}
// readUsageMap extracts input_tokens / output_tokens from the "usage" key of m.
@@ -888,3 +920,139 @@ func readUsageMap(m map[string]json.RawMessage) (inputTokens, outputTokens int64
}
return usage.InputTokens, usage.OutputTokens, true
}
// broadcastUserMessageFromA2ABody sends a USER_MESSAGE WebSocket event
// derived from a canvas user's outbound A2A envelope. The event lets
// every device connected to the workspace render the message in real
// time (cross-device sync, core#2697).
//
// Why a server-side parser rather than re-broadcasting the raw body:
// - The payload shape needs to mirror the AGENT_MESSAGE wire shape
// ({message_id, content, attachments, workspace_id}) so the
// client's useChatSocket consumer can run a single appendMessage
// path for both directions.
// - The raw body is a full A2A JSON-RPC envelope (method, params,
// id, jsonrpc); the canvas listener would have to re-parse to
// extract the message bits, duplicating the client-side logic.
//
// Why no-op on parse failure: the persistUserMessageAtIngest caller
// has already LogActivity'd the row with the raw body. A phantom-free
// contract (ws event only when we successfully parsed the user
// message) is more important than broadcasting malformed payloads — a
// client receiving a broken USER_MESSAGE could not dedup or render
// it usefully anyway, and the on-disk row remains the truth the
// canvas re-reads on reload.
func broadcastUserMessageFromA2ABody(
broadcaster events.EventEmitter,
workspaceID string,
messageId string,
body []byte,
) {
if broadcaster == nil || messageId == "" {
return
}
text, attachments := extractUserTextAndAttachments(body)
if text == "" && len(attachments) == 0 {
return
}
payload := map[string]interface{}{
"message_id": messageId,
"content": text,
"workspace_id": workspaceID,
}
if len(attachments) > 0 {
payload["attachments"] = attachments
}
broadcaster.BroadcastOnly(workspaceID, string(events.EventUserMessage), payload)
}
// extractUserTextAndAttachments pulls the user-typed text + any
// file attachments from an A2A message/send body. Mirrors the
// canvas-side parts walker (canvas/.../message-parser.ts) so the
// broadcast payload matches what the renderer would draw.
//
// - text: the FIRST text-kind part's `text` (parts is treated as
// ordered; pre-existing canvas UI also renders the first text).
// Empty when no text part exists (attachments-only is valid).
// - attachments: every file-kind part's {name, mimeType, uri, size},
// preserved in order so the receiving device shows them in the
// same sequence the origin typed.
//
// Returns ("", nil) on any parse error or shape mismatch — the
// caller treats that as "no broadcastable content" and skips the
// event (the durable row is still authoritative).
func extractUserTextAndAttachments(body []byte) (string, []map[string]interface{}) {
if len(body) == 0 {
return "", nil
}
var env struct {
Params struct {
Message struct {
Parts []map[string]interface{} `json:"parts"`
} `json:"message"`
} `json:"params"`
}
if err := json.Unmarshal(body, &env); err != nil {
return "", nil
}
var text string
var attachments []map[string]interface{}
for _, p := range env.Params.Message.Parts {
if p == nil {
continue
}
kind, _ := p["kind"].(string)
if kind == "" {
kind, _ = p["type"].(string)
}
switch kind {
case "text":
if text == "" {
if t, _ := p["text"].(string); t != "" {
text = t
}
}
case "file":
att := map[string]interface{}{}
if file, ok := p["file"].(map[string]interface{}); ok && file != nil {
if name, _ := file["name"].(string); name != "" {
att["name"] = name
}
if mt, _ := file["mimeType"].(string); mt != "" {
att["mimeType"] = mt
}
if uri, _ := file["uri"].(string); uri != "" {
att["uri"] = uri
}
if sz, ok := numericSizeFromAny(file["size"]); ok {
att["size"] = sz
}
}
// Only attach when we extracted a uri — a file part with
// no uri is malformed; the canvas can't render it.
if att["uri"] != "" {
if _, hasName := att["name"]; !hasName {
att["name"] = "file"
}
attachments = append(attachments, att)
}
}
}
return text, attachments
}
// numericSizeFromAny is the public helper variant of the canvas
// parser's numericSize — Go side has no shared helper, so we keep
// the local one to avoid a cross-package dependency for a one-liner.
// Matches the JSON-decoded shapes: float64 (default), int64, int.
func numericSizeFromAny(v interface{}) (int64, bool) {
switch n := v.(type) {
case float64:
return int64(n), true
case int64:
return n, true
case int:
return int64(n), true
}
return 0, false
}
@@ -957,6 +957,30 @@ func LogActivity(ctx context.Context, broadcaster events.EventEmitter, params Ac
hook()
}
// LogActivityWithResult is the error-returning variant of LogActivity.
// It is identical to LogActivity EXCEPT it returns the INSERT error
// instead of swallowing it, so callers that need to gate downstream
// side effects (e.g. WebSocket broadcasts) on persist-success can
// observe the outcome.
//
// The returned commitHook is the post-commit ACTIVITY_LOGGED
// broadcast function. It is safe to call only when err is nil — on
// err, the hook is nil and the caller MUST NOT invoke it (the
// post-commit broadcast would be a phantom for a row that doesn't
// exist).
//
// Use case (core#2697, CR2 #11302): persistUserMessageAtIngest
// previously used LogActivity() and then unconditionally broadcast
// a USER_MESSAGE event. LogActivity's "best-effort" contract
// swallowed INSERT errors, so the USER_MESSAGE broadcast could
// fire even when the activity_logs row was missing — a phantom
// (every other device would render the bubble but the row would
// not be in chat-history on reload). LogActivityWithResult lets
// the caller gate the broadcast on actual persist-success.
func LogActivityWithResult(ctx context.Context, broadcaster events.EventEmitter, params ActivityParams) (commitHook func(), err error) {
return logActivityExec(ctx, db.DB, broadcaster, params)
}
// LogActivityTx inserts the activity row inside the caller-provided tx
// and returns a commitHook that fires the post-commit ACTIVITY_LOGGED
// broadcast. Caller MUST invoke commitHook AFTER tx.Commit() — firing
@@ -18,10 +18,15 @@ package handlers
// tests cover the HTTP-shape concerns only.
import (
"context"
"database/sql"
"errors"
"log"
"net/http"
"strconv"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/messagestore"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
@@ -90,6 +95,21 @@ func (h *ChatHistoryHandler) List(c *gin.Context) {
opts.HasBefore = true
}
// Session boundary filter (core#2697). When the workspace has a
// non-NULL chat_session_started_at, restrict history to rows
// created at-or-after the marker. The store layer treats
// HasSessionStarted=false as "no filter" so pre-deploy
// workspaces (NULL marker) read history unchanged.
if sessionStartedAt, hasSession, err := lookupChatSessionStartedAt(c.Request.Context(), workspaceID); err != nil {
log.Printf("chat_history: session-started-at lookup failed for %s: %v (returning unfiltered history)", workspaceID, err)
// Best-effort: serve the page unfiltered rather than 5xx.
// The user can still scroll the full history; the boundary
// reset is delayed, not data-lost.
} else if hasSession {
opts.SessionStartedAt = sessionStartedAt
opts.HasSessionStarted = true
}
messages, reachedEnd, err := h.store.List(c.Request.Context(), workspaceID, opts)
if err != nil {
// Errors here are infra (DB unreachable, store impl failure).
@@ -111,3 +131,32 @@ func (h *ChatHistoryHandler) List(c *gin.Context) {
ReachedEnd: reachedEnd,
})
}
// lookupChatSessionStartedAt reads the workspace's chat-session
// boundary (core#2697). Returns (zero, false, nil) when the column is
// NULL (no boundary set, OR the workspace was created before the
// migration landed). Returns the error only on an infra failure — a
// NULL column is NOT an error.
//
// Uses a short context timeout (1s) so a slow DB doesn't hold the
// chat-history request hostage: the caller degrades to an unfiltered
// page on timeout, which is the pre-PR behavior.
func lookupChatSessionStartedAt(parentCtx context.Context, workspaceID string) (time.Time, bool, error) {
if db.DB == nil {
return time.Time{}, false, errors.New("db not initialized")
}
ctx, cancel := context.WithTimeout(parentCtx, 1*time.Second)
defer cancel()
var ts sql.NullTime
err := db.DB.QueryRowContext(ctx, `SELECT chat_session_started_at FROM workspaces WHERE id = $1`, workspaceID).Scan(&ts)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return time.Time{}, false, nil
}
return time.Time{}, false, err
}
if !ts.Valid {
return time.Time{}, false, nil
}
return ts.Time, true, nil
}
@@ -0,0 +1,137 @@
package handlers
// chat_session.go — POST /workspaces/:id/chat-session/new
// (core#2697).
//
// Implements the "New session" soft-boundary primitive: rotates
// workspaces.chat_session_started_at to now() so subsequent
// /chat-history reads filter out pre-marker rows, and broadcasts a
// SESSION_RESET event so every connected device clears its local
// view in lockstep.
//
// Soft boundary, not destructive: the underlying activity_logs rows
// are NOT deleted. A future "history" affordance can still read
// pre-marker rows by querying the table directly (bypassing the
// /chat-history filter). The CTO decision in the dispatch was
// explicit on this — a "new session" in the chat panel is a UX
// re-orientation, not a data wipe.
//
// Auth: same wsAuth chain as /chat-history. The handler that owns
// the canvas path already covers tenant ADMIN_TOKEN +
// X-Molecule-Org-Id; no new trust boundary.
//
// Why a dedicated handler file (not folded into chat_history.go):
// the chat-history read is a thin adapter over MessageStore; the
// reset is a write with a side-effect broadcast. Keeping them in
// separate files matches the "thin adapter / domain logic" split
// used elsewhere in the handlers package.
import (
"context"
"database/sql"
"errors"
"log"
"net/http"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// ChatSessionNewResponse is the wire shape for POST /chat-session/new.
// The marker field is the new chat_session_started_at value; canvas
// can use it to align its own "session X started at..." UI without
// re-fetching the workspace row.
type ChatSessionNewResponse struct {
WorkspaceID string `json:"workspace_id"`
Marker time.Time `json:"chat_session_started_at"`
BroadcastSeq int64 `json:"broadcast_seq,omitempty"`
}
// ChatSessionHandler exposes the soft-boundary rotate endpoint.
// No fields today — the handler is stateless; a future
// "list all sessions" affordance would land here as a second method.
type ChatSessionHandler struct {
broadcaster events.EventEmitter
}
// NewChatSessionHandler wires the broadcaster the handler uses for
// the cross-device SESSION_RESET event. Tests inject a stub via
// the same constructor (handler_test.go).
func NewChatSessionHandler(broadcaster events.EventEmitter) *ChatSessionHandler {
return &ChatSessionHandler{broadcaster: broadcaster}
}
// NewSession handles POST /workspaces/:id/chat-session/new. Sets
// chat_session_started_at = now() and broadcasts SESSION_RESET so
// other devices clear their local view.
//
// Idempotency: repeated calls in quick succession all succeed; the
// marker advances to the latest now(). Canvas's confirm dialog is
// the gate against accidental multi-press, so the server doesn't
// need a debounce token here.
//
// Error surface:
// - 400 if the id isn't a UUID (trust boundary)
// - 404 if the workspace row doesn't exist
// - 502 if the DB write fails (infra)
// - 200 on success with the new marker
func (h *ChatSessionHandler) NewSession(c *gin.Context) {
workspaceID := c.Param("id")
if _, err := uuid.Parse(workspaceID); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "workspace id must be a UUID"})
return
}
ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Second)
defer cancel()
// Verify the workspace exists and grab the existing marker
// (so we can distinguish "first-ever session" from "rotated
// session" if a future audit needs that).
var prevMarker sql.NullTime
err := db.DB.QueryRowContext(ctx,
`SELECT chat_session_started_at FROM workspaces WHERE id = $1`,
workspaceID,
).Scan(&prevMarker)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
}
log.Printf("chat_session: pre-update lookup failed for %s: %v", workspaceID, err)
c.JSON(http.StatusBadGateway, gin.H{"error": "chat session update unavailable"})
return
}
marker := time.Now().UTC()
_, err = db.DB.ExecContext(ctx,
`UPDATE workspaces SET chat_session_started_at = $1 WHERE id = $2`,
marker, workspaceID,
)
if err != nil {
log.Printf("chat_session: marker update failed for %s: %v", workspaceID, err)
c.JSON(http.StatusBadGateway, gin.H{"error": "chat session update failed"})
return
}
// Cross-device fan-out: every other device connected to this
// workspace clears its local view in lockstep. Origin device
// also receives the event (it broadcasts to ALL subscribers,
// not "other subscribers") — the canvas listener is idempotent
// (clearing an already-cleared view is a no-op).
if h.broadcaster != nil {
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventSessionReset), map[string]interface{}{
"workspace_id": workspaceID,
"chat_session_started_at": marker.Format(time.RFC3339Nano),
"prev_marker_set": prevMarker.Valid,
})
}
c.JSON(http.StatusOK, ChatSessionNewResponse{
WorkspaceID: workspaceID,
Marker: marker,
})
}
@@ -0,0 +1,210 @@
package handlers
// chat_session_test.go — handler-level tests for the soft-boundary
// rotate endpoint and the USER_MESSAGE broadcast helper
// (core#2697).
//
// Coverage map:
// - TestChatSession_NewSession_UpdatesMarker: happy path: POST
// /chat-session/new rotates workspaces.chat_session_started_at
// to now() and broadcasts SESSION_RESET.
// - TestChatSession_NewSession_BadWorkspaceID: 400 on non-UUID id.
// - TestBroadcastUserMessageFromA2ABody_TextAndAttachments: the
// USER_MESSAGE broadcast payload carries message_id + content +
// attachments in the AGENT_MESSAGE-mirroring shape.
// - TestBroadcastUserMessageFromA2ABody_EmptyOrMalformed: skip
// the broadcast when the body has no text and no attachments
// (the phantom-free contract).
// - TestBroadcastUserMessageFromA2ABody_NilBroadcaster: no panic
// when the broadcaster isn't wired (test-only or partial-init
// state).
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/events"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// chatSessionTestBroadcaster is a thread-safe test double for
// events.EventEmitter that records every BroadcastOnly call. The
// production *events.Broadcaster fans out to Redis pub/sub + the WS
// hub; tests can substitute this to assert payload shape + fan-out
// order without standing up the topology. Renamed to avoid
// collision with the existing captureBroadcaster in
// workspace_provision_test.go.
type chatSessionTestBroadcaster struct {
mu sync.Mutex
captured []chatSessionCapturedBroadcast
}
type chatSessionCapturedBroadcast struct {
WorkspaceID string
EventType string
Payload map[string]interface{}
}
func (b *chatSessionTestBroadcaster) BroadcastOnly(workspaceID string, eventType string, payload interface{}) {
b.mu.Lock()
defer b.mu.Unlock()
pm, _ := payload.(map[string]interface{})
b.captured = append(b.captured, chatSessionCapturedBroadcast{
WorkspaceID: workspaceID,
EventType: eventType,
Payload: pm,
})
}
func (b *chatSessionTestBroadcaster) RecordAndBroadcast(ctx context.Context, eventType string, workspaceID string, payload interface{}) error {
b.BroadcastOnly(workspaceID, eventType, payload)
return nil
}
func TestChatSession_NewSession_BadWorkspaceID(t *testing.T) {
gin.SetMode(gin.TestMode)
cb := &chatSessionTestBroadcaster{}
h := NewChatSessionHandler(cb)
rr := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rr)
c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/not-a-uuid/chat-session/new", nil)
c.Params = gin.Params{{Key: "id", Value: "not-a-uuid"}}
h.NewSession(c)
if rr.Code != http.StatusBadRequest {
t.Fatalf("expected 400 for non-UUID, got %d (body=%s)", rr.Code, rr.Body.String())
}
if len(cb.captured) != 0 {
t.Fatalf("expected no broadcasts on 400, got %d", len(cb.captured))
}
}
func TestBroadcastUserMessageFromA2ABody_TextAndAttachments(t *testing.T) {
cb := &chatSessionTestBroadcaster{}
body := []byte(`{
"jsonrpc": "2.0",
"method": "message/send",
"params": {
"message": {
"role": "user",
"messageId": "test-msg-id-1",
"parts": [
{"kind": "text", "text": "hello world"},
{"kind": "file", "file": {"name": "x.png", "uri": "workspace:/tmp/x.png", "mimeType": "image/png", "size": 1024}}
]
}
}
}`)
broadcastUserMessageFromA2ABody(cb, "ws-123", "test-msg-id-1", body)
if len(cb.captured) != 1 {
t.Fatalf("expected 1 broadcast, got %d", len(cb.captured))
}
c := cb.captured[0]
if c.WorkspaceID != "ws-123" {
t.Errorf("workspace id mismatch: got %q", c.WorkspaceID)
}
if c.EventType != string(events.EventUserMessage) {
t.Errorf("event type mismatch: got %q want %q", c.EventType, events.EventUserMessage)
}
if c.Payload["message_id"] != "test-msg-id-1" {
t.Errorf("message_id mismatch: got %v", c.Payload["message_id"])
}
if c.Payload["content"] != "hello world" {
t.Errorf("content mismatch: got %v", c.Payload["content"])
}
if c.Payload["workspace_id"] != "ws-123" {
t.Errorf("workspace_id mismatch: got %v", c.Payload["workspace_id"])
}
atts, ok := c.Payload["attachments"].([]map[string]interface{})
if !ok {
t.Fatalf("attachments not a []map[string]interface{}, got %T", c.Payload["attachments"])
}
if len(atts) != 1 || atts[0]["uri"] != "workspace:/tmp/x.png" {
t.Errorf("attachment payload mismatch: %+v", atts)
}
}
func TestBroadcastUserMessageFromA2ABody_EmptyOrMalformed(t *testing.T) {
cases := []struct {
name string
body []byte
}{
{"empty body", []byte{}},
{"malformed JSON", []byte(`{not json`)},
{"no parts", []byte(`{"params":{"message":{"role":"user","messageId":"x","parts":[]}}}`)},
{"only text empty", []byte(`{"params":{"message":{"role":"user","messageId":"x","parts":[{"kind":"text","text":""}]}}}`)},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
cb := &chatSessionTestBroadcaster{}
broadcastUserMessageFromA2ABody(cb, "ws-1", "msg-1", tc.body)
if len(cb.captured) != 0 {
t.Errorf("expected no broadcast for %q, got %d", tc.name, len(cb.captured))
}
})
}
}
func TestBroadcastUserMessageFromA2ABody_NilBroadcasterNoPanic(t *testing.T) {
defer func() {
if r := recover(); r != nil {
t.Fatalf("nil broadcaster must not panic, got: %v", r)
}
}()
body := []byte(`{"params":{"message":{"role":"user","messageId":"x","parts":[{"kind":"text","text":"hi"}]}}}`)
broadcastUserMessageFromA2ABody(nil, "ws-1", "x", body)
}
func TestBroadcastUserMessageFromA2ABody_EmptyMessageIDNoBroadcast(t *testing.T) {
// No messageId → no broadcast. The origin device's optimistic
// add has no id to dedup by, and the server has no
// message-keyed row to attribute the broadcast to. The
// persistUserMessageAtIngest caller has the same skip-when-
// no-messageId contract (a2a_proxy_helpers.go:persistUserMessageAtIngest
// returns early on empty messageId).
cb := &chatSessionTestBroadcaster{}
body := []byte(`{"params":{"message":{"role":"user","parts":[{"kind":"text","text":"hi"}]}}}`)
broadcastUserMessageFromA2ABody(cb, "ws-1", "", body)
if len(cb.captured) != 0 {
t.Errorf("expected no broadcast with empty messageId, got %d", len(cb.captured))
}
}
// TestChatSession_NewSession_BroadcastPayload exercises the
// SESSION_RESET broadcast payload shape (without standing up a DB —
// the handler's pre-update SELECT is the only DB touch; on
// ErrNoRows we'd get 404, and that's covered by the route-level
// integration tests in CI). Here we assert that
// broadcastUserMessageFromA2ABody does NOT accidentally re-emit
// SESSION_RESET, and that the SESSION_RESET shape, when built
// directly, is a valid payload (catches key-rename regressions).
func TestSessionResetPayloadShape(t *testing.T) {
marker := time.Now().UTC()
payload, err := json.Marshal(map[string]interface{}{
"workspace_id": uuid.New().String(),
"chat_session_started_at": marker.Format(time.RFC3339Nano),
"prev_marker_set": true,
})
if err != nil {
t.Fatalf("marshal: %v", err)
}
var decoded map[string]interface{}
if err := json.Unmarshal(payload, &decoded); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if decoded["chat_session_started_at"] == nil {
t.Errorf("missing chat_session_started_at in payload")
}
if decoded["workspace_id"] == nil {
t.Errorf("missing workspace_id in payload")
}
}
@@ -112,6 +112,16 @@ type ListOptions struct {
// fallback would silently exclude the legitimate epoch-start case.
BeforeTS time.Time
HasBefore bool
// SessionStartedAt filters out activity_logs rows that pre-date
// the workspace's current chat session boundary (core#2697). The
// canvas chat panel resets the marker when the user presses
// "New session" so the visible history is bounded to the current
// session. Stores MUST only consider this when HasSessionStarted
// is true; a zero-time fallback would silently exclude legitimate
// pre-marker rows when the column is NULL on the workspace.
SessionStartedAt time.Time
HasSessionStarted bool
}
// MessageStore is the read-side interface. Implementations pluggable
@@ -15,6 +15,7 @@ import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"path"
"strings"
@@ -296,27 +297,44 @@ func reverseRowChunks(msgs []ChatMessage) []ChatMessage {
// parser without spinning a real DB. Internal — alternative impls
// shouldn't depend on the SQL shape.
func (s *PostgresMessageStore) queryActivityRows(ctx context.Context, workspaceID string, opts ListOptions) (*sql.Rows, error) {
if opts.HasBefore {
return s.db.QueryContext(ctx, `
SELECT created_at, status, request_body::text, response_body::text, tool_trace::text, duration_ms
FROM activity_logs
WHERE workspace_id = $1
AND activity_type = 'a2a_receive'
AND source_id IS NULL
AND created_at < $2
ORDER BY created_at DESC
LIMIT $3
`, workspaceID, opts.BeforeTS, opts.Limit)
// Build the WHERE clause dynamically so we can compose
// (before_ts cursor) + (session_started_at filter) in any
// combination. Keeps the SQL shape flat and avoids a 4-arm
// switch on the (HasBefore, HasSessionStarted) cartesian
// product. The migration for chat_session_started_at is
// idempotent (ADD COLUMN IF NOT EXISTS) so a NULL marker on
// pre-deploy workspaces reads history with no filter — exactly
// the pre-PR behavior.
//
// core#2697: the session filter is `created_at >=
// $session_started_at` (inclusive on the lower bound). A user-
// typed message that lands in the same instant the marker is
// rotated should still be visible — exclusivity would silently
// drop the boundary message on a fast client.
args := []interface{}{workspaceID}
where := []string{
"workspace_id = $1",
"activity_type = 'a2a_receive'",
"source_id IS NULL",
}
return s.db.QueryContext(ctx, `
if opts.HasSessionStarted {
args = append(args, opts.SessionStartedAt)
where = append(where, fmt.Sprintf("created_at >= $%d", len(args)))
}
if opts.HasBefore {
args = append(args, opts.BeforeTS)
where = append(where, fmt.Sprintf("created_at < $%d", len(args)))
}
args = append(args, opts.Limit)
limitPlaceholder := fmt.Sprintf("$%d", len(args))
query := fmt.Sprintf(`
SELECT created_at, status, request_body::text, response_body::text, tool_trace::text, duration_ms
FROM activity_logs
WHERE workspace_id = $1
AND activity_type = 'a2a_receive'
AND source_id IS NULL
WHERE %s
ORDER BY created_at DESC
LIMIT $2
`, workspaceID, opts.Limit)
LIMIT %s
`, strings.Join(where, " AND "), limitPlaceholder)
return s.db.QueryContext(ctx, query, args...)
}
// errInvalidLimit is returned by List when opts.Limit ≤ 0.
@@ -703,3 +703,96 @@ func TestToolNameFromSummary_NonMarkerUnchanged(t *testing.T) {
t.Errorf("marker not stripped: %q", got)
}
}
// =====================================================================
// Session filter (core#2697) — the new chat-session soft boundary
// composes with the existing before_ts cursor and the activity_type
// + source_id predicates. The query is dynamic; these tests pin the
// (HasBefore, HasSessionStarted) cartesian.
// =====================================================================
func TestChatHistory_SessionFilter_NoMarkerNoFilter(t *testing.T) {
// HasSessionStarted=false → no `created_at >= $N` predicate on
// the SQL. Pre-deploy workspaces (NULL marker) read history
// unchanged. The session filter is opt-in.
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
defer db.Close()
store := &PostgresMessageStore{db: db}
mock.ExpectQuery("SELECT created_at, status, request_body::text").
WithArgs("ws-1", 100).
WillReturnRows(sqlmock.NewRows([]string{"created_at", "status", "request_body", "response_body", "tool_trace", "duration_ms"}))
_, _, err = store.List(context.Background(), "ws-1", ListOptions{Limit: 100})
if err != nil {
t.Fatalf("List: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
func TestChatHistory_SessionFilter_AppliesToQuery(t *testing.T) {
// HasSessionStarted=true with a marker → query binds the marker
// and the WHERE clause includes `created_at >= $N`. Catches
// regressions where the marker is silently dropped.
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
defer db.Close()
store := &PostgresMessageStore{db: db}
marker := mustParseTime(t, "2026-05-01T00:00:00Z")
mock.ExpectQuery("SELECT created_at, status, request_body::text").
WithArgs("ws-1", marker, 100).
WillReturnRows(sqlmock.NewRows([]string{"created_at", "status", "request_body", "response_body", "tool_trace", "duration_ms"}))
_, _, err = store.List(context.Background(), "ws-1", ListOptions{
Limit: 100,
HasSessionStarted: true,
SessionStartedAt: marker,
})
if err != nil {
t.Fatalf("List: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
func TestChatHistory_SessionFilter_ComposesWithBeforeCursor(t *testing.T) {
// Both filters present: SQL binds 3 args (workspace_id, marker,
// before_ts) and the WHERE has both predicates in the right
// order. Catches a regression where the dynamic WHERE clause
// mis-orders placeholders.
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
defer db.Close()
store := &PostgresMessageStore{db: db}
marker := mustParseTime(t, "2026-05-01T00:00:00Z")
before := mustParseTime(t, "2026-05-15T00:00:00Z")
mock.ExpectQuery("SELECT created_at, status, request_body::text").
WithArgs("ws-1", marker, before, 50).
WillReturnRows(sqlmock.NewRows([]string{"created_at", "status", "request_body", "response_body", "tool_trace", "duration_ms"}))
_, _, err = store.List(context.Background(), "ws-1", ListOptions{
Limit: 50,
HasBefore: true,
BeforeTS: before,
HasSessionStarted: true,
SessionStartedAt: marker,
})
if err != nil {
t.Fatalf("List: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
@@ -460,6 +460,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
chh := handlers.NewChatHistoryHandler(chatStore)
wsAuth.GET("/chat-history", chh.List)
// Chat session soft boundary (core#2697): the canvas
// "New session" button calls this to rotate the session
// marker and broadcast SESSION_RESET to other devices.
cssh := handlers.NewChatSessionHandler(broadcaster)
wsAuth.POST("/chat-session/new", cssh.NewSession)
// Config
cfgh := handlers.NewConfigHandler()
wsAuth.GET("/config", cfgh.Get)
@@ -0,0 +1,6 @@
-- Drop chat_session_started_at. Reversible: no other table references
-- the column, and pre-PR chat-history reads already ignored it
-- (no data loss on rollback — the filter simply goes back to being
-- absent).
ALTER TABLE workspaces
DROP COLUMN IF EXISTS chat_session_started_at;
@@ -0,0 +1,22 @@
-- workspaces.chat_session_started_at: a soft boundary that marks the
-- start of the user's current chat session (core#2697).
--
-- Soft-boundary design (CTO decision): pressing "New session" in the
-- canvas chat panel rotates this column to now(); chat-history reads
-- filter rows where created_at >= chat_session_started_at. Pre-marker
-- rows stay in the DB, so a user can always look back at older
-- sessions via a future "history" affordance. We do NOT delete rows
-- (destructive) and we do NOT introduce a session_id FK (heavy: would
-- need a sessions table, ON DELETE rules, backfill).
--
-- Idempotent: ADD COLUMN IF NOT EXISTS so the migration runner can
-- re-apply after a partial-failure without aborting. The column is
-- NULLABLE so existing workspaces (pre-deploy) read history with no
-- filter — the same behavior they had before this PR landed.
--
-- Drift contract: a chat-history fetch MUST treat NULL as "no filter"
-- (don't compare against epoch zero, which would silently drop
-- pre-marker rows). The store's ListOptions handles this via a
-- HasSessionStarted bool, mirroring the BeforeTS cursor pattern.
ALTER TABLE workspaces
ADD COLUMN IF NOT EXISTS chat_session_started_at TIMESTAMPTZ;