From d99b3f2aeceb69aeed4d98b7fa494242bd23fe01 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 15:29:42 -0700 Subject: [PATCH 01/12] refactor(handlers): consolidate Notify + MCP send_message_to_user through AgentMessageWriter (RFC #2945 PR-A) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-RFC-#2945 the broadcast + activity_log INSERT for "agent → user chat" was duplicated across two handlers — activity.go's Notify (HTTP /notify) and mcp_tools.go's toolSendMessageToUser (MCP tools/call). The duplication is exactly what produced the reno-stars production data-loss regression (PR #2944): the persistence-half fix landed for one handler and silently lagged for the other for months, dropping every long-form external-agent message on reload. PR #2944 added the missing INSERT to mcp_tools.go and a forward- looking AST gate. This PR removes the duplication at the source. What changes ------------ NEW: workspace-server/internal/handlers/agent_message_writer.go - AgentMessageWriter struct + NewAgentMessageWriter ctor. - Send(ctx, workspaceID, message, attachments) error: workspace lookup → broadcast WS AGENT_MESSAGE → INSERT activity_logs. - ErrWorkspaceNotFound for the lookup-miss path so callers can return 404 / JSON-RPC error cleanly. - Best-effort persistence: INSERT failure logs only, returns nil so the broadcast success isn't undone (matches previous behavior in both call sites — pinned by test). - Takes events.EventEmitter (interface) so tests can substitute a capturing fake without nil-panicking inside hub.Broadcast. UPDATED: activity.go:Notify - Replaced ~75 lines of inline broadcast+INSERT with a 12-line call to AgentMessageWriter.Send. - Attachment shape conversion (NotifyAttachment → AgentMessageAttachment) is local to the HTTP handler; the writer's API doesn't import the HTTP-binding-tagged type. UPDATED: mcp_tools.go:toolSendMessageToUser - Replaced ~40 lines (the post-#2944 broadcast+INSERT pair) with a 6-line call to the writer. - Attachments is nil today because the MCP tool args don't expose attachments yet. When the schema adds it, build the slice and pass through; the writer half is ready. Tests ----- agent_message_writer_test.go (8 tests, comprehensive): - TestAgentMessageWriter_Send_Success_NoAttachments — happy path, pins JSON `{"result":"hi"}`. - TestAgentMessageWriter_Send_Success_WithAttachments — pins file parts shape (kind=file, file.{uri,name,mimeType,size}). Uses a jsonMatcher that decodes + asserts via predicate (tolerant of map key ordering, exact on shape). - TestAgentMessageWriter_Send_WorkspaceNotFound — pins ErrWorkspaceNotFound + asserts NO broadcast NO INSERT. - TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil — pins best-effort persistence contract. - TestAgentMessageWriter_Send_PreviewTruncation — pins ≤80-char preview + ellipsis (Ryan's onboarding-friction report would have bloated activity_logs.summary by 2KB without this). - TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent — pins WS event name + payload shape via capturingEmitter. - TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty — pins the "no key when nil" wire contract. The existing AST gate from #2944 (TestAgentMessageBroadcastsArePersisted) still holds: any future function emitting AGENT_MESSAGE without an INSERT fails the test. With the writer in place that's now redundant — both producers go through it — but the gate is cheap to keep as defense-in-depth. Verified: go vet clean; all writer + caller tests pass; existing TestNotify_* + TestMCPHandler_SendMessage_* + the AST gate all green. Refs RFC #2945, PR #2944. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/activity.go | 92 ++--- .../internal/handlers/agent_message_writer.go | 164 +++++++++ .../handlers/agent_message_writer_test.go | 331 ++++++++++++++++++ .../internal/handlers/mcp_tools.go | 65 +--- 4 files changed, 533 insertions(+), 119 deletions(-) create mode 100644 workspace-server/internal/handlers/agent_message_writer.go create mode 100644 workspace-server/internal/handlers/agent_message_writer_test.go diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index 7c90ff52..db63b155 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -465,78 +465,30 @@ func (h *ActivityHandler) Notify(c *gin.Context) { } } - // Verify workspace exists - var wsName string - err := db.DB.QueryRowContext(c.Request.Context(), - `SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`, workspaceID, - ).Scan(&wsName) - if err != nil { - c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"}) - return + // Single source of truth for chat-bearing agent → user messages — + // see agent_message_writer.go for the contract. Pre-RFC-#2945, the + // broadcast + INSERT pair was inlined here and again in + // mcp_tools.go's send_message_to_user, and the duplication is what + // produced the reno-stars data-loss regression. Both paths now + // route through the same writer; future channels (Slack, Discord, + // Lark) hook in here too. + attachments := make([]AgentMessageAttachment, 0, len(body.Attachments)) + for _, a := range body.Attachments { + attachments = append(attachments, AgentMessageAttachment{ + URI: a.URI, + Name: a.Name, + MimeType: a.MimeType, + Size: a.Size, + }) } - - broadcastPayload := map[string]interface{}{ - "message": body.Message, - "workspace_id": workspaceID, - "name": wsName, - } - if len(body.Attachments) > 0 { - broadcastPayload["attachments"] = body.Attachments - } - h.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", broadcastPayload) - - // Persist to activity_logs so the chat history loader restores this - // message after a page reload. Pre-fix, send_message_to_user pushes - // were broadcast-only — survived the WebSocket session but vanished - // when the user refreshed because nothing wrote them to the DB. - // - // Shape chosen to match the existing loader query - // (`type=a2a_receive&source=canvas`): - // - activity_type='a2a_receive' so it joins the same query path - // - source_id=NULL so the canvas-source filter accepts it - // - method='notify' to distinguish from real A2A receives in audits - // - request_body=NULL so the loader doesn't append a duplicate - // "user message" bubble for it - // - response_body={"result": ""} matches extractResponseText's - // simplest branch ({result: string} → take verbatim) - // - // Errors are logged-only — broadcast already succeeded, the user - // sees the message; persistence failure just means the message - // won't survive reload (pre-fix behavior). Don't fail the whole - // notify on a DB hiccup. - // response_body shape — chosen to feed BOTH: - // - extractResponseText: looks at body.result (string) and returns it - // - extractFilesFromTask: looks at body.parts[] for kind=file - // so a chat reload after a notify-with-attachments restores both - // the text bubble AND the download chips. - respPayload := map[string]interface{}{"result": body.Message} - if len(body.Attachments) > 0 { - fileParts := make([]map[string]interface{}, 0, len(body.Attachments)) - for _, a := range body.Attachments { - fileMeta := map[string]interface{}{"uri": a.URI, "name": a.Name} - if a.MimeType != "" { - fileMeta["mimeType"] = a.MimeType - } - if a.Size > 0 { - fileMeta["size"] = a.Size - } - fileParts = append(fileParts, map[string]interface{}{ - "kind": "file", - "file": fileMeta, - }) + writer := NewAgentMessageWriter(db.DB, h.broadcaster) + if err := writer.Send(c.Request.Context(), workspaceID, body.Message, attachments); err != nil { + if errors.Is(err, ErrWorkspaceNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"}) + return } - respPayload["parts"] = fileParts - } - respJSON, _ := json.Marshal(respPayload) - preview := body.Message - if len(preview) > 80 { - preview = preview[:80] + "…" - } - if _, err := db.DB.ExecContext(c.Request.Context(), ` - INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status) - VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok') - `, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil { - log.Printf("Notify: failed to persist message for %s: %v", workspaceID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"}) + return } c.JSON(http.StatusOK, gin.H{"status": "sent"}) diff --git a/workspace-server/internal/handlers/agent_message_writer.go b/workspace-server/internal/handlers/agent_message_writer.go new file mode 100644 index 00000000..b6856847 --- /dev/null +++ b/workspace-server/internal/handlers/agent_message_writer.go @@ -0,0 +1,164 @@ +package handlers + +// AgentMessageWriter is the SSOT for "agent → user" message delivery in the +// workspace-server. Every chat-bearing path that surfaces a message to the +// canvas — HTTP /notify (Notify handler), MCP tools/call +// send_message_to_user (toolSendMessageToUser), any future channel — MUST +// route through this writer rather than re-implement the broadcast + +// persist contract inline. +// +// Why: pre-consolidation, two handlers duplicated the same "broadcast then +// INSERT activity_logs" sequence. The reno-stars production data-loss +// incident (2026-05-05, RFC #2945, PR #2944) was the symptom — the +// persistence half landed for /notify but lagged for the MCP bridge by +// months, silently dropping every long-form external-agent message until +// reload. The AST gate from #2944 catches drift; this writer eliminates +// the *possibility* of drift by giving both call sites a single +// well-tested function to call. +// +// Contract: +// 1. Look up the workspace by id; ErrWorkspaceNotFound on miss so the +// caller can return 404 with a clean message. +// 2. Broadcast a WS AGENT_MESSAGE event with {message, workspace_id, +// name, attachments?}. +// 3. INSERT a row into activity_logs: +// type='a2a_receive', method='notify', source_id NULL, +// response_body={"result": message[, "parts": [file kind...]]}, +// status='ok' +// Best-effort — INSERT failure logs only, returns nil so the broadcast +// success isn't undone on the caller side. +// 4. Returns nil on success. +// +// The shape (especially the JSON response_body) is the wire contract the +// canvas's chat-history hydrator (canvas/src/.../historyHydration.ts) +// reads. Drift here silently breaks chat replay across all consumers, so +// changes to the JSON shape MUST be cross-verified against the hydrator +// in the same PR. + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "log" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" +) + +// ErrWorkspaceNotFound is returned by AgentMessageWriter.Send when the +// workspace lookup turns up nothing (or the workspace is in +// status='removed'). Callers translate to HTTP 404 / JSON-RPC error / +// whatever surface they expose. +var ErrWorkspaceNotFound = errors.New("agent_message: workspace not found") + +// AgentMessageAttachment is one file attached to an agent → user +// message. Identical to handlers.NotifyAttachment in field set; kept +// distinct so the writer's API doesn't import a handler type with HTTP +// binding tags. +type AgentMessageAttachment struct { + URI string + Name string + MimeType string + Size int64 +} + +// AgentMessageWriter persists + broadcasts agent → user messages. Construct +// once per process via NewAgentMessageWriter; pass the same instance to +// every handler that delivers chat (Notify, toolSendMessageToUser, etc.). +// +// Takes events.EventEmitter (not the *Broadcaster concrete type) so tests +// can substitute a fake emitter and producers in other packages can wrap +// the real broadcaster behind their own metrics / retries without leaking +// the concrete dependency. +type AgentMessageWriter struct { + db *sql.DB + broadcaster events.EventEmitter +} + +// NewAgentMessageWriter binds the writer to the platform's DB pool + +// WebSocket broadcaster. +func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter) *AgentMessageWriter { + return &AgentMessageWriter{db: db, broadcaster: broadcaster} +} + +// Send delivers a single agent → user message. Look up + broadcast + +// persist in that order; ErrWorkspaceNotFound short-circuits before any +// broadcast or DB write so callers can 404 cleanly. +// +// Returns nil on success — including on DB-INSERT failure (the broadcast +// already returned successfully and the user has seen the message; the +// persistence-failure mode is logged at WARN but the caller's response +// stays 200 so the agent doesn't retry and double-broadcast). +func (w *AgentMessageWriter) Send( + ctx context.Context, + workspaceID, message string, + attachments []AgentMessageAttachment, +) error { + // 1. Workspace lookup. status='removed' filter is the same shape /notify + // used pre-consolidation; deleted workspaces don't get notifications. + var wsName string + if err := w.db.QueryRowContext(ctx, + `SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`, + workspaceID, + ).Scan(&wsName); err != nil { + // Includes sql.ErrNoRows; canonicalize so callers don't have to + // import database/sql to compare. + return ErrWorkspaceNotFound + } + + // 2. Build broadcast payload + WS-emit. Same shape that ChatTab's + // AGENT_MESSAGE handler in canvas/src/store/canvas-events.ts has + // consumed since the canvas chat shipped — drift here would orphan + // every live chat panel. + broadcastPayload := map[string]interface{}{ + "message": message, + "workspace_id": workspaceID, + "name": wsName, + } + if len(attachments) > 0 { + broadcastPayload["attachments"] = attachments + } + w.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", broadcastPayload) + + // 3. Persist for chat-history hydration. response_body shape MUST stay + // in sync with extractResponseText + extractFilesFromTask in + // canvas/src/components/tabs/chat/historyHydration.ts: + // - extractResponseText reads body.result (string) → renders text + // - extractFilesFromTask reads body.parts[] (kind=file) → renders chips + respPayload := map[string]interface{}{"result": message} + if len(attachments) > 0 { + fileParts := make([]map[string]interface{}, 0, len(attachments)) + for _, a := range attachments { + fileMeta := map[string]interface{}{"uri": a.URI, "name": a.Name} + if a.MimeType != "" { + fileMeta["mimeType"] = a.MimeType + } + if a.Size > 0 { + fileMeta["size"] = a.Size + } + fileParts = append(fileParts, map[string]interface{}{ + "kind": "file", + "file": fileMeta, + }) + } + respPayload["parts"] = fileParts + } + respJSON, _ := json.Marshal(respPayload) + preview := message + if len(preview) > 80 { + preview = preview[:80] + "…" + } + if _, err := w.db.ExecContext(ctx, ` + INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status) + VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok') + `, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil { + // Best-effort: the broadcast already returned ok and the user + // has seen the message. Logging a structured line lets operators + // notice persistence-failure rates spike if the DB is unhealthy, + // without breaking the tool response or causing the agent to + // retry-and-double-broadcast. + log.Printf("agent_message: failed to persist for %s: %v", workspaceID, err) + } + + return nil +} diff --git a/workspace-server/internal/handlers/agent_message_writer_test.go b/workspace-server/internal/handlers/agent_message_writer_test.go new file mode 100644 index 00000000..d311c926 --- /dev/null +++ b/workspace-server/internal/handlers/agent_message_writer_test.go @@ -0,0 +1,331 @@ +package handlers + +import ( + "context" + "database/sql/driver" + "encoding/json" + "errors" + "strings" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" +) + +// AgentMessageWriter is the SSOT for agent → user chat delivery +// (RFC #2945 PR-A). These tests pin the contract the writer +// guarantees: workspace lookup, broadcast, INSERT, error semantics — +// every shape that producers (Notify, toolSendMessageToUser, future +// channels) rely on. +// +// Pre-consolidation, the broadcast-then-INSERT logic was duplicated +// across two handlers and they drifted (reno-stars, 2026-05-05). With +// the writer being the only place this logic lives, these tests are +// the regression line for every chat-bearing path simultaneously. + +// jsonMatcher is a sqlmock Argument matcher that decodes the actual +// SQL arg as JSON and runs a caller-supplied predicate over the +// resulting structure. Tighter than substring matching (which can +// false-pass on a renamed key) and tolerant of map-key ordering +// (which exact-string matching is not). +type jsonMatcher struct { + predicate func(parsed map[string]any) bool + desc string +} + +func (m jsonMatcher) Match(v driver.Value) bool { + s, ok := v.(string) + if !ok { + return false + } + var parsed map[string]any + if err := json.Unmarshal([]byte(s), &parsed); err != nil { + return false + } + return m.predicate(parsed) +} + +// stringMatcher pins exact prefix/suffix/equality checks against a +// driver.Value that's actually a string. +type stringMatcher func(string) bool + +func (f stringMatcher) Match(v driver.Value) bool { + s, ok := v.(string) + if !ok { + return false + } + return f(s) +} + +// capturingEmitter records every BroadcastOnly call so tests can pin +// the WS event shape without a real ws.Hub. RecordAndBroadcast is +// also captured for completeness — the writer doesn't call it today, +// but a future producer might, and a captured-but-unasserted record +// is easier to diagnose than a nil panic. +type capturingEmitter struct { + events []capturedEvent +} + +type capturedEvent struct { + workspaceID string + eventType string + payload interface{} +} + +func (c *capturingEmitter) BroadcastOnly(workspaceID string, eventType string, payload interface{}) { + c.events = append(c.events, capturedEvent{workspaceID, eventType, payload}) +} + +func (c *capturingEmitter) RecordAndBroadcast(_ context.Context, eventType string, workspaceID string, payload interface{}) error { + c.events = append(c.events, capturedEvent{workspaceID, eventType, payload}) + return nil +} + +// TestAgentMessageWriter_Send_Success_NoAttachments pins the happy +// path: workspace lookup, broadcast, INSERT, return nil. +func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) { + mock := setupTestDB(t) + w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) + + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-1"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + + mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`). + WithArgs( + "ws-1", + sqlmock.AnyArg(), // summary + `{"result":"hi"}`, + ). + WillReturnResult(sqlmock.NewResult(1, 1)) + + if err := w.Send(context.Background(), "ws-1", "hi", nil); err != nil { + t.Fatalf("Send returned %v, want nil", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("DB expectations: %v", err) + } +} + +// TestAgentMessageWriter_Send_Success_WithAttachments pins the file +// attachment shape — response_body MUST contain a parts[] array with +// kind=file entries so the canvas hydrater renders download chips. +// Drift here = chips disappear on chat reload. +func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) { + mock := setupTestDB(t) + w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) + + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-att"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan")) + + mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`). + WithArgs( + "ws-att", + sqlmock.AnyArg(), + jsonMatcher{ + desc: "response_body has result + parts with kind=file metadata", + predicate: func(p map[string]any) bool { + if p["result"] != "see attached" { + return false + } + parts, ok := p["parts"].([]any) + if !ok || len(parts) != 1 { + return false + } + part, ok := parts[0].(map[string]any) + if !ok { + return false + } + if part["kind"] != "file" { + return false + } + file, ok := part["file"].(map[string]any) + if !ok { + return false + } + return file["uri"] == "workspace://x.zip" && + file["name"] == "x.zip" && + file["mimeType"] == "application/zip" && + file["size"].(float64) == 1234 + }, + }, + ). + WillReturnResult(sqlmock.NewResult(1, 1)) + + atts := []AgentMessageAttachment{ + {URI: "workspace://x.zip", Name: "x.zip", MimeType: "application/zip", Size: 1234}, + } + if err := w.Send(context.Background(), "ws-att", "see attached", atts); err != nil { + t.Fatalf("Send returned %v, want nil", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("DB expectations: %v", err) + } +} + +// TestAgentMessageWriter_Send_WorkspaceNotFound pins ErrWorkspaceNotFound +// short-circuit. Must NOT broadcast, MUST NOT INSERT — caller will 404 +// or surface a JSON-RPC error. +func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) { + mock := setupTestDB(t) + emitter := &capturingEmitter{} + w := NewAgentMessageWriter(db.DB, emitter) + + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-missing"). + WillReturnRows(sqlmock.NewRows([]string{"name"})) + + err := w.Send(context.Background(), "ws-missing", "lost in the void", nil) + if !errors.Is(err, ErrWorkspaceNotFound) { + t.Errorf("Send returned %v, want ErrWorkspaceNotFound", err) + } + if len(emitter.events) != 0 { + t.Errorf("workspace-not-found path MUST NOT broadcast, got %d events", len(emitter.events)) + } + // Implicit: no INSERT expectation registered, so a stray INSERT + // would fail ExpectationsWereMet. + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("DB expectations (INSERT must NOT fire on workspace-not-found): %v", err) + } +} + +// TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil pins the +// "best-effort persistence" contract: when the activity_log INSERT +// fails (DB hiccup, transient connection, constraint), the writer +// MUST still return nil. The broadcast already succeeded; the user +// has seen the message; returning an error here would cause the +// caller (and the agent calling the tool) to retry and double- +// broadcast. +func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) { + mock := setupTestDB(t) + w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) + + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-dbfail"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + + mock.ExpectExec(`INSERT INTO activity_logs`). + WillReturnError(errors.New("transient db error")) + + err := w.Send(context.Background(), "ws-dbfail", "should not be lost from live chat", nil) + if err != nil { + t.Errorf("DB INSERT failure must return nil (broadcast already succeeded), got %v", err) + } +} + +// TestAgentMessageWriter_Send_PreviewTruncation pins the summary +// preview cap. Long messages (Ryan's onboarding-friction report was +// ~2k chars) must summarise to ≤80 chars + ellipsis so the activity +// table doesn't carry multi-KB summaries that bloat list queries. +func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) { + mock := setupTestDB(t) + w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) + + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-trunc"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan")) + + longMsg := strings.Repeat("x", 200) + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs( + "ws-trunc", + stringMatcher(func(s string) bool { + if !strings.HasPrefix(s, "Agent message: ") { + return false + } + preview := strings.TrimPrefix(s, "Agent message: ") + if !strings.HasSuffix(preview, "…") { + return false + } + body := strings.TrimSuffix(preview, "…") + return len(body) == 80 + }), + sqlmock.AnyArg(), + ). + WillReturnResult(sqlmock.NewResult(1, 1)) + + if err := w.Send(context.Background(), "ws-trunc", longMsg, nil); err != nil { + t.Fatalf("Send: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("preview truncation drift: %v", err) + } +} + +// TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent pins the +// WS event name + payload shape. The canvas's +// canvas-events.ts:AGENT_MESSAGE handler reads {message, workspace_id, +// name, attachments?} — drift here orphans every live chat panel. +func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) { + mock := setupTestDB(t) + emitter := &capturingEmitter{} + w := NewAgentMessageWriter(db.DB, emitter) + + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-bc"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Workspace Name")) + mock.ExpectExec(`INSERT INTO activity_logs`). + WillReturnResult(sqlmock.NewResult(1, 1)) + + atts := []AgentMessageAttachment{ + {URI: "workspace://a.txt", Name: "a.txt"}, + } + if err := w.Send(context.Background(), "ws-bc", "hi", atts); err != nil { + t.Fatalf("Send: %v", err) + } + + if len(emitter.events) != 1 { + t.Fatalf("expected exactly 1 broadcast, got %d", len(emitter.events)) + } + ev := emitter.events[0] + if ev.eventType != "AGENT_MESSAGE" { + t.Errorf("event type = %q, want AGENT_MESSAGE", ev.eventType) + } + if ev.workspaceID != "ws-bc" { + t.Errorf("workspace_id = %q, want ws-bc", ev.workspaceID) + } + pl, ok := ev.payload.(map[string]interface{}) + if !ok { + t.Fatalf("payload not a map: %T", ev.payload) + } + if pl["message"] != "hi" { + t.Errorf("payload.message = %v, want hi", pl["message"]) + } + if pl["workspace_id"] != "ws-bc" { + t.Errorf("payload.workspace_id = %v, want ws-bc", pl["workspace_id"]) + } + if pl["name"] != "Workspace Name" { + t.Errorf("payload.name = %v, want Workspace Name", pl["name"]) + } + if pl["attachments"] == nil { + t.Error("payload.attachments missing on attachment-bearing send") + } +} + +// TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty pins the +// "no key when nil" wire contract — extra empty fields would force +// canvas consumers to defensively check for [] vs undefined; the +// existing AGENT_MESSAGE handler treats absence as "no attachments". +func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) { + mock := setupTestDB(t) + emitter := &capturingEmitter{} + w := NewAgentMessageWriter(db.DB, emitter) + + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-noatt"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("X")) + mock.ExpectExec(`INSERT INTO activity_logs`). + WillReturnResult(sqlmock.NewResult(1, 1)) + + if err := w.Send(context.Background(), "ws-noatt", "plain text", nil); err != nil { + t.Fatalf("Send: %v", err) + } + if len(emitter.events) != 1 { + t.Fatalf("expected 1 event, got %d", len(emitter.events)) + } + pl := emitter.events[0].payload.(map[string]interface{}) + if _, present := pl["attachments"]; present { + t.Errorf("attachments key MUST NOT be present when empty (canvas treats absence as 'none'); payload=%v", pl) + } +} diff --git a/workspace-server/internal/handlers/mcp_tools.go b/workspace-server/internal/handlers/mcp_tools.go index 961921de..dfb93e48 100644 --- a/workspace-server/internal/handlers/mcp_tools.go +++ b/workspace-server/internal/handlers/mcp_tools.go @@ -11,6 +11,7 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "io" "log" @@ -330,57 +331,23 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri return "", fmt.Errorf("send_message_to_user is not enabled on this MCP bridge (set MOLECULE_MCP_ALLOW_SEND_MESSAGE=true)") } - var wsName string - err := h.database.QueryRowContext(ctx, - `SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`, workspaceID, - ).Scan(&wsName) - if err != nil { - return "", fmt.Errorf("workspace not found") - } - - h.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", map[string]interface{}{ - "message": message, - "workspace_id": workspaceID, - "name": wsName, - }) - - // Persist to activity_logs so chat history loaders surface this - // message after a page reload. Pre-fix (reno-stars 2026-05-05), - // the MCP-bridge variant of send_message_to_user broadcast WS - // only — visible live, gone on reload — while the HTTP /notify - // sibling already had this fix (activity.go:535). External - // claude-code agents using molecule-mcp's send_message_to_user - // tool route through THIS handler, not /notify, so they were - // hitting the unfixed path. + // Single source of truth for chat-bearing agent → user messages — + // see agent_message_writer.go for the contract. The pre-RFC-#2945 + // duplication of broadcast + INSERT logic between this handler and + // activity.go:Notify is what produced the reno-stars data-loss + // regression; both paths now route through the same writer. // - // Shape mirrors activity.go's Notify handler exactly so the - // canvas chat-history hydration treats both the same: - // - activity_type='a2a_receive' joins the source=canvas filter - // - source_id is omitted → defaults to NULL ("canvas-source") - // - method='notify' tags it as a push (vs a real A2A receive) - // - request_body=NULL so the loader doesn't draw a duplicate - // "user" bubble - // - response_body={"result": ""} feeds extractResponseText - // directly - // - // Errors are log-only — the broadcast already returned ok to the - // caller, the user has seen the message, and the persistence - // failure mode (DB hiccup) shouldn't block the tool call. The - // downside is the same as pre-fix: message vanishes on reload - // after a transient DB error. Log it so operators have a signal. - respPayload := map[string]interface{}{"result": message} - respJSON, _ := json.Marshal(respPayload) - preview := message - if len(preview) > 80 { - preview = preview[:80] + "…" + // MCP send_message_to_user does not currently surface attachments + // (the tool args don't accept them); pass nil. If a future tool + // schema adds an attachments arg, build []AgentMessageAttachment + // and pass through. + writer := NewAgentMessageWriter(h.database, h.broadcaster) + if err := writer.Send(ctx, workspaceID, message, nil); err != nil { + if errors.Is(err, ErrWorkspaceNotFound) { + return "", fmt.Errorf("workspace not found") + } + return "", err } - if _, err := h.database.ExecContext(ctx, ` - INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status) - VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok') - `, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil { - log.Printf("MCP send_message_to_user: failed to persist for %s: %v", workspaceID, err) - } - return "Message sent.", nil } From f01f37407218aa352b130a6060b0a4b1364767c2 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 15:44:36 -0700 Subject: [PATCH 02/12] feat(mcp): add `molecule-mcp doctor` onboarding diagnostic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #2934 item 6 — the deferred follow-up from Ryan's onboarding- friction report. Quote: "this single command would have saved me 30 of the 45 minutes." When push delivery fails or the install half-works, the operator today has no signal — they hand-grep the Claude Code binary or chase the `from versions: none` red herring. Doctor renders six checks in one screen with concrete next-step suggestions: 1. Python version >=3.11? (wheel's pin) 2. Wheel install molecule-ai-workspace-runtime importable + version surfaced 3. PATH for binary `molecule-mcp` resolves on PATH; if not, prints the resolved user-site bin dir to add (or recommends pipx) 4. Env vars PLATFORM_URL + WORKSPACE_ID + token (env or *_FILE or .auth_token) 5. Platform reach GET ${PLATFORM_URL}/healthz returns 2xx 6. Registry register POST /registry/register with the resolved token returns 2xx — end-to-end auth check Each line: `[OK|WARN|FAIL]