refactor(handlers): consolidate Notify + MCP send_message_to_user through AgentMessageWriter (RFC #2945 PR-A)

Pre-RFC-#2945 the broadcast + activity_log INSERT for "agent → user
chat" was duplicated across two handlers — activity.go's Notify (HTTP
/notify) and mcp_tools.go's toolSendMessageToUser (MCP tools/call).
The duplication is exactly what produced the reno-stars production
data-loss regression (PR #2944): the persistence-half fix landed for
one handler and silently lagged for the other for months, dropping
every long-form external-agent message on reload.

PR #2944 added the missing INSERT to mcp_tools.go and a forward-
looking AST gate. This PR removes the duplication at the source.

What changes
------------

NEW: workspace-server/internal/handlers/agent_message_writer.go
- AgentMessageWriter struct + NewAgentMessageWriter ctor.
- Send(ctx, workspaceID, message, attachments) error: workspace
  lookup → broadcast WS AGENT_MESSAGE → INSERT activity_logs.
- ErrWorkspaceNotFound for the lookup-miss path so callers can
  return 404 / JSON-RPC error cleanly.
- Best-effort persistence: INSERT failure logs only, returns nil so
  the broadcast success isn't undone (matches previous behavior in
  both call sites — pinned by test).
- Takes events.EventEmitter (interface) so tests can substitute a
  capturing fake without nil-panicking inside hub.Broadcast.

UPDATED: activity.go:Notify
- Replaced ~75 lines of inline broadcast+INSERT with a 12-line
  call to AgentMessageWriter.Send.
- Attachment shape conversion (NotifyAttachment → AgentMessageAttachment)
  is local to the HTTP handler; the writer's API doesn't import the
  HTTP-binding-tagged type.

UPDATED: mcp_tools.go:toolSendMessageToUser
- Replaced ~40 lines (the post-#2944 broadcast+INSERT pair) with a
  6-line call to the writer.
- Attachments is nil today because the MCP tool args don't expose
  attachments yet. When the schema adds it, build the slice and
  pass through; the writer half is ready.

Tests
-----

agent_message_writer_test.go (8 tests, comprehensive):
- TestAgentMessageWriter_Send_Success_NoAttachments — happy path,
  pins JSON `{"result":"hi"}`.
- TestAgentMessageWriter_Send_Success_WithAttachments — pins file
  parts shape (kind=file, file.{uri,name,mimeType,size}). Uses a
  jsonMatcher that decodes + asserts via predicate (tolerant of
  map key ordering, exact on shape).
- TestAgentMessageWriter_Send_WorkspaceNotFound — pins
  ErrWorkspaceNotFound + asserts NO broadcast NO INSERT.
- TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil — pins
  best-effort persistence contract.
- TestAgentMessageWriter_Send_PreviewTruncation — pins ≤80-char
  preview + ellipsis (Ryan's onboarding-friction report would have
  bloated activity_logs.summary by 2KB without this).
- TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent — pins WS
  event name + payload shape via capturingEmitter.
- TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty — pins
  the "no key when nil" wire contract.

The existing AST gate from #2944
(TestAgentMessageBroadcastsArePersisted) still holds: any future
function emitting AGENT_MESSAGE without an INSERT fails the test.
With the writer in place that's now redundant — both producers go
through it — but the gate is cheap to keep as defense-in-depth.

Verified: go vet clean; all writer + caller tests pass; existing
TestNotify_* + TestMCPHandler_SendMessage_* + the AST gate all green.

Refs RFC #2945, PR #2944.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-05-05 15:29:42 -07:00
parent c4807a930d
commit d99b3f2aec
4 changed files with 533 additions and 119 deletions

View File

@ -465,78 +465,30 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
}
}
// Verify workspace exists
var wsName string
err := db.DB.QueryRowContext(c.Request.Context(),
`SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`, workspaceID,
).Scan(&wsName)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
// Single source of truth for chat-bearing agent → user messages —
// see agent_message_writer.go for the contract. Pre-RFC-#2945, the
// broadcast + INSERT pair was inlined here and again in
// mcp_tools.go's send_message_to_user, and the duplication is what
// produced the reno-stars data-loss regression. Both paths now
// route through the same writer; future channels (Slack, Discord,
// Lark) hook in here too.
attachments := make([]AgentMessageAttachment, 0, len(body.Attachments))
for _, a := range body.Attachments {
attachments = append(attachments, AgentMessageAttachment{
URI: a.URI,
Name: a.Name,
MimeType: a.MimeType,
Size: a.Size,
})
}
broadcastPayload := map[string]interface{}{
"message": body.Message,
"workspace_id": workspaceID,
"name": wsName,
}
if len(body.Attachments) > 0 {
broadcastPayload["attachments"] = body.Attachments
}
h.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", broadcastPayload)
// Persist to activity_logs so the chat history loader restores this
// message after a page reload. Pre-fix, send_message_to_user pushes
// were broadcast-only — survived the WebSocket session but vanished
// when the user refreshed because nothing wrote them to the DB.
//
// Shape chosen to match the existing loader query
// (`type=a2a_receive&source=canvas`):
// - activity_type='a2a_receive' so it joins the same query path
// - source_id=NULL so the canvas-source filter accepts it
// - method='notify' to distinguish from real A2A receives in audits
// - request_body=NULL so the loader doesn't append a duplicate
// "user message" bubble for it
// - response_body={"result": "<text>"} matches extractResponseText's
// simplest branch ({result: string} → take verbatim)
//
// Errors are logged-only — broadcast already succeeded, the user
// sees the message; persistence failure just means the message
// won't survive reload (pre-fix behavior). Don't fail the whole
// notify on a DB hiccup.
// response_body shape — chosen to feed BOTH:
// - extractResponseText: looks at body.result (string) and returns it
// - extractFilesFromTask: looks at body.parts[] for kind=file
// so a chat reload after a notify-with-attachments restores both
// the text bubble AND the download chips.
respPayload := map[string]interface{}{"result": body.Message}
if len(body.Attachments) > 0 {
fileParts := make([]map[string]interface{}, 0, len(body.Attachments))
for _, a := range body.Attachments {
fileMeta := map[string]interface{}{"uri": a.URI, "name": a.Name}
if a.MimeType != "" {
fileMeta["mimeType"] = a.MimeType
}
if a.Size > 0 {
fileMeta["size"] = a.Size
}
fileParts = append(fileParts, map[string]interface{}{
"kind": "file",
"file": fileMeta,
})
writer := NewAgentMessageWriter(db.DB, h.broadcaster)
if err := writer.Send(c.Request.Context(), workspaceID, body.Message, attachments); err != nil {
if errors.Is(err, ErrWorkspaceNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
}
respPayload["parts"] = fileParts
}
respJSON, _ := json.Marshal(respPayload)
preview := body.Message
if len(preview) > 80 {
preview = preview[:80] + "…"
}
if _, err := db.DB.ExecContext(c.Request.Context(), `
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
log.Printf("Notify: failed to persist message for %s: %v", workspaceID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
return
}
c.JSON(http.StatusOK, gin.H{"status": "sent"})

View File

@ -0,0 +1,164 @@
package handlers
// AgentMessageWriter is the SSOT for "agent → user" message delivery in the
// workspace-server. Every chat-bearing path that surfaces a message to the
// canvas — HTTP /notify (Notify handler), MCP tools/call
// send_message_to_user (toolSendMessageToUser), any future channel — MUST
// route through this writer rather than re-implement the broadcast +
// persist contract inline.
//
// Why: pre-consolidation, two handlers duplicated the same "broadcast then
// INSERT activity_logs" sequence. The reno-stars production data-loss
// incident (2026-05-05, RFC #2945, PR #2944) was the symptom — the
// persistence half landed for /notify but lagged for the MCP bridge by
// months, silently dropping every long-form external-agent message until
// reload. The AST gate from #2944 catches drift; this writer eliminates
// the *possibility* of drift by giving both call sites a single
// well-tested function to call.
//
// Contract:
// 1. Look up the workspace by id; ErrWorkspaceNotFound on miss so the
// caller can return 404 with a clean message.
// 2. Broadcast a WS AGENT_MESSAGE event with {message, workspace_id,
// name, attachments?}.
// 3. INSERT a row into activity_logs:
// type='a2a_receive', method='notify', source_id NULL,
// response_body={"result": message[, "parts": [file kind...]]},
// status='ok'
// Best-effort — INSERT failure logs only, returns nil so the broadcast
// success isn't undone on the caller side.
// 4. Returns nil on success.
//
// The shape (especially the JSON response_body) is the wire contract the
// canvas's chat-history hydrator (canvas/src/.../historyHydration.ts)
// reads. Drift here silently breaks chat replay across all consumers, so
// changes to the JSON shape MUST be cross-verified against the hydrator
// in the same PR.
import (
"context"
"database/sql"
"encoding/json"
"errors"
"log"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
)
// ErrWorkspaceNotFound is returned by AgentMessageWriter.Send when the
// workspace lookup turns up nothing (or the workspace is in
// status='removed'). Callers translate to HTTP 404 / JSON-RPC error /
// whatever surface they expose.
var ErrWorkspaceNotFound = errors.New("agent_message: workspace not found")
// AgentMessageAttachment is one file attached to an agent → user
// message. Identical to handlers.NotifyAttachment in field set; kept
// distinct so the writer's API doesn't import a handler type with HTTP
// binding tags.
type AgentMessageAttachment struct {
URI string
Name string
MimeType string
Size int64
}
// AgentMessageWriter persists + broadcasts agent → user messages. Construct
// once per process via NewAgentMessageWriter; pass the same instance to
// every handler that delivers chat (Notify, toolSendMessageToUser, etc.).
//
// Takes events.EventEmitter (not the *Broadcaster concrete type) so tests
// can substitute a fake emitter and producers in other packages can wrap
// the real broadcaster behind their own metrics / retries without leaking
// the concrete dependency.
type AgentMessageWriter struct {
db *sql.DB
broadcaster events.EventEmitter
}
// NewAgentMessageWriter binds the writer to the platform's DB pool +
// WebSocket broadcaster.
func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter) *AgentMessageWriter {
return &AgentMessageWriter{db: db, broadcaster: broadcaster}
}
// Send delivers a single agent → user message. Look up + broadcast +
// persist in that order; ErrWorkspaceNotFound short-circuits before any
// broadcast or DB write so callers can 404 cleanly.
//
// Returns nil on success — including on DB-INSERT failure (the broadcast
// already returned successfully and the user has seen the message; the
// persistence-failure mode is logged at WARN but the caller's response
// stays 200 so the agent doesn't retry and double-broadcast).
func (w *AgentMessageWriter) Send(
ctx context.Context,
workspaceID, message string,
attachments []AgentMessageAttachment,
) error {
// 1. Workspace lookup. status='removed' filter is the same shape /notify
// used pre-consolidation; deleted workspaces don't get notifications.
var wsName string
if err := w.db.QueryRowContext(ctx,
`SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`,
workspaceID,
).Scan(&wsName); err != nil {
// Includes sql.ErrNoRows; canonicalize so callers don't have to
// import database/sql to compare.
return ErrWorkspaceNotFound
}
// 2. Build broadcast payload + WS-emit. Same shape that ChatTab's
// AGENT_MESSAGE handler in canvas/src/store/canvas-events.ts has
// consumed since the canvas chat shipped — drift here would orphan
// every live chat panel.
broadcastPayload := map[string]interface{}{
"message": message,
"workspace_id": workspaceID,
"name": wsName,
}
if len(attachments) > 0 {
broadcastPayload["attachments"] = attachments
}
w.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", broadcastPayload)
// 3. Persist for chat-history hydration. response_body shape MUST stay
// in sync with extractResponseText + extractFilesFromTask in
// canvas/src/components/tabs/chat/historyHydration.ts:
// - extractResponseText reads body.result (string) → renders text
// - extractFilesFromTask reads body.parts[] (kind=file) → renders chips
respPayload := map[string]interface{}{"result": message}
if len(attachments) > 0 {
fileParts := make([]map[string]interface{}, 0, len(attachments))
for _, a := range attachments {
fileMeta := map[string]interface{}{"uri": a.URI, "name": a.Name}
if a.MimeType != "" {
fileMeta["mimeType"] = a.MimeType
}
if a.Size > 0 {
fileMeta["size"] = a.Size
}
fileParts = append(fileParts, map[string]interface{}{
"kind": "file",
"file": fileMeta,
})
}
respPayload["parts"] = fileParts
}
respJSON, _ := json.Marshal(respPayload)
preview := message
if len(preview) > 80 {
preview = preview[:80] + "…"
}
if _, err := w.db.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
// Best-effort: the broadcast already returned ok and the user
// has seen the message. Logging a structured line lets operators
// notice persistence-failure rates spike if the DB is unhealthy,
// without breaking the tool response or causing the agent to
// retry-and-double-broadcast.
log.Printf("agent_message: failed to persist for %s: %v", workspaceID, err)
}
return nil
}

View File

@ -0,0 +1,331 @@
package handlers
import (
"context"
"database/sql/driver"
"encoding/json"
"errors"
"strings"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// AgentMessageWriter is the SSOT for agent → user chat delivery
// (RFC #2945 PR-A). These tests pin the contract the writer
// guarantees: workspace lookup, broadcast, INSERT, error semantics —
// every shape that producers (Notify, toolSendMessageToUser, future
// channels) rely on.
//
// Pre-consolidation, the broadcast-then-INSERT logic was duplicated
// across two handlers and they drifted (reno-stars, 2026-05-05). With
// the writer being the only place this logic lives, these tests are
// the regression line for every chat-bearing path simultaneously.
// jsonMatcher is a sqlmock Argument matcher that decodes the actual
// SQL arg as JSON and runs a caller-supplied predicate over the
// resulting structure. Tighter than substring matching (which can
// false-pass on a renamed key) and tolerant of map-key ordering
// (which exact-string matching is not).
type jsonMatcher struct {
predicate func(parsed map[string]any) bool
desc string
}
func (m jsonMatcher) Match(v driver.Value) bool {
s, ok := v.(string)
if !ok {
return false
}
var parsed map[string]any
if err := json.Unmarshal([]byte(s), &parsed); err != nil {
return false
}
return m.predicate(parsed)
}
// stringMatcher pins exact prefix/suffix/equality checks against a
// driver.Value that's actually a string.
type stringMatcher func(string) bool
func (f stringMatcher) Match(v driver.Value) bool {
s, ok := v.(string)
if !ok {
return false
}
return f(s)
}
// capturingEmitter records every BroadcastOnly call so tests can pin
// the WS event shape without a real ws.Hub. RecordAndBroadcast is
// also captured for completeness — the writer doesn't call it today,
// but a future producer might, and a captured-but-unasserted record
// is easier to diagnose than a nil panic.
type capturingEmitter struct {
events []capturedEvent
}
type capturedEvent struct {
workspaceID string
eventType string
payload interface{}
}
func (c *capturingEmitter) BroadcastOnly(workspaceID string, eventType string, payload interface{}) {
c.events = append(c.events, capturedEvent{workspaceID, eventType, payload})
}
func (c *capturingEmitter) RecordAndBroadcast(_ context.Context, eventType string, workspaceID string, payload interface{}) error {
c.events = append(c.events, capturedEvent{workspaceID, eventType, payload})
return nil
}
// TestAgentMessageWriter_Send_Success_NoAttachments pins the happy
// path: workspace lookup, broadcast, INSERT, return nil.
func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
WithArgs(
"ws-1",
sqlmock.AnyArg(), // summary
`{"result":"hi"}`,
).
WillReturnResult(sqlmock.NewResult(1, 1))
if err := w.Send(context.Background(), "ws-1", "hi", nil); err != nil {
t.Fatalf("Send returned %v, want nil", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("DB expectations: %v", err)
}
}
// TestAgentMessageWriter_Send_Success_WithAttachments pins the file
// attachment shape — response_body MUST contain a parts[] array with
// kind=file entries so the canvas hydrater renders download chips.
// Drift here = chips disappear on chat reload.
func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-att").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan"))
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
WithArgs(
"ws-att",
sqlmock.AnyArg(),
jsonMatcher{
desc: "response_body has result + parts with kind=file metadata",
predicate: func(p map[string]any) bool {
if p["result"] != "see attached" {
return false
}
parts, ok := p["parts"].([]any)
if !ok || len(parts) != 1 {
return false
}
part, ok := parts[0].(map[string]any)
if !ok {
return false
}
if part["kind"] != "file" {
return false
}
file, ok := part["file"].(map[string]any)
if !ok {
return false
}
return file["uri"] == "workspace://x.zip" &&
file["name"] == "x.zip" &&
file["mimeType"] == "application/zip" &&
file["size"].(float64) == 1234
},
},
).
WillReturnResult(sqlmock.NewResult(1, 1))
atts := []AgentMessageAttachment{
{URI: "workspace://x.zip", Name: "x.zip", MimeType: "application/zip", Size: 1234},
}
if err := w.Send(context.Background(), "ws-att", "see attached", atts); err != nil {
t.Fatalf("Send returned %v, want nil", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("DB expectations: %v", err)
}
}
// TestAgentMessageWriter_Send_WorkspaceNotFound pins ErrWorkspaceNotFound
// short-circuit. Must NOT broadcast, MUST NOT INSERT — caller will 404
// or surface a JSON-RPC error.
func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
mock := setupTestDB(t)
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-missing").
WillReturnRows(sqlmock.NewRows([]string{"name"}))
err := w.Send(context.Background(), "ws-missing", "lost in the void", nil)
if !errors.Is(err, ErrWorkspaceNotFound) {
t.Errorf("Send returned %v, want ErrWorkspaceNotFound", err)
}
if len(emitter.events) != 0 {
t.Errorf("workspace-not-found path MUST NOT broadcast, got %d events", len(emitter.events))
}
// Implicit: no INSERT expectation registered, so a stray INSERT
// would fail ExpectationsWereMet.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("DB expectations (INSERT must NOT fire on workspace-not-found): %v", err)
}
}
// TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil pins the
// "best-effort persistence" contract: when the activity_log INSERT
// fails (DB hiccup, transient connection, constraint), the writer
// MUST still return nil. The broadcast already succeeded; the user
// has seen the message; returning an error here would cause the
// caller (and the agent calling the tool) to retry and double-
// broadcast.
func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-dbfail").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnError(errors.New("transient db error"))
err := w.Send(context.Background(), "ws-dbfail", "should not be lost from live chat", nil)
if err != nil {
t.Errorf("DB INSERT failure must return nil (broadcast already succeeded), got %v", err)
}
}
// TestAgentMessageWriter_Send_PreviewTruncation pins the summary
// preview cap. Long messages (Ryan's onboarding-friction report was
// ~2k chars) must summarise to ≤80 chars + ellipsis so the activity
// table doesn't carry multi-KB summaries that bloat list queries.
func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-trunc").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan"))
longMsg := strings.Repeat("x", 200)
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
"ws-trunc",
stringMatcher(func(s string) bool {
if !strings.HasPrefix(s, "Agent message: ") {
return false
}
preview := strings.TrimPrefix(s, "Agent message: ")
if !strings.HasSuffix(preview, "…") {
return false
}
body := strings.TrimSuffix(preview, "…")
return len(body) == 80
}),
sqlmock.AnyArg(),
).
WillReturnResult(sqlmock.NewResult(1, 1))
if err := w.Send(context.Background(), "ws-trunc", longMsg, nil); err != nil {
t.Fatalf("Send: %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("preview truncation drift: %v", err)
}
}
// TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent pins the
// WS event name + payload shape. The canvas's
// canvas-events.ts:AGENT_MESSAGE handler reads {message, workspace_id,
// name, attachments?} — drift here orphans every live chat panel.
func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
mock := setupTestDB(t)
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-bc").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Workspace Name"))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnResult(sqlmock.NewResult(1, 1))
atts := []AgentMessageAttachment{
{URI: "workspace://a.txt", Name: "a.txt"},
}
if err := w.Send(context.Background(), "ws-bc", "hi", atts); err != nil {
t.Fatalf("Send: %v", err)
}
if len(emitter.events) != 1 {
t.Fatalf("expected exactly 1 broadcast, got %d", len(emitter.events))
}
ev := emitter.events[0]
if ev.eventType != "AGENT_MESSAGE" {
t.Errorf("event type = %q, want AGENT_MESSAGE", ev.eventType)
}
if ev.workspaceID != "ws-bc" {
t.Errorf("workspace_id = %q, want ws-bc", ev.workspaceID)
}
pl, ok := ev.payload.(map[string]interface{})
if !ok {
t.Fatalf("payload not a map: %T", ev.payload)
}
if pl["message"] != "hi" {
t.Errorf("payload.message = %v, want hi", pl["message"])
}
if pl["workspace_id"] != "ws-bc" {
t.Errorf("payload.workspace_id = %v, want ws-bc", pl["workspace_id"])
}
if pl["name"] != "Workspace Name" {
t.Errorf("payload.name = %v, want Workspace Name", pl["name"])
}
if pl["attachments"] == nil {
t.Error("payload.attachments missing on attachment-bearing send")
}
}
// TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty pins the
// "no key when nil" wire contract — extra empty fields would force
// canvas consumers to defensively check for [] vs undefined; the
// existing AGENT_MESSAGE handler treats absence as "no attachments".
func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) {
mock := setupTestDB(t)
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-noatt").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("X"))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnResult(sqlmock.NewResult(1, 1))
if err := w.Send(context.Background(), "ws-noatt", "plain text", nil); err != nil {
t.Fatalf("Send: %v", err)
}
if len(emitter.events) != 1 {
t.Fatalf("expected 1 event, got %d", len(emitter.events))
}
pl := emitter.events[0].payload.(map[string]interface{})
if _, present := pl["attachments"]; present {
t.Errorf("attachments key MUST NOT be present when empty (canvas treats absence as 'none'); payload=%v", pl)
}
}

View File

@ -11,6 +11,7 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log"
@ -330,57 +331,23 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
return "", fmt.Errorf("send_message_to_user is not enabled on this MCP bridge (set MOLECULE_MCP_ALLOW_SEND_MESSAGE=true)")
}
var wsName string
err := h.database.QueryRowContext(ctx,
`SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`, workspaceID,
).Scan(&wsName)
if err != nil {
return "", fmt.Errorf("workspace not found")
}
h.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", map[string]interface{}{
"message": message,
"workspace_id": workspaceID,
"name": wsName,
})
// Persist to activity_logs so chat history loaders surface this
// message after a page reload. Pre-fix (reno-stars 2026-05-05),
// the MCP-bridge variant of send_message_to_user broadcast WS
// only — visible live, gone on reload — while the HTTP /notify
// sibling already had this fix (activity.go:535). External
// claude-code agents using molecule-mcp's send_message_to_user
// tool route through THIS handler, not /notify, so they were
// hitting the unfixed path.
// Single source of truth for chat-bearing agent → user messages —
// see agent_message_writer.go for the contract. The pre-RFC-#2945
// duplication of broadcast + INSERT logic between this handler and
// activity.go:Notify is what produced the reno-stars data-loss
// regression; both paths now route through the same writer.
//
// Shape mirrors activity.go's Notify handler exactly so the
// canvas chat-history hydration treats both the same:
// - activity_type='a2a_receive' joins the source=canvas filter
// - source_id is omitted → defaults to NULL ("canvas-source")
// - method='notify' tags it as a push (vs a real A2A receive)
// - request_body=NULL so the loader doesn't draw a duplicate
// "user" bubble
// - response_body={"result": "<text>"} feeds extractResponseText
// directly
//
// Errors are log-only — the broadcast already returned ok to the
// caller, the user has seen the message, and the persistence
// failure mode (DB hiccup) shouldn't block the tool call. The
// downside is the same as pre-fix: message vanishes on reload
// after a transient DB error. Log it so operators have a signal.
respPayload := map[string]interface{}{"result": message}
respJSON, _ := json.Marshal(respPayload)
preview := message
if len(preview) > 80 {
preview = preview[:80] + "…"
// MCP send_message_to_user does not currently surface attachments
// (the tool args don't accept them); pass nil. If a future tool
// schema adds an attachments arg, build []AgentMessageAttachment
// and pass through.
writer := NewAgentMessageWriter(h.database, h.broadcaster)
if err := writer.Send(ctx, workspaceID, message, nil); err != nil {
if errors.Is(err, ErrWorkspaceNotFound) {
return "", fmt.Errorf("workspace not found")
}
return "", err
}
if _, err := h.database.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
log.Printf("MCP send_message_to_user: failed to persist for %s: %v", workspaceID, err)
}
return "Message sent.", nil
}