Merge pull request #2944 from Molecule-AI/fix-mcp-send-message-to-user-persist
fix(mcp): persist send_message_to_user pushes to activity_log (reno-stars data loss)
This commit is contained in:
commit
d22fbb29b8
@ -0,0 +1,177 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"go/ast"
|
||||
"go/parser"
|
||||
"go/token"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestAgentMessageBroadcastsArePersisted is a forward-looking AST
|
||||
// gate: every function in this package that broadcasts an
|
||||
// `AGENT_MESSAGE` WebSocket event MUST also call
|
||||
// `INSERT INTO activity_logs` somewhere in its body.
|
||||
//
|
||||
// The reno-stars production data-loss bug (CEO Ryan PC's long-form
|
||||
// onboarding-friction message visible live but missing on reload)
|
||||
// happened because mcp_tools.go:toolSendMessageToUser broadcast WS
|
||||
// without a paired INSERT — while the HTTP /notify sibling DID
|
||||
// persist. The fix added the INSERT; this gate prevents the regression
|
||||
// class from re-emerging in any future chat-bearing tool.
|
||||
//
|
||||
// Why an AST gate vs a code-review checklist (per memory
|
||||
// feedback_behavior_based_ast_gates.md): "pin invariants by what a
|
||||
// function calls, not what it's named". The shape that loses data is:
|
||||
//
|
||||
// BroadcastOnly(_, "AGENT_MESSAGE", _) without an INSERT companion
|
||||
//
|
||||
// Any new tool that emits AGENT_MESSAGE must persist or the next
|
||||
// canvas refresh drops the message — same shape as reno-stars. A
|
||||
// reviewer can miss this; the AST walk can't.
|
||||
//
|
||||
// Allowlist: empty by intent. If a future use case genuinely needs
|
||||
// fire-and-forget broadcast (e.g., transient typing indicators that
|
||||
// should NOT survive reload), add an entry here AND document why.
|
||||
// "Doesn't need to persist" is rarely the right answer for chat —
|
||||
// the canvas history is the source of truth.
|
||||
func TestAgentMessageBroadcastsArePersisted(t *testing.T) {
|
||||
wd, err := os.Getwd()
|
||||
if err != nil {
|
||||
t.Fatalf("getwd: %v", err)
|
||||
}
|
||||
entries, err := os.ReadDir(wd)
|
||||
if err != nil {
|
||||
t.Fatalf("readdir %s: %v", wd, err)
|
||||
}
|
||||
|
||||
type violation struct {
|
||||
file string
|
||||
fn string
|
||||
}
|
||||
var violations []violation
|
||||
|
||||
for _, ent := range entries {
|
||||
name := ent.Name()
|
||||
if ent.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") {
|
||||
continue
|
||||
}
|
||||
path := filepath.Join(wd, name)
|
||||
fset := token.NewFileSet()
|
||||
file, err := parser.ParseFile(fset, path, nil, parser.ParseComments)
|
||||
if err != nil {
|
||||
t.Fatalf("parse %s: %v", path, err)
|
||||
}
|
||||
|
||||
for _, decl := range file.Decls {
|
||||
fn, ok := decl.(*ast.FuncDecl)
|
||||
if !ok || fn.Body == nil {
|
||||
continue
|
||||
}
|
||||
if !funcEmitsAgentMessageBroadcast(fn) {
|
||||
continue
|
||||
}
|
||||
if !funcInsertsIntoActivityLogs(fn) {
|
||||
violations = append(violations, violation{file: name, fn: fn.Name.Name})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(violations) > 0 {
|
||||
sort.Slice(violations, func(i, j int) bool {
|
||||
if violations[i].file != violations[j].file {
|
||||
return violations[i].file < violations[j].file
|
||||
}
|
||||
return violations[i].fn < violations[j].fn
|
||||
})
|
||||
var buf strings.Builder
|
||||
for _, v := range violations {
|
||||
buf.WriteString(" - ")
|
||||
buf.WriteString(v.file)
|
||||
buf.WriteString(":")
|
||||
buf.WriteString(v.fn)
|
||||
buf.WriteString("\n")
|
||||
}
|
||||
t.Errorf(`function(s) broadcast `+"`AGENT_MESSAGE`"+` without persisting to activity_logs:
|
||||
|
||||
%s
|
||||
This is the reno-stars data-loss regression class: live message
|
||||
visible to the user, but missing on reload because activity_log was
|
||||
never written. Every chat-bearing broadcast MUST be paired with:
|
||||
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method,
|
||||
summary, response_body, status)
|
||||
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
|
||||
|
||||
See activity.go:Notify and mcp_tools.go:toolSendMessageToUser for
|
||||
the canonical shapes. Don't add an allowlist entry without a
|
||||
documented reason — the canvas chat history is the source of truth
|
||||
and silently dropping messages is a P0 user trust break.`,
|
||||
buf.String())
|
||||
}
|
||||
}
|
||||
|
||||
// funcEmitsAgentMessageBroadcast walks fn.Body for any CallExpr that
|
||||
// looks like `*.BroadcastOnly(_, "AGENT_MESSAGE", _)`.
|
||||
func funcEmitsAgentMessageBroadcast(fn *ast.FuncDecl) bool {
|
||||
var found bool
|
||||
ast.Inspect(fn.Body, func(n ast.Node) bool {
|
||||
call, ok := n.(*ast.CallExpr)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
sel, ok := call.Fun.(*ast.SelectorExpr)
|
||||
if !ok || sel.Sel.Name != "BroadcastOnly" {
|
||||
return true
|
||||
}
|
||||
// BroadcastOnly(workspaceID, eventType, payload) — the second
|
||||
// arg is the event name. Match by string-literal value.
|
||||
if len(call.Args) < 2 {
|
||||
return true
|
||||
}
|
||||
lit, ok := call.Args[1].(*ast.BasicLit)
|
||||
if !ok || lit.Kind != token.STRING {
|
||||
return true
|
||||
}
|
||||
raw := lit.Value
|
||||
if unq, err := strconv.Unquote(raw); err == nil {
|
||||
raw = unq
|
||||
}
|
||||
if raw == "AGENT_MESSAGE" {
|
||||
found = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
return found
|
||||
}
|
||||
|
||||
// funcInsertsIntoActivityLogs walks fn.Body for any STRING BasicLit
|
||||
// whose body contains `INSERT INTO activity_logs` (the SQL literal
|
||||
// passed to ExecContext). Matches the substring rather than a strict
|
||||
// regex because we don't care about the exact INSERT shape here —
|
||||
// only that the function persists. Specific shape pinning lives in
|
||||
// the per-handler test (see TestMCPHandler_SendMessageToUser_*).
|
||||
func funcInsertsIntoActivityLogs(fn *ast.FuncDecl) bool {
|
||||
var found bool
|
||||
ast.Inspect(fn.Body, func(n ast.Node) bool {
|
||||
lit, ok := n.(*ast.BasicLit)
|
||||
if !ok || lit.Kind != token.STRING {
|
||||
return true
|
||||
}
|
||||
raw := lit.Value
|
||||
if unq, err := strconv.Unquote(raw); err == nil {
|
||||
raw = unq
|
||||
}
|
||||
if strings.Contains(raw, "INSERT INTO activity_logs") {
|
||||
found = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
return found
|
||||
}
|
||||
@ -11,18 +11,21 @@ import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"errors"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// newMCPHandler is a test helper that constructs an MCPHandler backed by the
|
||||
// sqlmock DB set up by setupTestDB.
|
||||
// sqlmock DB set up by setupTestDB. Uses newTestBroadcaster so handlers
|
||||
// that BroadcastOnly (send_message_to_user, etc.) don't nil-panic on the
|
||||
// hub — events.NewBroadcaster(nil) crashes inside hub.Broadcast.
|
||||
func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) {
|
||||
t.Helper()
|
||||
mock := setupTestDB(t)
|
||||
h := NewMCPHandler(db.DB, events.NewBroadcaster(nil))
|
||||
h := NewMCPHandler(db.DB, newTestBroadcaster())
|
||||
return h, mock
|
||||
}
|
||||
|
||||
@ -628,6 +631,170 @@ func TestMCPHandler_SendMessageToUser_Blocked_WhenEnvNotSet(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s pins the
|
||||
// "best-effort persistence" contract: when the activity_log INSERT
|
||||
// fails (DB hiccup, constraint violation, transient connection drop),
|
||||
// the tool MUST still return success to the agent because the WS
|
||||
// broadcast already succeeded — the user has seen the message.
|
||||
//
|
||||
// This matches /notify (activity.go) behavior. Returning an error
|
||||
// here would cause the agent to retry and re-broadcast, double-
|
||||
// rendering the message in the user's live chat panel for every
|
||||
// retry until the DB recovers.
|
||||
func TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s(t *testing.T) {
|
||||
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
|
||||
h, mock := newMCPHandler(t)
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-err").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
|
||||
// INSERT fails — must NOT abort the tool response.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||
WillReturnError(errors.New("transient db error"))
|
||||
|
||||
w := mcpPost(t, h, "ws-err", map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 100,
|
||||
"method": "tools/call",
|
||||
"params": map[string]interface{}{
|
||||
"name": "send_message_to_user",
|
||||
"arguments": map[string]interface{}{
|
||||
"message": "should not be lost from the live chat",
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
var resp mcpResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response was not valid JSON-RPC: %v", err)
|
||||
}
|
||||
// Tool response is success — INSERT failure logged, broadcast
|
||||
// already succeeded.
|
||||
if resp.Error != nil {
|
||||
t.Errorf("tool response should be success on DB error (broadcast won), got JSON-RPC error: %+v", resp.Error)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("expected DB calls in order: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMCPHandler_SendMessageToUser_ResponseBodyShape pins the
|
||||
// response_body JSON shape stored in activity_logs. This shape MUST
|
||||
// match what the canvas hydrater (extractResponseText in
|
||||
// historyHydration.ts) reads — specifically `{"result": "<text>"}`.
|
||||
// Any drift in the JSON shape silently breaks chat history without
|
||||
// failing the INSERT.
|
||||
//
|
||||
// Caught the same drift class flagged in
|
||||
// feedback_assert_exact_not_substring.md: a substring match on
|
||||
// "result" would pass even if the field were renamed; we assert the
|
||||
// exact JSON shape.
|
||||
func TestMCPHandler_SendMessageToUser_ResponseBodyShape(t *testing.T) {
|
||||
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
|
||||
h, mock := newMCPHandler(t)
|
||||
|
||||
const userMessage = "Hi there from the agent"
|
||||
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-shape").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
|
||||
// Capture the response_body argument and assert its exact shape.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||
WithArgs(
|
||||
"ws-shape",
|
||||
sqlmock.AnyArg(), // summary
|
||||
// The response_body MUST be JSON `{"result": "<message>"}`.
|
||||
// Any other shape (e.g., wrapping in a Task object) breaks
|
||||
// the canvas hydrater's `body.result` extractor.
|
||||
`{"result":"`+userMessage+`"}`,
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
w := mcpPost(t, h, "ws-shape", map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 101,
|
||||
"method": "tools/call",
|
||||
"params": map[string]interface{}{
|
||||
"name": "send_message_to_user",
|
||||
"arguments": map[string]interface{}{
|
||||
"message": userMessage,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("expected 200, got %d body=%s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("response_body shape drift — would silently break canvas chat history: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestMCPHandler_SendMessageToUser_PersistsToActivityLog pins the fix
|
||||
// for the reno-stars / CEO Ryan PC chat-history data-loss bug:
|
||||
// external claude-code agents using molecule-mcp's send_message_to_user
|
||||
// tool route through THIS handler (not the HTTP /notify endpoint),
|
||||
// and the handler used to broadcast WS only — visible live, gone on
|
||||
// reload because nothing wrote to activity_logs.
|
||||
//
|
||||
// Pins:
|
||||
// - INSERT happens on the success path (broadcast + DB write).
|
||||
// - INSERT shape mirrors the HTTP /notify handler exactly:
|
||||
// activity_type='a2a_receive', method='notify', request_body NULL,
|
||||
// response_body={"result": message}, status='ok'. The canvas
|
||||
// hydration query (`type=a2a_receive&source=canvas`) treats
|
||||
// both writers as the same shape — drift here means the bug
|
||||
// re-surfaces silently.
|
||||
func TestMCPHandler_SendMessageToUser_PersistsToActivityLog(t *testing.T) {
|
||||
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
|
||||
h, mock := newMCPHandler(t)
|
||||
|
||||
// Workspace lookup — the handler verifies the workspace exists
|
||||
// before it does anything else. Returning a name lets the
|
||||
// broadcast payload populate; the test doesn't assert on the
|
||||
// broadcast (no observable WS in this fake), only on the DB.
|
||||
mock.ExpectQuery("SELECT name FROM workspaces").
|
||||
WithArgs("ws-msg").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
|
||||
|
||||
// The persistence INSERT — pin the exact shape so a future
|
||||
// refactor that switches columns or drops `method='notify'`
|
||||
// breaks the test loud, not silently. Match by regex on the
|
||||
// table + activity_type + method literals.
|
||||
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
|
||||
WithArgs(
|
||||
"ws-msg",
|
||||
sqlmock.AnyArg(), // summary "Agent message: ..."
|
||||
sqlmock.AnyArg(), // response_body JSON
|
||||
).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
w := mcpPost(t, h, "ws-msg", map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"id": 99,
|
||||
"method": "tools/call",
|
||||
"params": map[string]interface{}{
|
||||
"name": "send_message_to_user",
|
||||
"arguments": map[string]interface{}{
|
||||
"message": "Hello, this should persist!",
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
var resp mcpResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("response was not valid JSON-RPC: %v\nbody=%s", err, w.Body.String())
|
||||
}
|
||||
if resp.Error != nil {
|
||||
t.Errorf("unexpected JSON-RPC error: %+v", resp.Error)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("DB expectations not met (INSERT missing → reno-stars data-loss regression): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Parse error
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
@ -344,6 +344,43 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
|
||||
"name": wsName,
|
||||
})
|
||||
|
||||
// Persist to activity_logs so chat history loaders surface this
|
||||
// message after a page reload. Pre-fix (reno-stars 2026-05-05),
|
||||
// the MCP-bridge variant of send_message_to_user broadcast WS
|
||||
// only — visible live, gone on reload — while the HTTP /notify
|
||||
// sibling already had this fix (activity.go:535). External
|
||||
// claude-code agents using molecule-mcp's send_message_to_user
|
||||
// tool route through THIS handler, not /notify, so they were
|
||||
// hitting the unfixed path.
|
||||
//
|
||||
// Shape mirrors activity.go's Notify handler exactly so the
|
||||
// canvas chat-history hydration treats both the same:
|
||||
// - activity_type='a2a_receive' joins the source=canvas filter
|
||||
// - source_id is omitted → defaults to NULL ("canvas-source")
|
||||
// - method='notify' tags it as a push (vs a real A2A receive)
|
||||
// - request_body=NULL so the loader doesn't draw a duplicate
|
||||
// "user" bubble
|
||||
// - response_body={"result": "<text>"} feeds extractResponseText
|
||||
// directly
|
||||
//
|
||||
// Errors are log-only — the broadcast already returned ok to the
|
||||
// caller, the user has seen the message, and the persistence
|
||||
// failure mode (DB hiccup) shouldn't block the tool call. The
|
||||
// downside is the same as pre-fix: message vanishes on reload
|
||||
// after a transient DB error. Log it so operators have a signal.
|
||||
respPayload := map[string]interface{}{"result": message}
|
||||
respJSON, _ := json.Marshal(respPayload)
|
||||
preview := message
|
||||
if len(preview) > 80 {
|
||||
preview = preview[:80] + "…"
|
||||
}
|
||||
if _, err := h.database.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
|
||||
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
|
||||
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
|
||||
log.Printf("MCP send_message_to_user: failed to persist for %s: %v", workspaceID, err)
|
||||
}
|
||||
|
||||
return "Message sent.", nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user