forked from molecule-ai/molecule-core
Merge pull request #2949 from Molecule-AI/rfc-2945-pr-a-agent-message-writer
refactor(handlers): AgentMessageWriter SSOT — consolidate Notify + MCP send_message_to_user (RFC #2945 PR-A)
This commit is contained in:
commit
ada27fdb5d
@ -465,78 +465,30 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// Verify workspace exists
|
||||
var wsName string
|
||||
err := db.DB.QueryRowContext(c.Request.Context(),
|
||||
`SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`, workspaceID,
|
||||
).Scan(&wsName)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
return
|
||||
// Single source of truth for chat-bearing agent → user messages —
|
||||
// see agent_message_writer.go for the contract. Pre-RFC-#2945, the
|
||||
// broadcast + INSERT pair was inlined here and again in
|
||||
// mcp_tools.go's send_message_to_user, and the duplication is what
|
||||
// produced the reno-stars data-loss regression. Both paths now
|
||||
// route through the same writer; future channels (Slack, Discord,
|
||||
// Lark) hook in here too.
|
||||
attachments := make([]AgentMessageAttachment, 0, len(body.Attachments))
|
||||
for _, a := range body.Attachments {
|
||||
attachments = append(attachments, AgentMessageAttachment{
|
||||
URI: a.URI,
|
||||
Name: a.Name,
|
||||
MimeType: a.MimeType,
|
||||
Size: a.Size,
|
||||
})
|
||||
}
|
||||
|
||||
broadcastPayload := map[string]interface{}{
|
||||
"message": body.Message,
|
||||
"workspace_id": workspaceID,
|
||||
"name": wsName,
|
||||
}
|
||||
if len(body.Attachments) > 0 {
|
||||
broadcastPayload["attachments"] = body.Attachments
|
||||
}
|
||||
h.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", broadcastPayload)
|
||||
|
||||
// Persist to activity_logs so the chat history loader restores this
|
||||
// message after a page reload. Pre-fix, send_message_to_user pushes
|
||||
// were broadcast-only — survived the WebSocket session but vanished
|
||||
// when the user refreshed because nothing wrote them to the DB.
|
||||
//
|
||||
// Shape chosen to match the existing loader query
|
||||
// (`type=a2a_receive&source=canvas`):
|
||||
// - activity_type='a2a_receive' so it joins the same query path
|
||||
// - source_id=NULL so the canvas-source filter accepts it
|
||||
// - method='notify' to distinguish from real A2A receives in audits
|
||||
// - request_body=NULL so the loader doesn't append a duplicate
|
||||
// "user message" bubble for it
|
||||
// - response_body={"result": "<text>"} matches extractResponseText's
|
||||
// simplest branch ({result: string} → take verbatim)
|
||||
//
|
||||
// Errors are logged-only — broadcast already succeeded, the user
|
||||
// sees the message; persistence failure just means the message
|
||||
// won't survive reload (pre-fix behavior). Don't fail the whole
|
||||
// notify on a DB hiccup.
|
||||
// response_body shape — chosen to feed BOTH:
|
||||
// - extractResponseText: looks at body.result (string) and returns it
|
||||
// - extractFilesFromTask: looks at body.parts[] for kind=file
|
||||
// so a chat reload after a notify-with-attachments restores both
|
||||
// the text bubble AND the download chips.
|
||||
respPayload := map[string]interface{}{"result": body.Message}
|
||||
if len(body.Attachments) > 0 {
|
||||
fileParts := make([]map[string]interface{}, 0, len(body.Attachments))
|
||||
for _, a := range body.Attachments {
|
||||
fileMeta := map[string]interface{}{"uri": a.URI, "name": a.Name}
|
||||
if a.MimeType != "" {
|
||||
fileMeta["mimeType"] = a.MimeType
|
||||
}
|
||||
if a.Size > 0 {
|
||||
fileMeta["size"] = a.Size
|
||||
}
|
||||
fileParts = append(fileParts, map[string]interface{}{
|
||||
"kind": "file",
|
||||
"file": fileMeta,
|
||||
})
|
||||
writer := NewAgentMessageWriter(db.DB, h.broadcaster)
|
||||
if err := writer.Send(c.Request.Context(), workspaceID, body.Message, attachments); err != nil {
|
||||
if errors.Is(err, ErrWorkspaceNotFound) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
return
|
||||
}
|
||||
respPayload["parts"] = fileParts
|
||||
}
|
||||
respJSON, _ := json.Marshal(respPayload)
|
||||
preview := body.Message
|
||||
if len(preview) > 80 {
|
||||
preview = preview[:80] + "…"
|
||||
}
|
||||
if _, err := db.DB.ExecContext(c.Request.Context(), `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
|
||||
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
|
||||
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
|
||||
log.Printf("Notify: failed to persist message for %s: %v", workspaceID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": "sent"})
|
||||
|
||||
164
workspace-server/internal/handlers/agent_message_writer.go
Normal file
164
workspace-server/internal/handlers/agent_message_writer.go
Normal file
@ -0,0 +1,164 @@
|
||||
package handlers
|
||||
|
||||
// AgentMessageWriter is the SSOT for "agent → user" message delivery in the
|
||||
// workspace-server. Every chat-bearing path that surfaces a message to the
|
||||
// canvas — HTTP /notify (Notify handler), MCP tools/call
|
||||
// send_message_to_user (toolSendMessageToUser), any future channel — MUST
|
||||
// route through this writer rather than re-implement the broadcast +
|
||||
// persist contract inline.
|
||||
//
|
||||
// Why: pre-consolidation, two handlers duplicated the same "broadcast then
|
||||
// INSERT activity_logs" sequence. The reno-stars production data-loss
|
||||
// incident (2026-05-05, RFC #2945, PR #2944) was the symptom — the
|
||||
// persistence half landed for /notify but lagged for the MCP bridge by
|
||||
// months, silently dropping every long-form external-agent message until
|
||||
// reload. The AST gate from #2944 catches drift; this writer eliminates
|
||||
// the *possibility* of drift by giving both call sites a single
|
||||
// well-tested function to call.
|
||||
//
|
||||
// Contract:
|
||||
// 1. Look up the workspace by id; ErrWorkspaceNotFound on miss so the
|
||||
// caller can return 404 with a clean message.
|
||||
// 2. Broadcast a WS AGENT_MESSAGE event with {message, workspace_id,
|
||||
// name, attachments?}.
|
||||
// 3. INSERT a row into activity_logs:
|
||||
// type='a2a_receive', method='notify', source_id NULL,
|
||||
// response_body={"result": message[, "parts": [file kind...]]},
|
||||
// status='ok'
|
||||
// Best-effort — INSERT failure logs only, returns nil so the broadcast
|
||||
// success isn't undone on the caller side.
|
||||
// 4. Returns nil on success.
|
||||
//
|
||||
// The shape (especially the JSON response_body) is the wire contract the
|
||||
// canvas's chat-history hydrator (canvas/src/.../historyHydration.ts)
|
||||
// reads. Drift here silently breaks chat replay across all consumers, so
|
||||
// changes to the JSON shape MUST be cross-verified against the hydrator
|
||||
// in the same PR.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
)
|
||||
|
||||
// ErrWorkspaceNotFound is returned by AgentMessageWriter.Send when the
|
||||
// workspace lookup turns up nothing (or the workspace is in
|
||||
// status='removed'). Callers translate to HTTP 404 / JSON-RPC error /
|
||||
// whatever surface they expose.
|
||||
var ErrWorkspaceNotFound = errors.New("agent_message: workspace not found")
|
||||
|
||||
// AgentMessageAttachment is one file attached to an agent → user
|
||||
// message. Identical to handlers.NotifyAttachment in field set; kept
|
||||
// distinct so the writer's API doesn't import a handler type with HTTP
|
||||
// binding tags.
|
||||
type AgentMessageAttachment struct {
|
||||
URI string
|
||||
Name string
|
||||
MimeType string
|
||||
Size int64
|
||||
}
|
||||
|
||||
// AgentMessageWriter persists + broadcasts agent → user messages. Construct
|
||||
// once per process via NewAgentMessageWriter; pass the same instance to
|
||||
// every handler that delivers chat (Notify, toolSendMessageToUser, etc.).
|
||||
//
|
||||
// Takes events.EventEmitter (not the *Broadcaster concrete type) so tests
|
||||
// can substitute a fake emitter and producers in other packages can wrap
|
||||
// the real broadcaster behind their own metrics / retries without leaking
|
||||
// the concrete dependency.
|
||||
type AgentMessageWriter struct {
|
||||
db *sql.DB
|
||||
broadcaster events.EventEmitter
|
||||
}
|
||||
|
||||
// NewAgentMessageWriter binds the writer to the platform's DB pool +
|
||||
// WebSocket broadcaster.
|
||||
func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter) *AgentMessageWriter {
|
||||
return &AgentMessageWriter{db: db, broadcaster: broadcaster}
|
||||
}
|
||||
|
||||
// Send delivers a single agent → user message. Look up + broadcast +
|
||||
// persist in that order; ErrWorkspaceNotFound short-circuits before any
|
||||
// broadcast or DB write so callers can 404 cleanly.
|
||||
//
|
||||
// Returns nil on success — including on DB-INSERT failure (the broadcast
|
||||
// already returned successfully and the user has seen the message; the
|
||||
// persistence-failure mode is logged at WARN but the caller's response
|
||||
// stays 200 so the agent doesn't retry and double-broadcast).
|
||||
func (w *AgentMessageWriter) Send(
|
||||
ctx context.Context,
|
||||
workspaceID, message string,
|
||||
attachments []AgentMessageAttachment,
|
||||
) error {
|
||||
// 1. Workspace lookup. status='removed' filter is the same shape /notify
|
||||
// used pre-consolidation; deleted workspaces don't get notifications.
|
||||
var wsName string
|
||||
if err := w.db.QueryRowContext(ctx,
|
||||
`SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`,
|
||||
workspaceID,
|
||||
).Scan(&wsName); err != nil {
|
||||
// Includes sql.ErrNoRows; canonicalize so callers don't have to
|
||||
// import database/sql to compare.
|
||||
return ErrWorkspaceNotFound
|
||||
}
|
||||
|
||||
// 2. Build broadcast payload + WS-emit. Same shape that ChatTab's
|
||||
// AGENT_MESSAGE handler in canvas/src/store/canvas-events.ts has
|
||||
// consumed since the canvas chat shipped — drift here would orphan
|
||||
// every live chat panel.
|
||||
broadcastPayload := map[string]interface{}{
|
||||
"message": message,
|
||||
"workspace_id": workspaceID,
|
||||
"name": wsName,
|
||||
}
|
||||
if len(attachments) > 0 {
|
||||
broadcastPayload["attachments"] = attachments
|
||||
}
|
||||
w.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", broadcastPayload)
|
||||
|
||||
// 3. Persist for chat-history hydration. response_body shape MUST stay
|
||||
// in sync with extractResponseText + extractFilesFromTask in
|
||||
// canvas/src/components/tabs/chat/historyHydration.ts:
|
||||
// - extractResponseText reads body.result (string) → renders text
|
||||
// - extractFilesFromTask reads body.parts[] (kind=file) → renders chips
|
||||
respPayload := map[string]interface{}{"result": message}
|
||||
if len(attachments) > 0 {
|
||||
fileParts := make([]map[string]interface{}, 0, len(attachments))
|
||||
for _, a := range attachments {
|
||||
fileMeta := map[string]interface{}{"uri": a.URI, "name": a.Name}
|
||||
if a.MimeType != "" {
|
||||
fileMeta["mimeType"] = a.MimeType
|
||||
}
|
||||
if a.Size > 0 {
|
||||
fileMeta["size"] = a.Size
|
||||
}
|
||||
fileParts = append(fileParts, map[string]interface{}{
|
||||
"kind": "file",
|
||||
"file": fileMeta,
|
||||
})
|
||||
}
|
||||
respPayload["parts"] = fileParts
|
||||
}
|
||||
respJSON, _ := json.Marshal(respPayload)
|
||||
preview := message
|
||||
if len(preview) > 80 {
|
||||
preview = preview[:80] + "…"
|
||||
}
|
||||
if _, err := w.db.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
|
||||
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
|
||||
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
|
||||
// Best-effort: the broadcast already returned ok and the user
|
||||
// has seen the message. Logging a structured line lets operators
|
||||
// notice persistence-failure rates spike if the DB is unhealthy,
|
||||
// without breaking the tool response or causing the agent to
|
||||
// retry-and-double-broadcast.
|
||||
log.Printf("agent_message: failed to persist for %s: %v", workspaceID, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
331
workspace-server/internal/handlers/agent_message_writer_test.go
Normal file
331
workspace-server/internal/handlers/agent_message_writer_test.go
Normal file
@ -0,0 +1,331 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql/driver"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// AgentMessageWriter is the SSOT for agent → user chat delivery
|
||||
// (RFC #2945 PR-A). These tests pin the contract the writer
|
||||
// guarantees: workspace lookup, broadcast, INSERT, error semantics —
|
||||
// every shape that producers (Notify, toolSendMessageToUser, future
|
||||
// channels) rely on.
|
||||
//
|
||||
// Pre-consolidation, the broadcast-then-INSERT logic was duplicated
|
||||
// across two handlers and they drifted (reno-stars, 2026-05-05). With
|
||||
// the writer being the only place this logic lives, these tests are
|
||||
// the regression line for every chat-bearing path simultaneously.
|
||||
|
||||
// jsonMatcher is a sqlmock Argument matcher that decodes the actual
|
||||
// SQL arg as JSON and runs a caller-supplied predicate over the
|
||||
// resulting structure. Tighter than substring matching (which can
|
||||
// false-pass on a renamed key) and tolerant of map-key ordering
|
||||
// (which exact-string matching is not).
|
||||
type jsonMatcher struct {
|
||||
predicate func(parsed map[string]any) bool
|
||||
desc string
|
||||
}
|
||||
|
||||
func (m jsonMatcher) Match(v driver.Value) bool {
|
||||
s, ok := v.(string)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
var parsed map[string]any
|
||||
if err := json.Unmarshal([]byte(s), &parsed); err != nil {
|
||||
return false
|
||||
}
|
||||
return m.predicate(parsed)
|
||||
}
|
||||
|
||||
// stringMatcher pins exact prefix/suffix/equality checks against a
|
||||
// driver.Value that's actually a string.
|
||||
type stringMatcher func(string) bool
|
||||
|
||||
func (f stringMatcher) Match(v driver.Value) bool {
|
||||
s, ok := v.(string)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return f(s)
|
||||
}
|
||||
|
||||
// capturingEmitter records every BroadcastOnly call so tests can pin
|
||||
// the WS event shape without a real ws.Hub. RecordAndBroadcast is
|
||||
// also captured for completeness — the writer doesn't call it today,
|
||||
// but a future producer might, and a captured-but-unasserted record
|
||||
// is easier to diagnose than a nil panic.
|
||||
type capturingEmitter struct {
|
||||
events []capturedEvent
|
||||
}
|
||||
|
||||
type capturedEvent struct {
|
||||
workspaceID string
|
||||
eventType string
|
||||
payload interface{}
|
||||
}
|
||||
|
||||
func (c *capturingEmitter) BroadcastOnly(workspaceID string, eventType string, payload interface{}) {
|
||||
c.events = append(c.events, capturedEvent{workspaceID, eventType, payload})
|
||||
}
|
||||
|
||||
func (c *capturingEmitter) RecordAndBroadcast(_ context.Context, eventType string, workspaceID string, payload interface{}) error {
|
||||
c.events = append(c.events, capturedEvent{workspaceID, eventType, payload})
|
||||
return nil
|
||||
}
|
||||
|
||||
// TestAgentMessageWriter_Send_Success_NoAttachments pins the happy
|
||||
// path: workspace lookup, broadcast, INSERT, return nil.
|
||||
func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||
WithArgs(
|
||||
"ws-1",
|
||||
sqlmock.AnyArg(), // summary
|
||||
`{"result":"hi"}`,
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
if err := w.Send(context.Background(), "ws-1", "hi", nil); err != nil {
|
||||
t.Fatalf("Send returned %v, want nil", err)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAgentMessageWriter_Send_Success_WithAttachments pins the file
|
||||
// attachment shape — response_body MUST contain a parts[] array with
|
||||
// kind=file entries so the canvas hydrater renders download chips.
|
||||
// Drift here = chips disappear on chat reload.
|
||||
func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-att").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan"))
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||
WithArgs(
|
||||
"ws-att",
|
||||
sqlmock.AnyArg(),
|
||||
jsonMatcher{
|
||||
desc: "response_body has result + parts with kind=file metadata",
|
||||
predicate: func(p map[string]any) bool {
|
||||
if p["result"] != "see attached" {
|
||||
return false
|
||||
}
|
||||
parts, ok := p["parts"].([]any)
|
||||
if !ok || len(parts) != 1 {
|
||||
return false
|
||||
}
|
||||
part, ok := parts[0].(map[string]any)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if part["kind"] != "file" {
|
||||
return false
|
||||
}
|
||||
file, ok := part["file"].(map[string]any)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return file["uri"] == "workspace://x.zip" &&
|
||||
file["name"] == "x.zip" &&
|
||||
file["mimeType"] == "application/zip" &&
|
||||
file["size"].(float64) == 1234
|
||||
},
|
||||
},
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
atts := []AgentMessageAttachment{
|
||||
{URI: "workspace://x.zip", Name: "x.zip", MimeType: "application/zip", Size: 1234},
|
||||
}
|
||||
if err := w.Send(context.Background(), "ws-att", "see attached", atts); err != nil {
|
||||
t.Fatalf("Send returned %v, want nil", err)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAgentMessageWriter_Send_WorkspaceNotFound pins ErrWorkspaceNotFound
|
||||
// short-circuit. Must NOT broadcast, MUST NOT INSERT — caller will 404
|
||||
// or surface a JSON-RPC error.
|
||||
func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
emitter := &capturingEmitter{}
|
||||
w := NewAgentMessageWriter(db.DB, emitter)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-missing").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}))
|
||||
|
||||
err := w.Send(context.Background(), "ws-missing", "lost in the void", nil)
|
||||
if !errors.Is(err, ErrWorkspaceNotFound) {
|
||||
t.Errorf("Send returned %v, want ErrWorkspaceNotFound", err)
|
||||
}
|
||||
if len(emitter.events) != 0 {
|
||||
t.Errorf("workspace-not-found path MUST NOT broadcast, got %d events", len(emitter.events))
|
||||
}
|
||||
// Implicit: no INSERT expectation registered, so a stray INSERT
|
||||
// would fail ExpectationsWereMet.
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("DB expectations (INSERT must NOT fire on workspace-not-found): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil pins the
|
||||
// "best-effort persistence" contract: when the activity_log INSERT
|
||||
// fails (DB hiccup, transient connection, constraint), the writer
|
||||
// MUST still return nil. The broadcast already succeeded; the user
|
||||
// has seen the message; returning an error here would cause the
|
||||
// caller (and the agent calling the tool) to retry and double-
|
||||
// broadcast.
|
||||
func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-dbfail").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WillReturnError(errors.New("transient db error"))
|
||||
|
||||
err := w.Send(context.Background(), "ws-dbfail", "should not be lost from live chat", nil)
|
||||
if err != nil {
|
||||
t.Errorf("DB INSERT failure must return nil (broadcast already succeeded), got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAgentMessageWriter_Send_PreviewTruncation pins the summary
|
||||
// preview cap. Long messages (Ryan's onboarding-friction report was
|
||||
// ~2k chars) must summarise to ≤80 chars + ellipsis so the activity
|
||||
// table doesn't carry multi-KB summaries that bloat list queries.
|
||||
func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-trunc").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan"))
|
||||
|
||||
longMsg := strings.Repeat("x", 200)
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(
|
||||
"ws-trunc",
|
||||
stringMatcher(func(s string) bool {
|
||||
if !strings.HasPrefix(s, "Agent message: ") {
|
||||
return false
|
||||
}
|
||||
preview := strings.TrimPrefix(s, "Agent message: ")
|
||||
if !strings.HasSuffix(preview, "…") {
|
||||
return false
|
||||
}
|
||||
body := strings.TrimSuffix(preview, "…")
|
||||
return len(body) == 80
|
||||
}),
|
||||
sqlmock.AnyArg(),
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
if err := w.Send(context.Background(), "ws-trunc", longMsg, nil); err != nil {
|
||||
t.Fatalf("Send: %v", err)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("preview truncation drift: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent pins the
|
||||
// WS event name + payload shape. The canvas's
|
||||
// canvas-events.ts:AGENT_MESSAGE handler reads {message, workspace_id,
|
||||
// name, attachments?} — drift here orphans every live chat panel.
|
||||
func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
emitter := &capturingEmitter{}
|
||||
w := NewAgentMessageWriter(db.DB, emitter)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-bc").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Workspace Name"))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
atts := []AgentMessageAttachment{
|
||||
{URI: "workspace://a.txt", Name: "a.txt"},
|
||||
}
|
||||
if err := w.Send(context.Background(), "ws-bc", "hi", atts); err != nil {
|
||||
t.Fatalf("Send: %v", err)
|
||||
}
|
||||
|
||||
if len(emitter.events) != 1 {
|
||||
t.Fatalf("expected exactly 1 broadcast, got %d", len(emitter.events))
|
||||
}
|
||||
ev := emitter.events[0]
|
||||
if ev.eventType != "AGENT_MESSAGE" {
|
||||
t.Errorf("event type = %q, want AGENT_MESSAGE", ev.eventType)
|
||||
}
|
||||
if ev.workspaceID != "ws-bc" {
|
||||
t.Errorf("workspace_id = %q, want ws-bc", ev.workspaceID)
|
||||
}
|
||||
pl, ok := ev.payload.(map[string]interface{})
|
||||
if !ok {
|
||||
t.Fatalf("payload not a map: %T", ev.payload)
|
||||
}
|
||||
if pl["message"] != "hi" {
|
||||
t.Errorf("payload.message = %v, want hi", pl["message"])
|
||||
}
|
||||
if pl["workspace_id"] != "ws-bc" {
|
||||
t.Errorf("payload.workspace_id = %v, want ws-bc", pl["workspace_id"])
|
||||
}
|
||||
if pl["name"] != "Workspace Name" {
|
||||
t.Errorf("payload.name = %v, want Workspace Name", pl["name"])
|
||||
}
|
||||
if pl["attachments"] == nil {
|
||||
t.Error("payload.attachments missing on attachment-bearing send")
|
||||
}
|
||||
}
|
||||
|
||||
// TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty pins the
|
||||
// "no key when nil" wire contract — extra empty fields would force
|
||||
// canvas consumers to defensively check for [] vs undefined; the
|
||||
// existing AGENT_MESSAGE handler treats absence as "no attachments".
|
||||
func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
emitter := &capturingEmitter{}
|
||||
w := NewAgentMessageWriter(db.DB, emitter)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-noatt").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("X"))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
if err := w.Send(context.Background(), "ws-noatt", "plain text", nil); err != nil {
|
||||
t.Fatalf("Send: %v", err)
|
||||
}
|
||||
if len(emitter.events) != 1 {
|
||||
t.Fatalf("expected 1 event, got %d", len(emitter.events))
|
||||
}
|
||||
pl := emitter.events[0].payload.(map[string]interface{})
|
||||
if _, present := pl["attachments"]; present {
|
||||
t.Errorf("attachments key MUST NOT be present when empty (canvas treats absence as 'none'); payload=%v", pl)
|
||||
}
|
||||
}
|
||||
@ -11,6 +11,7 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
@ -330,57 +331,23 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
|
||||
return "", fmt.Errorf("send_message_to_user is not enabled on this MCP bridge (set MOLECULE_MCP_ALLOW_SEND_MESSAGE=true)")
|
||||
}
|
||||
|
||||
var wsName string
|
||||
err := h.database.QueryRowContext(ctx,
|
||||
`SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`, workspaceID,
|
||||
).Scan(&wsName)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("workspace not found")
|
||||
}
|
||||
|
||||
h.broadcaster.BroadcastOnly(workspaceID, "AGENT_MESSAGE", map[string]interface{}{
|
||||
"message": message,
|
||||
"workspace_id": workspaceID,
|
||||
"name": wsName,
|
||||
})
|
||||
|
||||
// Persist to activity_logs so chat history loaders surface this
|
||||
// message after a page reload. Pre-fix (reno-stars 2026-05-05),
|
||||
// the MCP-bridge variant of send_message_to_user broadcast WS
|
||||
// only — visible live, gone on reload — while the HTTP /notify
|
||||
// sibling already had this fix (activity.go:535). External
|
||||
// claude-code agents using molecule-mcp's send_message_to_user
|
||||
// tool route through THIS handler, not /notify, so they were
|
||||
// hitting the unfixed path.
|
||||
// Single source of truth for chat-bearing agent → user messages —
|
||||
// see agent_message_writer.go for the contract. The pre-RFC-#2945
|
||||
// duplication of broadcast + INSERT logic between this handler and
|
||||
// activity.go:Notify is what produced the reno-stars data-loss
|
||||
// regression; both paths now route through the same writer.
|
||||
//
|
||||
// Shape mirrors activity.go's Notify handler exactly so the
|
||||
// canvas chat-history hydration treats both the same:
|
||||
// - activity_type='a2a_receive' joins the source=canvas filter
|
||||
// - source_id is omitted → defaults to NULL ("canvas-source")
|
||||
// - method='notify' tags it as a push (vs a real A2A receive)
|
||||
// - request_body=NULL so the loader doesn't draw a duplicate
|
||||
// "user" bubble
|
||||
// - response_body={"result": "<text>"} feeds extractResponseText
|
||||
// directly
|
||||
//
|
||||
// Errors are log-only — the broadcast already returned ok to the
|
||||
// caller, the user has seen the message, and the persistence
|
||||
// failure mode (DB hiccup) shouldn't block the tool call. The
|
||||
// downside is the same as pre-fix: message vanishes on reload
|
||||
// after a transient DB error. Log it so operators have a signal.
|
||||
respPayload := map[string]interface{}{"result": message}
|
||||
respJSON, _ := json.Marshal(respPayload)
|
||||
preview := message
|
||||
if len(preview) > 80 {
|
||||
preview = preview[:80] + "…"
|
||||
// MCP send_message_to_user does not currently surface attachments
|
||||
// (the tool args don't accept them); pass nil. If a future tool
|
||||
// schema adds an attachments arg, build []AgentMessageAttachment
|
||||
// and pass through.
|
||||
writer := NewAgentMessageWriter(h.database, h.broadcaster)
|
||||
if err := writer.Send(ctx, workspaceID, message, nil); err != nil {
|
||||
if errors.Is(err, ErrWorkspaceNotFound) {
|
||||
return "", fmt.Errorf("workspace not found")
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
if _, err := h.database.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
|
||||
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
|
||||
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
|
||||
log.Printf("MCP send_message_to_user: failed to persist for %s: %v", workspaceID, err)
|
||||
}
|
||||
|
||||
return "Message sent.", nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user