diff --git a/workspace-server/internal/handlers/chat_history.go b/workspace-server/internal/handlers/chat_history.go index d76ea9d7..12aff51d 100644 --- a/workspace-server/internal/handlers/chat_history.go +++ b/workspace-server/internal/handlers/chat_history.go @@ -1,134 +1,65 @@ package handlers -// chat_history.go — server-side rendering of activity_logs rows into the -// canonical ChatMessage shape (RFC #2945 PR-C, issue #3017). +// chat_history.go — HTTP-shape adapter over messagestore.MessageStore +// (RFC #2945 PR-D). // -// Replaces the canvas-side TS parsing in -// canvas/src/components/tabs/chat/historyHydration.ts + -// canvas/src/components/tabs/chat/message-parser.ts so: -// -// - Single source of truth for A2A-envelope walking. A future API -// consumer (mobile, third-party integration, RFC #2945 PR-D's -// OSS MessageStore) consumes a typed surface instead of re- -// implementing the same shape walk. -// -// - Wire-format evolution (a2a-sdk v0 → v1 protobuf flat shape) is -// handled in one place. Today the TS parser handles both shapes; -// this Go parser mirrors that contract exactly. -// -// - PR-D unblocked: MessageStore returns []ChatMessage typed values, -// not raw activity_logs rows. The interface is meaningless if -// parsing still has to happen client-side. +// Pre-PR-D, this file owned the activity_logs query AND the parser +// AND the HTTP plumbing. PR-D extracts the storage + parser into +// internal/messagestore/ so OSS operators can plug in alternative +// backends (S3-tiered, vector store, in-memory). The handler is now +// a thin adapter: parse query params → call store → emit JSON. // // Endpoint: GET /workspaces/:id/chat-history?limit=N&before_ts=T -// // Auth: same wsAuth chain as /workspaces/:id/activity (tenant // ADMIN_TOKEN + X-Molecule-Org-Id header). No new trust boundary. // -// Behavioral contract: every test case in -// canvas/src/components/tabs/chat/__tests__/historyHydration.test.ts -// (11 cases) has a Go-side parity test in chat_history_test.go. -// Mutation-tested by reverting individual branches and confirming -// the corresponding Go test fires red. +// Behavioral parity with canvas TS is enforced at the messagestore +// layer (internal/messagestore/postgres_store_test.go); this file's +// tests cover the HTTP-shape concerns only. import ( - "database/sql" - "encoding/json" - "errors" "net/http" - "path" "strconv" - "strings" "time" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/messagestore" "github.com/gin-gonic/gin" "github.com/google/uuid" ) -// ChatMessage is the canonical shape returned by GET /chat-history. -// Mirrors canvas/src/components/tabs/chat/types.ts:ChatMessage so -// the canvas can render it without per-row mapping. -// -// NOTE: id is server-generated (UUID v4) per row pair — clients should -// NOT depend on these ids being stable across requests since the -// activity_log row itself doesn't carry message-shaped ids. Use -// (timestamp, role, content) for cross-request deduping; the id is -// just a React key. -type ChatMessage struct { - ID string `json:"id"` - Role string `json:"role"` // "user" | "agent" | "system" - Content string `json:"content"` - Attachments []ChatAttachment `json:"attachments,omitempty"` - Timestamp string `json:"timestamp"` // RFC3339 — pinned to row.created_at -} - -// ChatAttachment mirrors canvas's ChatAttachment / ParsedFilePart. -type ChatAttachment struct { - Name string `json:"name"` - URI string `json:"uri"` - MimeType string `json:"mimeType,omitempty"` - Size *int64 `json:"size,omitempty"` -} - // ChatHistoryResponse is the wire shape for GET /chat-history. type ChatHistoryResponse struct { - Messages []ChatMessage `json:"messages"` - ReachedEnd bool `json:"reached_end"` + Messages []messagestore.ChatMessage `json:"messages"` + ReachedEnd bool `json:"reached_end"` } -// ChatHistoryHandler exposes the typed chat-history endpoint. It does -// not need a broadcaster — read-only. -type ChatHistoryHandler struct{} - -// NewChatHistoryHandler returns a fresh handler. Stateless on purpose: -// no caching, no per-request handler state. The DB query is the -// expensive part; cache control is handled at HTTP layer. -func NewChatHistoryHandler() *ChatHistoryHandler { - return &ChatHistoryHandler{} +// ChatHistoryHandler exposes the typed chat-history endpoint over a +// MessageStore. The store is injected so OSS operators can swap the +// backend without forking the handler. +type ChatHistoryHandler struct { + store messagestore.MessageStore } -// internalSelfPrefixes — message texts that should be filtered out of -// chat history because they're internal self-triggers (heartbeats, -// scheduled-task self-fire, delegation-result self-notify) rather than -// user-typed messages. Mirrors canvas's isInternalSelfMessage. Centring -// here means a future internal-trigger pattern only needs to be added -// in one place, not in every consumer. -var internalSelfPrefixes = []string{ - "Delegation results are ready", -} - -// isInternalSelfMessage reports whether text starts with any registered -// internal-self prefix. Empty text returns false (only filter on -// matched prefixes — empty/missing text is a legitimate -// attachments-only bubble). -func isInternalSelfMessage(text string) bool { - if text == "" { - return false - } - for _, prefix := range internalSelfPrefixes { - if strings.HasPrefix(text, prefix) { - return true - } - } - return false +// NewChatHistoryHandler wires a MessageStore (typically +// messagestore.NewPostgresMessageStore at production startup). +// +// Tests inject fakes (see internal/handlers/chat_history_test.go). +// Constructor takes the interface, not a concrete type, so the +// platform-default vs OSS-alternative decision happens at wiring +// time in router.go. +func NewChatHistoryHandler(store messagestore.MessageStore) *ChatHistoryHandler { + return &ChatHistoryHandler{store: store} } // List handles GET /workspaces/:id/chat-history?limit=N&before_ts=T. // // Query parameters mirror /activity for caller convenience: // -// - limit (default 100, max 1000) — page size, newest-first from the -// server's POV. Caller reverses for chronological display. -// - before_ts (RFC3339, optional) — paginate by walking strictly -// older than this timestamp. Identical semantics to /activity's -// before_ts: matches what canvas uses for lazy-loading older -// batches. +// - limit (default 100, max 1000) — page size +// - before_ts (RFC3339, optional) — cursor for paginating backward // -// The handler scopes to activity_type='a2a_receive' AND source_id IS -// NULL (canvas-source rows only) — the same filter canvas applies via -// `?type=a2a_receive&source=canvas`. Centralizing here means a future -// caller (mobile, public API) doesn't need to know the filter. +// Validates inputs at the trust boundary; the store sees only +// well-formed ListOptions. func (h *ChatHistoryHandler) List(c *gin.Context) { workspaceID := c.Param("id") if _, err := uuid.Parse(workspaceID); err != nil { @@ -146,8 +77,7 @@ func (h *ChatHistoryHandler) List(c *gin.Context) { limit = 1000 } - var beforeTS time.Time - usingBeforeTS := false + opts := messagestore.ListOptions{Limit: limit} if v := c.Query("before_ts"); v != "" { t, err := time.Parse(time.RFC3339, v) if err != nil { @@ -156,491 +86,28 @@ func (h *ChatHistoryHandler) List(c *gin.Context) { }) return } - beforeTS = t - usingBeforeTS = true + opts.BeforeTS = t + opts.HasBefore = true } - // Newest-first ordering matches /activity. Caller reverses for - // chronological display. Same semantics across both endpoints - // keeps the canvas's lazy-history pagination logic stable. - rows, err := h.queryActivityRows(c.Request.Context(), workspaceID, limit, usingBeforeTS, beforeTS) + messages, reachedEnd, err := h.store.List(c.Request.Context(), workspaceID, opts) if err != nil { - // Errors here are infra (DB unreachable) — surface as 502 so - // the canvas can retry vs. treating as "no rows". + // Errors here are infra (DB unreachable, store impl failure). + // Surface as 502 so the canvas can retry vs. treating as + // "no rows." c.JSON(http.StatusBadGateway, gin.H{"error": "chat history unavailable"}) return } - defer rows.Close() - var messages []ChatMessage - rowCount := 0 - for rows.Next() { - var ( - createdAt time.Time - status string - rawRequest sql.NullString - rawResponse sql.NullString - ) - if err := rows.Scan(&createdAt, &status, &rawRequest, &rawResponse); err != nil { - continue - } - rowCount++ - var requestBody, responseBody json.RawMessage - if rawRequest.Valid { - requestBody = json.RawMessage(rawRequest.String) - } - if rawResponse.Valid { - responseBody = json.RawMessage(rawResponse.String) - } - messages = append(messages, activityRowToChatMessages(createdAt, status, requestBody, responseBody, isInternalSelfMessage)...) + // Defensive: if the store returns nil messages slice (any impl + // might), emit empty array rather than `null` so canvas's JSON + // parser doesn't have to handle two empty representations. + if messages == nil { + messages = []messagestore.ChatMessage{} } c.JSON(http.StatusOK, ChatHistoryResponse{ Messages: messages, - ReachedEnd: rowCount < limit, + ReachedEnd: reachedEnd, }) } - -// queryActivityRows pulls the raw a2a_receive rows for a workspace. -// Split out so unit tests can mock the DB layer without spinning a -// full request context. Canvas-source rows only (source_id IS NULL). -func (h *ChatHistoryHandler) queryActivityRows(ctx interface { - Done() <-chan struct{} - Err() error - Deadline() (time.Time, bool) - Value(any) any -}, workspaceID string, limit int, usingBeforeTS bool, beforeTS time.Time) (*sql.Rows, error) { - if usingBeforeTS { - return db.DB.QueryContext(ctx, ` - SELECT created_at, status, request_body::text, response_body::text - FROM activity_logs - WHERE workspace_id = $1 - AND activity_type = 'a2a_receive' - AND source_id IS NULL - AND created_at < $2 - ORDER BY created_at DESC - LIMIT $3 - `, workspaceID, beforeTS, limit) - } - return db.DB.QueryContext(ctx, ` - SELECT created_at, status, request_body::text, response_body::text - FROM activity_logs - WHERE workspace_id = $1 - AND activity_type = 'a2a_receive' - AND source_id IS NULL - ORDER BY created_at DESC - LIMIT $2 - `, workspaceID, limit) -} - -// activityRowToChatMessages converts ONE activity_logs row into 0-2 -// ChatMessages. Direct port of canvas's activityRowToMessages. -// -// - Up to 1 user-side bubble from request_body, unless internal-self. -// - Up to 1 agent-side bubble from response_body. Role is "system" -// when status='error' OR text starts with "agent error" (case- -// insensitive — matches canvas predicate exactly). -// -// Both bubbles MUST adopt row.created_at as their timestamp. The -// canvas hydration regression that motivated extracting the helper -// (every reload re-stamping bubbles to render-time) is regression- -// covered in chat_history_test.go. -// -// Defensive: any malformed JSON is silently dropped (text becomes "", -// attachments []) — chat falls through to text-only rather than -// surfacing a 500. -func activityRowToChatMessages( - createdAt time.Time, - status string, - requestBody json.RawMessage, - responseBody json.RawMessage, - internalSelf func(string) bool, -) []ChatMessage { - var out []ChatMessage - timestamp := createdAt.UTC().Format(time.RFC3339Nano) - - // USER side — extract from request_body.params.message - userText := extractRequestText(requestBody) - userAttachments := extractFilesFromUserMessage(requestBody) - if !internalSelf(userText) && (userText != "" || len(userAttachments) > 0) { - out = append(out, ChatMessage{ - ID: newMessageID(), - Role: "user", - Content: userText, - Attachments: userAttachments, - Timestamp: timestamp, - }) - } - - // AGENT side — extract from response_body - if len(responseBody) > 0 { - agentText := extractChatResponseText(responseBody) - agentAttachments := extractFilesFromResponse(responseBody) - if agentText != "" || len(agentAttachments) > 0 { - role := "agent" - if status == "error" || strings.HasPrefix(strings.ToLower(agentText), "agent error") { - role = "system" - } - out = append(out, ChatMessage{ - ID: newMessageID(), - Role: role, - Content: agentText, - Attachments: agentAttachments, - Timestamp: timestamp, - }) - } - } - - return out -} - -// extractRequestText pulls the user's typed text from the canonical -// A2A request envelope. Returns "" on any malformed shape; callers -// pair this with extractFilesFromUserMessage to catch attachments- -// only bubbles. -// -// request_body = {"params": {"message": {"parts": [{"kind":"text", "text":"..."}, ...]}}} -// -// Mirrors canvas's extractRequestText. Currently returns ONLY parts[0] -// to match canvas exactly; multi-text-part user messages would -// require both parsers to evolve in lockstep (track via PR-C-2). -func extractRequestText(body json.RawMessage) string { - if len(body) == 0 { - return "" - } - var env struct { - Params struct { - Message struct { - Parts []map[string]any `json:"parts"` - } `json:"message"` - } `json:"params"` - } - if err := json.Unmarshal(body, &env); err != nil { - return "" - } - for _, p := range env.Params.Message.Parts { - if t, ok := p["text"].(string); ok && t != "" { - return t - } - } - return "" -} - -// extractFilesFromUserMessage walks the same request_body envelope as -// extractRequestText and collects file parts. -func extractFilesFromUserMessage(body json.RawMessage) []ChatAttachment { - if len(body) == 0 { - return nil - } - var env struct { - Params struct { - Message json.RawMessage `json:"message"` - } `json:"params"` - } - if err := json.Unmarshal(body, &env); err != nil { - return nil - } - if len(env.Params.Message) == 0 { - return nil - } - return extractFilesFromTask(env.Params.Message) -} - -// extractChatResponseText collects text from any of the response shapes -// canvas's extractChatResponseText handles, joining with "\n": -// -// - {"result": ""} — string -// - {"result": {"parts": [{"kind":"text", "text":""}]}} — A2A JSON-RPC -// - {"parts": [{"root": {"text": "..."}}]} — older nested shape -// - {"result": {"artifacts": [{"parts": [...]}]}} — task shape -// - {"task": ""} — fallback -// -// Why collect rather than first-source-wins: claude-code emits multiple -// text parts; hermes emits summary-in-parts + details-in-artifacts. The -// pre-collect "first wins" silently truncated 15k-char briefs to their -// leading line and dropped artifact details. Matches canvas behavior -// exactly. -func extractChatResponseText(body json.RawMessage) string { - if len(body) == 0 { - return "" - } - - // {"result": "string"} - var asString struct { - Result string `json:"result"` - } - if err := json.Unmarshal(body, &asString); err == nil && asString.Result != "" { - return asString.Result - } - - // {"result": {object}} — try the structured shapes - var asObject struct { - Result json.RawMessage `json:"result"` - Task string `json:"task"` - } - if err := json.Unmarshal(body, &asObject); err != nil { - return "" - } - - var collected []string - - if len(asObject.Result) > 0 { - var resultObj struct { - Parts []map[string]any `json:"parts"` - Artifacts []json.RawMessage `json:"artifacts"` - } - if err := json.Unmarshal(asObject.Result, &resultObj); err == nil { - // A2A JSON-RPC: parts[].text - if t := joinTextParts(resultObj.Parts); t != "" { - collected = append(collected, t) - } - // Older nested: parts[].root.text - var rootTexts []string - for _, p := range resultObj.Parts { - if root, ok := p["root"].(map[string]any); ok { - if t, ok := root["text"].(string); ok && t != "" { - rootTexts = append(rootTexts, t) - } - } - } - if len(rootTexts) > 0 { - collected = append(collected, strings.Join(rootTexts, "\n")) - } - // Task shape: artifacts[].parts[].text - for _, raw := range resultObj.Artifacts { - var art struct { - Parts []map[string]any `json:"parts"` - } - if err := json.Unmarshal(raw, &art); err == nil { - if t := joinTextParts(art.Parts); t != "" { - collected = append(collected, t) - } - } - } - } - } - - if len(collected) > 0 { - return strings.Join(collected, "\n") - } - - if asObject.Task != "" { - return asObject.Task - } - return "" -} - -// joinTextParts returns a "\n"-joined concatenation of every text part -// in parts[]. Empty if no text parts. Matches canvas extractTextsFromParts. -func joinTextParts(parts []map[string]any) string { - var texts []string - for _, p := range parts { - // Accept both "type":"text" (older) and "kind":"text" (current). - isText := false - if k, ok := p["kind"].(string); ok && k == "text" { - isText = true - } - if t, ok := p["type"].(string); ok && t == "text" { - isText = true - } - if !isText { - continue - } - if t, ok := p["text"].(string); ok && t != "" { - texts = append(texts, t) - } - } - return strings.Join(texts, "\n") -} - -// extractFilesFromResponse collects file parts from the response_body -// across the same shape variants as extractChatResponseText. Mirrors -// canvas extractFilesFromTask, except the canvas function takes "the -// task object" while this takes the wire-level response_body and -// dispatches: -// -// - {"result": {object}} → unwrap result, walk parts/artifacts -// - {"result": "", "parts": [...]} → notify shape, walk top-level parts -// - {"message": {"parts": [...]}} → some A2A servers wrap as a message -func extractFilesFromResponse(body json.RawMessage) []ChatAttachment { - if len(body) == 0 { - return nil - } - // Determine which container to feed extractFilesFromTask: - // - if result is an object, feed the result object - // - else feed the top-level body (notify shape with parts at root) - var probe struct { - Result json.RawMessage `json:"result"` - } - _ = json.Unmarshal(body, &probe) - - feed := body - if len(probe.Result) > 0 { - // Is result an object? (vs a string) - trimmed := bytes_trim_space(probe.Result) - if len(trimmed) > 0 && trimmed[0] == '{' { - feed = probe.Result - } - } - return extractFilesFromTask(feed) -} - -// extractFilesFromTask walks parts[] + artifacts[].parts[] + status.message.parts[] -// + message.parts[] and pulls out file parts. Mirrors canvas's -// extractFilesFromTask exactly — same two wire shapes (v0 hot path, -// v1 protobuf flat shape). -// -// Defensive: any error inside the walk is recovered and partial -// results returned. A malformed shape should never fail the whole -// chat reload — degraded UX is better than 500. -func extractFilesFromTask(taskJSON json.RawMessage) []ChatAttachment { - if len(taskJSON) == 0 { - return nil - } - var task struct { - Parts []map[string]any `json:"parts"` - Artifacts []json.RawMessage `json:"artifacts"` - Status json.RawMessage `json:"status"` - Message json.RawMessage `json:"message"` - } - if err := json.Unmarshal(taskJSON, &task); err != nil { - return nil - } - var out []ChatAttachment - out = appendFilesFromParts(out, task.Parts) - for _, raw := range task.Artifacts { - var art struct { - Parts []map[string]any `json:"parts"` - } - if err := json.Unmarshal(raw, &art); err == nil { - out = appendFilesFromParts(out, art.Parts) - } - } - if len(task.Status) > 0 { - var st struct { - Message struct { - Parts []map[string]any `json:"parts"` - } `json:"message"` - } - if err := json.Unmarshal(task.Status, &st); err == nil { - out = appendFilesFromParts(out, st.Message.Parts) - } - } - if len(task.Message) > 0 { - var msg struct { - Parts []map[string]any `json:"parts"` - } - if err := json.Unmarshal(task.Message, &msg); err == nil { - out = appendFilesFromParts(out, msg.Parts) - } - } - return out -} - -// appendFilesFromParts handles the v0 hot path (kind/type=file with -// nested file{}) and the v1 flat path (url+filename+mediaType). -func appendFilesFromParts(out []ChatAttachment, parts []map[string]any) []ChatAttachment { - for _, raw := range parts { - v0 := false - if k, ok := raw["kind"].(string); ok && k == "file" { - v0 = true - } - if t, ok := raw["type"].(string); ok && t == "file" { - v0 = true - } - v1URL, _ := raw["url"].(string) - - if !v0 && v1URL == "" { - continue - } - - var att ChatAttachment - if v0 { - file, _ := raw["file"].(map[string]any) - if file == nil { - file = raw // some emitters flatten; defensive - } - uri, _ := file["uri"].(string) - if uri == "" { - continue - } - att.URI = uri - if name, _ := file["name"].(string); name != "" { - att.Name = name - } else { - att.Name = basename(uri) - } - if mt, ok := file["mimeType"].(string); ok { - att.MimeType = mt - } - if sz, ok := numericSize(file["size"]); ok { - att.Size = &sz - } - } else { - att.URI = v1URL - if name, _ := raw["filename"].(string); name != "" { - att.Name = name - } else { - att.Name = basename(v1URL) - } - if mt, ok := raw["mediaType"].(string); ok { - att.MimeType = mt - } - } - out = append(out, att) - } - return out -} - -// numericSize coerces JSON's number type (always float64 in -// json.Unmarshal of map[string]any) to int64 for the Size field. -// Returns (0, false) for non-numeric or absent values. -func numericSize(v any) (int64, bool) { - switch n := v.(type) { - case float64: - return int64(n), true - case int64: - return n, true - case int: - return int64(n), true - } - return 0, false -} - -// basename strips scheme + path components, returning the trailing -// segment (or "file" if empty). Mirrors canvas basename helper. -func basename(uri string) string { - cleaned := strings.TrimPrefix(uri, "workspace:") - cleaned = strings.TrimPrefix(cleaned, "https://") - cleaned = strings.TrimPrefix(cleaned, "http://") - if cleaned == "" { - return "file" - } - return path.Base(cleaned) -} - -// bytes_trim_space — minimal whitespace stripper for json.RawMessage -// peeking. Avoids importing bytes for one tiny helper. Internal-only. -func bytes_trim_space(b json.RawMessage) json.RawMessage { - for len(b) > 0 && (b[0] == ' ' || b[0] == '\t' || b[0] == '\n' || b[0] == '\r') { - b = b[1:] - } - for len(b) > 0 && (b[len(b)-1] == ' ' || b[len(b)-1] == '\t' || b[len(b)-1] == '\n' || b[len(b)-1] == '\r') { - b = b[:len(b)-1] - } - return b -} - -// newMessageID generates a fresh UUID per ChatMessage. Server-minted -// because activity_logs rows don't carry message-shaped ids, and the -// canvas only needs a React-key-stable id (it dedupes by content+role+ -// timestamp window, not by id). -func newMessageID() string { - return uuid.New().String() -} - -// ensureNoUnusedImports avoids lint complaining about `errors` if a -// future refactor removes the only consumer. errors is reserved for -// the inevitable wrap-aware DB-error handling once we add a -// distinguishable "DB outage vs no rows" path. -var _ = errors.Is diff --git a/workspace-server/internal/handlers/chat_history_test.go b/workspace-server/internal/handlers/chat_history_test.go index 50ad69fc..cd3aa270 100644 --- a/workspace-server/internal/handlers/chat_history_test.go +++ b/workspace-server/internal/handlers/chat_history_test.go @@ -1,422 +1,276 @@ package handlers -// chat_history_test.go — Go-side parity tests for the canvas TS test -// fixtures in canvas/src/components/tabs/chat/__tests__/historyHydration.test.ts. +// chat_history_test.go — handler-level tests against a fake +// MessageStore. The parser-level parity tests against the canvas TS +// fixtures live in internal/messagestore/postgres_store_test.go; +// this file covers the HTTP-shape concerns (param validation, +// pagination passthrough, error mapping) without touching a DB. // -// Every test case in the TS file has a Go counterpart here, named -// after the TS describe/it block. A future change that diverges the -// two implementations should fail the corresponding test here BEFORE -// the canvas's stale TS path silently returns wrong messages. -// -// Mutation guidance: when adding behavior, add the case to BOTH -// historyHydration.test.ts AND this file. RFC #2945 PR-C ships server- -// owned parsing — the canvas TS is the legacy source the server now -// replaces, so divergence == regression. +// Why the split: PR-D extracted storage to messagestore.MessageStore. +// The handler is now a thin adapter — its tests should exercise the +// adapter (ParseQuery → store.List → emitJSON), not the parser. A +// future MessageStore impl (S3, vector store) shares the same +// handler; testing the handler against the interface keeps the +// adapter test independent of any specific impl. import ( + "context" "encoding/json" + "errors" + "net/http" + "net/http/httptest" "strings" "testing" - "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/messagestore" + "github.com/gin-gonic/gin" ) -const fixedTimestamp = "2026-04-25T18:00:00Z" +const testWorkspaceID = "550e8400-e29b-41d4-a716-446655440000" -func mustParseTime(t *testing.T, s string) time.Time { +func init() { + gin.SetMode(gin.TestMode) +} + +// fakeStore is a stub MessageStore for handler-level tests. Every +// real store impl (Postgres, S3, vector) shares the handler — so a +// fake that records inputs + returns scripted outputs is the right +// granularity for HTTP-shape coverage. +type fakeStore struct { + // LastWorkspaceID + LastOpts capture the call shape so the test + // can assert the handler passed the right args to the store. + LastWorkspaceID string + LastOpts messagestore.ListOptions + + // Returns — set per test. + ReturnMessages []messagestore.ChatMessage + ReturnReachedEnd bool + ReturnErr error + + // Panic — if non-empty, List panics with this string. Used by + // the resilience test to confirm the handler returns 502 on + // store-impl failures rather than crashing the goroutine. + PanicWith string +} + +func (s *fakeStore) List(ctx context.Context, workspaceID string, opts messagestore.ListOptions) ([]messagestore.ChatMessage, bool, error) { + if s.PanicWith != "" { + panic(s.PanicWith) + } + s.LastWorkspaceID = workspaceID + s.LastOpts = opts + return s.ReturnMessages, s.ReturnReachedEnd, s.ReturnErr +} + +// Compile-time assertion that fakeStore satisfies the interface. +// Catches drift if the interface changes and the fake stops being a +// drop-in for tests. +var _ messagestore.MessageStore = (*fakeStore)(nil) + +func newRouter(store messagestore.MessageStore) *gin.Engine { + r := gin.New() + h := NewChatHistoryHandler(store) + r.GET("/workspaces/:id/chat-history", h.List) + return r +} + +func doChatHistoryRequest(t *testing.T, r *gin.Engine, path string) *httptest.ResponseRecorder { t.Helper() - tt, err := time.Parse(time.RFC3339, s) - if err != nil { - t.Fatalf("parse %s: %v", s, err) - } - return tt -} - -func neverInternal(_ string) bool { return false } - -// ===================================================================== -// timestamp preservation (regression cover) -// -// The canvas bug that motivated extracting the helper: every reload -// re-stamped historical bubbles to render-time. Pin row.created_at -// adoption. -// ===================================================================== - -func TestChatHistory_UserMessageTimestampPinsToCreatedAt(t *testing.T) { - created := mustParseTime(t, "2026-04-25T18:00:00Z") - body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"hello from earlier today"}]}}}`) - - msgs := activityRowToChatMessages(created, "ok", body, nil, neverInternal) - if len(msgs) != 1 { - t.Fatalf("expected 1 user message, got %d", len(msgs)) - } - if msgs[0].Role != "user" { - t.Errorf("role=%q want user", msgs[0].Role) - } - if !strings.HasPrefix(msgs[0].Timestamp, "2026-04-25T18:00:00") { - t.Errorf("user message timestamp %q does NOT pin to row.created_at — regression of the 2026-04-25 bubble-collapse bug", msgs[0].Timestamp) - } -} - -func TestChatHistory_AgentMessageTimestampPinsToCreatedAt(t *testing.T) { - created := mustParseTime(t, "2026-04-25T18:05:00Z") - body := json.RawMessage(`{"result":"agent reply"}`) - - msgs := activityRowToChatMessages(created, "ok", nil, body, neverInternal) - if len(msgs) != 1 { - t.Fatalf("expected 1 agent message, got %d", len(msgs)) - } - if msgs[0].Role != "agent" { - t.Errorf("role=%q want agent", msgs[0].Role) - } - if !strings.HasPrefix(msgs[0].Timestamp, "2026-04-25T18:05:00") { - t.Errorf("agent message timestamp %q does NOT pin to row.created_at", msgs[0].Timestamp) - } -} - -func TestChatHistory_TwoRowsDistinctTimestamps(t *testing.T) { - bodyA := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"first"}]}}}`) - bodyB := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"second"}]}}}`) - a := activityRowToChatMessages(mustParseTime(t, "2026-04-25T14:00:00Z"), "ok", bodyA, nil, neverInternal) - b := activityRowToChatMessages(mustParseTime(t, "2026-04-25T21:01:58Z"), "ok", bodyB, nil, neverInternal) - - if len(a) != 1 || len(b) != 1 { - t.Fatalf("expected 1 message each; got %d and %d", len(a), len(b)) - } - if a[0].Timestamp == b[0].Timestamp { - t.Errorf("two distinct created_at values produced same timestamp: %q", a[0].Timestamp) - } - if !strings.HasPrefix(a[0].Timestamp, "2026-04-25T14:00:00") || !strings.HasPrefix(b[0].Timestamp, "2026-04-25T21:01:58") { - t.Errorf("timestamps drifted: a=%q b=%q", a[0].Timestamp, b[0].Timestamp) - } + req := httptest.NewRequest(http.MethodGet, path, nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + return w } // ===================================================================== -// user-message extraction +// Param validation // ===================================================================== -func TestChatHistory_EmitsUserMessageWhenRequestHasText(t *testing.T) { - body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"hi agent"}]}}}`) - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal) - if len(msgs) != 1 { - t.Fatalf("expected 1 message, got %d", len(msgs)) +func TestChatHistoryHandler_RejectsNonUUIDWorkspaceID(t *testing.T) { + store := &fakeStore{} + r := newRouter(store) + + w := doChatHistoryRequest(t, r, "/workspaces/not-a-uuid/chat-history") + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400 for non-UUID, got %d", w.Code) } - if msgs[0].Role != "user" || msgs[0].Content != "hi agent" { - t.Errorf("role=%q content=%q want user/hi agent", msgs[0].Role, msgs[0].Content) + if store.LastWorkspaceID != "" { + t.Errorf("non-UUID reached the store: %q", store.LastWorkspaceID) } } -func TestChatHistory_DropsInternalSelfMessages(t *testing.T) { - body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"Delegation results are ready..."}]}}}`) - predicate := func(t string) bool { return strings.HasPrefix(t, "Delegation results are ready") } - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, predicate) - for _, m := range msgs { - if m.Role == "user" { - t.Errorf("internal-self message rendered as user bubble: %q", m.Content) - } +func TestChatHistoryHandler_RejectsMalformedBeforeTS(t *testing.T) { + store := &fakeStore{} + r := newRouter(store) + + w := doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history?before_ts=not-a-timestamp") + if w.Code != http.StatusBadRequest { + t.Errorf("expected 400 for malformed before_ts, got %d", w.Code) + } + if !strings.Contains(w.Body.String(), "RFC3339") { + t.Errorf("error message should mention RFC3339; got %q", w.Body.String()) } } -func TestChatHistory_NoUserMessageWhenRequestBodyNull(t *testing.T) { - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, nil, neverInternal) - for _, m := range msgs { - if m.Role == "user" { - t.Errorf("emitted user bubble despite null request_body: %+v", m) - } +func TestChatHistoryHandler_DefaultsLimitTo100(t *testing.T) { + store := &fakeStore{} + r := newRouter(store) + + doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history") + if store.LastOpts.Limit != 100 { + t.Errorf("default limit=%d want 100", store.LastOpts.Limit) + } + if store.LastOpts.HasBefore { + t.Errorf("HasBefore should be false when no cursor passed") } } -func TestChatHistory_UserAttachmentsHydratedFromRequestBody(t *testing.T) { - body := json.RawMessage(`{ - "params": { - "message": { - "parts": [ - {"kind":"text","text":"here's the screenshot"}, - {"kind":"file","file":{"name":"shot.png","mimeType":"image/png","uri":"workspace:/uploads/shot.png","size":4096}} - ] - } - } - }`) - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal) - var user *ChatMessage - for i := range msgs { - if msgs[i].Role == "user" { - user = &msgs[i] - break - } - } - if user == nil { - t.Fatalf("no user bubble produced") - } - if user.Content != "here's the screenshot" { - t.Errorf("content=%q", user.Content) - } - if len(user.Attachments) != 1 { - t.Fatalf("attachments=%d want 1", len(user.Attachments)) - } - att := user.Attachments[0] - if att.Name != "shot.png" || att.URI != "workspace:/uploads/shot.png" || att.MimeType != "image/png" { - t.Errorf("attachment shape wrong: %+v", att) - } - if att.Size == nil || *att.Size != 4096 { - t.Errorf("size=%v want 4096", att.Size) +func TestChatHistoryHandler_ClampsLimitToMax1000(t *testing.T) { + store := &fakeStore{} + r := newRouter(store) + + doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history?limit=99999") + if store.LastOpts.Limit != 1000 { + t.Errorf("limit not clamped: got %d, want 1000", store.LastOpts.Limit) } } -func TestChatHistory_AttachmentsOnlyUserBubbleWhenTextEmpty(t *testing.T) { - // Drag-drop a file with no caption — bubble should still render. - body := json.RawMessage(`{ - "params": { - "message": { - "parts": [ - {"kind":"file","file":{"name":"report.pdf","uri":"workspace:/uploads/report.pdf"}} - ] - } - } - }`) - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal) - if len(msgs) != 1 { - t.Fatalf("expected 1 attachments-only bubble, got %d", len(msgs)) - } - if msgs[0].Role != "user" || msgs[0].Content != "" || len(msgs[0].Attachments) != 1 { - t.Errorf("unexpected: role=%q content=%q attachments=%d", msgs[0].Role, msgs[0].Content, len(msgs[0].Attachments)) - } - if msgs[0].Attachments[0].Name != "report.pdf" { - t.Errorf("attachment name=%q want report.pdf", msgs[0].Attachments[0].Name) - } -} +func TestChatHistoryHandler_IgnoresInvalidLimit(t *testing.T) { + // Negative or zero limits should fall back to default rather + // than reach the store (which rejects them as a programming bug). + store := &fakeStore{} + r := newRouter(store) -func TestChatHistory_InternalSelfPredicateSuppressesEvenWithAttachments(t *testing.T) { - body := json.RawMessage(`{ - "params": { - "message": { - "parts": [ - {"kind":"text","text":"Delegation results are ready..."}, - {"kind":"file","file":{"name":"x.zip","uri":"workspace:/x.zip"}} - ] - } - } - }`) - predicate := func(t string) bool { return strings.HasPrefix(t, "Delegation results are ready") } - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, predicate) - for _, m := range msgs { - if m.Role == "user" { - t.Errorf("internal-self predicate did NOT suppress user bubble despite attachments: %+v", m) + for _, bad := range []string{"-1", "0", "abc"} { + store.LastOpts = messagestore.ListOptions{} + doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history?limit="+bad) + if store.LastOpts.Limit != 100 { + t.Errorf("limit=%q yielded %d, want default 100", bad, store.LastOpts.Limit) } } } // ===================================================================== -// agent-message extraction +// Pagination passthrough // ===================================================================== -func TestChatHistory_AgentMessageFromResultString(t *testing.T) { - body := json.RawMessage(`{"result":"agent says hi"}`) - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) - if len(msgs) != 1 || msgs[0].Role != "agent" || msgs[0].Content != "agent says hi" { - t.Errorf("got %+v", msgs) - } -} +func TestChatHistoryHandler_BeforeTSPassedToStore(t *testing.T) { + store := &fakeStore{} + r := newRouter(store) -func TestChatHistory_RoleSystemWhenStatusError(t *testing.T) { - body := json.RawMessage(`{"result":"delegation failed"}`) - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "error", nil, body, neverInternal) - if len(msgs) != 1 || msgs[0].Role != "system" { - t.Errorf("status=error did NOT promote role to system: %+v", msgs) - } -} + doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history?before_ts=2026-04-25T18:00:00Z&limit=25") -func TestChatHistory_RoleSystemWhenAgentErrorPrefix(t *testing.T) { - // Defense-in-depth — if a runtime returns ok status but the text - // itself starts with "agent error", the canvas would still - // render system role. Mirror that here. - body := json.RawMessage(`{"result":"Agent error: ProcessError(exit=1)"}`) - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) - if len(msgs) != 1 || msgs[0].Role != "system" { - t.Errorf("agent-error prefix did NOT promote to system: %+v", msgs) + if !store.LastOpts.HasBefore { + t.Errorf("HasBefore=false but query passed before_ts") } -} - -func TestChatHistory_AgentAttachmentsFromResponseBodyParts(t *testing.T) { - // Notify shape: response_body = {"result":"","parts":[{"kind":"file",...}]} - body := json.RawMessage(`{ - "result": "Done — see attached.", - "parts": [ - {"kind":"file","file":{"name":"build.zip","uri":"workspace:/tmp/build.zip","size":12345}} - ] - }`) - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) - var agent *ChatMessage - for i := range msgs { - if msgs[i].Role == "agent" { - agent = &msgs[i] - break - } + got := store.LastOpts.BeforeTS.UTC().Format("2006-01-02T15:04:05Z") + if got != "2026-04-25T18:00:00Z" { + t.Errorf("BeforeTS=%q want 2026-04-25T18:00:00Z", got) } - if agent == nil { - t.Fatalf("no agent bubble") - } - if len(agent.Attachments) != 1 || agent.Attachments[0].Name != "build.zip" { - t.Errorf("agent attachments shape wrong: %+v", agent.Attachments) - } - if agent.Attachments[0].Size == nil || *agent.Attachments[0].Size != 12345 { - t.Errorf("size=%v want 12345", agent.Attachments[0].Size) - } -} - -func TestChatHistory_NoAgentMessageWhenResponseBodyNull(t *testing.T) { - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, nil, neverInternal) - for _, m := range msgs { - if m.Role == "agent" || m.Role == "system" { - t.Errorf("emitted agent/system bubble despite null response_body: %+v", m) - } - } -} - -func TestChatHistory_NoAgentMessageWhenResponseHasNoTextNoFiles(t *testing.T) { - body := json.RawMessage(`{"unrelated":"metadata"}`) - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) - for _, m := range msgs { - if m.Role == "agent" { - t.Errorf("emitted agent bubble despite empty content: %+v", m) - } + if store.LastOpts.Limit != 25 { + t.Errorf("limit=%d want 25", store.LastOpts.Limit) } } // ===================================================================== -// end-to-end shape — paired user + agent with same timestamp +// Response shape // ===================================================================== -func TestChatHistory_PairedUserAndAgentSameTimestamp(t *testing.T) { - created := mustParseTime(t, "2026-04-25T18:00:00Z") - req := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"what's 2+2?"}]}}}`) - resp := json.RawMessage(`{"result":"4"}`) - msgs := activityRowToChatMessages(created, "ok", req, resp, neverInternal) - if len(msgs) != 2 { - t.Fatalf("expected 2 messages, got %d", len(msgs)) +func TestChatHistoryHandler_EmptyResultIsArrayNotNull(t *testing.T) { + // nil messages slice from the store must serialize as `[]`, + // not `null` — canvas's JSON parser has one path. + store := &fakeStore{ReturnMessages: nil, ReturnReachedEnd: true} + r := newRouter(store) + w := doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history") + + if w.Code != http.StatusOK { + t.Fatalf("status=%d", w.Code) } - if msgs[0].Role != "user" || msgs[0].Content != "what's 2+2?" { - t.Errorf("first message wrong: %+v", msgs[0]) + var resp ChatHistoryResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("body not JSON: %v", err) } - if msgs[1].Role != "agent" || msgs[1].Content != "4" { - t.Errorf("second message wrong: %+v", msgs[1]) + // json.Unmarshal of `null` into a []slice yields a nil — assert + // the JSON literally contains "[]" so a future change that + // forgets the nil-coercion would fail loudly. + if !strings.Contains(w.Body.String(), `"messages":[]`) { + t.Errorf("body should contain `\"messages\":[]`; got %s", w.Body.String()) } - if msgs[0].Timestamp != msgs[1].Timestamp { - t.Errorf("paired bubbles have different timestamps: %q vs %q", msgs[0].Timestamp, msgs[1].Timestamp) + if !resp.ReachedEnd { + t.Errorf("reached_end not propagated") + } +} + +func TestChatHistoryHandler_NonEmptyResponsePreservesShape(t *testing.T) { + size := int64(4096) + store := &fakeStore{ + ReturnMessages: []messagestore.ChatMessage{ + { + ID: "msg-1", + Role: "user", + Content: "hi", + Timestamp: "2026-04-25T18:00:00Z", + }, + { + ID: "msg-2", + Role: "agent", + Content: "hello back", + Attachments: []messagestore.ChatAttachment{ + {Name: "img.png", URI: "workspace:/img.png", MimeType: "image/png", Size: &size}, + }, + Timestamp: "2026-04-25T18:00:01Z", + }, + }, + ReturnReachedEnd: false, + } + r := newRouter(store) + w := doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history") + + if w.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", w.Code, w.Body.String()) + } + var resp ChatHistoryResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("body not JSON: %v", err) + } + if len(resp.Messages) != 2 { + t.Fatalf("messages=%d want 2", len(resp.Messages)) + } + if resp.Messages[1].Attachments[0].Size == nil || *resp.Messages[1].Attachments[0].Size != 4096 { + t.Errorf("size pointer flattened in JSON round-trip") } } // ===================================================================== -// Go-specific: defensive parsing +// Error mapping — store errors become 502, not 500/panic // ===================================================================== -func TestChatHistory_MalformedJSONInRequestBodyReturnsEmpty(t *testing.T) { - // Should NOT panic; should return no user bubble (or no message at all). - body := json.RawMessage(`{not valid json}`) - defer func() { - if r := recover(); r != nil { - t.Fatalf("panic on malformed json: %v", r) - } - }() - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal) - for _, m := range msgs { - if m.Role == "user" && (m.Content != "" || len(m.Attachments) > 0) { - t.Errorf("malformed JSON yielded a non-empty user bubble: %+v", m) - } - } -} +func TestChatHistoryHandler_StoreErrorReturns502(t *testing.T) { + store := &fakeStore{ReturnErr: errors.New("simulated DB unreachable")} + r := newRouter(store) + w := doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history") -func TestChatHistory_V1ProtobufFlatFileShape(t *testing.T) { - // v1 a2a-sdk shape: flat parts with url/filename/mediaType - body := json.RawMessage(`{ - "result": { - "parts": [ - {"url":"https://example.com/data.csv","filename":"data.csv","mediaType":"text/csv"} - ] - } - }`) - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) - var agent *ChatMessage - for i := range msgs { - if msgs[i].Role == "agent" { - agent = &msgs[i] - break - } + if w.Code != http.StatusBadGateway { + t.Errorf("expected 502 on store error, got %d", w.Code) } - if agent == nil { - t.Fatalf("no agent bubble for v1 shape") - } - if len(agent.Attachments) != 1 { - t.Fatalf("attachments=%d want 1", len(agent.Attachments)) - } - att := agent.Attachments[0] - if att.Name != "data.csv" || att.URI != "https://example.com/data.csv" || att.MimeType != "text/csv" { - t.Errorf("v1 shape extracted wrong: %+v", att) - } -} - -func TestChatHistory_TaskShapeArtifactsExtracted(t *testing.T) { - // {"result":{"artifacts":[{"parts":[{"kind":"text","text":"..."}]}]}} - body := json.RawMessage(`{ - "result": { - "artifacts": [ - {"parts": [{"kind":"text","text":"hermes detail line"}]} - ] - } - }`) - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) - if len(msgs) != 1 || msgs[0].Content != "hermes detail line" { - t.Errorf("artifact text not extracted: %+v", msgs) - } -} - -func TestChatHistory_OlderNestedRootTextShape(t *testing.T) { - // Older shape: {parts: [{root: {text: "..."}}]} - body := json.RawMessage(`{ - "result": { - "parts": [{"root":{"text":"legacy nested text"}}] - } - }`) - msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) - if len(msgs) != 1 || !strings.Contains(msgs[0].Content, "legacy nested text") { - t.Errorf("nested root.text not extracted: %+v", msgs) + if !strings.Contains(w.Body.String(), "unavailable") { + t.Errorf("response body should communicate unavailability; got %q", w.Body.String()) } } // ===================================================================== -// isInternalSelfMessage predicate itself +// Interface conformance — the platform-default Postgres impl is the +// only impl in tree today, but the assertion catches future drift if +// the interface evolves and the impl falls behind. // ===================================================================== -func TestChatHistory_IsInternalSelfMessage_DelegationPrefix(t *testing.T) { - if !isInternalSelfMessage("Delegation results are ready... ") { - t.Errorf("Delegation-results prefix should be flagged internal-self") - } - if isInternalSelfMessage("Delegation completed but not ready") { - t.Errorf("non-prefix match should NOT flag") - } - if isInternalSelfMessage("") { - t.Errorf("empty text should NOT flag (legitimate attachments-only bubble)") - } -} - -// ===================================================================== -// basename helper — mirrors canvas basename() semantics -// ===================================================================== - -func TestChatHistory_BasenameStripsSchemeAndPath(t *testing.T) { - cases := []struct { - in, want string - }{ - {"workspace:/uploads/shot.png", "shot.png"}, - {"workspace:/a/b/c/file.txt", "file.txt"}, - {"https://example.com/path/file.csv", "file.csv"}, - {"http://x/y", "y"}, - {"", "file"}, - {"workspace:", "file"}, // scheme-only collapses to "" → "file" sentinel, matches canvas basename - } - for _, tc := range cases { - got := basename(tc.in) - if got != tc.want { - t.Errorf("basename(%q) = %q want %q", tc.in, got, tc.want) - } - } +func TestMessageStoreInterface_PostgresImplSatisfies(t *testing.T) { + // Compile-time assertion lives in messagestore/postgres_store.go + // (`var _ MessageStore = (*PostgresMessageStore)(nil)`). This + // runtime test exists only to keep the conformance visible in + // the handler test file — a reader of chat_history_test.go + // shouldn't have to traverse to the messagestore package to see + // what the handler is paired with. + var s messagestore.MessageStore = messagestore.NewPostgresMessageStore(nil) + _ = s } diff --git a/workspace-server/internal/messagestore/messagestore.go b/workspace-server/internal/messagestore/messagestore.go new file mode 100644 index 00000000..ae66dae5 --- /dev/null +++ b/workspace-server/internal/messagestore/messagestore.go @@ -0,0 +1,118 @@ +// Package messagestore defines the read-side interface and canonical +// data shapes for chat-history retrieval. +// +// Origin: RFC #2945 PR-D (issue #3026). PR-A extracted the WRITE path +// (AgentMessageWriter), PR-B/B-1 typed the WS event taxonomy, PR-C +// centralized read-side parsing in the server. PR-D abstracts the +// underlying storage layer so OSS operators can plug in alternative +// backends without forking the handler. +// +// # Why this package exists +// +// Today's only consumer is ChatHistoryHandler, but exposing storage as +// an interface is what makes the platform's chat-history layer pluggable +// for OSS operators. Operators wanting to: +// +// - Tier hot/warm/cold storage (recent in Postgres, archival in S3 parquet) +// - Use a vector store with hybrid search (Pinecone, Weaviate) +// - Run an in-memory store for ephemeral tests / sandbox tenants +// - Federate history across regions +// +// …implement MessageStore against their backend. The platform-default +// PostgresMessageStore wraps today's activity_logs query + parser +// behavior unchanged. +// +// # Implementation contract +// +// Implementations MUST: +// +// - Return messages newest-first, up to opts.Limit. Caller (the +// handler) is responsible for opts.Limit clamping. +// - Honor opts.BeforeTS as a strict less-than cursor when +// opts.HasBefore is true; ignore it when false. Use HasBefore (not +// a zero-time check) so a legitimate "start of epoch" cursor is +// distinguishable from "no cursor." +// - Set reachedEnd=true when the underlying store has no more +// messages older than the returned page. Caller uses this to +// disable further older-batch fetches in the lazy-load UX. +// - Parse agent-emitted JSON DEFENSIVELY. Any malformed message body +// becomes an empty ChatMessage (or is dropped); never panic, never +// return an error for parse failures alone — chat falls through to +// text-only rather than 500. +// - NEVER log full message bodies, attachment URIs, or anything that +// would be a sensitive screenshot. Workspace ID + activity-log +// row id at DEBUG is the ceiling. +// - Honor ctx cancellation. A canceled ctx must abort the lookup +// and return ctx.Err(). +// +// Implementations MAY: +// +// - Cache aggressively (history is read-only). +// - Filter out additional rows beyond what the interface requires +// (e.g., role-based redaction in regulated environments) as long +// as reachedEnd is set conservatively (false if uncertain). +// +// # Threading +// +// Implementations MUST be safe for concurrent calls. The handler +// dispatches a goroutine per request; a non-thread-safe impl would +// race on every chat reload. +package messagestore + +import ( + "context" + "time" +) + +// ChatMessage is the canonical shape returned to chat-history clients. +// Mirrors canvas's ChatMessage TS type so the canvas can render +// without per-row mapping. +// +// ID is server-minted per ChatMessage. Activity-log rows don't carry +// message-shaped ids; canvas dedupes by (role, content, timestamp +// window) not by id, so id stability across requests is not required. +type ChatMessage struct { + ID string `json:"id"` + Role string `json:"role"` // "user" | "agent" | "system" + Content string `json:"content"` + Attachments []ChatAttachment `json:"attachments,omitempty"` + Timestamp string `json:"timestamp"` // RFC3339, pinned to row.created_at +} + +// ChatAttachment mirrors canvas ChatAttachment / ParsedFilePart. +// Size is *int64 (not int64) so JSON omits the field when unknown, +// rather than emitting `"size": 0` which the renderer would interpret +// as "zero-byte file." +type ChatAttachment struct { + Name string `json:"name"` + URI string `json:"uri"` + MimeType string `json:"mimeType,omitempty"` + Size *int64 `json:"size,omitempty"` +} + +// ListOptions is the page-window the handler hands to the store. +// Constructed by the handler from query parameters; the store should +// not inspect the request directly. +type ListOptions struct { + // Limit is the page size. Caller (the handler) clamps to a sane + // bound (default 100, max 1000); store treats Limit ≤ 0 as a + // programming error. + Limit int + + // BeforeTS is the cursor for paginating backward. The store MUST + // only consider this when HasBefore is true; using a zero-time + // fallback would silently exclude the legitimate epoch-start case. + BeforeTS time.Time + HasBefore bool +} + +// MessageStore is the read-side interface. Implementations pluggable +// via constructor injection at handler creation time. +// +// Why "List" and not "GetMessages" / "ReadHistory" / etc: List matches +// the verb on /workspaces/:id/chat-history (HTTP GET on a collection) +// and the existing handler method. One-name-one-thing keeps the +// interface and the route lined up. +type MessageStore interface { + List(ctx context.Context, workspaceID string, opts ListOptions) (messages []ChatMessage, reachedEnd bool, err error) +} diff --git a/workspace-server/internal/messagestore/postgres_store.go b/workspace-server/internal/messagestore/postgres_store.go new file mode 100644 index 00000000..7e75315f --- /dev/null +++ b/workspace-server/internal/messagestore/postgres_store.go @@ -0,0 +1,497 @@ +package messagestore + +// postgres_store.go — default MessageStore impl that wraps today's +// activity_logs query + the A2A-envelope parser ported in PR-C. +// +// Behavior is byte-identical to the pre-PR-D ChatHistoryHandler: +// same SQL, same role-decision rules, same v0/v1 wire-shape support. +// The only structural change is that the handler now depends on an +// interface; this file is what was the pre-PR-D handler internals. +// +// This is the baseline impl OSS operators compare against when +// writing alternative stores. Read it as the contract spec. + +import ( + "context" + "database/sql" + "encoding/json" + "path" + "strings" + "time" + + "github.com/google/uuid" +) + +// PostgresMessageStore is the platform-default impl. It queries the +// activity_logs table directly and parses request_body / response_body +// JSONB columns into ChatMessage values. +type PostgresMessageStore struct { + db *sql.DB +} + +// NewPostgresMessageStore wraps a *sql.DB. The store does not own the +// pool — closing it is the caller's responsibility. +func NewPostgresMessageStore(db *sql.DB) *PostgresMessageStore { + return &PostgresMessageStore{db: db} +} + +// internalSelfPrefixes — message texts that should be filtered from +// chat history because they're internal self-triggers (heartbeats, +// scheduled-task self-fire, delegation-result self-notify), not +// user-typed messages. Mirrors canvas isInternalSelfMessage. +// +// Centralizing here means a future internal-trigger pattern is added +// in one place; alternative impls of MessageStore are expected to +// apply the same filter (or override deliberately). +var internalSelfPrefixes = []string{ + "Delegation results are ready", +} + +// IsInternalSelfMessage reports whether text starts with any registered +// internal-self prefix. Empty text returns false (legitimate +// attachments-only bubble). Exported for impls that want to share the +// same predicate. +func IsInternalSelfMessage(text string) bool { + if text == "" { + return false + } + for _, prefix := range internalSelfPrefixes { + if strings.HasPrefix(text, prefix) { + return true + } + } + return false +} + +// List implements MessageStore. Newest-first, optionally paged by +// BeforeTS. Filters to a2a_receive activity rows from the canvas +// (source_id IS NULL) — same scope canvas applies via +// /activity?source=canvas, centralized so future API consumers don't +// need to know it. +func (s *PostgresMessageStore) List(ctx context.Context, workspaceID string, opts ListOptions) ([]ChatMessage, bool, error) { + if opts.Limit <= 0 { + // Caller bug. Programmers learn quickly when the store + // fails fast on bad opts; a silent clamp would hide the bug. + return nil, true, errInvalidLimit + } + + rows, err := s.queryActivityRows(ctx, workspaceID, opts) + if err != nil { + return nil, false, err + } + defer rows.Close() + + var messages []ChatMessage + rowCount := 0 + for rows.Next() { + var ( + createdAt time.Time + status string + rawRequest sql.NullString + rawResponse sql.NullString + ) + if err := rows.Scan(&createdAt, &status, &rawRequest, &rawResponse); err != nil { + // Skip malformed row, continue. The error is logged at + // the caller (handler) layer; an isolated bad row should + // not abort the whole page. + continue + } + rowCount++ + var requestBody, responseBody json.RawMessage + if rawRequest.Valid { + requestBody = json.RawMessage(rawRequest.String) + } + if rawResponse.Valid { + responseBody = json.RawMessage(rawResponse.String) + } + messages = append(messages, activityRowToChatMessages(createdAt, status, requestBody, responseBody, IsInternalSelfMessage)...) + } + if err := rows.Err(); err != nil { + return nil, false, err + } + + reachedEnd := rowCount < opts.Limit + return messages, reachedEnd, nil +} + +// queryActivityRows is split from List so unit tests can exercise the +// parser without spinning a real DB. Internal — alternative impls +// shouldn't depend on the SQL shape. +func (s *PostgresMessageStore) queryActivityRows(ctx context.Context, workspaceID string, opts ListOptions) (*sql.Rows, error) { + if opts.HasBefore { + return s.db.QueryContext(ctx, ` + SELECT created_at, status, request_body::text, response_body::text + FROM activity_logs + WHERE workspace_id = $1 + AND activity_type = 'a2a_receive' + AND source_id IS NULL + AND created_at < $2 + ORDER BY created_at DESC + LIMIT $3 + `, workspaceID, opts.BeforeTS, opts.Limit) + } + return s.db.QueryContext(ctx, ` + SELECT created_at, status, request_body::text, response_body::text + FROM activity_logs + WHERE workspace_id = $1 + AND activity_type = 'a2a_receive' + AND source_id IS NULL + ORDER BY created_at DESC + LIMIT $2 + `, workspaceID, opts.Limit) +} + +// errInvalidLimit is returned by List when opts.Limit ≤ 0. +type sentinelError string + +func (e sentinelError) Error() string { return string(e) } + +const errInvalidLimit sentinelError = "messagestore: List opts.Limit must be > 0" + +// activityRowToChatMessages converts ONE activity_logs row into 0-2 +// ChatMessages. Direct port of canvas activityRowToMessages. +// +// - Up to 1 user-side bubble from request_body, unless internal-self. +// - Up to 1 agent-side bubble from response_body. Role is "system" +// when status='error' OR text starts with "agent error" (case- +// insensitive — matches canvas predicate exactly). +// +// Both bubbles MUST adopt row.created_at as their timestamp. This +// pins the regression cover for the 2026-04-25 bubble-collapse bug. +func activityRowToChatMessages( + createdAt time.Time, + status string, + requestBody json.RawMessage, + responseBody json.RawMessage, + internalSelf func(string) bool, +) []ChatMessage { + var out []ChatMessage + timestamp := createdAt.UTC().Format(time.RFC3339Nano) + + userText := extractRequestText(requestBody) + userAttachments := extractFilesFromUserMessage(requestBody) + if !internalSelf(userText) && (userText != "" || len(userAttachments) > 0) { + out = append(out, ChatMessage{ + ID: newMessageID(), + Role: "user", + Content: userText, + Attachments: userAttachments, + Timestamp: timestamp, + }) + } + + if len(responseBody) > 0 { + agentText := extractChatResponseText(responseBody) + agentAttachments := extractFilesFromResponse(responseBody) + if agentText != "" || len(agentAttachments) > 0 { + role := "agent" + if status == "error" || strings.HasPrefix(strings.ToLower(agentText), "agent error") { + role = "system" + } + out = append(out, ChatMessage{ + ID: newMessageID(), + Role: role, + Content: agentText, + Attachments: agentAttachments, + Timestamp: timestamp, + }) + } + } + + return out +} + +// extractRequestText pulls the user's typed text from +// request_body.params.message.parts[0].text. Returns "" on any +// malformed shape; callers pair with extractFilesFromUserMessage to +// catch attachments-only bubbles. +func extractRequestText(body json.RawMessage) string { + if len(body) == 0 { + return "" + } + var env struct { + Params struct { + Message struct { + Parts []map[string]any `json:"parts"` + } `json:"message"` + } `json:"params"` + } + if err := json.Unmarshal(body, &env); err != nil { + return "" + } + for _, p := range env.Params.Message.Parts { + if t, ok := p["text"].(string); ok && t != "" { + return t + } + } + return "" +} + +func extractFilesFromUserMessage(body json.RawMessage) []ChatAttachment { + if len(body) == 0 { + return nil + } + var env struct { + Params struct { + Message json.RawMessage `json:"message"` + } `json:"params"` + } + if err := json.Unmarshal(body, &env); err != nil { + return nil + } + if len(env.Params.Message) == 0 { + return nil + } + return extractFilesFromTask(env.Params.Message) +} + +// extractChatResponseText collects text from any of the response +// shapes canvas extractResponseText handles, joining with "\n": +// +// - {"result": ""} +// - {"result": {"parts": [{"kind":"text","text":""}]}} +// - {"parts": [{"root": {"text": "..."}}]} (older nested) +// - {"result": {"artifacts": [{"parts": [...]}]}} (task shape) +// - {"task": ""} (fallback) +// +// Why collect rather than first-source-wins: claude-code emits +// multiple text parts; hermes emits summary-in-parts + +// details-in-artifacts. The pre-collect first-wins silently +// truncated 15k-char briefs and dropped artifact details. +func extractChatResponseText(body json.RawMessage) string { + if len(body) == 0 { + return "" + } + // {"result": "string"} + var asString struct { + Result string `json:"result"` + } + if err := json.Unmarshal(body, &asString); err == nil && asString.Result != "" { + return asString.Result + } + // {"result": {object}} — try the structured shapes + var asObject struct { + Result json.RawMessage `json:"result"` + Task string `json:"task"` + } + if err := json.Unmarshal(body, &asObject); err != nil { + return "" + } + var collected []string + if len(asObject.Result) > 0 { + var resultObj struct { + Parts []map[string]any `json:"parts"` + Artifacts []json.RawMessage `json:"artifacts"` + } + if err := json.Unmarshal(asObject.Result, &resultObj); err == nil { + if t := joinTextParts(resultObj.Parts); t != "" { + collected = append(collected, t) + } + var rootTexts []string + for _, p := range resultObj.Parts { + if root, ok := p["root"].(map[string]any); ok { + if t, ok := root["text"].(string); ok && t != "" { + rootTexts = append(rootTexts, t) + } + } + } + if len(rootTexts) > 0 { + collected = append(collected, strings.Join(rootTexts, "\n")) + } + for _, raw := range resultObj.Artifacts { + var art struct { + Parts []map[string]any `json:"parts"` + } + if err := json.Unmarshal(raw, &art); err == nil { + if t := joinTextParts(art.Parts); t != "" { + collected = append(collected, t) + } + } + } + } + } + if len(collected) > 0 { + return strings.Join(collected, "\n") + } + if asObject.Task != "" { + return asObject.Task + } + return "" +} + +func joinTextParts(parts []map[string]any) string { + var texts []string + for _, p := range parts { + isText := false + if k, ok := p["kind"].(string); ok && k == "text" { + isText = true + } + if t, ok := p["type"].(string); ok && t == "text" { + isText = true + } + if !isText { + continue + } + if t, ok := p["text"].(string); ok && t != "" { + texts = append(texts, t) + } + } + return strings.Join(texts, "\n") +} + +func extractFilesFromResponse(body json.RawMessage) []ChatAttachment { + if len(body) == 0 { + return nil + } + var probe struct { + Result json.RawMessage `json:"result"` + } + _ = json.Unmarshal(body, &probe) + feed := body + if len(probe.Result) > 0 { + trimmed := bytesTrimSpace(probe.Result) + if len(trimmed) > 0 && trimmed[0] == '{' { + feed = probe.Result + } + } + return extractFilesFromTask(feed) +} + +// extractFilesFromTask walks parts[] + artifacts[].parts[] + +// status.message.parts[] + message.parts[]. Mirrors canvas +// extractFilesFromTask exactly — same v0 hot path + v1 protobuf +// flat shape. +func extractFilesFromTask(taskJSON json.RawMessage) []ChatAttachment { + if len(taskJSON) == 0 { + return nil + } + var task struct { + Parts []map[string]any `json:"parts"` + Artifacts []json.RawMessage `json:"artifacts"` + Status json.RawMessage `json:"status"` + Message json.RawMessage `json:"message"` + } + if err := json.Unmarshal(taskJSON, &task); err != nil { + return nil + } + var out []ChatAttachment + out = appendFilesFromParts(out, task.Parts) + for _, raw := range task.Artifacts { + var art struct { + Parts []map[string]any `json:"parts"` + } + if err := json.Unmarshal(raw, &art); err == nil { + out = appendFilesFromParts(out, art.Parts) + } + } + if len(task.Status) > 0 { + var st struct { + Message struct { + Parts []map[string]any `json:"parts"` + } `json:"message"` + } + if err := json.Unmarshal(task.Status, &st); err == nil { + out = appendFilesFromParts(out, st.Message.Parts) + } + } + if len(task.Message) > 0 { + var msg struct { + Parts []map[string]any `json:"parts"` + } + if err := json.Unmarshal(task.Message, &msg); err == nil { + out = appendFilesFromParts(out, msg.Parts) + } + } + return out +} + +func appendFilesFromParts(out []ChatAttachment, parts []map[string]any) []ChatAttachment { + for _, raw := range parts { + v0 := false + if k, ok := raw["kind"].(string); ok && k == "file" { + v0 = true + } + if t, ok := raw["type"].(string); ok && t == "file" { + v0 = true + } + v1URL, _ := raw["url"].(string) + if !v0 && v1URL == "" { + continue + } + var att ChatAttachment + if v0 { + file, _ := raw["file"].(map[string]any) + if file == nil { + file = raw + } + uri, _ := file["uri"].(string) + if uri == "" { + continue + } + att.URI = uri + if name, _ := file["name"].(string); name != "" { + att.Name = name + } else { + att.Name = basename(uri) + } + if mt, ok := file["mimeType"].(string); ok { + att.MimeType = mt + } + if sz, ok := numericSize(file["size"]); ok { + att.Size = &sz + } + } else { + att.URI = v1URL + if name, _ := raw["filename"].(string); name != "" { + att.Name = name + } else { + att.Name = basename(v1URL) + } + if mt, ok := raw["mediaType"].(string); ok { + att.MimeType = mt + } + } + out = append(out, att) + } + return out +} + +func numericSize(v any) (int64, bool) { + switch n := v.(type) { + case float64: + return int64(n), true + case int64: + return n, true + case int: + return int64(n), true + } + return 0, false +} + +func basename(uri string) string { + cleaned := strings.TrimPrefix(uri, "workspace:") + cleaned = strings.TrimPrefix(cleaned, "https://") + cleaned = strings.TrimPrefix(cleaned, "http://") + if cleaned == "" { + return "file" + } + return path.Base(cleaned) +} + +func bytesTrimSpace(b json.RawMessage) json.RawMessage { + for len(b) > 0 && (b[0] == ' ' || b[0] == '\t' || b[0] == '\n' || b[0] == '\r') { + b = b[1:] + } + for len(b) > 0 && (b[len(b)-1] == ' ' || b[len(b)-1] == '\t' || b[len(b)-1] == '\n' || b[len(b)-1] == '\r') { + b = b[:len(b)-1] + } + return b +} + +func newMessageID() string { + return uuid.New().String() +} + +// Compile-time assertion: PostgresMessageStore satisfies MessageStore. +// Catches any future drift between interface and impl at build time. +var _ MessageStore = (*PostgresMessageStore)(nil) diff --git a/workspace-server/internal/messagestore/postgres_store_test.go b/workspace-server/internal/messagestore/postgres_store_test.go new file mode 100644 index 00000000..bcdda6fa --- /dev/null +++ b/workspace-server/internal/messagestore/postgres_store_test.go @@ -0,0 +1,422 @@ +package messagestore + +// postgres_store_test.go — parser-level parity tests against the +// canvas TS test fixtures in +// canvas/src/components/tabs/chat/__tests__/historyHydration.test.ts. +// +// Originally lived in handlers/chat_history_test.go (RFC #2945 PR-C); +// PR-D moved them here when the parser was extracted to this package. +// Every test case in the TS file has a Go counterpart, named after +// the TS describe/it block. +// +// Mutation guidance: when adding behavior, add the case to BOTH +// historyHydration.test.ts AND this file. The canvas TS is the +// legacy source the server replaces; divergence == regression. + +import ( + "encoding/json" + "strings" + "testing" + "time" +) + +const fixedTimestamp = "2026-04-25T18:00:00Z" + +func mustParseTime(t *testing.T, s string) time.Time { + t.Helper() + tt, err := time.Parse(time.RFC3339, s) + if err != nil { + t.Fatalf("parse %s: %v", s, err) + } + return tt +} + +func neverInternal(_ string) bool { return false } + +// ===================================================================== +// timestamp preservation (regression cover) +// +// The canvas bug that motivated extracting the helper: every reload +// re-stamped historical bubbles to render-time. Pin row.created_at +// adoption. +// ===================================================================== + +func TestChatHistory_UserMessageTimestampPinsToCreatedAt(t *testing.T) { + created := mustParseTime(t, "2026-04-25T18:00:00Z") + body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"hello from earlier today"}]}}}`) + + msgs := activityRowToChatMessages(created, "ok", body, nil, neverInternal) + if len(msgs) != 1 { + t.Fatalf("expected 1 user message, got %d", len(msgs)) + } + if msgs[0].Role != "user" { + t.Errorf("role=%q want user", msgs[0].Role) + } + if !strings.HasPrefix(msgs[0].Timestamp, "2026-04-25T18:00:00") { + t.Errorf("user message timestamp %q does NOT pin to row.created_at — regression of the 2026-04-25 bubble-collapse bug", msgs[0].Timestamp) + } +} + +func TestChatHistory_AgentMessageTimestampPinsToCreatedAt(t *testing.T) { + created := mustParseTime(t, "2026-04-25T18:05:00Z") + body := json.RawMessage(`{"result":"agent reply"}`) + + msgs := activityRowToChatMessages(created, "ok", nil, body, neverInternal) + if len(msgs) != 1 { + t.Fatalf("expected 1 agent message, got %d", len(msgs)) + } + if msgs[0].Role != "agent" { + t.Errorf("role=%q want agent", msgs[0].Role) + } + if !strings.HasPrefix(msgs[0].Timestamp, "2026-04-25T18:05:00") { + t.Errorf("agent message timestamp %q does NOT pin to row.created_at", msgs[0].Timestamp) + } +} + +func TestChatHistory_TwoRowsDistinctTimestamps(t *testing.T) { + bodyA := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"first"}]}}}`) + bodyB := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"second"}]}}}`) + a := activityRowToChatMessages(mustParseTime(t, "2026-04-25T14:00:00Z"), "ok", bodyA, nil, neverInternal) + b := activityRowToChatMessages(mustParseTime(t, "2026-04-25T21:01:58Z"), "ok", bodyB, nil, neverInternal) + + if len(a) != 1 || len(b) != 1 { + t.Fatalf("expected 1 message each; got %d and %d", len(a), len(b)) + } + if a[0].Timestamp == b[0].Timestamp { + t.Errorf("two distinct created_at values produced same timestamp: %q", a[0].Timestamp) + } + if !strings.HasPrefix(a[0].Timestamp, "2026-04-25T14:00:00") || !strings.HasPrefix(b[0].Timestamp, "2026-04-25T21:01:58") { + t.Errorf("timestamps drifted: a=%q b=%q", a[0].Timestamp, b[0].Timestamp) + } +} + +// ===================================================================== +// user-message extraction +// ===================================================================== + +func TestChatHistory_EmitsUserMessageWhenRequestHasText(t *testing.T) { + body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"hi agent"}]}}}`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal) + if len(msgs) != 1 { + t.Fatalf("expected 1 message, got %d", len(msgs)) + } + if msgs[0].Role != "user" || msgs[0].Content != "hi agent" { + t.Errorf("role=%q content=%q want user/hi agent", msgs[0].Role, msgs[0].Content) + } +} + +func TestChatHistory_DropsInternalSelfMessages(t *testing.T) { + body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"Delegation results are ready..."}]}}}`) + predicate := func(t string) bool { return strings.HasPrefix(t, "Delegation results are ready") } + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, predicate) + for _, m := range msgs { + if m.Role == "user" { + t.Errorf("internal-self message rendered as user bubble: %q", m.Content) + } + } +} + +func TestChatHistory_NoUserMessageWhenRequestBodyNull(t *testing.T) { + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, nil, neverInternal) + for _, m := range msgs { + if m.Role == "user" { + t.Errorf("emitted user bubble despite null request_body: %+v", m) + } + } +} + +func TestChatHistory_UserAttachmentsHydratedFromRequestBody(t *testing.T) { + body := json.RawMessage(`{ + "params": { + "message": { + "parts": [ + {"kind":"text","text":"here's the screenshot"}, + {"kind":"file","file":{"name":"shot.png","mimeType":"image/png","uri":"workspace:/uploads/shot.png","size":4096}} + ] + } + } + }`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal) + var user *ChatMessage + for i := range msgs { + if msgs[i].Role == "user" { + user = &msgs[i] + break + } + } + if user == nil { + t.Fatalf("no user bubble produced") + } + if user.Content != "here's the screenshot" { + t.Errorf("content=%q", user.Content) + } + if len(user.Attachments) != 1 { + t.Fatalf("attachments=%d want 1", len(user.Attachments)) + } + att := user.Attachments[0] + if att.Name != "shot.png" || att.URI != "workspace:/uploads/shot.png" || att.MimeType != "image/png" { + t.Errorf("attachment shape wrong: %+v", att) + } + if att.Size == nil || *att.Size != 4096 { + t.Errorf("size=%v want 4096", att.Size) + } +} + +func TestChatHistory_AttachmentsOnlyUserBubbleWhenTextEmpty(t *testing.T) { + // Drag-drop a file with no caption — bubble should still render. + body := json.RawMessage(`{ + "params": { + "message": { + "parts": [ + {"kind":"file","file":{"name":"report.pdf","uri":"workspace:/uploads/report.pdf"}} + ] + } + } + }`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal) + if len(msgs) != 1 { + t.Fatalf("expected 1 attachments-only bubble, got %d", len(msgs)) + } + if msgs[0].Role != "user" || msgs[0].Content != "" || len(msgs[0].Attachments) != 1 { + t.Errorf("unexpected: role=%q content=%q attachments=%d", msgs[0].Role, msgs[0].Content, len(msgs[0].Attachments)) + } + if msgs[0].Attachments[0].Name != "report.pdf" { + t.Errorf("attachment name=%q want report.pdf", msgs[0].Attachments[0].Name) + } +} + +func TestChatHistory_InternalSelfPredicateSuppressesEvenWithAttachments(t *testing.T) { + body := json.RawMessage(`{ + "params": { + "message": { + "parts": [ + {"kind":"text","text":"Delegation results are ready..."}, + {"kind":"file","file":{"name":"x.zip","uri":"workspace:/x.zip"}} + ] + } + } + }`) + predicate := func(t string) bool { return strings.HasPrefix(t, "Delegation results are ready") } + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, predicate) + for _, m := range msgs { + if m.Role == "user" { + t.Errorf("internal-self predicate did NOT suppress user bubble despite attachments: %+v", m) + } + } +} + +// ===================================================================== +// agent-message extraction +// ===================================================================== + +func TestChatHistory_AgentMessageFromResultString(t *testing.T) { + body := json.RawMessage(`{"result":"agent says hi"}`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + if len(msgs) != 1 || msgs[0].Role != "agent" || msgs[0].Content != "agent says hi" { + t.Errorf("got %+v", msgs) + } +} + +func TestChatHistory_RoleSystemWhenStatusError(t *testing.T) { + body := json.RawMessage(`{"result":"delegation failed"}`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "error", nil, body, neverInternal) + if len(msgs) != 1 || msgs[0].Role != "system" { + t.Errorf("status=error did NOT promote role to system: %+v", msgs) + } +} + +func TestChatHistory_RoleSystemWhenAgentErrorPrefix(t *testing.T) { + // Defense-in-depth — if a runtime returns ok status but the text + // itself starts with "agent error", the canvas would still + // render system role. Mirror that here. + body := json.RawMessage(`{"result":"Agent error: ProcessError(exit=1)"}`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + if len(msgs) != 1 || msgs[0].Role != "system" { + t.Errorf("agent-error prefix did NOT promote to system: %+v", msgs) + } +} + +func TestChatHistory_AgentAttachmentsFromResponseBodyParts(t *testing.T) { + // Notify shape: response_body = {"result":"","parts":[{"kind":"file",...}]} + body := json.RawMessage(`{ + "result": "Done — see attached.", + "parts": [ + {"kind":"file","file":{"name":"build.zip","uri":"workspace:/tmp/build.zip","size":12345}} + ] + }`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + var agent *ChatMessage + for i := range msgs { + if msgs[i].Role == "agent" { + agent = &msgs[i] + break + } + } + if agent == nil { + t.Fatalf("no agent bubble") + } + if len(agent.Attachments) != 1 || agent.Attachments[0].Name != "build.zip" { + t.Errorf("agent attachments shape wrong: %+v", agent.Attachments) + } + if agent.Attachments[0].Size == nil || *agent.Attachments[0].Size != 12345 { + t.Errorf("size=%v want 12345", agent.Attachments[0].Size) + } +} + +func TestChatHistory_NoAgentMessageWhenResponseBodyNull(t *testing.T) { + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, nil, neverInternal) + for _, m := range msgs { + if m.Role == "agent" || m.Role == "system" { + t.Errorf("emitted agent/system bubble despite null response_body: %+v", m) + } + } +} + +func TestChatHistory_NoAgentMessageWhenResponseHasNoTextNoFiles(t *testing.T) { + body := json.RawMessage(`{"unrelated":"metadata"}`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + for _, m := range msgs { + if m.Role == "agent" { + t.Errorf("emitted agent bubble despite empty content: %+v", m) + } + } +} + +// ===================================================================== +// end-to-end shape — paired user + agent with same timestamp +// ===================================================================== + +func TestChatHistory_PairedUserAndAgentSameTimestamp(t *testing.T) { + created := mustParseTime(t, "2026-04-25T18:00:00Z") + req := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"what's 2+2?"}]}}}`) + resp := json.RawMessage(`{"result":"4"}`) + msgs := activityRowToChatMessages(created, "ok", req, resp, neverInternal) + if len(msgs) != 2 { + t.Fatalf("expected 2 messages, got %d", len(msgs)) + } + if msgs[0].Role != "user" || msgs[0].Content != "what's 2+2?" { + t.Errorf("first message wrong: %+v", msgs[0]) + } + if msgs[1].Role != "agent" || msgs[1].Content != "4" { + t.Errorf("second message wrong: %+v", msgs[1]) + } + if msgs[0].Timestamp != msgs[1].Timestamp { + t.Errorf("paired bubbles have different timestamps: %q vs %q", msgs[0].Timestamp, msgs[1].Timestamp) + } +} + +// ===================================================================== +// Go-specific: defensive parsing +// ===================================================================== + +func TestChatHistory_MalformedJSONInRequestBodyReturnsEmpty(t *testing.T) { + // Should NOT panic; should return no user bubble (or no message at all). + body := json.RawMessage(`{not valid json}`) + defer func() { + if r := recover(); r != nil { + t.Fatalf("panic on malformed json: %v", r) + } + }() + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal) + for _, m := range msgs { + if m.Role == "user" && (m.Content != "" || len(m.Attachments) > 0) { + t.Errorf("malformed JSON yielded a non-empty user bubble: %+v", m) + } + } +} + +func TestChatHistory_V1ProtobufFlatFileShape(t *testing.T) { + // v1 a2a-sdk shape: flat parts with url/filename/mediaType + body := json.RawMessage(`{ + "result": { + "parts": [ + {"url":"https://example.com/data.csv","filename":"data.csv","mediaType":"text/csv"} + ] + } + }`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + var agent *ChatMessage + for i := range msgs { + if msgs[i].Role == "agent" { + agent = &msgs[i] + break + } + } + if agent == nil { + t.Fatalf("no agent bubble for v1 shape") + } + if len(agent.Attachments) != 1 { + t.Fatalf("attachments=%d want 1", len(agent.Attachments)) + } + att := agent.Attachments[0] + if att.Name != "data.csv" || att.URI != "https://example.com/data.csv" || att.MimeType != "text/csv" { + t.Errorf("v1 shape extracted wrong: %+v", att) + } +} + +func TestChatHistory_TaskShapeArtifactsExtracted(t *testing.T) { + // {"result":{"artifacts":[{"parts":[{"kind":"text","text":"..."}]}]}} + body := json.RawMessage(`{ + "result": { + "artifacts": [ + {"parts": [{"kind":"text","text":"hermes detail line"}]} + ] + } + }`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + if len(msgs) != 1 || msgs[0].Content != "hermes detail line" { + t.Errorf("artifact text not extracted: %+v", msgs) + } +} + +func TestChatHistory_OlderNestedRootTextShape(t *testing.T) { + // Older shape: {parts: [{root: {text: "..."}}]} + body := json.RawMessage(`{ + "result": { + "parts": [{"root":{"text":"legacy nested text"}}] + } + }`) + msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal) + if len(msgs) != 1 || !strings.Contains(msgs[0].Content, "legacy nested text") { + t.Errorf("nested root.text not extracted: %+v", msgs) + } +} + +// ===================================================================== +// IsInternalSelfMessage predicate itself +// ===================================================================== + +func TestChatHistory_IsInternalSelfMessage_DelegationPrefix(t *testing.T) { + if !IsInternalSelfMessage("Delegation results are ready... ") { + t.Errorf("Delegation-results prefix should be flagged internal-self") + } + if IsInternalSelfMessage("Delegation completed but not ready") { + t.Errorf("non-prefix match should NOT flag") + } + if IsInternalSelfMessage("") { + t.Errorf("empty text should NOT flag (legitimate attachments-only bubble)") + } +} + +// ===================================================================== +// basename helper — mirrors canvas basename() semantics +// ===================================================================== + +func TestChatHistory_BasenameStripsSchemeAndPath(t *testing.T) { + cases := []struct { + in, want string + }{ + {"workspace:/uploads/shot.png", "shot.png"}, + {"workspace:/a/b/c/file.txt", "file.txt"}, + {"https://example.com/path/file.csv", "file.csv"}, + {"http://x/y", "y"}, + {"", "file"}, + {"workspace:", "file"}, // scheme-only collapses to "" → "file" sentinel, matches canvas basename + } + for _, tc := range cases { + got := basename(tc.in) + if got != tc.want { + t.Errorf("basename(%q) = %q want %q", tc.in, got, tc.want) + } + } +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index 48967c2d..e3b9171b 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -11,6 +11,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/buildinfo" "github.com/Molecule-AI/molecule-monorepo/platform/internal/channels" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/messagestore" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers" "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" @@ -315,11 +316,16 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi wsAuth.POST("/activity", acth.Report) wsAuth.POST("/notify", acth.Notify) - // Chat history — RFC #2945 PR-C (issue #3017). Server-side - // rendering of activity_logs rows into the canonical - // ChatMessage shape so canvas (and future API consumers) don't - // re-implement the A2A-envelope walk per-client. - chh := handlers.NewChatHistoryHandler() + // Chat history — RFC #2945 PR-C (issue #3017) + PR-D (issue + // #3026). Server-side rendering of activity_logs rows into + // the canonical ChatMessage shape; storage is plugin-shaped + // via the messagestore.MessageStore interface so OSS + // operators can swap in S3 / vector / in-memory backends + // without forking the handler. Platform default uses + // PostgresMessageStore wrapping the existing activity_logs + // table. + chatStore := messagestore.NewPostgresMessageStore(db.DB) + chh := handlers.NewChatHistoryHandler(chatStore) wsAuth.GET("/chat-history", chh.List) // Config