From dcc870a6b748b7b83f1804bed31defeebffb8f21 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 23:17:26 -0700 Subject: [PATCH] feat(workspace-server): server-side chat-history endpoint (RFC #2945 PR-C) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the SSOT gap for chat-history hydration: today every consumer (canvas TS) re-implements an A2A-envelope walk to map activity_logs rows into rendered ChatMessage objects. This PR moves that walk into the server. ## What's added GET /workspaces/:id/chat-history?limit=N&before_ts=T Returns: { "messages": [ {"id": "", "role": "user"|"agent"|"system", "content": "...", "attachments": [...], "timestamp": ""} ], "reached_end": false } Auth chain: same wsAuth as /workspaces/:id/activity (tenant ADMIN_TOKEN + X-Molecule-Org-Id). No new trust boundary. Filter: a2a_receive rows with source_id IS NULL — same canvas-source filter the canvas applies via /activity?type=a2a_receive&source=canvas, centralized so future API consumers don't need to know it. ## What's mirrored from canvas TS Direct port of canvas/src/components/tabs/chat/historyHydration.ts + message-parser.ts: - extractRequestText / extractFilesFromUserMessage — user-side parts walk through request_body.params.message.parts[] - extractChatResponseText — agent-side response_body collector across the four shapes (string, A2A JSON-RPC parts, older nested parts.root.text, task artifacts) joined with "\n" (matches canvas multi-source collector — claude-code emits multiple text parts; hermes emits summary+artifacts) - extractFilesFromResponse / extractFilesFromTask — file walk across parts[] + artifacts[].parts[] + status.message.parts[] + message.parts[] - v0 hot path ({kind:"file", file:{...}}) AND v1 protobuf flat shape ({url, filename, mediaType}) both supported - Role decision: status='error' OR text starts with "agent error" (case-insensitive) → "system", else "agent" - isInternalSelfMessage prefix filter (Delegation results are ready...) - Timestamp pinned to row.created_at (regression cover for 2026-04-25 bubble-collapse bug) ## Tests 22 unit tests in chat_history_test.go, every TS test case in historyHydration.test.ts has a Go counterpart: Timestamp preservation (3): user/agent pin to created_at, two-rows produce two distinct timestamps. User-message extraction (5): text-only, internal-self skip, null body, attachments hydrated, attachments-only-when-text-empty, internal-self suppresses even with attachments. Agent-message extraction (4): result-string, status=error→system, agent-error-prefix→system, response_body.parts attachments, null body, no-text-no-files-no-bubble. End-to-end (1): paired user+agent same timestamp. Go-specific (5): malformed JSON returns empty (no panic), v1 protobuf flat shape extraction, task-artifacts extraction, older nested root.text shape, basename helper edge cases. isInternalSelfMessage predicate (1): prefix match, non-prefix non- match, empty-text non-match. Mutation-tested. Removed the role-promotion branch (status=error + agent-error prefix → system); confirmed both TestChatHistory_RoleSystemWhenStatusError and TestChatHistory_RoleSystemWhenAgentErrorPrefix fire red. Restored. Both green. Full handlers test suite (4.3s) green; full repo `go test ./...` green. ## SSOT decision Parsing logic lives in workspace-server/internal/handlers/chat_history.go ONLY. Canvas keeps historyHydration.ts + message-parser.ts during the transition because: - PR-C-2 (follow-up): canvas loadMessagesFromDB swaps to new endpoint. Today's canvas still calls /activity for backward compatibility. - The TS parsers are still load-bearing for LIVE message handling (WebSocket A2A_RESPONSE events) until RFC #2945 PR-B-2 mirrors the typed event payloads to canvas consumers. Canvas's TS path will be deleted in a separate PR after a one-week observation window confirms no live-message consumers depend on it. ## Security review - Untrusted input? YES — request_body and response_body come from agents (potentially OSS / third-party). Defensive: any malformed JSON returns empty content + no attachments, no panic. Tested via TestChatHistory_MalformedJSONInRequestBodyReturnsEmpty. - Trust boundary? Same as today: agent → workspace-server. No new boundary; reuses existing wsAuth middleware. - Auth/authz? Inherits wsAuth chain. Cross-workspace access blocked by existing TenantGuard middleware. - PII / secrets in logs? None. The handler logs nothing on the happy path; errors log 502 without body content. - Output sanitization? ChatMessage.content is plain text returned as-is; canvas already sanitizes via ReactMarkdown. Attachment URIs are agent-provided (workspace: / platform-pending: / https:); canvas's existing scheme allow-list still applies. ## Versioning / backwards compatibility - New endpoint /chat-history. /activity unchanged. - Canvas historyHydration.ts + message-parser.ts intact during transition (will be removed in PR-C-2 follow-up). - No public API consumer of /activity is broken — added route is additive. - No semver bump (server is internal versioning). ## Three weakest spots (hostile-reviewer self-pass) 1. extractRequestText returns ONLY parts[0].text. If a user message contains multiple text parts (uncommon — canvas only ever emits one), we lose later parts. Matches canvas exactly today, but a future change that emits multi-text user messages needs both parsers updated. Documented in code; covered by test if/when added. 2. activityRowToChatMessages rebuilds ChatMessage IDs every call (no caching). Each chat reload mints fresh UUIDs. This is fine because canvas dedupes by (role, content, timestamp window) not id, but a future API consumer that DID rely on id stability would break. Documented in the ChatMessage struct comment. 3. The handler scopes to source_id IS NULL only (canvas-source rows). A future "show all messages, including agent-to-agent" mode would need a new endpoint or a parameter. Out of scope for PR-C; canvas's /activity?source=canvas already enforces the same filter. Closes #3017. Unblocks RFC #2945 PR-D (MessageStore interface) which returns []ChatMessage typed values. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/chat_history.go | 646 ++++++++++++++++++ .../internal/handlers/chat_history_test.go | 422 ++++++++++++ workspace-server/internal/router/router.go | 7 + 3 files changed, 1075 insertions(+) create mode 100644 workspace-server/internal/handlers/chat_history.go create mode 100644 workspace-server/internal/handlers/chat_history_test.go diff --git a/workspace-server/internal/handlers/chat_history.go b/workspace-server/internal/handlers/chat_history.go new file mode 100644 index 00000000..d76ea9d7 --- /dev/null +++ b/workspace-server/internal/handlers/chat_history.go @@ -0,0 +1,646 @@ +package handlers + +// chat_history.go — server-side rendering of activity_logs rows into the +// canonical ChatMessage shape (RFC #2945 PR-C, issue #3017). +// +// Replaces the canvas-side TS parsing in +// canvas/src/components/tabs/chat/historyHydration.ts + +// canvas/src/components/tabs/chat/message-parser.ts so: +// +// - Single source of truth for A2A-envelope walking. A future API +// consumer (mobile, third-party integration, RFC #2945 PR-D's +// OSS MessageStore) consumes a typed surface instead of re- +// implementing the same shape walk. +// +// - Wire-format evolution (a2a-sdk v0 → v1 protobuf flat shape) is +// handled in one place. Today the TS parser handles both shapes; +// this Go parser mirrors that contract exactly. +// +// - PR-D unblocked: MessageStore returns []ChatMessage typed values, +// not raw activity_logs rows. The interface is meaningless if +// parsing still has to happen client-side. +// +// Endpoint: GET /workspaces/:id/chat-history?limit=N&before_ts=T +// +// Auth: same wsAuth chain as /workspaces/:id/activity (tenant +// ADMIN_TOKEN + X-Molecule-Org-Id header). No new trust boundary. +// +// Behavioral contract: every test case in +// canvas/src/components/tabs/chat/__tests__/historyHydration.test.ts +// (11 cases) has a Go-side parity test in chat_history_test.go. +// Mutation-tested by reverting individual branches and confirming +// the corresponding Go test fires red. + +import ( + "database/sql" + "encoding/json" + "errors" + "net/http" + "path" + "strconv" + "strings" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +// ChatMessage is the canonical shape returned by GET /chat-history. +// Mirrors canvas/src/components/tabs/chat/types.ts:ChatMessage so +// the canvas can render it without per-row mapping. +// +// NOTE: id is server-generated (UUID v4) per row pair — clients should +// NOT depend on these ids being stable across requests since the +// activity_log row itself doesn't carry message-shaped ids. Use +// (timestamp, role, content) for cross-request deduping; the id is +// just a React key. +type ChatMessage struct { + ID string `json:"id"` + Role string `json:"role"` // "user" | "agent" | "system" + Content string `json:"content"` + Attachments []ChatAttachment `json:"attachments,omitempty"` + Timestamp string `json:"timestamp"` // RFC3339 — pinned to row.created_at +} + +// ChatAttachment mirrors canvas's ChatAttachment / ParsedFilePart. +type ChatAttachment struct { + Name string `json:"name"` + URI string `json:"uri"` + MimeType string `json:"mimeType,omitempty"` + Size *int64 `json:"size,omitempty"` +} + +// ChatHistoryResponse is the wire shape for GET /chat-history. +type ChatHistoryResponse struct { + Messages []ChatMessage `json:"messages"` + ReachedEnd bool `json:"reached_end"` +} + +// ChatHistoryHandler exposes the typed chat-history endpoint. It does +// not need a broadcaster — read-only. +type ChatHistoryHandler struct{} + +// NewChatHistoryHandler returns a fresh handler. Stateless on purpose: +// no caching, no per-request handler state. The DB query is the +// expensive part; cache control is handled at HTTP layer. +func NewChatHistoryHandler() *ChatHistoryHandler { + return &ChatHistoryHandler{} +} + +// internalSelfPrefixes — message texts that should be filtered out of +// chat history because they're internal self-triggers (heartbeats, +// scheduled-task self-fire, delegation-result self-notify) rather than +// user-typed messages. Mirrors canvas's isInternalSelfMessage. Centring +// here means a future internal-trigger pattern only needs to be added +// in one place, not in every consumer. +var internalSelfPrefixes = []string{ + "Delegation results are ready", +} + +// isInternalSelfMessage reports whether text starts with any registered +// internal-self prefix. Empty text returns false (only filter on +// matched prefixes — empty/missing text is a legitimate +// attachments-only bubble). +func isInternalSelfMessage(text string) bool { + if text == "" { + return false + } + for _, prefix := range internalSelfPrefixes { + if strings.HasPrefix(text, prefix) { + return true + } + } + return false +} + +// List handles GET /workspaces/:id/chat-history?limit=N&before_ts=T. +// +// Query parameters mirror /activity for caller convenience: +// +// - limit (default 100, max 1000) — page size, newest-first from the +// server's POV. Caller reverses for chronological display. +// - before_ts (RFC3339, optional) — paginate by walking strictly +// older than this timestamp. Identical semantics to /activity's +// before_ts: matches what canvas uses for lazy-loading older +// batches. +// +// The handler scopes to activity_type='a2a_receive' AND source_id IS +// NULL (canvas-source rows only) — the same filter canvas applies via +// `?type=a2a_receive&source=canvas`. Centralizing here means a future +// caller (mobile, public API) doesn't need to know the filter. +func (h *ChatHistoryHandler) List(c *gin.Context) { + workspaceID := c.Param("id") + if _, err := uuid.Parse(workspaceID); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "workspace id must be a UUID"}) + return + } + + limit := 100 + if v := c.Query("limit"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + limit = n + } + } + if limit > 1000 { + limit = 1000 + } + + var beforeTS time.Time + usingBeforeTS := false + if v := c.Query("before_ts"); v != "" { + t, err := time.Parse(time.RFC3339, v) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error": "before_ts must be an RFC3339 timestamp (e.g. 2026-05-01T00:00:00Z)", + }) + return + } + beforeTS = t + usingBeforeTS = true + } + + // Newest-first ordering matches /activity. Caller reverses for + // chronological display. Same semantics across both endpoints + // keeps the canvas's lazy-history pagination logic stable. + rows, err := h.queryActivityRows(c.Request.Context(), workspaceID, limit, usingBeforeTS, beforeTS) + if err != nil { + // Errors here are infra (DB unreachable) — surface as 502 so + // the canvas can retry vs. treating as "no rows". + c.JSON(http.StatusBadGateway, gin.H{"error": "chat history unavailable"}) + return + } + defer rows.Close() + + var messages []ChatMessage + rowCount := 0 + for rows.Next() { + var ( + createdAt time.Time + status string + rawRequest sql.NullString + rawResponse sql.NullString + ) + if err := rows.Scan(&createdAt, &status, &rawRequest, &rawResponse); err != nil { + continue + } + rowCount++ + var requestBody, responseBody json.RawMessage + if rawRequest.Valid { + requestBody = json.RawMessage(rawRequest.String) + } + if rawResponse.Valid { + responseBody = json.RawMessage(rawResponse.String) + } + messages = append(messages, activityRowToChatMessages(createdAt, status, requestBody, responseBody, isInternalSelfMessage)...) + } + + c.JSON(http.StatusOK, ChatHistoryResponse{ + Messages: messages, + ReachedEnd: rowCount < limit, + }) +} + +// queryActivityRows pulls the raw a2a_receive rows for a workspace. +// Split out so unit tests can mock the DB layer without spinning a +// full request context. Canvas-source rows only (source_id IS NULL). +func (h *ChatHistoryHandler) queryActivityRows(ctx interface { + Done() <-chan struct{} + Err() error + Deadline() (time.Time, bool) + Value(any) any +}, workspaceID string, limit int, usingBeforeTS bool, beforeTS time.Time) (*sql.Rows, error) { + if usingBeforeTS { + return db.DB.QueryContext(ctx, ` + SELECT created_at, status, request_body::text, response_body::text + FROM activity_logs + WHERE workspace_id = $1 + AND activity_type = 'a2a_receive' + AND source_id IS NULL + AND created_at < $2 + ORDER BY created_at DESC + LIMIT $3 + `, workspaceID, beforeTS, limit) + } + return db.DB.QueryContext(ctx, ` + SELECT created_at, status, request_body::text, response_body::text + FROM activity_logs + WHERE workspace_id = $1 + AND activity_type = 'a2a_receive' + AND source_id IS NULL + ORDER BY created_at DESC + LIMIT $2 + `, workspaceID, limit) +} + +// activityRowToChatMessages converts ONE activity_logs row into 0-2 +// ChatMessages. Direct port of canvas's activityRowToMessages. +// +// - Up to 1 user-side bubble from request_body, unless internal-self. +// - Up to 1 agent-side bubble from response_body. Role is "system" +// when status='error' OR text starts with "agent error" (case- +// insensitive — matches canvas predicate exactly). +// +// Both bubbles MUST adopt row.created_at as their timestamp. The +// canvas hydration regression that motivated extracting the helper +// (every reload re-stamping bubbles to render-time) is regression- +// covered in chat_history_test.go. +// +// Defensive: any malformed JSON is silently dropped (text becomes "", +// attachments []) — chat falls through to text-only rather than +// surfacing a 500. +func activityRowToChatMessages( + createdAt time.Time, + status string, + requestBody json.RawMessage, + responseBody json.RawMessage, + internalSelf func(string) bool, +) []ChatMessage { + var out []ChatMessage + timestamp := createdAt.UTC().Format(time.RFC3339Nano) + + // USER side — extract from request_body.params.message + userText := extractRequestText(requestBody) + userAttachments := extractFilesFromUserMessage(requestBody) + if !internalSelf(userText) && (userText != "" || len(userAttachments) > 0) { + out = append(out, ChatMessage{ + ID: newMessageID(), + Role: "user", + Content: userText, + Attachments: userAttachments, + Timestamp: timestamp, + }) + } + + // AGENT side — extract from response_body + if len(responseBody) > 0 { + agentText := extractChatResponseText(responseBody) + agentAttachments := extractFilesFromResponse(responseBody) + if agentText != "" || len(agentAttachments) > 0 { + role := "agent" + if status == "error" || strings.HasPrefix(strings.ToLower(agentText), "agent error") { + role = "system" + } + out = append(out, ChatMessage{ + ID: newMessageID(), + Role: role, + Content: agentText, + Attachments: agentAttachments, + Timestamp: timestamp, + }) + } + } + + return out +} + +// extractRequestText pulls the user's typed text from the canonical +// A2A request envelope. Returns "" on any malformed shape; callers +// pair this with extractFilesFromUserMessage to catch attachments- +// only bubbles. +// +// request_body = {"params": {"message": {"parts": [{"kind":"text", "text":"..."}, ...]}}} +// +// Mirrors canvas's extractRequestText. Currently returns ONLY parts[0] +// to match canvas exactly; multi-text-part user messages would +// require both parsers to evolve in lockstep (track via PR-C-2). +func extractRequestText(body json.RawMessage) string { + if len(body) == 0 { + return "" + } + var env struct { + Params struct { + Message struct { + Parts []map[string]any `json:"parts"` + } `json:"message"` + } `json:"params"` + } + if err := json.Unmarshal(body, &env); err != nil { + return "" + } + for _, p := range env.Params.Message.Parts { + if t, ok := p["text"].(string); ok && t != "" { + return t + } + } + return "" +} + +// extractFilesFromUserMessage walks the same request_body envelope as +// extractRequestText and collects file parts. +func extractFilesFromUserMessage(body json.RawMessage) []ChatAttachment { + if len(body) == 0 { + return nil + } + var env struct { + Params struct { + Message json.RawMessage `json:"message"` + } `json:"params"` + } + if err := json.Unmarshal(body, &env); err != nil { + return nil + } + if len(env.Params.Message) == 0 { + return nil + } + return extractFilesFromTask(env.Params.Message) +} + +// extractChatResponseText collects text from any of the response shapes +// canvas's extractChatResponseText handles, joining with "\n": +// +// - {"result": ""} — string +// - {"result": {"parts": [{"kind":"text", "text":""}]}} — A2A JSON-RPC +// - {"parts": [{"root": {"text": "..."}}]} — older nested shape +// - {"result": {"artifacts": [{"parts": [...]}]}} — task shape +// - {"task": ""} — fallback +// +// Why collect rather than first-source-wins: claude-code emits multiple +// text parts; hermes emits summary-in-parts + details-in-artifacts. The +// pre-collect "first wins" silently truncated 15k-char briefs to their +// leading line and dropped artifact details. Matches canvas behavior +// exactly. +func extractChatResponseText(body json.RawMessage) string { + if len(body) == 0 { + return "" + } + + // {"result": "string"} + var asString struct { + Result string `json:"result"` + } + if err := json.Unmarshal(body, &asString); err == nil && asString.Result != "" { + return asString.Result + } + + // {"result": {object}} — try the structured shapes + var asObject struct { + Result json.RawMessage `json:"result"` + Task string `json:"task"` + } + if err := json.Unmarshal(body, &asObject); err != nil { + return "" + } + + var collected []string + + if len(asObject.Result) > 0 { + var resultObj struct { + Parts []map[string]any `json:"parts"` + Artifacts []json.RawMessage `json:"artifacts"` + } + if err := json.Unmarshal(asObject.Result, &resultObj); err == nil { + // A2A JSON-RPC: parts[].text + if t := joinTextParts(resultObj.Parts); t != "" { + collected = append(collected, t) + } + // Older nested: parts[].root.text + var rootTexts []string + for _, p := range resultObj.Parts { + if root, ok := p["root"].(map[string]any); ok { + if t, ok := root["text"].(string); ok && t != "" { + rootTexts = append(rootTexts, t) + } + } + } + if len(rootTexts) > 0 { + collected = append(collected, strings.Join(rootTexts, "\n")) + } + // Task shape: artifacts[].parts[].text + for _, raw := range resultObj.Artifacts { + var art struct { + Parts []map[string]any `json:"parts"` + } + if err := json.Unmarshal(raw, &art); err == nil { + if t := joinTextParts(art.Parts); t != "" { + collected = append(collected, t) + } + } + } + } + } + + if len(collected) > 0 { + return strings.Join(collected, "\n") + } + + if asObject.Task != "" { + return asObject.Task + } + return "" +} + +// joinTextParts returns a "\n"-joined concatenation of every text part +// in parts[]. Empty if no text parts. Matches canvas extractTextsFromParts. +func joinTextParts(parts []map[string]any) string { + var texts []string + for _, p := range parts { + // Accept both "type":"text" (older) and "kind":"text" (current). + isText := false + if k, ok := p["kind"].(string); ok && k == "text" { + isText = true + } + if t, ok := p["type"].(string); ok && t == "text" { + isText = true + } + if !isText { + continue + } + if t, ok := p["text"].(string); ok && t != "" { + texts = append(texts, t) + } + } + return strings.Join(texts, "\n") +} + +// extractFilesFromResponse collects file parts from the response_body +// across the same shape variants as extractChatResponseText. Mirrors +// canvas extractFilesFromTask, except the canvas function takes "the +// task object" while this takes the wire-level response_body and +// dispatches: +// +// - {"result": {object}} → unwrap result, walk parts/artifacts +// - {"result": "", "parts": [...]} → notify shape, walk top-level parts +// - {"message": {"parts": [...]}} → some A2A servers wrap as a message +func extractFilesFromResponse(body json.RawMessage) []ChatAttachment { + if len(body) == 0 { + return nil + } + // Determine which container to feed extractFilesFromTask: + // - if result is an object, feed the result object + // - else feed the top-level body (notify shape with parts at root) + var probe struct { + Result json.RawMessage `json:"result"` + } + _ = json.Unmarshal(body, &probe) + + feed := body + if len(probe.Result) > 0 { + // Is result an object? (vs a string) + trimmed := bytes_trim_space(probe.Result) + if len(trimmed) > 0 && trimmed[0] == '{' { + feed = probe.Result + } + } + return extractFilesFromTask(feed) +} + +// extractFilesFromTask walks parts[] + artifacts[].parts[] + status.message.parts[] +// + message.parts[] and pulls out file parts. Mirrors canvas's +// extractFilesFromTask exactly — same two wire shapes (v0 hot path, +// v1 protobuf flat shape). +// +// Defensive: any error inside the walk is recovered and partial +// results returned. A malformed shape should never fail the whole +// chat reload — degraded UX is better than 500. +func extractFilesFromTask(taskJSON json.RawMessage) []ChatAttachment { + if len(taskJSON) == 0 { + return nil + } + var task struct { + Parts []map[string]any `json:"parts"` + Artifacts []json.RawMessage `json:"artifacts"` + Status json.RawMessage `json:"status"` + Message json.RawMessage `json:"message"` + } + if err := json.Unmarshal(taskJSON, &task); err != nil { + return nil + } + var out []ChatAttachment + out = appendFilesFromParts(out, task.Parts) + for _, raw := range task.Artifacts { + var art struct { + Parts []map[string]any `json:"parts"` + } + if err := json.Unmarshal(raw, &art); err == nil { + out = appendFilesFromParts(out, art.Parts) + } + } + if len(task.Status) > 0 { + var st struct { + Message struct { + Parts []map[string]any `json:"parts"` + } `json:"message"` + } + if err := json.Unmarshal(task.Status, &st); err == nil { + out = appendFilesFromParts(out, st.Message.Parts) + } + } + if len(task.Message) > 0 { + var msg struct { + Parts []map[string]any `json:"parts"` + } + if err := json.Unmarshal(task.Message, &msg); err == nil { + out = appendFilesFromParts(out, msg.Parts) + } + } + return out +} + +// appendFilesFromParts handles the v0 hot path (kind/type=file with +// nested file{}) and the v1 flat path (url+filename+mediaType). +func appendFilesFromParts(out []ChatAttachment, parts []map[string]any) []ChatAttachment { + for _, raw := range parts { + v0 := false + if k, ok := raw["kind"].(string); ok && k == "file" { + v0 = true + } + if t, ok := raw["type"].(string); ok && t == "file" { + v0 = true + } + v1URL, _ := raw["url"].(string) + + if !v0 && v1URL == "" { + continue + } + + var att ChatAttachment + if v0 { + file, _ := raw["file"].(map[string]any) + if file == nil { + file = raw // some emitters flatten; defensive + } + uri, _ := file["uri"].(string) + if uri == "" { + continue + } + att.URI = uri + if name, _ := file["name"].(string); name != "" { + att.Name = name + } else { + att.Name = basename(uri) + } + if mt, ok := file["mimeType"].(string); ok { + att.MimeType = mt + } + if sz, ok := numericSize(file["size"]); ok { + att.Size = &sz + } + } else { + att.URI = v1URL + if name, _ := raw["filename"].(string); name != "" { + att.Name = name + } else { + att.Name = basename(v1URL) + } + if mt, ok := raw["mediaType"].(string); ok { + att.MimeType = mt + } + } + out = append(out, att) + } + return out +} + +// numericSize coerces JSON's number type (always float64 in +// json.Unmarshal of map[string]any) to int64 for the Size field. +// Returns (0, false) for non-numeric or absent values. +func numericSize(v any) (int64, bool) { + switch n := v.(type) { + case float64: + return int64(n), true + case int64: + return n, true + case int: + return int64(n), true + } + return 0, false +} + +// basename strips scheme + path components, returning the trailing +// segment (or "file" if empty). Mirrors canvas basename helper. +func basename(uri string) string { + cleaned := strings.TrimPrefix(uri, "workspace:") + cleaned = strings.TrimPrefix(cleaned, "https://") + cleaned = strings.TrimPrefix(cleaned, "http://") + if cleaned == "" { + return "file" + } + return path.Base(cleaned) +} + +// bytes_trim_space — minimal whitespace stripper for json.RawMessage +// peeking. Avoids importing bytes for one tiny helper. Internal-only. +func bytes_trim_space(b json.RawMessage) json.RawMessage { + for len(b) > 0 && (b[0] == ' ' || b[0] == '\t' || b[0] == '\n' || b[0] == '\r') { + b = b[1:] + } + for len(b) > 0 && (b[len(b)-1] == ' ' || b[len(b)-1] == '\t' || b[len(b)-1] == '\n' || b[len(b)-1] == '\r') { + b = b[:len(b)-1] + } + return b +} + +// newMessageID generates a fresh UUID per ChatMessage. Server-minted +// because activity_logs rows don't carry message-shaped ids, and the +// canvas only needs a React-key-stable id (it dedupes by content+role+ +// timestamp window, not by id). +func newMessageID() string { + return uuid.New().String() +} + +// ensureNoUnusedImports avoids lint complaining about `errors` if a +// future refactor removes the only consumer. errors is reserved for +// the inevitable wrap-aware DB-error handling once we add a +// distinguishable "DB outage vs no rows" path. +var _ = errors.Is diff --git a/workspace-server/internal/handlers/chat_history_test.go b/workspace-server/internal/handlers/chat_history_test.go new file mode 100644 index 00000000..50ad69fc --- /dev/null +++ b/workspace-server/internal/handlers/chat_history_test.go @@ -0,0 +1,422 @@ +package handlers + +// chat_history_test.go — Go-side parity tests for the canvas TS test +// fixtures in canvas/src/components/tabs/chat/__tests__/historyHydration.test.ts. +// +// Every test case in the TS file has a Go counterpart here, named +// after the TS describe/it block. A future change that diverges the +// two implementations should fail the corresponding test here BEFORE +// the canvas's stale TS path silently returns wrong messages. +// +// Mutation guidance: when adding behavior, add the case to BOTH +// historyHydration.test.ts AND this file. RFC #2945 PR-C ships server- +// owned parsing — the canvas TS is the legacy source the server now +// replaces, so divergence == regression. + +import ( + "encoding/json" + "strings" + "testing" + "time" +) + +const fixedTimestamp = "2026-04-25T18:00:00Z" + +func mustParseTime(t *testing.T, s string) time.Time { + t.Helper() + tt, err := time.Parse(time.RFC3339, s) + if err != nil { + t.Fatalf("parse %s: %v", s, err) + } + return tt +} + +func neverInternal(_ string) bool { return false } + +// ===================================================================== +// timestamp preservation (regression cover) +// +// The canvas bug that motivated extracting the helper: every reload +// re-stamped historical bubbles to render-time. Pin row.created_at +// adoption. +// ===================================================================== + +func TestChatHistory_UserMessageTimestampPinsToCreatedAt(t *testing.T) { + created := mustParseTime(t, "2026-04-25T18:00:00Z") + body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"hello from earlier today"}]}}}`) + + msgs := activityRowToChatMessages(created, "ok", body, nil, neverInternal) + if len(msgs) != 1 { + t.Fatalf("expected 1 user message, got %d", len(msgs)) + } + if msgs[0].Role != "user" { + t.Errorf("role=%q want user", msgs[0].Role) + } + if !strings.HasPrefix(msgs[0].Timestamp, "2026-04-25T18:00:00") { + t.Errorf("user message timestamp %q does NOT pin to row.created_at — regression of the 2026-04-25 bubble-collapse bug", msgs[0].Timestamp) + } +} + +func TestChatHistory_AgentMessageTimestampPinsToCreatedAt(t *testing.T) { + created := mustParseTime(t, "2026-04-25T18:05:00Z") + body := json.RawMessage(`{"result":"agent reply"}`) + + msgs := activityRowToChatMessages(created, "ok", nil, body, neverInternal) + if len(msgs) != 1 { + t.Fatalf("expected 1 agent message, got %d", len(msgs)) + } + if msgs[0].Role != "agent" { + t.Errorf("role=%q want agent", msgs[0].Role) + } + if !strings.HasPrefix(msgs[0].Timestamp, "2026-04-25T18:05:00") { + t.Errorf("agent message timestamp %q does NOT pin to row.created_at", msgs[0].Timestamp) + } +} + +func TestChatHistory_TwoRowsDistinctTimestamps(t *testing.T) { + bodyA := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"first"}]}}}`) + bodyB := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"second"}]}}}`) + a := activityRowToChatMessages(mustParseTime(t, "2026-04-25T14:00:00Z"), "ok", bodyA, nil, neverInternal) + b := activityRowToChatMessages(mustParseTime(t, "2026-04-25T21:01:58Z"), "ok", bodyB, nil, neverInternal) + + if len(a) != 1 || len(b) != 1 { + t.Fatalf("expected 1 message each; got %d and %d", len(a), len(b)) + } + if a[0].Timestamp == b[0].Timestamp { + t.Errorf("two distinct created_at values produced same timestamp: %q", a[0].Timestamp) + } + if !strings.HasPrefix(a[0].Timestamp, "2026-04-25T14:00:00") || !strings.HasPrefix(b[0].Timestamp, "2026-04-25T21:01:58") { + t.Errorf("timestamps drifted: a=%q b=%q", a[0].Timestamp, b[0].Timestamp) + } +} + +// ===================================================================== +// user-message extraction +// ===================================================================== + +func TestChatHistory_EmitsUserMessageWhenRequestHasText(t *testing.T) { + body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"hi agent"}]}}}`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal) + if len(msgs) != 1 { + t.Fatalf("expected 1 message, got %d", len(msgs)) + } + if msgs[0].Role != "user" || msgs[0].Content != "hi agent" { + t.Errorf("role=%q content=%q want user/hi agent", msgs[0].Role, msgs[0].Content) + } +} + +func TestChatHistory_DropsInternalSelfMessages(t *testing.T) { + body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"Delegation results are ready..."}]}}}`) + predicate := func(t string) bool { return strings.HasPrefix(t, "Delegation results are ready") } + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, predicate) + for _, m := range msgs { + if m.Role == "user" { + t.Errorf("internal-self message rendered as user bubble: %q", m.Content) + } + } +} + +func TestChatHistory_NoUserMessageWhenRequestBodyNull(t *testing.T) { + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, nil, neverInternal) + for _, m := range msgs { + if m.Role == "user" { + t.Errorf("emitted user bubble despite null request_body: %+v", m) + } + } +} + +func TestChatHistory_UserAttachmentsHydratedFromRequestBody(t *testing.T) { + body := json.RawMessage(`{ + "params": { + "message": { + "parts": [ + {"kind":"text","text":"here's the screenshot"}, + {"kind":"file","file":{"name":"shot.png","mimeType":"image/png","uri":"workspace:/uploads/shot.png","size":4096}} + ] + } + } + }`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal) + var user *ChatMessage + for i := range msgs { + if msgs[i].Role == "user" { + user = &msgs[i] + break + } + } + if user == nil { + t.Fatalf("no user bubble produced") + } + if user.Content != "here's the screenshot" { + t.Errorf("content=%q", user.Content) + } + if len(user.Attachments) != 1 { + t.Fatalf("attachments=%d want 1", len(user.Attachments)) + } + att := user.Attachments[0] + if att.Name != "shot.png" || att.URI != "workspace:/uploads/shot.png" || att.MimeType != "image/png" { + t.Errorf("attachment shape wrong: %+v", att) + } + if att.Size == nil || *att.Size != 4096 { + t.Errorf("size=%v want 4096", att.Size) + } +} + +func TestChatHistory_AttachmentsOnlyUserBubbleWhenTextEmpty(t *testing.T) { + // Drag-drop a file with no caption — bubble should still render. + body := json.RawMessage(`{ + "params": { + "message": { + "parts": [ + {"kind":"file","file":{"name":"report.pdf","uri":"workspace:/uploads/report.pdf"}} + ] + } + } + }`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal) + if len(msgs) != 1 { + t.Fatalf("expected 1 attachments-only bubble, got %d", len(msgs)) + } + if msgs[0].Role != "user" || msgs[0].Content != "" || len(msgs[0].Attachments) != 1 { + t.Errorf("unexpected: role=%q content=%q attachments=%d", msgs[0].Role, msgs[0].Content, len(msgs[0].Attachments)) + } + if msgs[0].Attachments[0].Name != "report.pdf" { + t.Errorf("attachment name=%q want report.pdf", msgs[0].Attachments[0].Name) + } +} + +func TestChatHistory_InternalSelfPredicateSuppressesEvenWithAttachments(t *testing.T) { + body := json.RawMessage(`{ + "params": { + "message": { + "parts": [ + {"kind":"text","text":"Delegation results are ready..."}, + {"kind":"file","file":{"name":"x.zip","uri":"workspace:/x.zip"}} + ] + } + } + }`) + predicate := func(t string) bool { return strings.HasPrefix(t, "Delegation results are ready") } + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, predicate) + for _, m := range msgs { + if m.Role == "user" { + t.Errorf("internal-self predicate did NOT suppress user bubble despite attachments: %+v", m) + } + } +} + +// ===================================================================== +// agent-message extraction +// ===================================================================== + +func TestChatHistory_AgentMessageFromResultString(t *testing.T) { + body := json.RawMessage(`{"result":"agent says hi"}`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + if len(msgs) != 1 || msgs[0].Role != "agent" || msgs[0].Content != "agent says hi" { + t.Errorf("got %+v", msgs) + } +} + +func TestChatHistory_RoleSystemWhenStatusError(t *testing.T) { + body := json.RawMessage(`{"result":"delegation failed"}`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "error", nil, body, neverInternal) + if len(msgs) != 1 || msgs[0].Role != "system" { + t.Errorf("status=error did NOT promote role to system: %+v", msgs) + } +} + +func TestChatHistory_RoleSystemWhenAgentErrorPrefix(t *testing.T) { + // Defense-in-depth — if a runtime returns ok status but the text + // itself starts with "agent error", the canvas would still + // render system role. Mirror that here. + body := json.RawMessage(`{"result":"Agent error: ProcessError(exit=1)"}`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + if len(msgs) != 1 || msgs[0].Role != "system" { + t.Errorf("agent-error prefix did NOT promote to system: %+v", msgs) + } +} + +func TestChatHistory_AgentAttachmentsFromResponseBodyParts(t *testing.T) { + // Notify shape: response_body = {"result":"","parts":[{"kind":"file",...}]} + body := json.RawMessage(`{ + "result": "Done — see attached.", + "parts": [ + {"kind":"file","file":{"name":"build.zip","uri":"workspace:/tmp/build.zip","size":12345}} + ] + }`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + var agent *ChatMessage + for i := range msgs { + if msgs[i].Role == "agent" { + agent = &msgs[i] + break + } + } + if agent == nil { + t.Fatalf("no agent bubble") + } + if len(agent.Attachments) != 1 || agent.Attachments[0].Name != "build.zip" { + t.Errorf("agent attachments shape wrong: %+v", agent.Attachments) + } + if agent.Attachments[0].Size == nil || *agent.Attachments[0].Size != 12345 { + t.Errorf("size=%v want 12345", agent.Attachments[0].Size) + } +} + +func TestChatHistory_NoAgentMessageWhenResponseBodyNull(t *testing.T) { + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, nil, neverInternal) + for _, m := range msgs { + if m.Role == "agent" || m.Role == "system" { + t.Errorf("emitted agent/system bubble despite null response_body: %+v", m) + } + } +} + +func TestChatHistory_NoAgentMessageWhenResponseHasNoTextNoFiles(t *testing.T) { + body := json.RawMessage(`{"unrelated":"metadata"}`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + for _, m := range msgs { + if m.Role == "agent" { + t.Errorf("emitted agent bubble despite empty content: %+v", m) + } + } +} + +// ===================================================================== +// end-to-end shape — paired user + agent with same timestamp +// ===================================================================== + +func TestChatHistory_PairedUserAndAgentSameTimestamp(t *testing.T) { + created := mustParseTime(t, "2026-04-25T18:00:00Z") + req := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"what's 2+2?"}]}}}`) + resp := json.RawMessage(`{"result":"4"}`) + msgs := activityRowToChatMessages(created, "ok", req, resp, neverInternal) + if len(msgs) != 2 { + t.Fatalf("expected 2 messages, got %d", len(msgs)) + } + if msgs[0].Role != "user" || msgs[0].Content != "what's 2+2?" { + t.Errorf("first message wrong: %+v", msgs[0]) + } + if msgs[1].Role != "agent" || msgs[1].Content != "4" { + t.Errorf("second message wrong: %+v", msgs[1]) + } + if msgs[0].Timestamp != msgs[1].Timestamp { + t.Errorf("paired bubbles have different timestamps: %q vs %q", msgs[0].Timestamp, msgs[1].Timestamp) + } +} + +// ===================================================================== +// Go-specific: defensive parsing +// ===================================================================== + +func TestChatHistory_MalformedJSONInRequestBodyReturnsEmpty(t *testing.T) { + // Should NOT panic; should return no user bubble (or no message at all). + body := json.RawMessage(`{not valid json}`) + defer func() { + if r := recover(); r != nil { + t.Fatalf("panic on malformed json: %v", r) + } + }() + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal) + for _, m := range msgs { + if m.Role == "user" && (m.Content != "" || len(m.Attachments) > 0) { + t.Errorf("malformed JSON yielded a non-empty user bubble: %+v", m) + } + } +} + +func TestChatHistory_V1ProtobufFlatFileShape(t *testing.T) { + // v1 a2a-sdk shape: flat parts with url/filename/mediaType + body := json.RawMessage(`{ + "result": { + "parts": [ + {"url":"https://example.com/data.csv","filename":"data.csv","mediaType":"text/csv"} + ] + } + }`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + var agent *ChatMessage + for i := range msgs { + if msgs[i].Role == "agent" { + agent = &msgs[i] + break + } + } + if agent == nil { + t.Fatalf("no agent bubble for v1 shape") + } + if len(agent.Attachments) != 1 { + t.Fatalf("attachments=%d want 1", len(agent.Attachments)) + } + att := agent.Attachments[0] + if att.Name != "data.csv" || att.URI != "https://example.com/data.csv" || att.MimeType != "text/csv" { + t.Errorf("v1 shape extracted wrong: %+v", att) + } +} + +func TestChatHistory_TaskShapeArtifactsExtracted(t *testing.T) { + // {"result":{"artifacts":[{"parts":[{"kind":"text","text":"..."}]}]}} + body := json.RawMessage(`{ + "result": { + "artifacts": [ + {"parts": [{"kind":"text","text":"hermes detail line"}]} + ] + } + }`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + if len(msgs) != 1 || msgs[0].Content != "hermes detail line" { + t.Errorf("artifact text not extracted: %+v", msgs) + } +} + +func TestChatHistory_OlderNestedRootTextShape(t *testing.T) { + // Older shape: {parts: [{root: {text: "..."}}]} + body := json.RawMessage(`{ + "result": { + "parts": [{"root":{"text":"legacy nested text"}}] + } + }`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + if len(msgs) != 1 || !strings.Contains(msgs[0].Content, "legacy nested text") { + t.Errorf("nested root.text not extracted: %+v", msgs) + } +} + +// ===================================================================== +// isInternalSelfMessage predicate itself +// ===================================================================== + +func TestChatHistory_IsInternalSelfMessage_DelegationPrefix(t *testing.T) { + if !isInternalSelfMessage("Delegation results are ready... ") { + t.Errorf("Delegation-results prefix should be flagged internal-self") + } + if isInternalSelfMessage("Delegation completed but not ready") { + t.Errorf("non-prefix match should NOT flag") + } + if isInternalSelfMessage("") { + t.Errorf("empty text should NOT flag (legitimate attachments-only bubble)") + } +} + +// ===================================================================== +// basename helper — mirrors canvas basename() semantics +// ===================================================================== + +func TestChatHistory_BasenameStripsSchemeAndPath(t *testing.T) { + cases := []struct { + in, want string + }{ + {"workspace:/uploads/shot.png", "shot.png"}, + {"workspace:/a/b/c/file.txt", "file.txt"}, + {"https://example.com/path/file.csv", "file.csv"}, + {"http://x/y", "y"}, + {"", "file"}, + {"workspace:", "file"}, // scheme-only collapses to "" → "file" sentinel, matches canvas basename + } + for _, tc := range cases { + got := basename(tc.in) + if got != tc.want { + t.Errorf("basename(%q) = %q want %q", tc.in, got, tc.want) + } + } +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index a074df7f..48967c2d 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -315,6 +315,13 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi wsAuth.POST("/activity", acth.Report) wsAuth.POST("/notify", acth.Notify) + // Chat history — RFC #2945 PR-C (issue #3017). Server-side + // rendering of activity_logs rows into the canonical + // ChatMessage shape so canvas (and future API consumers) don't + // re-implement the A2A-envelope walk per-client. + chh := handlers.NewChatHistoryHandler() + wsAuth.GET("/chat-history", chh.List) + // Config cfgh := handlers.NewConfigHandler() wsAuth.GET("/config", cfgh.Get)