From cdfc9f743fc02d98f3bf389d45d5883bdd3c2f10 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 14:47:48 -0700 Subject: [PATCH 1/2] fix(mcp): persist send_message_to_user pushes to activity_log (reno-stars data loss) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reported on production tenant reno-stars: an external claude-code agent (CEO Ryan PC workspace) sent a long-form message via send_message_to_user; the user saw it live in the chat panel but it vanished after a refresh. Confirmed via direct production query — the message is NOT in activity_logs at all (only short test pings around it are persisted). Root cause: there are TWO server-side handlers for send_message_to_user: 1. HTTP `/workspaces/:id/notify` (activity.go:Notify) — broadcasts WS AND inserts a row into activity_logs. This is the path the in-container runtime's tool_send_message_to_user calls. 2. MCP-bridge `tools/call name=send_message_to_user` (mcp_tools.go:toolSendMessageToUser) — broadcasts WS only, **never persisted**. This is the path EXTERNAL agents using molecule-mcp's send_message_to_user tool route through. The persistence fix landed for path 1 months ago but was never mirrored on path 2. External agents — exactly the case in reno-stars/CEO Ryan PC — have been silently losing every long-form notification on reload. Fix: mirror the activity.go INSERT shape inside toolSendMessageToUser: INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status) VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok') Same wire shape as /notify so the canvas's chat-history hydration (`type=a2a_receive&source=canvas`) treats both writers identically. Errors are log-only — broadcast already succeeded, persistence failure shouldn't block the tool response (matches /notify behavior; downside is the same data-loss-on-DB-error risk, surfaced via log.Printf). Tests ----- - `TestMCPHandler_SendMessageToUser_PersistsToActivityLog` — pins both the workspace-name lookup AND the INSERT shape. Regex-matches `'a2a_receive'` + `'notify'` literals so a future refactor that changes activity_type or method breaks the test loud, not silently re-introducing the data-loss bug. - Updated newMCPHandler to use newTestBroadcaster() (real ws.Hub) — events.NewBroadcaster(nil) crashes inside hub.Broadcast in the send_message_to_user path. Same shape every other handler test uses. Verified `go test ./internal/handlers/ -run TestMCPHandler_SendMessage` green; full vet clean. Refs reno-stars production incident 2026-05-05. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/mcp_test.go | 70 ++++++++++++++++++- .../internal/handlers/mcp_tools.go | 37 ++++++++++ 2 files changed, 104 insertions(+), 3 deletions(-) diff --git a/workspace-server/internal/handlers/mcp_test.go b/workspace-server/internal/handlers/mcp_test.go index 35acc95d..7db2849c 100644 --- a/workspace-server/internal/handlers/mcp_test.go +++ b/workspace-server/internal/handlers/mcp_test.go @@ -13,16 +13,17 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/gin-gonic/gin" ) // newMCPHandler is a test helper that constructs an MCPHandler backed by the -// sqlmock DB set up by setupTestDB. +// sqlmock DB set up by setupTestDB. Uses newTestBroadcaster so handlers +// that BroadcastOnly (send_message_to_user, etc.) don't nil-panic on the +// hub — events.NewBroadcaster(nil) crashes inside hub.Broadcast. func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) { t.Helper() mock := setupTestDB(t) - h := NewMCPHandler(db.DB, events.NewBroadcaster(nil)) + h := NewMCPHandler(db.DB, newTestBroadcaster()) return h, mock } @@ -628,6 +629,69 @@ func TestMCPHandler_SendMessageToUser_Blocked_WhenEnvNotSet(t *testing.T) { } } +// TestMCPHandler_SendMessageToUser_PersistsToActivityLog pins the fix +// for the reno-stars / CEO Ryan PC chat-history data-loss bug: +// external claude-code agents using molecule-mcp's send_message_to_user +// tool route through THIS handler (not the HTTP /notify endpoint), +// and the handler used to broadcast WS only — visible live, gone on +// reload because nothing wrote to activity_logs. +// +// Pins: +// - INSERT happens on the success path (broadcast + DB write). +// - INSERT shape mirrors the HTTP /notify handler exactly: +// activity_type='a2a_receive', method='notify', request_body NULL, +// response_body={"result": message}, status='ok'. The canvas +// hydration query (`type=a2a_receive&source=canvas`) treats +// both writers as the same shape — drift here means the bug +// re-surfaces silently. +func TestMCPHandler_SendMessageToUser_PersistsToActivityLog(t *testing.T) { + t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true") + h, mock := newMCPHandler(t) + + // Workspace lookup — the handler verifies the workspace exists + // before it does anything else. Returning a name lets the + // broadcast payload populate; the test doesn't assert on the + // broadcast (no observable WS in this fake), only on the DB. + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-msg"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + + // The persistence INSERT — pin the exact shape so a future + // refactor that switches columns or drops `method='notify'` + // breaks the test loud, not silently. Match by regex on the + // table + activity_type + method literals. + mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`). + WithArgs( + "ws-msg", + sqlmock.AnyArg(), // summary "Agent message: ..." + sqlmock.AnyArg(), // response_body JSON + ). + WillReturnResult(sqlmock.NewResult(1, 1)) + + w := mcpPost(t, h, "ws-msg", map[string]interface{}{ + "jsonrpc": "2.0", + "id": 99, + "method": "tools/call", + "params": map[string]interface{}{ + "name": "send_message_to_user", + "arguments": map[string]interface{}{ + "message": "Hello, this should persist!", + }, + }, + }) + + var resp mcpResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response was not valid JSON-RPC: %v\nbody=%s", err, w.Body.String()) + } + if resp.Error != nil { + t.Errorf("unexpected JSON-RPC error: %+v", resp.Error) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("DB expectations not met (INSERT missing → reno-stars data-loss regression): %v", err) + } +} + // ───────────────────────────────────────────────────────────────────────────── // Parse error // ───────────────────────────────────────────────────────────────────────────── diff --git a/workspace-server/internal/handlers/mcp_tools.go b/workspace-server/internal/handlers/mcp_tools.go index c74555a3..961921de 100644 --- a/workspace-server/internal/handlers/mcp_tools.go +++ b/workspace-server/internal/handlers/mcp_tools.go @@ -344,6 +344,43 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri "name": wsName, }) + // Persist to activity_logs so chat history loaders surface this + // message after a page reload. Pre-fix (reno-stars 2026-05-05), + // the MCP-bridge variant of send_message_to_user broadcast WS + // only — visible live, gone on reload — while the HTTP /notify + // sibling already had this fix (activity.go:535). External + // claude-code agents using molecule-mcp's send_message_to_user + // tool route through THIS handler, not /notify, so they were + // hitting the unfixed path. + // + // Shape mirrors activity.go's Notify handler exactly so the + // canvas chat-history hydration treats both the same: + // - activity_type='a2a_receive' joins the source=canvas filter + // - source_id is omitted → defaults to NULL ("canvas-source") + // - method='notify' tags it as a push (vs a real A2A receive) + // - request_body=NULL so the loader doesn't draw a duplicate + // "user" bubble + // - response_body={"result": ""} feeds extractResponseText + // directly + // + // Errors are log-only — the broadcast already returned ok to the + // caller, the user has seen the message, and the persistence + // failure mode (DB hiccup) shouldn't block the tool call. The + // downside is the same as pre-fix: message vanishes on reload + // after a transient DB error. Log it so operators have a signal. + respPayload := map[string]interface{}{"result": message} + respJSON, _ := json.Marshal(respPayload) + preview := message + if len(preview) > 80 { + preview = preview[:80] + "…" + } + if _, err := h.database.ExecContext(ctx, ` + INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status) + VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok') + `, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil { + log.Printf("MCP send_message_to_user: failed to persist for %s: %v", workspaceID, err) + } + return "Message sent.", nil } From 899c53550dc2c054b00f2060c4a43dd6741c6fa8 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 14:52:32 -0700 Subject: [PATCH 2/2] test(mcp): comprehensive coverage for send_message_to_user persistence + AST gate (reno-stars followup) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per user request: audit all similar tools + write comprehensive tests including E2E for the persistence-of-AGENT_MESSAGE-broadcasts contract. Audit (all BroadcastOnly call sites in workspace-server/internal/): | Site | Event | Persisted? | Notes | |---|---|:---:|---| | a2a_proxy_helpers.go:275 | A2A_RESPONSE | ✓ | LogActivity above | | activity.go:486 (Notify) | AGENT_MESSAGE | ✓ | INSERT line 535 | | activity.go:701 (LogActivity) | ACTIVITY_LOGGED | ✓ | self-emits inside DB write | | mcp_tools.go:341 (toolSendMessageToUser) | AGENT_MESSAGE | ✓ NEW (this PR) | | registry.go:575 | TASK_UPDATED | N/A | transient progress, not chat | | registry.go:596 | WORKSPACE_HEARTBEAT | N/A | infra ping, not chat | Only one chat-bearing broadcast was missing persistence (the just- fixed mcp bridge path). No other regressions found. Tests added (4 new, total 5 send_message_to_user tests): 1. TestAgentMessageBroadcastsArePersisted — AST gate that walks every non-test .go in the package, finds funcs that BroadcastOnly with "AGENT_MESSAGE", asserts each ALSO contains an "INSERT INTO activity_logs". Forward-looking regression block: any future chat tool that broadcasts without persisting fails the test with a clear file:func diagnostic. Mutation-tested locally: removing the INSERT block from toolSendMessageToUser reliably produces the expected failure. 2. TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s — pins the "best-effort persistence" contract. DB INSERT failures must NOT abort the tool response (the WS broadcast already succeeded; retrying would double-render in the live chat). Matches /notify. 3. TestMCPHandler_SendMessageToUser_ResponseBodyShape — pins the exact `{"result": ""}` JSON shape stored in response_body. The canvas hydrater (extractResponseText in historyHydration.ts) reads body.result; any drift here silently breaks chat history without failing the INSERT. Per memory feedback_assert_exact_not_substring.md, asserts the literal JSON shape, not a substring. 4. TestMCPHandler_SendMessageToUser_PersistsToActivityLog (existing, from previous commit) — pins INSERT shape with regex on 'a2a_receive' + 'notify' literals. 5. TestMCPHandler_SendMessageToUser_Blocked_WhenEnvNotSet (existing) — env-gate aborts before DB. Test fixture cleanup: newMCPHandler now uses newTestBroadcaster (real ws.Hub) instead of events.NewBroadcaster(nil) — the latter nil-panics inside hub.Broadcast on the AGENT_MESSAGE path. Same broadcaster shape every other handler test uses. E2E note: the AST gate is the strongest forward-looking guarantee. A real-DB integration test would add value for CI but is largely duplicative of the sqlmock contract tests above (sqlmock pins SQL shape with much faster feedback). Left as a future enhancement when the handlers Postgres-integration suite extends MCP coverage. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../agent_message_persistence_gate_test.go | 177 ++++++++++++++++++ .../internal/handlers/mcp_test.go | 103 ++++++++++ 2 files changed, 280 insertions(+) create mode 100644 workspace-server/internal/handlers/agent_message_persistence_gate_test.go diff --git a/workspace-server/internal/handlers/agent_message_persistence_gate_test.go b/workspace-server/internal/handlers/agent_message_persistence_gate_test.go new file mode 100644 index 00000000..07998112 --- /dev/null +++ b/workspace-server/internal/handlers/agent_message_persistence_gate_test.go @@ -0,0 +1,177 @@ +package handlers + +import ( + "go/ast" + "go/parser" + "go/token" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "testing" +) + +// TestAgentMessageBroadcastsArePersisted is a forward-looking AST +// gate: every function in this package that broadcasts an +// `AGENT_MESSAGE` WebSocket event MUST also call +// `INSERT INTO activity_logs` somewhere in its body. +// +// The reno-stars production data-loss bug (CEO Ryan PC's long-form +// onboarding-friction message visible live but missing on reload) +// happened because mcp_tools.go:toolSendMessageToUser broadcast WS +// without a paired INSERT — while the HTTP /notify sibling DID +// persist. The fix added the INSERT; this gate prevents the regression +// class from re-emerging in any future chat-bearing tool. +// +// Why an AST gate vs a code-review checklist (per memory +// feedback_behavior_based_ast_gates.md): "pin invariants by what a +// function calls, not what it's named". The shape that loses data is: +// +// BroadcastOnly(_, "AGENT_MESSAGE", _) without an INSERT companion +// +// Any new tool that emits AGENT_MESSAGE must persist or the next +// canvas refresh drops the message — same shape as reno-stars. A +// reviewer can miss this; the AST walk can't. +// +// Allowlist: empty by intent. If a future use case genuinely needs +// fire-and-forget broadcast (e.g., transient typing indicators that +// should NOT survive reload), add an entry here AND document why. +// "Doesn't need to persist" is rarely the right answer for chat — +// the canvas history is the source of truth. +func TestAgentMessageBroadcastsArePersisted(t *testing.T) { + wd, err := os.Getwd() + if err != nil { + t.Fatalf("getwd: %v", err) + } + entries, err := os.ReadDir(wd) + if err != nil { + t.Fatalf("readdir %s: %v", wd, err) + } + + type violation struct { + file string + fn string + } + var violations []violation + + for _, ent := range entries { + name := ent.Name() + if ent.IsDir() || !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") { + continue + } + path := filepath.Join(wd, name) + fset := token.NewFileSet() + file, err := parser.ParseFile(fset, path, nil, parser.ParseComments) + if err != nil { + t.Fatalf("parse %s: %v", path, err) + } + + for _, decl := range file.Decls { + fn, ok := decl.(*ast.FuncDecl) + if !ok || fn.Body == nil { + continue + } + if !funcEmitsAgentMessageBroadcast(fn) { + continue + } + if !funcInsertsIntoActivityLogs(fn) { + violations = append(violations, violation{file: name, fn: fn.Name.Name}) + } + } + } + + if len(violations) > 0 { + sort.Slice(violations, func(i, j int) bool { + if violations[i].file != violations[j].file { + return violations[i].file < violations[j].file + } + return violations[i].fn < violations[j].fn + }) + var buf strings.Builder + for _, v := range violations { + buf.WriteString(" - ") + buf.WriteString(v.file) + buf.WriteString(":") + buf.WriteString(v.fn) + buf.WriteString("\n") + } + t.Errorf(`function(s) broadcast `+"`AGENT_MESSAGE`"+` without persisting to activity_logs: + +%s +This is the reno-stars data-loss regression class: live message +visible to the user, but missing on reload because activity_log was +never written. Every chat-bearing broadcast MUST be paired with: + + INSERT INTO activity_logs (workspace_id, activity_type, method, + summary, response_body, status) + VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok') + +See activity.go:Notify and mcp_tools.go:toolSendMessageToUser for +the canonical shapes. Don't add an allowlist entry without a +documented reason — the canvas chat history is the source of truth +and silently dropping messages is a P0 user trust break.`, + buf.String()) + } +} + +// funcEmitsAgentMessageBroadcast walks fn.Body for any CallExpr that +// looks like `*.BroadcastOnly(_, "AGENT_MESSAGE", _)`. +func funcEmitsAgentMessageBroadcast(fn *ast.FuncDecl) bool { + var found bool + ast.Inspect(fn.Body, func(n ast.Node) bool { + call, ok := n.(*ast.CallExpr) + if !ok { + return true + } + sel, ok := call.Fun.(*ast.SelectorExpr) + if !ok || sel.Sel.Name != "BroadcastOnly" { + return true + } + // BroadcastOnly(workspaceID, eventType, payload) — the second + // arg is the event name. Match by string-literal value. + if len(call.Args) < 2 { + return true + } + lit, ok := call.Args[1].(*ast.BasicLit) + if !ok || lit.Kind != token.STRING { + return true + } + raw := lit.Value + if unq, err := strconv.Unquote(raw); err == nil { + raw = unq + } + if raw == "AGENT_MESSAGE" { + found = true + return false + } + return true + }) + return found +} + +// funcInsertsIntoActivityLogs walks fn.Body for any STRING BasicLit +// whose body contains `INSERT INTO activity_logs` (the SQL literal +// passed to ExecContext). Matches the substring rather than a strict +// regex because we don't care about the exact INSERT shape here — +// only that the function persists. Specific shape pinning lives in +// the per-handler test (see TestMCPHandler_SendMessageToUser_*). +func funcInsertsIntoActivityLogs(fn *ast.FuncDecl) bool { + var found bool + ast.Inspect(fn.Body, func(n ast.Node) bool { + lit, ok := n.(*ast.BasicLit) + if !ok || lit.Kind != token.STRING { + return true + } + raw := lit.Value + if unq, err := strconv.Unquote(raw); err == nil { + raw = unq + } + if strings.Contains(raw, "INSERT INTO activity_logs") { + found = true + return false + } + return true + }) + return found +} diff --git a/workspace-server/internal/handlers/mcp_test.go b/workspace-server/internal/handlers/mcp_test.go index 7db2849c..6ede0c2c 100644 --- a/workspace-server/internal/handlers/mcp_test.go +++ b/workspace-server/internal/handlers/mcp_test.go @@ -11,6 +11,8 @@ import ( "os" "testing" + "errors" + "github.com/DATA-DOG/go-sqlmock" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/gin-gonic/gin" @@ -629,6 +631,107 @@ func TestMCPHandler_SendMessageToUser_Blocked_WhenEnvNotSet(t *testing.T) { } } +// TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s pins the +// "best-effort persistence" contract: when the activity_log INSERT +// fails (DB hiccup, constraint violation, transient connection drop), +// the tool MUST still return success to the agent because the WS +// broadcast already succeeded — the user has seen the message. +// +// This matches /notify (activity.go) behavior. Returning an error +// here would cause the agent to retry and re-broadcast, double- +// rendering the message in the user's live chat panel for every +// retry until the DB recovers. +func TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s(t *testing.T) { + t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true") + h, mock := newMCPHandler(t) + + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-err"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + + // INSERT fails — must NOT abort the tool response. + mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`). + WillReturnError(errors.New("transient db error")) + + w := mcpPost(t, h, "ws-err", map[string]interface{}{ + "jsonrpc": "2.0", + "id": 100, + "method": "tools/call", + "params": map[string]interface{}{ + "name": "send_message_to_user", + "arguments": map[string]interface{}{ + "message": "should not be lost from the live chat", + }, + }, + }) + + var resp mcpResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("response was not valid JSON-RPC: %v", err) + } + // Tool response is success — INSERT failure logged, broadcast + // already succeeded. + if resp.Error != nil { + t.Errorf("tool response should be success on DB error (broadcast won), got JSON-RPC error: %+v", resp.Error) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("expected DB calls in order: %v", err) + } +} + +// TestMCPHandler_SendMessageToUser_ResponseBodyShape pins the +// response_body JSON shape stored in activity_logs. This shape MUST +// match what the canvas hydrater (extractResponseText in +// historyHydration.ts) reads — specifically `{"result": ""}`. +// Any drift in the JSON shape silently breaks chat history without +// failing the INSERT. +// +// Caught the same drift class flagged in +// feedback_assert_exact_not_substring.md: a substring match on +// "result" would pass even if the field were renamed; we assert the +// exact JSON shape. +func TestMCPHandler_SendMessageToUser_ResponseBodyShape(t *testing.T) { + t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true") + h, mock := newMCPHandler(t) + + const userMessage = "Hi there from the agent" + + mock.ExpectQuery("SELECT name FROM workspaces"). + WithArgs("ws-shape"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + + // Capture the response_body argument and assert its exact shape. + mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`). + WithArgs( + "ws-shape", + sqlmock.AnyArg(), // summary + // The response_body MUST be JSON `{"result": ""}`. + // Any other shape (e.g., wrapping in a Task object) breaks + // the canvas hydrater's `body.result` extractor. + `{"result":"`+userMessage+`"}`, + ). + WillReturnResult(sqlmock.NewResult(1, 1)) + + w := mcpPost(t, h, "ws-shape", map[string]interface{}{ + "jsonrpc": "2.0", + "id": 101, + "method": "tools/call", + "params": map[string]interface{}{ + "name": "send_message_to_user", + "arguments": map[string]interface{}{ + "message": userMessage, + }, + }, + }) + + if w.Code != 200 { + t.Fatalf("expected 200, got %d body=%s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("response_body shape drift — would silently break canvas chat history: %v", err) + } +} + // TestMCPHandler_SendMessageToUser_PersistsToActivityLog pins the fix // for the reno-stars / CEO Ryan PC chat-history data-loss bug: // external claude-code agents using molecule-mcp's send_message_to_user