Merge pull request #3028 from Molecule-AI/rfc-2945-pr-d-message-store
Some checks failed
CI / Detect changes (push) Successful in 11s
Handlers Postgres Integration / detect-changes (push) Successful in 7s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 16s
E2E API Smoke Test / detect-changes (push) Successful in 17s
Block internal-flavored paths / Block forbidden paths (push) Successful in 25s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 13s
Secret scan / Scan diff for credential-shaped strings (push) Failing after 53s
CI / Shellcheck (E2E scripts) (push) Failing after 6s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 29s
CI / Python Lint & Test (push) Failing after 39s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Successful in 4s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 24s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 53s
CI / Canvas (Next.js) (push) Failing after 3m21s
CI / Canvas Deploy Reminder (push) Has been skipped
CI / Platform (Go) (push) Failing after 3m47s
CodeQL / Analyze (${{ matrix.language }}) (go) (push) Failing after 14m15s
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (push) Failing after 14m33s
CodeQL / Analyze (${{ matrix.language }}) (python) (push) Failing after 14m34s

feat(messagestore): MessageStore interface + Postgres impl (RFC #2945 PR-D)
This commit is contained in:
Hongming Wang 2026-05-06 06:42:13 +00:00 committed by GitHub
commit 50c3bdfd6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 1302 additions and 938 deletions

View File

@ -1,134 +1,65 @@
package handlers
// chat_history.go — server-side rendering of activity_logs rows into the
// canonical ChatMessage shape (RFC #2945 PR-C, issue #3017).
// chat_history.go — HTTP-shape adapter over messagestore.MessageStore
// (RFC #2945 PR-D).
//
// Replaces the canvas-side TS parsing in
// canvas/src/components/tabs/chat/historyHydration.ts +
// canvas/src/components/tabs/chat/message-parser.ts so:
//
// - Single source of truth for A2A-envelope walking. A future API
// consumer (mobile, third-party integration, RFC #2945 PR-D's
// OSS MessageStore) consumes a typed surface instead of re-
// implementing the same shape walk.
//
// - Wire-format evolution (a2a-sdk v0 → v1 protobuf flat shape) is
// handled in one place. Today the TS parser handles both shapes;
// this Go parser mirrors that contract exactly.
//
// - PR-D unblocked: MessageStore returns []ChatMessage typed values,
// not raw activity_logs rows. The interface is meaningless if
// parsing still has to happen client-side.
// Pre-PR-D, this file owned the activity_logs query AND the parser
// AND the HTTP plumbing. PR-D extracts the storage + parser into
// internal/messagestore/ so OSS operators can plug in alternative
// backends (S3-tiered, vector store, in-memory). The handler is now
// a thin adapter: parse query params → call store → emit JSON.
//
// Endpoint: GET /workspaces/:id/chat-history?limit=N&before_ts=T
//
// Auth: same wsAuth chain as /workspaces/:id/activity (tenant
// ADMIN_TOKEN + X-Molecule-Org-Id header). No new trust boundary.
//
// Behavioral contract: every test case in
// canvas/src/components/tabs/chat/__tests__/historyHydration.test.ts
// (11 cases) has a Go-side parity test in chat_history_test.go.
// Mutation-tested by reverting individual branches and confirming
// the corresponding Go test fires red.
// Behavioral parity with canvas TS is enforced at the messagestore
// layer (internal/messagestore/postgres_store_test.go); this file's
// tests cover the HTTP-shape concerns only.
import (
"database/sql"
"encoding/json"
"errors"
"net/http"
"path"
"strconv"
"strings"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/messagestore"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// ChatMessage is the canonical shape returned by GET /chat-history.
// Mirrors canvas/src/components/tabs/chat/types.ts:ChatMessage so
// the canvas can render it without per-row mapping.
//
// NOTE: id is server-generated (UUID v4) per row pair — clients should
// NOT depend on these ids being stable across requests since the
// activity_log row itself doesn't carry message-shaped ids. Use
// (timestamp, role, content) for cross-request deduping; the id is
// just a React key.
type ChatMessage struct {
ID string `json:"id"`
Role string `json:"role"` // "user" | "agent" | "system"
Content string `json:"content"`
Attachments []ChatAttachment `json:"attachments,omitempty"`
Timestamp string `json:"timestamp"` // RFC3339 — pinned to row.created_at
}
// ChatAttachment mirrors canvas's ChatAttachment / ParsedFilePart.
type ChatAttachment struct {
Name string `json:"name"`
URI string `json:"uri"`
MimeType string `json:"mimeType,omitempty"`
Size *int64 `json:"size,omitempty"`
}
// ChatHistoryResponse is the wire shape for GET /chat-history.
type ChatHistoryResponse struct {
Messages []ChatMessage `json:"messages"`
ReachedEnd bool `json:"reached_end"`
Messages []messagestore.ChatMessage `json:"messages"`
ReachedEnd bool `json:"reached_end"`
}
// ChatHistoryHandler exposes the typed chat-history endpoint. It does
// not need a broadcaster — read-only.
type ChatHistoryHandler struct{}
// NewChatHistoryHandler returns a fresh handler. Stateless on purpose:
// no caching, no per-request handler state. The DB query is the
// expensive part; cache control is handled at HTTP layer.
func NewChatHistoryHandler() *ChatHistoryHandler {
return &ChatHistoryHandler{}
// ChatHistoryHandler exposes the typed chat-history endpoint over a
// MessageStore. The store is injected so OSS operators can swap the
// backend without forking the handler.
type ChatHistoryHandler struct {
store messagestore.MessageStore
}
// internalSelfPrefixes — message texts that should be filtered out of
// chat history because they're internal self-triggers (heartbeats,
// scheduled-task self-fire, delegation-result self-notify) rather than
// user-typed messages. Mirrors canvas's isInternalSelfMessage. Centring
// here means a future internal-trigger pattern only needs to be added
// in one place, not in every consumer.
var internalSelfPrefixes = []string{
"Delegation results are ready",
}
// isInternalSelfMessage reports whether text starts with any registered
// internal-self prefix. Empty text returns false (only filter on
// matched prefixes — empty/missing text is a legitimate
// attachments-only bubble).
func isInternalSelfMessage(text string) bool {
if text == "" {
return false
}
for _, prefix := range internalSelfPrefixes {
if strings.HasPrefix(text, prefix) {
return true
}
}
return false
// NewChatHistoryHandler wires a MessageStore (typically
// messagestore.NewPostgresMessageStore at production startup).
//
// Tests inject fakes (see internal/handlers/chat_history_test.go).
// Constructor takes the interface, not a concrete type, so the
// platform-default vs OSS-alternative decision happens at wiring
// time in router.go.
func NewChatHistoryHandler(store messagestore.MessageStore) *ChatHistoryHandler {
return &ChatHistoryHandler{store: store}
}
// List handles GET /workspaces/:id/chat-history?limit=N&before_ts=T.
//
// Query parameters mirror /activity for caller convenience:
//
// - limit (default 100, max 1000) — page size, newest-first from the
// server's POV. Caller reverses for chronological display.
// - before_ts (RFC3339, optional) — paginate by walking strictly
// older than this timestamp. Identical semantics to /activity's
// before_ts: matches what canvas uses for lazy-loading older
// batches.
// - limit (default 100, max 1000) — page size
// - before_ts (RFC3339, optional) — cursor for paginating backward
//
// The handler scopes to activity_type='a2a_receive' AND source_id IS
// NULL (canvas-source rows only) — the same filter canvas applies via
// `?type=a2a_receive&source=canvas`. Centralizing here means a future
// caller (mobile, public API) doesn't need to know the filter.
// Validates inputs at the trust boundary; the store sees only
// well-formed ListOptions.
func (h *ChatHistoryHandler) List(c *gin.Context) {
workspaceID := c.Param("id")
if _, err := uuid.Parse(workspaceID); err != nil {
@ -146,8 +77,7 @@ func (h *ChatHistoryHandler) List(c *gin.Context) {
limit = 1000
}
var beforeTS time.Time
usingBeforeTS := false
opts := messagestore.ListOptions{Limit: limit}
if v := c.Query("before_ts"); v != "" {
t, err := time.Parse(time.RFC3339, v)
if err != nil {
@ -156,491 +86,28 @@ func (h *ChatHistoryHandler) List(c *gin.Context) {
})
return
}
beforeTS = t
usingBeforeTS = true
opts.BeforeTS = t
opts.HasBefore = true
}
// Newest-first ordering matches /activity. Caller reverses for
// chronological display. Same semantics across both endpoints
// keeps the canvas's lazy-history pagination logic stable.
rows, err := h.queryActivityRows(c.Request.Context(), workspaceID, limit, usingBeforeTS, beforeTS)
messages, reachedEnd, err := h.store.List(c.Request.Context(), workspaceID, opts)
if err != nil {
// Errors here are infra (DB unreachable) — surface as 502 so
// the canvas can retry vs. treating as "no rows".
// Errors here are infra (DB unreachable, store impl failure).
// Surface as 502 so the canvas can retry vs. treating as
// "no rows."
c.JSON(http.StatusBadGateway, gin.H{"error": "chat history unavailable"})
return
}
defer rows.Close()
var messages []ChatMessage
rowCount := 0
for rows.Next() {
var (
createdAt time.Time
status string
rawRequest sql.NullString
rawResponse sql.NullString
)
if err := rows.Scan(&createdAt, &status, &rawRequest, &rawResponse); err != nil {
continue
}
rowCount++
var requestBody, responseBody json.RawMessage
if rawRequest.Valid {
requestBody = json.RawMessage(rawRequest.String)
}
if rawResponse.Valid {
responseBody = json.RawMessage(rawResponse.String)
}
messages = append(messages, activityRowToChatMessages(createdAt, status, requestBody, responseBody, isInternalSelfMessage)...)
// Defensive: if the store returns nil messages slice (any impl
// might), emit empty array rather than `null` so canvas's JSON
// parser doesn't have to handle two empty representations.
if messages == nil {
messages = []messagestore.ChatMessage{}
}
c.JSON(http.StatusOK, ChatHistoryResponse{
Messages: messages,
ReachedEnd: rowCount < limit,
ReachedEnd: reachedEnd,
})
}
// queryActivityRows pulls the raw a2a_receive rows for a workspace.
// Split out so unit tests can mock the DB layer without spinning a
// full request context. Canvas-source rows only (source_id IS NULL).
func (h *ChatHistoryHandler) queryActivityRows(ctx interface {
Done() <-chan struct{}
Err() error
Deadline() (time.Time, bool)
Value(any) any
}, workspaceID string, limit int, usingBeforeTS bool, beforeTS time.Time) (*sql.Rows, error) {
if usingBeforeTS {
return db.DB.QueryContext(ctx, `
SELECT created_at, status, request_body::text, response_body::text
FROM activity_logs
WHERE workspace_id = $1
AND activity_type = 'a2a_receive'
AND source_id IS NULL
AND created_at < $2
ORDER BY created_at DESC
LIMIT $3
`, workspaceID, beforeTS, limit)
}
return db.DB.QueryContext(ctx, `
SELECT created_at, status, request_body::text, response_body::text
FROM activity_logs
WHERE workspace_id = $1
AND activity_type = 'a2a_receive'
AND source_id IS NULL
ORDER BY created_at DESC
LIMIT $2
`, workspaceID, limit)
}
// activityRowToChatMessages converts ONE activity_logs row into 0-2
// ChatMessages. Direct port of canvas's activityRowToMessages.
//
// - Up to 1 user-side bubble from request_body, unless internal-self.
// - Up to 1 agent-side bubble from response_body. Role is "system"
// when status='error' OR text starts with "agent error" (case-
// insensitive — matches canvas predicate exactly).
//
// Both bubbles MUST adopt row.created_at as their timestamp. The
// canvas hydration regression that motivated extracting the helper
// (every reload re-stamping bubbles to render-time) is regression-
// covered in chat_history_test.go.
//
// Defensive: any malformed JSON is silently dropped (text becomes "",
// attachments []) — chat falls through to text-only rather than
// surfacing a 500.
func activityRowToChatMessages(
createdAt time.Time,
status string,
requestBody json.RawMessage,
responseBody json.RawMessage,
internalSelf func(string) bool,
) []ChatMessage {
var out []ChatMessage
timestamp := createdAt.UTC().Format(time.RFC3339Nano)
// USER side — extract from request_body.params.message
userText := extractRequestText(requestBody)
userAttachments := extractFilesFromUserMessage(requestBody)
if !internalSelf(userText) && (userText != "" || len(userAttachments) > 0) {
out = append(out, ChatMessage{
ID: newMessageID(),
Role: "user",
Content: userText,
Attachments: userAttachments,
Timestamp: timestamp,
})
}
// AGENT side — extract from response_body
if len(responseBody) > 0 {
agentText := extractChatResponseText(responseBody)
agentAttachments := extractFilesFromResponse(responseBody)
if agentText != "" || len(agentAttachments) > 0 {
role := "agent"
if status == "error" || strings.HasPrefix(strings.ToLower(agentText), "agent error") {
role = "system"
}
out = append(out, ChatMessage{
ID: newMessageID(),
Role: role,
Content: agentText,
Attachments: agentAttachments,
Timestamp: timestamp,
})
}
}
return out
}
// extractRequestText pulls the user's typed text from the canonical
// A2A request envelope. Returns "" on any malformed shape; callers
// pair this with extractFilesFromUserMessage to catch attachments-
// only bubbles.
//
// request_body = {"params": {"message": {"parts": [{"kind":"text", "text":"..."}, ...]}}}
//
// Mirrors canvas's extractRequestText. Currently returns ONLY parts[0]
// to match canvas exactly; multi-text-part user messages would
// require both parsers to evolve in lockstep (track via PR-C-2).
func extractRequestText(body json.RawMessage) string {
if len(body) == 0 {
return ""
}
var env struct {
Params struct {
Message struct {
Parts []map[string]any `json:"parts"`
} `json:"message"`
} `json:"params"`
}
if err := json.Unmarshal(body, &env); err != nil {
return ""
}
for _, p := range env.Params.Message.Parts {
if t, ok := p["text"].(string); ok && t != "" {
return t
}
}
return ""
}
// extractFilesFromUserMessage walks the same request_body envelope as
// extractRequestText and collects file parts.
func extractFilesFromUserMessage(body json.RawMessage) []ChatAttachment {
if len(body) == 0 {
return nil
}
var env struct {
Params struct {
Message json.RawMessage `json:"message"`
} `json:"params"`
}
if err := json.Unmarshal(body, &env); err != nil {
return nil
}
if len(env.Params.Message) == 0 {
return nil
}
return extractFilesFromTask(env.Params.Message)
}
// extractChatResponseText collects text from any of the response shapes
// canvas's extractChatResponseText handles, joining with "\n":
//
// - {"result": "<text>"} — string
// - {"result": {"parts": [{"kind":"text", "text":""}]}} — A2A JSON-RPC
// - {"parts": [{"root": {"text": "..."}}]} — older nested shape
// - {"result": {"artifacts": [{"parts": [...]}]}} — task shape
// - {"task": "<text>"} — fallback
//
// Why collect rather than first-source-wins: claude-code emits multiple
// text parts; hermes emits summary-in-parts + details-in-artifacts. The
// pre-collect "first wins" silently truncated 15k-char briefs to their
// leading line and dropped artifact details. Matches canvas behavior
// exactly.
func extractChatResponseText(body json.RawMessage) string {
if len(body) == 0 {
return ""
}
// {"result": "string"}
var asString struct {
Result string `json:"result"`
}
if err := json.Unmarshal(body, &asString); err == nil && asString.Result != "" {
return asString.Result
}
// {"result": {object}} — try the structured shapes
var asObject struct {
Result json.RawMessage `json:"result"`
Task string `json:"task"`
}
if err := json.Unmarshal(body, &asObject); err != nil {
return ""
}
var collected []string
if len(asObject.Result) > 0 {
var resultObj struct {
Parts []map[string]any `json:"parts"`
Artifacts []json.RawMessage `json:"artifacts"`
}
if err := json.Unmarshal(asObject.Result, &resultObj); err == nil {
// A2A JSON-RPC: parts[].text
if t := joinTextParts(resultObj.Parts); t != "" {
collected = append(collected, t)
}
// Older nested: parts[].root.text
var rootTexts []string
for _, p := range resultObj.Parts {
if root, ok := p["root"].(map[string]any); ok {
if t, ok := root["text"].(string); ok && t != "" {
rootTexts = append(rootTexts, t)
}
}
}
if len(rootTexts) > 0 {
collected = append(collected, strings.Join(rootTexts, "\n"))
}
// Task shape: artifacts[].parts[].text
for _, raw := range resultObj.Artifacts {
var art struct {
Parts []map[string]any `json:"parts"`
}
if err := json.Unmarshal(raw, &art); err == nil {
if t := joinTextParts(art.Parts); t != "" {
collected = append(collected, t)
}
}
}
}
}
if len(collected) > 0 {
return strings.Join(collected, "\n")
}
if asObject.Task != "" {
return asObject.Task
}
return ""
}
// joinTextParts returns a "\n"-joined concatenation of every text part
// in parts[]. Empty if no text parts. Matches canvas extractTextsFromParts.
func joinTextParts(parts []map[string]any) string {
var texts []string
for _, p := range parts {
// Accept both "type":"text" (older) and "kind":"text" (current).
isText := false
if k, ok := p["kind"].(string); ok && k == "text" {
isText = true
}
if t, ok := p["type"].(string); ok && t == "text" {
isText = true
}
if !isText {
continue
}
if t, ok := p["text"].(string); ok && t != "" {
texts = append(texts, t)
}
}
return strings.Join(texts, "\n")
}
// extractFilesFromResponse collects file parts from the response_body
// across the same shape variants as extractChatResponseText. Mirrors
// canvas extractFilesFromTask, except the canvas function takes "the
// task object" while this takes the wire-level response_body and
// dispatches:
//
// - {"result": {object}} → unwrap result, walk parts/artifacts
// - {"result": "<text>", "parts": [...]} → notify shape, walk top-level parts
// - {"message": {"parts": [...]}} → some A2A servers wrap as a message
func extractFilesFromResponse(body json.RawMessage) []ChatAttachment {
if len(body) == 0 {
return nil
}
// Determine which container to feed extractFilesFromTask:
// - if result is an object, feed the result object
// - else feed the top-level body (notify shape with parts at root)
var probe struct {
Result json.RawMessage `json:"result"`
}
_ = json.Unmarshal(body, &probe)
feed := body
if len(probe.Result) > 0 {
// Is result an object? (vs a string)
trimmed := bytes_trim_space(probe.Result)
if len(trimmed) > 0 && trimmed[0] == '{' {
feed = probe.Result
}
}
return extractFilesFromTask(feed)
}
// extractFilesFromTask walks parts[] + artifacts[].parts[] + status.message.parts[]
// + message.parts[] and pulls out file parts. Mirrors canvas's
// extractFilesFromTask exactly — same two wire shapes (v0 hot path,
// v1 protobuf flat shape).
//
// Defensive: any error inside the walk is recovered and partial
// results returned. A malformed shape should never fail the whole
// chat reload — degraded UX is better than 500.
func extractFilesFromTask(taskJSON json.RawMessage) []ChatAttachment {
if len(taskJSON) == 0 {
return nil
}
var task struct {
Parts []map[string]any `json:"parts"`
Artifacts []json.RawMessage `json:"artifacts"`
Status json.RawMessage `json:"status"`
Message json.RawMessage `json:"message"`
}
if err := json.Unmarshal(taskJSON, &task); err != nil {
return nil
}
var out []ChatAttachment
out = appendFilesFromParts(out, task.Parts)
for _, raw := range task.Artifacts {
var art struct {
Parts []map[string]any `json:"parts"`
}
if err := json.Unmarshal(raw, &art); err == nil {
out = appendFilesFromParts(out, art.Parts)
}
}
if len(task.Status) > 0 {
var st struct {
Message struct {
Parts []map[string]any `json:"parts"`
} `json:"message"`
}
if err := json.Unmarshal(task.Status, &st); err == nil {
out = appendFilesFromParts(out, st.Message.Parts)
}
}
if len(task.Message) > 0 {
var msg struct {
Parts []map[string]any `json:"parts"`
}
if err := json.Unmarshal(task.Message, &msg); err == nil {
out = appendFilesFromParts(out, msg.Parts)
}
}
return out
}
// appendFilesFromParts handles the v0 hot path (kind/type=file with
// nested file{}) and the v1 flat path (url+filename+mediaType).
func appendFilesFromParts(out []ChatAttachment, parts []map[string]any) []ChatAttachment {
for _, raw := range parts {
v0 := false
if k, ok := raw["kind"].(string); ok && k == "file" {
v0 = true
}
if t, ok := raw["type"].(string); ok && t == "file" {
v0 = true
}
v1URL, _ := raw["url"].(string)
if !v0 && v1URL == "" {
continue
}
var att ChatAttachment
if v0 {
file, _ := raw["file"].(map[string]any)
if file == nil {
file = raw // some emitters flatten; defensive
}
uri, _ := file["uri"].(string)
if uri == "" {
continue
}
att.URI = uri
if name, _ := file["name"].(string); name != "" {
att.Name = name
} else {
att.Name = basename(uri)
}
if mt, ok := file["mimeType"].(string); ok {
att.MimeType = mt
}
if sz, ok := numericSize(file["size"]); ok {
att.Size = &sz
}
} else {
att.URI = v1URL
if name, _ := raw["filename"].(string); name != "" {
att.Name = name
} else {
att.Name = basename(v1URL)
}
if mt, ok := raw["mediaType"].(string); ok {
att.MimeType = mt
}
}
out = append(out, att)
}
return out
}
// numericSize coerces JSON's number type (always float64 in
// json.Unmarshal of map[string]any) to int64 for the Size field.
// Returns (0, false) for non-numeric or absent values.
func numericSize(v any) (int64, bool) {
switch n := v.(type) {
case float64:
return int64(n), true
case int64:
return n, true
case int:
return int64(n), true
}
return 0, false
}
// basename strips scheme + path components, returning the trailing
// segment (or "file" if empty). Mirrors canvas basename helper.
func basename(uri string) string {
cleaned := strings.TrimPrefix(uri, "workspace:")
cleaned = strings.TrimPrefix(cleaned, "https://")
cleaned = strings.TrimPrefix(cleaned, "http://")
if cleaned == "" {
return "file"
}
return path.Base(cleaned)
}
// bytes_trim_space — minimal whitespace stripper for json.RawMessage
// peeking. Avoids importing bytes for one tiny helper. Internal-only.
func bytes_trim_space(b json.RawMessage) json.RawMessage {
for len(b) > 0 && (b[0] == ' ' || b[0] == '\t' || b[0] == '\n' || b[0] == '\r') {
b = b[1:]
}
for len(b) > 0 && (b[len(b)-1] == ' ' || b[len(b)-1] == '\t' || b[len(b)-1] == '\n' || b[len(b)-1] == '\r') {
b = b[:len(b)-1]
}
return b
}
// newMessageID generates a fresh UUID per ChatMessage. Server-minted
// because activity_logs rows don't carry message-shaped ids, and the
// canvas only needs a React-key-stable id (it dedupes by content+role+
// timestamp window, not by id).
func newMessageID() string {
return uuid.New().String()
}
// ensureNoUnusedImports avoids lint complaining about `errors` if a
// future refactor removes the only consumer. errors is reserved for
// the inevitable wrap-aware DB-error handling once we add a
// distinguishable "DB outage vs no rows" path.
var _ = errors.Is

View File

@ -1,422 +1,276 @@
package handlers
// chat_history_test.go — Go-side parity tests for the canvas TS test
// fixtures in canvas/src/components/tabs/chat/__tests__/historyHydration.test.ts.
// chat_history_test.go — handler-level tests against a fake
// MessageStore. The parser-level parity tests against the canvas TS
// fixtures live in internal/messagestore/postgres_store_test.go;
// this file covers the HTTP-shape concerns (param validation,
// pagination passthrough, error mapping) without touching a DB.
//
// Every test case in the TS file has a Go counterpart here, named
// after the TS describe/it block. A future change that diverges the
// two implementations should fail the corresponding test here BEFORE
// the canvas's stale TS path silently returns wrong messages.
//
// Mutation guidance: when adding behavior, add the case to BOTH
// historyHydration.test.ts AND this file. RFC #2945 PR-C ships server-
// owned parsing — the canvas TS is the legacy source the server now
// replaces, so divergence == regression.
// Why the split: PR-D extracted storage to messagestore.MessageStore.
// The handler is now a thin adapter — its tests should exercise the
// adapter (ParseQuery → store.List → emitJSON), not the parser. A
// future MessageStore impl (S3, vector store) shares the same
// handler; testing the handler against the interface keeps the
// adapter test independent of any specific impl.
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/messagestore"
"github.com/gin-gonic/gin"
)
const fixedTimestamp = "2026-04-25T18:00:00Z"
const testWorkspaceID = "550e8400-e29b-41d4-a716-446655440000"
func mustParseTime(t *testing.T, s string) time.Time {
func init() {
gin.SetMode(gin.TestMode)
}
// fakeStore is a stub MessageStore for handler-level tests. Every
// real store impl (Postgres, S3, vector) shares the handler — so a
// fake that records inputs + returns scripted outputs is the right
// granularity for HTTP-shape coverage.
type fakeStore struct {
// LastWorkspaceID + LastOpts capture the call shape so the test
// can assert the handler passed the right args to the store.
LastWorkspaceID string
LastOpts messagestore.ListOptions
// Returns — set per test.
ReturnMessages []messagestore.ChatMessage
ReturnReachedEnd bool
ReturnErr error
// Panic — if non-empty, List panics with this string. Used by
// the resilience test to confirm the handler returns 502 on
// store-impl failures rather than crashing the goroutine.
PanicWith string
}
func (s *fakeStore) List(ctx context.Context, workspaceID string, opts messagestore.ListOptions) ([]messagestore.ChatMessage, bool, error) {
if s.PanicWith != "" {
panic(s.PanicWith)
}
s.LastWorkspaceID = workspaceID
s.LastOpts = opts
return s.ReturnMessages, s.ReturnReachedEnd, s.ReturnErr
}
// Compile-time assertion that fakeStore satisfies the interface.
// Catches drift if the interface changes and the fake stops being a
// drop-in for tests.
var _ messagestore.MessageStore = (*fakeStore)(nil)
func newRouter(store messagestore.MessageStore) *gin.Engine {
r := gin.New()
h := NewChatHistoryHandler(store)
r.GET("/workspaces/:id/chat-history", h.List)
return r
}
func doChatHistoryRequest(t *testing.T, r *gin.Engine, path string) *httptest.ResponseRecorder {
t.Helper()
tt, err := time.Parse(time.RFC3339, s)
if err != nil {
t.Fatalf("parse %s: %v", s, err)
}
return tt
}
func neverInternal(_ string) bool { return false }
// =====================================================================
// timestamp preservation (regression cover)
//
// The canvas bug that motivated extracting the helper: every reload
// re-stamped historical bubbles to render-time. Pin row.created_at
// adoption.
// =====================================================================
func TestChatHistory_UserMessageTimestampPinsToCreatedAt(t *testing.T) {
created := mustParseTime(t, "2026-04-25T18:00:00Z")
body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"hello from earlier today"}]}}}`)
msgs := activityRowToChatMessages(created, "ok", body, nil, neverInternal)
if len(msgs) != 1 {
t.Fatalf("expected 1 user message, got %d", len(msgs))
}
if msgs[0].Role != "user" {
t.Errorf("role=%q want user", msgs[0].Role)
}
if !strings.HasPrefix(msgs[0].Timestamp, "2026-04-25T18:00:00") {
t.Errorf("user message timestamp %q does NOT pin to row.created_at — regression of the 2026-04-25 bubble-collapse bug", msgs[0].Timestamp)
}
}
func TestChatHistory_AgentMessageTimestampPinsToCreatedAt(t *testing.T) {
created := mustParseTime(t, "2026-04-25T18:05:00Z")
body := json.RawMessage(`{"result":"agent reply"}`)
msgs := activityRowToChatMessages(created, "ok", nil, body, neverInternal)
if len(msgs) != 1 {
t.Fatalf("expected 1 agent message, got %d", len(msgs))
}
if msgs[0].Role != "agent" {
t.Errorf("role=%q want agent", msgs[0].Role)
}
if !strings.HasPrefix(msgs[0].Timestamp, "2026-04-25T18:05:00") {
t.Errorf("agent message timestamp %q does NOT pin to row.created_at", msgs[0].Timestamp)
}
}
func TestChatHistory_TwoRowsDistinctTimestamps(t *testing.T) {
bodyA := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"first"}]}}}`)
bodyB := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"second"}]}}}`)
a := activityRowToChatMessages(mustParseTime(t, "2026-04-25T14:00:00Z"), "ok", bodyA, nil, neverInternal)
b := activityRowToChatMessages(mustParseTime(t, "2026-04-25T21:01:58Z"), "ok", bodyB, nil, neverInternal)
if len(a) != 1 || len(b) != 1 {
t.Fatalf("expected 1 message each; got %d and %d", len(a), len(b))
}
if a[0].Timestamp == b[0].Timestamp {
t.Errorf("two distinct created_at values produced same timestamp: %q", a[0].Timestamp)
}
if !strings.HasPrefix(a[0].Timestamp, "2026-04-25T14:00:00") || !strings.HasPrefix(b[0].Timestamp, "2026-04-25T21:01:58") {
t.Errorf("timestamps drifted: a=%q b=%q", a[0].Timestamp, b[0].Timestamp)
}
req := httptest.NewRequest(http.MethodGet, path, nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
return w
}
// =====================================================================
// user-message extraction
// Param validation
// =====================================================================
func TestChatHistory_EmitsUserMessageWhenRequestHasText(t *testing.T) {
body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"hi agent"}]}}}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal)
if len(msgs) != 1 {
t.Fatalf("expected 1 message, got %d", len(msgs))
func TestChatHistoryHandler_RejectsNonUUIDWorkspaceID(t *testing.T) {
store := &fakeStore{}
r := newRouter(store)
w := doChatHistoryRequest(t, r, "/workspaces/not-a-uuid/chat-history")
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400 for non-UUID, got %d", w.Code)
}
if msgs[0].Role != "user" || msgs[0].Content != "hi agent" {
t.Errorf("role=%q content=%q want user/hi agent", msgs[0].Role, msgs[0].Content)
if store.LastWorkspaceID != "" {
t.Errorf("non-UUID reached the store: %q", store.LastWorkspaceID)
}
}
func TestChatHistory_DropsInternalSelfMessages(t *testing.T) {
body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"Delegation results are ready..."}]}}}`)
predicate := func(t string) bool { return strings.HasPrefix(t, "Delegation results are ready") }
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, predicate)
for _, m := range msgs {
if m.Role == "user" {
t.Errorf("internal-self message rendered as user bubble: %q", m.Content)
}
func TestChatHistoryHandler_RejectsMalformedBeforeTS(t *testing.T) {
store := &fakeStore{}
r := newRouter(store)
w := doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history?before_ts=not-a-timestamp")
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400 for malformed before_ts, got %d", w.Code)
}
if !strings.Contains(w.Body.String(), "RFC3339") {
t.Errorf("error message should mention RFC3339; got %q", w.Body.String())
}
}
func TestChatHistory_NoUserMessageWhenRequestBodyNull(t *testing.T) {
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, nil, neverInternal)
for _, m := range msgs {
if m.Role == "user" {
t.Errorf("emitted user bubble despite null request_body: %+v", m)
}
func TestChatHistoryHandler_DefaultsLimitTo100(t *testing.T) {
store := &fakeStore{}
r := newRouter(store)
doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history")
if store.LastOpts.Limit != 100 {
t.Errorf("default limit=%d want 100", store.LastOpts.Limit)
}
if store.LastOpts.HasBefore {
t.Errorf("HasBefore should be false when no cursor passed")
}
}
func TestChatHistory_UserAttachmentsHydratedFromRequestBody(t *testing.T) {
body := json.RawMessage(`{
"params": {
"message": {
"parts": [
{"kind":"text","text":"here's the screenshot"},
{"kind":"file","file":{"name":"shot.png","mimeType":"image/png","uri":"workspace:/uploads/shot.png","size":4096}}
]
}
}
}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal)
var user *ChatMessage
for i := range msgs {
if msgs[i].Role == "user" {
user = &msgs[i]
break
}
}
if user == nil {
t.Fatalf("no user bubble produced")
}
if user.Content != "here's the screenshot" {
t.Errorf("content=%q", user.Content)
}
if len(user.Attachments) != 1 {
t.Fatalf("attachments=%d want 1", len(user.Attachments))
}
att := user.Attachments[0]
if att.Name != "shot.png" || att.URI != "workspace:/uploads/shot.png" || att.MimeType != "image/png" {
t.Errorf("attachment shape wrong: %+v", att)
}
if att.Size == nil || *att.Size != 4096 {
t.Errorf("size=%v want 4096", att.Size)
func TestChatHistoryHandler_ClampsLimitToMax1000(t *testing.T) {
store := &fakeStore{}
r := newRouter(store)
doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history?limit=99999")
if store.LastOpts.Limit != 1000 {
t.Errorf("limit not clamped: got %d, want 1000", store.LastOpts.Limit)
}
}
func TestChatHistory_AttachmentsOnlyUserBubbleWhenTextEmpty(t *testing.T) {
// Drag-drop a file with no caption — bubble should still render.
body := json.RawMessage(`{
"params": {
"message": {
"parts": [
{"kind":"file","file":{"name":"report.pdf","uri":"workspace:/uploads/report.pdf"}}
]
}
}
}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal)
if len(msgs) != 1 {
t.Fatalf("expected 1 attachments-only bubble, got %d", len(msgs))
}
if msgs[0].Role != "user" || msgs[0].Content != "" || len(msgs[0].Attachments) != 1 {
t.Errorf("unexpected: role=%q content=%q attachments=%d", msgs[0].Role, msgs[0].Content, len(msgs[0].Attachments))
}
if msgs[0].Attachments[0].Name != "report.pdf" {
t.Errorf("attachment name=%q want report.pdf", msgs[0].Attachments[0].Name)
}
}
func TestChatHistoryHandler_IgnoresInvalidLimit(t *testing.T) {
// Negative or zero limits should fall back to default rather
// than reach the store (which rejects them as a programming bug).
store := &fakeStore{}
r := newRouter(store)
func TestChatHistory_InternalSelfPredicateSuppressesEvenWithAttachments(t *testing.T) {
body := json.RawMessage(`{
"params": {
"message": {
"parts": [
{"kind":"text","text":"Delegation results are ready..."},
{"kind":"file","file":{"name":"x.zip","uri":"workspace:/x.zip"}}
]
}
}
}`)
predicate := func(t string) bool { return strings.HasPrefix(t, "Delegation results are ready") }
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, predicate)
for _, m := range msgs {
if m.Role == "user" {
t.Errorf("internal-self predicate did NOT suppress user bubble despite attachments: %+v", m)
for _, bad := range []string{"-1", "0", "abc"} {
store.LastOpts = messagestore.ListOptions{}
doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history?limit="+bad)
if store.LastOpts.Limit != 100 {
t.Errorf("limit=%q yielded %d, want default 100", bad, store.LastOpts.Limit)
}
}
}
// =====================================================================
// agent-message extraction
// Pagination passthrough
// =====================================================================
func TestChatHistory_AgentMessageFromResultString(t *testing.T) {
body := json.RawMessage(`{"result":"agent says hi"}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
if len(msgs) != 1 || msgs[0].Role != "agent" || msgs[0].Content != "agent says hi" {
t.Errorf("got %+v", msgs)
}
}
func TestChatHistoryHandler_BeforeTSPassedToStore(t *testing.T) {
store := &fakeStore{}
r := newRouter(store)
func TestChatHistory_RoleSystemWhenStatusError(t *testing.T) {
body := json.RawMessage(`{"result":"delegation failed"}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "error", nil, body, neverInternal)
if len(msgs) != 1 || msgs[0].Role != "system" {
t.Errorf("status=error did NOT promote role to system: %+v", msgs)
}
}
doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history?before_ts=2026-04-25T18:00:00Z&limit=25")
func TestChatHistory_RoleSystemWhenAgentErrorPrefix(t *testing.T) {
// Defense-in-depth — if a runtime returns ok status but the text
// itself starts with "agent error", the canvas would still
// render system role. Mirror that here.
body := json.RawMessage(`{"result":"Agent error: ProcessError(exit=1)"}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
if len(msgs) != 1 || msgs[0].Role != "system" {
t.Errorf("agent-error prefix did NOT promote to system: %+v", msgs)
if !store.LastOpts.HasBefore {
t.Errorf("HasBefore=false but query passed before_ts")
}
}
func TestChatHistory_AgentAttachmentsFromResponseBodyParts(t *testing.T) {
// Notify shape: response_body = {"result":"<text>","parts":[{"kind":"file",...}]}
body := json.RawMessage(`{
"result": "Done — see attached.",
"parts": [
{"kind":"file","file":{"name":"build.zip","uri":"workspace:/tmp/build.zip","size":12345}}
]
}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
var agent *ChatMessage
for i := range msgs {
if msgs[i].Role == "agent" {
agent = &msgs[i]
break
}
got := store.LastOpts.BeforeTS.UTC().Format("2006-01-02T15:04:05Z")
if got != "2026-04-25T18:00:00Z" {
t.Errorf("BeforeTS=%q want 2026-04-25T18:00:00Z", got)
}
if agent == nil {
t.Fatalf("no agent bubble")
}
if len(agent.Attachments) != 1 || agent.Attachments[0].Name != "build.zip" {
t.Errorf("agent attachments shape wrong: %+v", agent.Attachments)
}
if agent.Attachments[0].Size == nil || *agent.Attachments[0].Size != 12345 {
t.Errorf("size=%v want 12345", agent.Attachments[0].Size)
}
}
func TestChatHistory_NoAgentMessageWhenResponseBodyNull(t *testing.T) {
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, nil, neverInternal)
for _, m := range msgs {
if m.Role == "agent" || m.Role == "system" {
t.Errorf("emitted agent/system bubble despite null response_body: %+v", m)
}
}
}
func TestChatHistory_NoAgentMessageWhenResponseHasNoTextNoFiles(t *testing.T) {
body := json.RawMessage(`{"unrelated":"metadata"}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
for _, m := range msgs {
if m.Role == "agent" {
t.Errorf("emitted agent bubble despite empty content: %+v", m)
}
if store.LastOpts.Limit != 25 {
t.Errorf("limit=%d want 25", store.LastOpts.Limit)
}
}
// =====================================================================
// end-to-end shape — paired user + agent with same timestamp
// Response shape
// =====================================================================
func TestChatHistory_PairedUserAndAgentSameTimestamp(t *testing.T) {
created := mustParseTime(t, "2026-04-25T18:00:00Z")
req := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"what's 2+2?"}]}}}`)
resp := json.RawMessage(`{"result":"4"}`)
msgs := activityRowToChatMessages(created, "ok", req, resp, neverInternal)
if len(msgs) != 2 {
t.Fatalf("expected 2 messages, got %d", len(msgs))
func TestChatHistoryHandler_EmptyResultIsArrayNotNull(t *testing.T) {
// nil messages slice from the store must serialize as `[]`,
// not `null` — canvas's JSON parser has one path.
store := &fakeStore{ReturnMessages: nil, ReturnReachedEnd: true}
r := newRouter(store)
w := doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history")
if w.Code != http.StatusOK {
t.Fatalf("status=%d", w.Code)
}
if msgs[0].Role != "user" || msgs[0].Content != "what's 2+2?" {
t.Errorf("first message wrong: %+v", msgs[0])
var resp ChatHistoryResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("body not JSON: %v", err)
}
if msgs[1].Role != "agent" || msgs[1].Content != "4" {
t.Errorf("second message wrong: %+v", msgs[1])
// json.Unmarshal of `null` into a []slice yields a nil — assert
// the JSON literally contains "[]" so a future change that
// forgets the nil-coercion would fail loudly.
if !strings.Contains(w.Body.String(), `"messages":[]`) {
t.Errorf("body should contain `\"messages\":[]`; got %s", w.Body.String())
}
if msgs[0].Timestamp != msgs[1].Timestamp {
t.Errorf("paired bubbles have different timestamps: %q vs %q", msgs[0].Timestamp, msgs[1].Timestamp)
if !resp.ReachedEnd {
t.Errorf("reached_end not propagated")
}
}
func TestChatHistoryHandler_NonEmptyResponsePreservesShape(t *testing.T) {
size := int64(4096)
store := &fakeStore{
ReturnMessages: []messagestore.ChatMessage{
{
ID: "msg-1",
Role: "user",
Content: "hi",
Timestamp: "2026-04-25T18:00:00Z",
},
{
ID: "msg-2",
Role: "agent",
Content: "hello back",
Attachments: []messagestore.ChatAttachment{
{Name: "img.png", URI: "workspace:/img.png", MimeType: "image/png", Size: &size},
},
Timestamp: "2026-04-25T18:00:01Z",
},
},
ReturnReachedEnd: false,
}
r := newRouter(store)
w := doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history")
if w.Code != http.StatusOK {
t.Fatalf("status=%d body=%s", w.Code, w.Body.String())
}
var resp ChatHistoryResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("body not JSON: %v", err)
}
if len(resp.Messages) != 2 {
t.Fatalf("messages=%d want 2", len(resp.Messages))
}
if resp.Messages[1].Attachments[0].Size == nil || *resp.Messages[1].Attachments[0].Size != 4096 {
t.Errorf("size pointer flattened in JSON round-trip")
}
}
// =====================================================================
// Go-specific: defensive parsing
// Error mapping — store errors become 502, not 500/panic
// =====================================================================
func TestChatHistory_MalformedJSONInRequestBodyReturnsEmpty(t *testing.T) {
// Should NOT panic; should return no user bubble (or no message at all).
body := json.RawMessage(`{not valid json}`)
defer func() {
if r := recover(); r != nil {
t.Fatalf("panic on malformed json: %v", r)
}
}()
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal)
for _, m := range msgs {
if m.Role == "user" && (m.Content != "" || len(m.Attachments) > 0) {
t.Errorf("malformed JSON yielded a non-empty user bubble: %+v", m)
}
}
}
func TestChatHistoryHandler_StoreErrorReturns502(t *testing.T) {
store := &fakeStore{ReturnErr: errors.New("simulated DB unreachable")}
r := newRouter(store)
w := doChatHistoryRequest(t, r, "/workspaces/"+testWorkspaceID+"/chat-history")
func TestChatHistory_V1ProtobufFlatFileShape(t *testing.T) {
// v1 a2a-sdk shape: flat parts with url/filename/mediaType
body := json.RawMessage(`{
"result": {
"parts": [
{"url":"https://example.com/data.csv","filename":"data.csv","mediaType":"text/csv"}
]
}
}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
var agent *ChatMessage
for i := range msgs {
if msgs[i].Role == "agent" {
agent = &msgs[i]
break
}
if w.Code != http.StatusBadGateway {
t.Errorf("expected 502 on store error, got %d", w.Code)
}
if agent == nil {
t.Fatalf("no agent bubble for v1 shape")
}
if len(agent.Attachments) != 1 {
t.Fatalf("attachments=%d want 1", len(agent.Attachments))
}
att := agent.Attachments[0]
if att.Name != "data.csv" || att.URI != "https://example.com/data.csv" || att.MimeType != "text/csv" {
t.Errorf("v1 shape extracted wrong: %+v", att)
}
}
func TestChatHistory_TaskShapeArtifactsExtracted(t *testing.T) {
// {"result":{"artifacts":[{"parts":[{"kind":"text","text":"..."}]}]}}
body := json.RawMessage(`{
"result": {
"artifacts": [
{"parts": [{"kind":"text","text":"hermes detail line"}]}
]
}
}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
if len(msgs) != 1 || msgs[0].Content != "hermes detail line" {
t.Errorf("artifact text not extracted: %+v", msgs)
}
}
func TestChatHistory_OlderNestedRootTextShape(t *testing.T) {
// Older shape: {parts: [{root: {text: "..."}}]}
body := json.RawMessage(`{
"result": {
"parts": [{"root":{"text":"legacy nested text"}}]
}
}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
if len(msgs) != 1 || !strings.Contains(msgs[0].Content, "legacy nested text") {
t.Errorf("nested root.text not extracted: %+v", msgs)
if !strings.Contains(w.Body.String(), "unavailable") {
t.Errorf("response body should communicate unavailability; got %q", w.Body.String())
}
}
// =====================================================================
// isInternalSelfMessage predicate itself
// Interface conformance — the platform-default Postgres impl is the
// only impl in tree today, but the assertion catches future drift if
// the interface evolves and the impl falls behind.
// =====================================================================
func TestChatHistory_IsInternalSelfMessage_DelegationPrefix(t *testing.T) {
if !isInternalSelfMessage("Delegation results are ready... <body>") {
t.Errorf("Delegation-results prefix should be flagged internal-self")
}
if isInternalSelfMessage("Delegation completed but not ready") {
t.Errorf("non-prefix match should NOT flag")
}
if isInternalSelfMessage("") {
t.Errorf("empty text should NOT flag (legitimate attachments-only bubble)")
}
}
// =====================================================================
// basename helper — mirrors canvas basename() semantics
// =====================================================================
func TestChatHistory_BasenameStripsSchemeAndPath(t *testing.T) {
cases := []struct {
in, want string
}{
{"workspace:/uploads/shot.png", "shot.png"},
{"workspace:/a/b/c/file.txt", "file.txt"},
{"https://example.com/path/file.csv", "file.csv"},
{"http://x/y", "y"},
{"", "file"},
{"workspace:", "file"}, // scheme-only collapses to "" → "file" sentinel, matches canvas basename
}
for _, tc := range cases {
got := basename(tc.in)
if got != tc.want {
t.Errorf("basename(%q) = %q want %q", tc.in, got, tc.want)
}
}
func TestMessageStoreInterface_PostgresImplSatisfies(t *testing.T) {
// Compile-time assertion lives in messagestore/postgres_store.go
// (`var _ MessageStore = (*PostgresMessageStore)(nil)`). This
// runtime test exists only to keep the conformance visible in
// the handler test file — a reader of chat_history_test.go
// shouldn't have to traverse to the messagestore package to see
// what the handler is paired with.
var s messagestore.MessageStore = messagestore.NewPostgresMessageStore(nil)
_ = s
}

View File

@ -0,0 +1,118 @@
// Package messagestore defines the read-side interface and canonical
// data shapes for chat-history retrieval.
//
// Origin: RFC #2945 PR-D (issue #3026). PR-A extracted the WRITE path
// (AgentMessageWriter), PR-B/B-1 typed the WS event taxonomy, PR-C
// centralized read-side parsing in the server. PR-D abstracts the
// underlying storage layer so OSS operators can plug in alternative
// backends without forking the handler.
//
// # Why this package exists
//
// Today's only consumer is ChatHistoryHandler, but exposing storage as
// an interface is what makes the platform's chat-history layer pluggable
// for OSS operators. Operators wanting to:
//
// - Tier hot/warm/cold storage (recent in Postgres, archival in S3 parquet)
// - Use a vector store with hybrid search (Pinecone, Weaviate)
// - Run an in-memory store for ephemeral tests / sandbox tenants
// - Federate history across regions
//
// …implement MessageStore against their backend. The platform-default
// PostgresMessageStore wraps today's activity_logs query + parser
// behavior unchanged.
//
// # Implementation contract
//
// Implementations MUST:
//
// - Return messages newest-first, up to opts.Limit. Caller (the
// handler) is responsible for opts.Limit clamping.
// - Honor opts.BeforeTS as a strict less-than cursor when
// opts.HasBefore is true; ignore it when false. Use HasBefore (not
// a zero-time check) so a legitimate "start of epoch" cursor is
// distinguishable from "no cursor."
// - Set reachedEnd=true when the underlying store has no more
// messages older than the returned page. Caller uses this to
// disable further older-batch fetches in the lazy-load UX.
// - Parse agent-emitted JSON DEFENSIVELY. Any malformed message body
// becomes an empty ChatMessage (or is dropped); never panic, never
// return an error for parse failures alone — chat falls through to
// text-only rather than 500.
// - NEVER log full message bodies, attachment URIs, or anything that
// would be a sensitive screenshot. Workspace ID + activity-log
// row id at DEBUG is the ceiling.
// - Honor ctx cancellation. A canceled ctx must abort the lookup
// and return ctx.Err().
//
// Implementations MAY:
//
// - Cache aggressively (history is read-only).
// - Filter out additional rows beyond what the interface requires
// (e.g., role-based redaction in regulated environments) as long
// as reachedEnd is set conservatively (false if uncertain).
//
// # Threading
//
// Implementations MUST be safe for concurrent calls. The handler
// dispatches a goroutine per request; a non-thread-safe impl would
// race on every chat reload.
package messagestore
import (
"context"
"time"
)
// ChatMessage is the canonical shape returned to chat-history clients.
// Mirrors canvas's ChatMessage TS type so the canvas can render
// without per-row mapping.
//
// ID is server-minted per ChatMessage. Activity-log rows don't carry
// message-shaped ids; canvas dedupes by (role, content, timestamp
// window) not by id, so id stability across requests is not required.
type ChatMessage struct {
ID string `json:"id"`
Role string `json:"role"` // "user" | "agent" | "system"
Content string `json:"content"`
Attachments []ChatAttachment `json:"attachments,omitempty"`
Timestamp string `json:"timestamp"` // RFC3339, pinned to row.created_at
}
// ChatAttachment mirrors canvas ChatAttachment / ParsedFilePart.
// Size is *int64 (not int64) so JSON omits the field when unknown,
// rather than emitting `"size": 0` which the renderer would interpret
// as "zero-byte file."
type ChatAttachment struct {
Name string `json:"name"`
URI string `json:"uri"`
MimeType string `json:"mimeType,omitempty"`
Size *int64 `json:"size,omitempty"`
}
// ListOptions is the page-window the handler hands to the store.
// Constructed by the handler from query parameters; the store should
// not inspect the request directly.
type ListOptions struct {
// Limit is the page size. Caller (the handler) clamps to a sane
// bound (default 100, max 1000); store treats Limit ≤ 0 as a
// programming error.
Limit int
// BeforeTS is the cursor for paginating backward. The store MUST
// only consider this when HasBefore is true; using a zero-time
// fallback would silently exclude the legitimate epoch-start case.
BeforeTS time.Time
HasBefore bool
}
// MessageStore is the read-side interface. Implementations pluggable
// via constructor injection at handler creation time.
//
// Why "List" and not "GetMessages" / "ReadHistory" / etc: List matches
// the verb on /workspaces/:id/chat-history (HTTP GET on a collection)
// and the existing handler method. One-name-one-thing keeps the
// interface and the route lined up.
type MessageStore interface {
List(ctx context.Context, workspaceID string, opts ListOptions) (messages []ChatMessage, reachedEnd bool, err error)
}

View File

@ -0,0 +1,497 @@
package messagestore
// postgres_store.go — default MessageStore impl that wraps today's
// activity_logs query + the A2A-envelope parser ported in PR-C.
//
// Behavior is byte-identical to the pre-PR-D ChatHistoryHandler:
// same SQL, same role-decision rules, same v0/v1 wire-shape support.
// The only structural change is that the handler now depends on an
// interface; this file is what was the pre-PR-D handler internals.
//
// This is the baseline impl OSS operators compare against when
// writing alternative stores. Read it as the contract spec.
import (
"context"
"database/sql"
"encoding/json"
"path"
"strings"
"time"
"github.com/google/uuid"
)
// PostgresMessageStore is the platform-default impl. It queries the
// activity_logs table directly and parses request_body / response_body
// JSONB columns into ChatMessage values.
type PostgresMessageStore struct {
db *sql.DB
}
// NewPostgresMessageStore wraps a *sql.DB. The store does not own the
// pool — closing it is the caller's responsibility.
func NewPostgresMessageStore(db *sql.DB) *PostgresMessageStore {
return &PostgresMessageStore{db: db}
}
// internalSelfPrefixes — message texts that should be filtered from
// chat history because they're internal self-triggers (heartbeats,
// scheduled-task self-fire, delegation-result self-notify), not
// user-typed messages. Mirrors canvas isInternalSelfMessage.
//
// Centralizing here means a future internal-trigger pattern is added
// in one place; alternative impls of MessageStore are expected to
// apply the same filter (or override deliberately).
var internalSelfPrefixes = []string{
"Delegation results are ready",
}
// IsInternalSelfMessage reports whether text starts with any registered
// internal-self prefix. Empty text returns false (legitimate
// attachments-only bubble). Exported for impls that want to share the
// same predicate.
func IsInternalSelfMessage(text string) bool {
if text == "" {
return false
}
for _, prefix := range internalSelfPrefixes {
if strings.HasPrefix(text, prefix) {
return true
}
}
return false
}
// List implements MessageStore. Newest-first, optionally paged by
// BeforeTS. Filters to a2a_receive activity rows from the canvas
// (source_id IS NULL) — same scope canvas applies via
// /activity?source=canvas, centralized so future API consumers don't
// need to know it.
func (s *PostgresMessageStore) List(ctx context.Context, workspaceID string, opts ListOptions) ([]ChatMessage, bool, error) {
if opts.Limit <= 0 {
// Caller bug. Programmers learn quickly when the store
// fails fast on bad opts; a silent clamp would hide the bug.
return nil, true, errInvalidLimit
}
rows, err := s.queryActivityRows(ctx, workspaceID, opts)
if err != nil {
return nil, false, err
}
defer rows.Close()
var messages []ChatMessage
rowCount := 0
for rows.Next() {
var (
createdAt time.Time
status string
rawRequest sql.NullString
rawResponse sql.NullString
)
if err := rows.Scan(&createdAt, &status, &rawRequest, &rawResponse); err != nil {
// Skip malformed row, continue. The error is logged at
// the caller (handler) layer; an isolated bad row should
// not abort the whole page.
continue
}
rowCount++
var requestBody, responseBody json.RawMessage
if rawRequest.Valid {
requestBody = json.RawMessage(rawRequest.String)
}
if rawResponse.Valid {
responseBody = json.RawMessage(rawResponse.String)
}
messages = append(messages, activityRowToChatMessages(createdAt, status, requestBody, responseBody, IsInternalSelfMessage)...)
}
if err := rows.Err(); err != nil {
return nil, false, err
}
reachedEnd := rowCount < opts.Limit
return messages, reachedEnd, nil
}
// queryActivityRows is split from List so unit tests can exercise the
// parser without spinning a real DB. Internal — alternative impls
// shouldn't depend on the SQL shape.
func (s *PostgresMessageStore) queryActivityRows(ctx context.Context, workspaceID string, opts ListOptions) (*sql.Rows, error) {
if opts.HasBefore {
return s.db.QueryContext(ctx, `
SELECT created_at, status, request_body::text, response_body::text
FROM activity_logs
WHERE workspace_id = $1
AND activity_type = 'a2a_receive'
AND source_id IS NULL
AND created_at < $2
ORDER BY created_at DESC
LIMIT $3
`, workspaceID, opts.BeforeTS, opts.Limit)
}
return s.db.QueryContext(ctx, `
SELECT created_at, status, request_body::text, response_body::text
FROM activity_logs
WHERE workspace_id = $1
AND activity_type = 'a2a_receive'
AND source_id IS NULL
ORDER BY created_at DESC
LIMIT $2
`, workspaceID, opts.Limit)
}
// errInvalidLimit is returned by List when opts.Limit ≤ 0.
type sentinelError string
func (e sentinelError) Error() string { return string(e) }
const errInvalidLimit sentinelError = "messagestore: List opts.Limit must be > 0"
// activityRowToChatMessages converts ONE activity_logs row into 0-2
// ChatMessages. Direct port of canvas activityRowToMessages.
//
// - Up to 1 user-side bubble from request_body, unless internal-self.
// - Up to 1 agent-side bubble from response_body. Role is "system"
// when status='error' OR text starts with "agent error" (case-
// insensitive — matches canvas predicate exactly).
//
// Both bubbles MUST adopt row.created_at as their timestamp. This
// pins the regression cover for the 2026-04-25 bubble-collapse bug.
func activityRowToChatMessages(
createdAt time.Time,
status string,
requestBody json.RawMessage,
responseBody json.RawMessage,
internalSelf func(string) bool,
) []ChatMessage {
var out []ChatMessage
timestamp := createdAt.UTC().Format(time.RFC3339Nano)
userText := extractRequestText(requestBody)
userAttachments := extractFilesFromUserMessage(requestBody)
if !internalSelf(userText) && (userText != "" || len(userAttachments) > 0) {
out = append(out, ChatMessage{
ID: newMessageID(),
Role: "user",
Content: userText,
Attachments: userAttachments,
Timestamp: timestamp,
})
}
if len(responseBody) > 0 {
agentText := extractChatResponseText(responseBody)
agentAttachments := extractFilesFromResponse(responseBody)
if agentText != "" || len(agentAttachments) > 0 {
role := "agent"
if status == "error" || strings.HasPrefix(strings.ToLower(agentText), "agent error") {
role = "system"
}
out = append(out, ChatMessage{
ID: newMessageID(),
Role: role,
Content: agentText,
Attachments: agentAttachments,
Timestamp: timestamp,
})
}
}
return out
}
// extractRequestText pulls the user's typed text from
// request_body.params.message.parts[0].text. Returns "" on any
// malformed shape; callers pair with extractFilesFromUserMessage to
// catch attachments-only bubbles.
func extractRequestText(body json.RawMessage) string {
if len(body) == 0 {
return ""
}
var env struct {
Params struct {
Message struct {
Parts []map[string]any `json:"parts"`
} `json:"message"`
} `json:"params"`
}
if err := json.Unmarshal(body, &env); err != nil {
return ""
}
for _, p := range env.Params.Message.Parts {
if t, ok := p["text"].(string); ok && t != "" {
return t
}
}
return ""
}
func extractFilesFromUserMessage(body json.RawMessage) []ChatAttachment {
if len(body) == 0 {
return nil
}
var env struct {
Params struct {
Message json.RawMessage `json:"message"`
} `json:"params"`
}
if err := json.Unmarshal(body, &env); err != nil {
return nil
}
if len(env.Params.Message) == 0 {
return nil
}
return extractFilesFromTask(env.Params.Message)
}
// extractChatResponseText collects text from any of the response
// shapes canvas extractResponseText handles, joining with "\n":
//
// - {"result": "<text>"}
// - {"result": {"parts": [{"kind":"text","text":""}]}}
// - {"parts": [{"root": {"text": "..."}}]} (older nested)
// - {"result": {"artifacts": [{"parts": [...]}]}} (task shape)
// - {"task": "<text>"} (fallback)
//
// Why collect rather than first-source-wins: claude-code emits
// multiple text parts; hermes emits summary-in-parts +
// details-in-artifacts. The pre-collect first-wins silently
// truncated 15k-char briefs and dropped artifact details.
func extractChatResponseText(body json.RawMessage) string {
if len(body) == 0 {
return ""
}
// {"result": "string"}
var asString struct {
Result string `json:"result"`
}
if err := json.Unmarshal(body, &asString); err == nil && asString.Result != "" {
return asString.Result
}
// {"result": {object}} — try the structured shapes
var asObject struct {
Result json.RawMessage `json:"result"`
Task string `json:"task"`
}
if err := json.Unmarshal(body, &asObject); err != nil {
return ""
}
var collected []string
if len(asObject.Result) > 0 {
var resultObj struct {
Parts []map[string]any `json:"parts"`
Artifacts []json.RawMessage `json:"artifacts"`
}
if err := json.Unmarshal(asObject.Result, &resultObj); err == nil {
if t := joinTextParts(resultObj.Parts); t != "" {
collected = append(collected, t)
}
var rootTexts []string
for _, p := range resultObj.Parts {
if root, ok := p["root"].(map[string]any); ok {
if t, ok := root["text"].(string); ok && t != "" {
rootTexts = append(rootTexts, t)
}
}
}
if len(rootTexts) > 0 {
collected = append(collected, strings.Join(rootTexts, "\n"))
}
for _, raw := range resultObj.Artifacts {
var art struct {
Parts []map[string]any `json:"parts"`
}
if err := json.Unmarshal(raw, &art); err == nil {
if t := joinTextParts(art.Parts); t != "" {
collected = append(collected, t)
}
}
}
}
}
if len(collected) > 0 {
return strings.Join(collected, "\n")
}
if asObject.Task != "" {
return asObject.Task
}
return ""
}
func joinTextParts(parts []map[string]any) string {
var texts []string
for _, p := range parts {
isText := false
if k, ok := p["kind"].(string); ok && k == "text" {
isText = true
}
if t, ok := p["type"].(string); ok && t == "text" {
isText = true
}
if !isText {
continue
}
if t, ok := p["text"].(string); ok && t != "" {
texts = append(texts, t)
}
}
return strings.Join(texts, "\n")
}
func extractFilesFromResponse(body json.RawMessage) []ChatAttachment {
if len(body) == 0 {
return nil
}
var probe struct {
Result json.RawMessage `json:"result"`
}
_ = json.Unmarshal(body, &probe)
feed := body
if len(probe.Result) > 0 {
trimmed := bytesTrimSpace(probe.Result)
if len(trimmed) > 0 && trimmed[0] == '{' {
feed = probe.Result
}
}
return extractFilesFromTask(feed)
}
// extractFilesFromTask walks parts[] + artifacts[].parts[] +
// status.message.parts[] + message.parts[]. Mirrors canvas
// extractFilesFromTask exactly — same v0 hot path + v1 protobuf
// flat shape.
func extractFilesFromTask(taskJSON json.RawMessage) []ChatAttachment {
if len(taskJSON) == 0 {
return nil
}
var task struct {
Parts []map[string]any `json:"parts"`
Artifacts []json.RawMessage `json:"artifacts"`
Status json.RawMessage `json:"status"`
Message json.RawMessage `json:"message"`
}
if err := json.Unmarshal(taskJSON, &task); err != nil {
return nil
}
var out []ChatAttachment
out = appendFilesFromParts(out, task.Parts)
for _, raw := range task.Artifacts {
var art struct {
Parts []map[string]any `json:"parts"`
}
if err := json.Unmarshal(raw, &art); err == nil {
out = appendFilesFromParts(out, art.Parts)
}
}
if len(task.Status) > 0 {
var st struct {
Message struct {
Parts []map[string]any `json:"parts"`
} `json:"message"`
}
if err := json.Unmarshal(task.Status, &st); err == nil {
out = appendFilesFromParts(out, st.Message.Parts)
}
}
if len(task.Message) > 0 {
var msg struct {
Parts []map[string]any `json:"parts"`
}
if err := json.Unmarshal(task.Message, &msg); err == nil {
out = appendFilesFromParts(out, msg.Parts)
}
}
return out
}
func appendFilesFromParts(out []ChatAttachment, parts []map[string]any) []ChatAttachment {
for _, raw := range parts {
v0 := false
if k, ok := raw["kind"].(string); ok && k == "file" {
v0 = true
}
if t, ok := raw["type"].(string); ok && t == "file" {
v0 = true
}
v1URL, _ := raw["url"].(string)
if !v0 && v1URL == "" {
continue
}
var att ChatAttachment
if v0 {
file, _ := raw["file"].(map[string]any)
if file == nil {
file = raw
}
uri, _ := file["uri"].(string)
if uri == "" {
continue
}
att.URI = uri
if name, _ := file["name"].(string); name != "" {
att.Name = name
} else {
att.Name = basename(uri)
}
if mt, ok := file["mimeType"].(string); ok {
att.MimeType = mt
}
if sz, ok := numericSize(file["size"]); ok {
att.Size = &sz
}
} else {
att.URI = v1URL
if name, _ := raw["filename"].(string); name != "" {
att.Name = name
} else {
att.Name = basename(v1URL)
}
if mt, ok := raw["mediaType"].(string); ok {
att.MimeType = mt
}
}
out = append(out, att)
}
return out
}
func numericSize(v any) (int64, bool) {
switch n := v.(type) {
case float64:
return int64(n), true
case int64:
return n, true
case int:
return int64(n), true
}
return 0, false
}
func basename(uri string) string {
cleaned := strings.TrimPrefix(uri, "workspace:")
cleaned = strings.TrimPrefix(cleaned, "https://")
cleaned = strings.TrimPrefix(cleaned, "http://")
if cleaned == "" {
return "file"
}
return path.Base(cleaned)
}
func bytesTrimSpace(b json.RawMessage) json.RawMessage {
for len(b) > 0 && (b[0] == ' ' || b[0] == '\t' || b[0] == '\n' || b[0] == '\r') {
b = b[1:]
}
for len(b) > 0 && (b[len(b)-1] == ' ' || b[len(b)-1] == '\t' || b[len(b)-1] == '\n' || b[len(b)-1] == '\r') {
b = b[:len(b)-1]
}
return b
}
func newMessageID() string {
return uuid.New().String()
}
// Compile-time assertion: PostgresMessageStore satisfies MessageStore.
// Catches any future drift between interface and impl at build time.
var _ MessageStore = (*PostgresMessageStore)(nil)

View File

@ -0,0 +1,422 @@
package messagestore
// postgres_store_test.go — parser-level parity tests against the
// canvas TS test fixtures in
// canvas/src/components/tabs/chat/__tests__/historyHydration.test.ts.
//
// Originally lived in handlers/chat_history_test.go (RFC #2945 PR-C);
// PR-D moved them here when the parser was extracted to this package.
// Every test case in the TS file has a Go counterpart, named after
// the TS describe/it block.
//
// Mutation guidance: when adding behavior, add the case to BOTH
// historyHydration.test.ts AND this file. The canvas TS is the
// legacy source the server replaces; divergence == regression.
import (
"encoding/json"
"strings"
"testing"
"time"
)
const fixedTimestamp = "2026-04-25T18:00:00Z"
func mustParseTime(t *testing.T, s string) time.Time {
t.Helper()
tt, err := time.Parse(time.RFC3339, s)
if err != nil {
t.Fatalf("parse %s: %v", s, err)
}
return tt
}
func neverInternal(_ string) bool { return false }
// =====================================================================
// timestamp preservation (regression cover)
//
// The canvas bug that motivated extracting the helper: every reload
// re-stamped historical bubbles to render-time. Pin row.created_at
// adoption.
// =====================================================================
func TestChatHistory_UserMessageTimestampPinsToCreatedAt(t *testing.T) {
created := mustParseTime(t, "2026-04-25T18:00:00Z")
body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"hello from earlier today"}]}}}`)
msgs := activityRowToChatMessages(created, "ok", body, nil, neverInternal)
if len(msgs) != 1 {
t.Fatalf("expected 1 user message, got %d", len(msgs))
}
if msgs[0].Role != "user" {
t.Errorf("role=%q want user", msgs[0].Role)
}
if !strings.HasPrefix(msgs[0].Timestamp, "2026-04-25T18:00:00") {
t.Errorf("user message timestamp %q does NOT pin to row.created_at — regression of the 2026-04-25 bubble-collapse bug", msgs[0].Timestamp)
}
}
func TestChatHistory_AgentMessageTimestampPinsToCreatedAt(t *testing.T) {
created := mustParseTime(t, "2026-04-25T18:05:00Z")
body := json.RawMessage(`{"result":"agent reply"}`)
msgs := activityRowToChatMessages(created, "ok", nil, body, neverInternal)
if len(msgs) != 1 {
t.Fatalf("expected 1 agent message, got %d", len(msgs))
}
if msgs[0].Role != "agent" {
t.Errorf("role=%q want agent", msgs[0].Role)
}
if !strings.HasPrefix(msgs[0].Timestamp, "2026-04-25T18:05:00") {
t.Errorf("agent message timestamp %q does NOT pin to row.created_at", msgs[0].Timestamp)
}
}
func TestChatHistory_TwoRowsDistinctTimestamps(t *testing.T) {
bodyA := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"first"}]}}}`)
bodyB := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"second"}]}}}`)
a := activityRowToChatMessages(mustParseTime(t, "2026-04-25T14:00:00Z"), "ok", bodyA, nil, neverInternal)
b := activityRowToChatMessages(mustParseTime(t, "2026-04-25T21:01:58Z"), "ok", bodyB, nil, neverInternal)
if len(a) != 1 || len(b) != 1 {
t.Fatalf("expected 1 message each; got %d and %d", len(a), len(b))
}
if a[0].Timestamp == b[0].Timestamp {
t.Errorf("two distinct created_at values produced same timestamp: %q", a[0].Timestamp)
}
if !strings.HasPrefix(a[0].Timestamp, "2026-04-25T14:00:00") || !strings.HasPrefix(b[0].Timestamp, "2026-04-25T21:01:58") {
t.Errorf("timestamps drifted: a=%q b=%q", a[0].Timestamp, b[0].Timestamp)
}
}
// =====================================================================
// user-message extraction
// =====================================================================
func TestChatHistory_EmitsUserMessageWhenRequestHasText(t *testing.T) {
body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"hi agent"}]}}}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal)
if len(msgs) != 1 {
t.Fatalf("expected 1 message, got %d", len(msgs))
}
if msgs[0].Role != "user" || msgs[0].Content != "hi agent" {
t.Errorf("role=%q content=%q want user/hi agent", msgs[0].Role, msgs[0].Content)
}
}
func TestChatHistory_DropsInternalSelfMessages(t *testing.T) {
body := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"Delegation results are ready..."}]}}}`)
predicate := func(t string) bool { return strings.HasPrefix(t, "Delegation results are ready") }
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, predicate)
for _, m := range msgs {
if m.Role == "user" {
t.Errorf("internal-self message rendered as user bubble: %q", m.Content)
}
}
}
func TestChatHistory_NoUserMessageWhenRequestBodyNull(t *testing.T) {
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, nil, neverInternal)
for _, m := range msgs {
if m.Role == "user" {
t.Errorf("emitted user bubble despite null request_body: %+v", m)
}
}
}
func TestChatHistory_UserAttachmentsHydratedFromRequestBody(t *testing.T) {
body := json.RawMessage(`{
"params": {
"message": {
"parts": [
{"kind":"text","text":"here's the screenshot"},
{"kind":"file","file":{"name":"shot.png","mimeType":"image/png","uri":"workspace:/uploads/shot.png","size":4096}}
]
}
}
}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal)
var user *ChatMessage
for i := range msgs {
if msgs[i].Role == "user" {
user = &msgs[i]
break
}
}
if user == nil {
t.Fatalf("no user bubble produced")
}
if user.Content != "here's the screenshot" {
t.Errorf("content=%q", user.Content)
}
if len(user.Attachments) != 1 {
t.Fatalf("attachments=%d want 1", len(user.Attachments))
}
att := user.Attachments[0]
if att.Name != "shot.png" || att.URI != "workspace:/uploads/shot.png" || att.MimeType != "image/png" {
t.Errorf("attachment shape wrong: %+v", att)
}
if att.Size == nil || *att.Size != 4096 {
t.Errorf("size=%v want 4096", att.Size)
}
}
func TestChatHistory_AttachmentsOnlyUserBubbleWhenTextEmpty(t *testing.T) {
// Drag-drop a file with no caption — bubble should still render.
body := json.RawMessage(`{
"params": {
"message": {
"parts": [
{"kind":"file","file":{"name":"report.pdf","uri":"workspace:/uploads/report.pdf"}}
]
}
}
}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal)
if len(msgs) != 1 {
t.Fatalf("expected 1 attachments-only bubble, got %d", len(msgs))
}
if msgs[0].Role != "user" || msgs[0].Content != "" || len(msgs[0].Attachments) != 1 {
t.Errorf("unexpected: role=%q content=%q attachments=%d", msgs[0].Role, msgs[0].Content, len(msgs[0].Attachments))
}
if msgs[0].Attachments[0].Name != "report.pdf" {
t.Errorf("attachment name=%q want report.pdf", msgs[0].Attachments[0].Name)
}
}
func TestChatHistory_InternalSelfPredicateSuppressesEvenWithAttachments(t *testing.T) {
body := json.RawMessage(`{
"params": {
"message": {
"parts": [
{"kind":"text","text":"Delegation results are ready..."},
{"kind":"file","file":{"name":"x.zip","uri":"workspace:/x.zip"}}
]
}
}
}`)
predicate := func(t string) bool { return strings.HasPrefix(t, "Delegation results are ready") }
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, predicate)
for _, m := range msgs {
if m.Role == "user" {
t.Errorf("internal-self predicate did NOT suppress user bubble despite attachments: %+v", m)
}
}
}
// =====================================================================
// agent-message extraction
// =====================================================================
func TestChatHistory_AgentMessageFromResultString(t *testing.T) {
body := json.RawMessage(`{"result":"agent says hi"}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
if len(msgs) != 1 || msgs[0].Role != "agent" || msgs[0].Content != "agent says hi" {
t.Errorf("got %+v", msgs)
}
}
func TestChatHistory_RoleSystemWhenStatusError(t *testing.T) {
body := json.RawMessage(`{"result":"delegation failed"}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "error", nil, body, neverInternal)
if len(msgs) != 1 || msgs[0].Role != "system" {
t.Errorf("status=error did NOT promote role to system: %+v", msgs)
}
}
func TestChatHistory_RoleSystemWhenAgentErrorPrefix(t *testing.T) {
// Defense-in-depth — if a runtime returns ok status but the text
// itself starts with "agent error", the canvas would still
// render system role. Mirror that here.
body := json.RawMessage(`{"result":"Agent error: ProcessError(exit=1)"}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
if len(msgs) != 1 || msgs[0].Role != "system" {
t.Errorf("agent-error prefix did NOT promote to system: %+v", msgs)
}
}
func TestChatHistory_AgentAttachmentsFromResponseBodyParts(t *testing.T) {
// Notify shape: response_body = {"result":"<text>","parts":[{"kind":"file",...}]}
body := json.RawMessage(`{
"result": "Done — see attached.",
"parts": [
{"kind":"file","file":{"name":"build.zip","uri":"workspace:/tmp/build.zip","size":12345}}
]
}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
var agent *ChatMessage
for i := range msgs {
if msgs[i].Role == "agent" {
agent = &msgs[i]
break
}
}
if agent == nil {
t.Fatalf("no agent bubble")
}
if len(agent.Attachments) != 1 || agent.Attachments[0].Name != "build.zip" {
t.Errorf("agent attachments shape wrong: %+v", agent.Attachments)
}
if agent.Attachments[0].Size == nil || *agent.Attachments[0].Size != 12345 {
t.Errorf("size=%v want 12345", agent.Attachments[0].Size)
}
}
func TestChatHistory_NoAgentMessageWhenResponseBodyNull(t *testing.T) {
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, nil, neverInternal)
for _, m := range msgs {
if m.Role == "agent" || m.Role == "system" {
t.Errorf("emitted agent/system bubble despite null response_body: %+v", m)
}
}
}
func TestChatHistory_NoAgentMessageWhenResponseHasNoTextNoFiles(t *testing.T) {
body := json.RawMessage(`{"unrelated":"metadata"}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
for _, m := range msgs {
if m.Role == "agent" {
t.Errorf("emitted agent bubble despite empty content: %+v", m)
}
}
}
// =====================================================================
// end-to-end shape — paired user + agent with same timestamp
// =====================================================================
func TestChatHistory_PairedUserAndAgentSameTimestamp(t *testing.T) {
created := mustParseTime(t, "2026-04-25T18:00:00Z")
req := json.RawMessage(`{"params":{"message":{"parts":[{"kind":"text","text":"what's 2+2?"}]}}}`)
resp := json.RawMessage(`{"result":"4"}`)
msgs := activityRowToChatMessages(created, "ok", req, resp, neverInternal)
if len(msgs) != 2 {
t.Fatalf("expected 2 messages, got %d", len(msgs))
}
if msgs[0].Role != "user" || msgs[0].Content != "what's 2+2?" {
t.Errorf("first message wrong: %+v", msgs[0])
}
if msgs[1].Role != "agent" || msgs[1].Content != "4" {
t.Errorf("second message wrong: %+v", msgs[1])
}
if msgs[0].Timestamp != msgs[1].Timestamp {
t.Errorf("paired bubbles have different timestamps: %q vs %q", msgs[0].Timestamp, msgs[1].Timestamp)
}
}
// =====================================================================
// Go-specific: defensive parsing
// =====================================================================
func TestChatHistory_MalformedJSONInRequestBodyReturnsEmpty(t *testing.T) {
// Should NOT panic; should return no user bubble (or no message at all).
body := json.RawMessage(`{not valid json}`)
defer func() {
if r := recover(); r != nil {
t.Fatalf("panic on malformed json: %v", r)
}
}()
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", body, nil, neverInternal)
for _, m := range msgs {
if m.Role == "user" && (m.Content != "" || len(m.Attachments) > 0) {
t.Errorf("malformed JSON yielded a non-empty user bubble: %+v", m)
}
}
}
func TestChatHistory_V1ProtobufFlatFileShape(t *testing.T) {
// v1 a2a-sdk shape: flat parts with url/filename/mediaType
body := json.RawMessage(`{
"result": {
"parts": [
{"url":"https://example.com/data.csv","filename":"data.csv","mediaType":"text/csv"}
]
}
}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
var agent *ChatMessage
for i := range msgs {
if msgs[i].Role == "agent" {
agent = &msgs[i]
break
}
}
if agent == nil {
t.Fatalf("no agent bubble for v1 shape")
}
if len(agent.Attachments) != 1 {
t.Fatalf("attachments=%d want 1", len(agent.Attachments))
}
att := agent.Attachments[0]
if att.Name != "data.csv" || att.URI != "https://example.com/data.csv" || att.MimeType != "text/csv" {
t.Errorf("v1 shape extracted wrong: %+v", att)
}
}
func TestChatHistory_TaskShapeArtifactsExtracted(t *testing.T) {
// {"result":{"artifacts":[{"parts":[{"kind":"text","text":"..."}]}]}}
body := json.RawMessage(`{
"result": {
"artifacts": [
{"parts": [{"kind":"text","text":"hermes detail line"}]}
]
}
}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
if len(msgs) != 1 || msgs[0].Content != "hermes detail line" {
t.Errorf("artifact text not extracted: %+v", msgs)
}
}
func TestChatHistory_OlderNestedRootTextShape(t *testing.T) {
// Older shape: {parts: [{root: {text: "..."}}]}
body := json.RawMessage(`{
"result": {
"parts": [{"root":{"text":"legacy nested text"}}]
}
}`)
msgs := activityRowToChatMessages(mustParseTime(t, fixedTimestamp), "ok", nil, body, neverInternal)
if len(msgs) != 1 || !strings.Contains(msgs[0].Content, "legacy nested text") {
t.Errorf("nested root.text not extracted: %+v", msgs)
}
}
// =====================================================================
// IsInternalSelfMessage predicate itself
// =====================================================================
func TestChatHistory_IsInternalSelfMessage_DelegationPrefix(t *testing.T) {
if !IsInternalSelfMessage("Delegation results are ready... <body>") {
t.Errorf("Delegation-results prefix should be flagged internal-self")
}
if IsInternalSelfMessage("Delegation completed but not ready") {
t.Errorf("non-prefix match should NOT flag")
}
if IsInternalSelfMessage("") {
t.Errorf("empty text should NOT flag (legitimate attachments-only bubble)")
}
}
// =====================================================================
// basename helper — mirrors canvas basename() semantics
// =====================================================================
func TestChatHistory_BasenameStripsSchemeAndPath(t *testing.T) {
cases := []struct {
in, want string
}{
{"workspace:/uploads/shot.png", "shot.png"},
{"workspace:/a/b/c/file.txt", "file.txt"},
{"https://example.com/path/file.csv", "file.csv"},
{"http://x/y", "y"},
{"", "file"},
{"workspace:", "file"}, // scheme-only collapses to "" → "file" sentinel, matches canvas basename
}
for _, tc := range cases {
got := basename(tc.in)
if got != tc.want {
t.Errorf("basename(%q) = %q want %q", tc.in, got, tc.want)
}
}
}

View File

@ -11,6 +11,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/buildinfo"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/channels"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/messagestore"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
@ -315,11 +316,16 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
wsAuth.POST("/activity", acth.Report)
wsAuth.POST("/notify", acth.Notify)
// Chat history — RFC #2945 PR-C (issue #3017). Server-side
// rendering of activity_logs rows into the canonical
// ChatMessage shape so canvas (and future API consumers) don't
// re-implement the A2A-envelope walk per-client.
chh := handlers.NewChatHistoryHandler()
// Chat history — RFC #2945 PR-C (issue #3017) + PR-D (issue
// #3026). Server-side rendering of activity_logs rows into
// the canonical ChatMessage shape; storage is plugin-shaped
// via the messagestore.MessageStore interface so OSS
// operators can swap in S3 / vector / in-memory backends
// without forking the handler. Platform default uses
// PostgresMessageStore wrapping the existing activity_logs
// table.
chatStore := messagestore.NewPostgresMessageStore(db.DB)
chh := handlers.NewChatHistoryHandler(chatStore)
wsAuth.GET("/chat-history", chh.List)
// Config