diff --git a/workspace-server/internal/handlers/agent_message_persistence_gate_test.go b/workspace-server/internal/handlers/agent_message_persistence_gate_test.go new file mode 100644 index 00000000..07998112 --- /dev/null +++ b/workspace-server/internal/handlers/agent_message_persistence_gate_test.go @@ -0,0 +1,177 @@ +package handlers + +import ( + "go/ast" + "go/parser" + "go/token" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "testing" +) + +// TestAgentMessageBroadcastsArePersisted is a forward-looking AST +// gate: every function in this package that broadcasts an +// `AGENT_MESSAGE` WebSocket event MUST also call +// `INSERT INTO activity_logs` somewhere in its body. +// +// The reno-stars production data-loss bug (CEO Ryan PC's long-form +// onboarding-friction message visible live but missing on reload) +// happened because mcp_tools.go:toolSendMessageToUser broadcast WS +// without a paired INSERT — while the HTTP /notify sibling DID +// persist. The fix added the INSERT; this gate prevents the regression +// class from re-emerging in any future chat-bearing tool. +// +// Why an AST gate vs a code-review checklist (per memory +// feedback_behavior_based_ast_gates.md): "pin invariants by what a +// function calls, not what it's named". The shape that loses data is: +// +// BroadcastOnly(_, "AGENT_MESSAGE", _) without an INSERT companion +// +// Any new tool that emits AGENT_MESSAGE must persist or the next +// canvas refresh drops the message — same shape as reno-stars. A +// reviewer can miss this; the AST walk can't. +// +// Allowlist: empty by intent. If a future use case genuinely needs +// fire-and-forget broadcast (e.g., transient typing indicators that +// should NOT survive reload), add an entry here AND document why. +// "Doesn't need to persist" is rarely the right answer for chat — +// the canvas history is the source of truth. +func TestAgentMessageBroadcastsArePersisted(t *testing.T) { + wd, err := os.Getwd() + if err != nil { + t.Fatalf("getwd: %v", err) + } + entries, err := os.ReadDir(wd) + if err != nil { + t.Fatalf("readdir %s: %v", wd, err) + } + + type violation struct { + file string + fn string + } + var violations []violation + + for _, ent := range entries { + name := ent.Name() + if ent.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") { + continue + } + path := filepath.Join(wd, name) + fset := token.NewFileSet() + file, err := parser.ParseFile(fset, path, nil, parser.ParseComments) + if err != nil { + t.Fatalf("parse %s: %v", path, err) + } + + for _, decl := range file.Decls { + fn, ok := decl.(*ast.FuncDecl) + if !ok || fn.Body == nil { + continue + } + if !funcEmitsAgentMessageBroadcast(fn) { + continue + } + if !funcInsertsIntoActivityLogs(fn) { + violations = append(violations, violation{file: name, fn: fn.Name.Name}) + } + } + } + + if len(violations) > 0 { + sort.Slice(violations, func(i, j int) bool { + if violations[i].file != violations[j].file { + return violations[i].file < violations[j].file + } + return violations[i].fn < violations[j].fn + }) + var buf strings.Builder + for _, v := range violations { + buf.WriteString(" - ") + buf.WriteString(v.file) + buf.WriteString(":") + buf.WriteString(v.fn) + buf.WriteString("\n") + } + t.Errorf(`function(s) broadcast `+"`AGENT_MESSAGE`"+` without persisting to activity_logs: + +%s +This is the reno-stars data-loss regression class: live message +visible to the user, but missing on reload because activity_log was +never written. Every chat-bearing broadcast MUST be paired with: + + INSERT INTO activity_logs (workspace_id, activity_type, method, + summary, response_body, status) + VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok') + +See activity.go:Notify and mcp_tools.go:toolSendMessageToUser for +the canonical shapes. Don't add an allowlist entry without a +documented reason — the canvas chat history is the source of truth +and silently dropping messages is a P0 user trust break.`, + buf.String()) + } +} + +// funcEmitsAgentMessageBroadcast walks fn.Body for any CallExpr that +// looks like `*.BroadcastOnly(_, "AGENT_MESSAGE", _)`. +func funcEmitsAgentMessageBroadcast(fn *ast.FuncDecl) bool { + var found bool + ast.Inspect(fn.Body, func(n ast.Node) bool { + call, ok := n.(*ast.CallExpr) + if !ok { + return true + } + sel, ok := call.Fun.(*ast.SelectorExpr) + if !ok || sel.Sel.Name != "BroadcastOnly" { + return true + } + // BroadcastOnly(workspaceID, eventType, payload) — the second + // arg is the event name. Match by string-literal value. + if len(call.Args) < 2 { + return true + } + lit, ok := call.Args[1].(*ast.BasicLit) + if !ok || lit.Kind != token.STRING { + return true + } + raw := lit.Value + if unq, err := strconv.Unquote(raw); err == nil { + raw = unq + } + if raw == "AGENT_MESSAGE" { + found = true + return false + } + return true + }) + return found +} + +// funcInsertsIntoActivityLogs walks fn.Body for any STRING BasicLit +// whose body contains `INSERT INTO activity_logs` (the SQL literal +// passed to ExecContext). Matches the substring rather than a strict +// regex because we don't care about the exact INSERT shape here — +// only that the function persists. Specific shape pinning lives in +// the per-handler test (see TestMCPHandler_SendMessageToUser_*). +func funcInsertsIntoActivityLogs(fn *ast.FuncDecl) bool { + var found bool + ast.Inspect(fn.Body, func(n ast.Node) bool { + lit, ok := n.(*ast.BasicLit) + if !ok || lit.Kind != token.STRING { + return true + } + raw := lit.Value + if unq, err := strconv.Unquote(raw); err == nil { + raw = unq + } + if strings.Contains(raw, "INSERT INTO activity_logs") { + found = true + return false + } + return true + }) + return found +} diff --git a/workspace-server/internal/handlers/mcp_test.go b/workspace-server/internal/handlers/mcp_test.go index 35acc95d..6ede0c2c 100644 --- a/workspace-server/internal/handlers/mcp_test.go +++ b/workspace-server/internal/handlers/mcp_test.go @@ -11,18 +11,21 @@ import ( "os" "testing" + "errors" + "github.com/DATA-DOG/go-sqlmock" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/gin-gonic/gin" ) // newMCPHandler is a test helper that constructs an MCPHandler backed by the -// sqlmock DB set up by setupTestDB. +// sqlmock DB set up by setupTestDB. Uses newTestBroadcaster so handlers +// that BroadcastOnly (send_message_to_user, etc.) don't nil-panic on the +// hub — events.NewBroadcaster(nil) crashes inside hub.Broadcast. func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) { t.Helper() mock := setupTestDB(t) - h := NewMCPHandler(db.DB, events.NewBroadcaster(nil)) + h := NewMCPHandler(db.DB, newTestBroadcaster()) return h, mock } @@ -628,6 +631,170 @@ func TestMCPHandler_SendMessageToUser_Blocked_WhenEnvNotSet(t *testing.T) { } } +// TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s pins the +// "best-effort persistence" contract: when the activity_log INSERT +// fails (DB hiccup, constraint violation, transient connection drop), +// the tool MUST still return success to the agent because the WS +// broadcast already succeeded — the user has seen the message. +// +// This matches /notify (activity.go) behavior. Returning an error +// here would cause the agent to retry and re-broadcast, double- +// rendering the message in the user's live chat panel for every +// retry until the DB recovers. +func TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s(t *testing.T) { + t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true") + h, mock := newMCPHandler(t) + + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-err"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + + // INSERT fails — must NOT abort the tool response. + mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`). + WillReturnError(errors.New("transient db error")) + + w := mcpPost(t, h, "ws-err", map[string]interface{}{ + "jsonrpc": "2.0", + "id": 100, + "method": "tools/call", + "params": map[string]interface{}{ + "name": "send_message_to_user", + "arguments": map[string]interface{}{ + "message": "should not be lost from the live chat", + }, + }, + }) + + var resp mcpResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response was not valid JSON-RPC: %v", err) + } + // Tool response is success — INSERT failure logged, broadcast + // already succeeded. + if resp.Error != nil { + t.Errorf("tool response should be success on DB error (broadcast won), got JSON-RPC error: %+v", resp.Error) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("expected DB calls in order: %v", err) + } +} + +// TestMCPHandler_SendMessageToUser_ResponseBodyShape pins the +// response_body JSON shape stored in activity_logs. This shape MUST +// match what the canvas hydrater (extractResponseText in +// historyHydration.ts) reads — specifically `{"result": ""}`. +// Any drift in the JSON shape silently breaks chat history without +// failing the INSERT. +// +// Caught the same drift class flagged in +// feedback_assert_exact_not_substring.md: a substring match on +// "result" would pass even if the field were renamed; we assert the +// exact JSON shape. +func TestMCPHandler_SendMessageToUser_ResponseBodyShape(t *testing.T) { + t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true") + h, mock := newMCPHandler(t) + + const userMessage = "Hi there from the agent" + + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-shape"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + + // Capture the response_body argument and assert its exact shape. + mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`). + WithArgs( + "ws-shape", + sqlmock.AnyArg(), // summary + // The response_body MUST be JSON `{"result": ""}`. + // Any other shape (e.g., wrapping in a Task object) breaks + // the canvas hydrater's `body.result` extractor. + `{"result":"`+userMessage+`"}`, + ). + WillReturnResult(sqlmock.NewResult(1, 1)) + + w := mcpPost(t, h, "ws-shape", map[string]interface{}{ + "jsonrpc": "2.0", + "id": 101, + "method": "tools/call", + "params": map[string]interface{}{ + "name": "send_message_to_user", + "arguments": map[string]interface{}{ + "message": userMessage, + }, + }, + }) + + if w.Code != 200 { + t.Fatalf("expected 200, got %d body=%s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("response_body shape drift — would silently break canvas chat history: %v", err) + } +} + +// TestMCPHandler_SendMessageToUser_PersistsToActivityLog pins the fix +// for the reno-stars / CEO Ryan PC chat-history data-loss bug: +// external claude-code agents using molecule-mcp's send_message_to_user +// tool route through THIS handler (not the HTTP /notify endpoint), +// and the handler used to broadcast WS only — visible live, gone on +// reload because nothing wrote to activity_logs. +// +// Pins: +// - INSERT happens on the success path (broadcast + DB write). +// - INSERT shape mirrors the HTTP /notify handler exactly: +// activity_type='a2a_receive', method='notify', request_body NULL, +// response_body={"result": message}, status='ok'. The canvas +// hydration query (`type=a2a_receive&source=canvas`) treats +// both writers as the same shape — drift here means the bug +// re-surfaces silently. +func TestMCPHandler_SendMessageToUser_PersistsToActivityLog(t *testing.T) { + t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true") + h, mock := newMCPHandler(t) + + // Workspace lookup — the handler verifies the workspace exists + // before it does anything else. Returning a name lets the + // broadcast payload populate; the test doesn't assert on the + // broadcast (no observable WS in this fake), only on the DB. + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-msg"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + + // The persistence INSERT — pin the exact shape so a future + // refactor that switches columns or drops `method='notify'` + // breaks the test loud, not silently. Match by regex on the + // table + activity_type + method literals. + mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`). + WithArgs( + "ws-msg", + sqlmock.AnyArg(), // summary "Agent message: ..." + sqlmock.AnyArg(), // response_body JSON + ). + WillReturnResult(sqlmock.NewResult(1, 1)) + + w := mcpPost(t, h, "ws-msg", map[string]interface{}{ + "jsonrpc": "2.0", + "id": 99, + "method": "tools/call", + "params": map[string]interface{}{ + "name": "send_message_to_user", + "arguments": map[string]interface{}{ + "message": "Hello, this should persist!", + }, + }, + }) + + var resp mcpResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response was not valid JSON-RPC: %v\nbody=%s", err, w.Body.String()) + } + if resp.Error != nil { + t.Errorf("unexpected JSON-RPC error: %+v", resp.Error) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("DB expectations not met (INSERT missing → reno-stars data-loss regression): %v", err) + } +} + // ───────────────────────────────────────────────────────────────────────────── // Parse error // ───────────────────────────────────────────────────────────────────────────── diff --git a/workspace-server/internal/handlers/mcp_tools.go b/workspace-server/internal/handlers/mcp_tools.go index c74555a3..961921de 100644 --- a/workspace-server/internal/handlers/mcp_tools.go +++ b/workspace-server/internal/handlers/mcp_tools.go @@ -344,6 +344,43 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri "name": wsName, }) + // Persist to activity_logs so chat history loaders surface this + // message after a page reload. Pre-fix (reno-stars 2026-05-05), + // the MCP-bridge variant of send_message_to_user broadcast WS + // only — visible live, gone on reload — while the HTTP /notify + // sibling already had this fix (activity.go:535). External + // claude-code agents using molecule-mcp's send_message_to_user + // tool route through THIS handler, not /notify, so they were + // hitting the unfixed path. + // + // Shape mirrors activity.go's Notify handler exactly so the + // canvas chat-history hydration treats both the same: + // - activity_type='a2a_receive' joins the source=canvas filter + // - source_id is omitted → defaults to NULL ("canvas-source") + // - method='notify' tags it as a push (vs a real A2A receive) + // - request_body=NULL so the loader doesn't draw a duplicate + // "user" bubble + // - response_body={"result": ""} feeds extractResponseText + // directly + // + // Errors are log-only — the broadcast already returned ok to the + // caller, the user has seen the message, and the persistence + // failure mode (DB hiccup) shouldn't block the tool call. The + // downside is the same as pre-fix: message vanishes on reload + // after a transient DB error. Log it so operators have a signal. + respPayload := map[string]interface{}{"result": message} + respJSON, _ := json.Marshal(respPayload) + preview := message + if len(preview) > 80 { + preview = preview[:80] + "…" + } + if _, err := h.database.ExecContext(ctx, ` + INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status) + VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok') + `, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil { + log.Printf("MCP send_message_to_user: failed to persist for %s: %v", workspaceID, err) + } + return "Message sent.", nil }