fix(mcp): persist send_message_to_user pushes to activity_log (reno-stars data loss)

Reported on production tenant reno-stars: an external claude-code agent
(CEO Ryan PC workspace) sent a long-form message via send_message_to_user;
the user saw it live in the chat panel but it vanished after a refresh.
Confirmed via direct production query — the message is NOT in
activity_logs at all (only short test pings around it are persisted).

Root cause: there are TWO server-side handlers for send_message_to_user:

  1. HTTP `/workspaces/:id/notify` (activity.go:Notify) — broadcasts WS
     AND inserts a row into activity_logs. This is the path the
     in-container runtime's tool_send_message_to_user calls.

  2. MCP-bridge `tools/call name=send_message_to_user`
     (mcp_tools.go:toolSendMessageToUser) — broadcasts WS only,
     **never persisted**. This is the path EXTERNAL agents using
     molecule-mcp's send_message_to_user tool route through.

The persistence fix landed for path 1 months ago but was never mirrored
on path 2. External agents — exactly the case in reno-stars/CEO Ryan PC
— have been silently losing every long-form notification on reload.

Fix: mirror the activity.go INSERT shape inside toolSendMessageToUser:

  INSERT INTO activity_logs
    (workspace_id, activity_type, method, summary, response_body, status)
  VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')

Same wire shape as /notify so the canvas's chat-history hydration
(`type=a2a_receive&source=canvas`) treats both writers identically.
Errors are log-only — broadcast already succeeded, persistence failure
shouldn't block the tool response (matches /notify behavior; downside
is the same data-loss-on-DB-error risk, surfaced via log.Printf).

Tests
-----

- `TestMCPHandler_SendMessageToUser_PersistsToActivityLog` — pins both
  the workspace-name lookup AND the INSERT shape. Regex-matches
  `'a2a_receive'` + `'notify'` literals so a future refactor that
  changes activity_type or method breaks the test loud, not silently
  re-introducing the data-loss bug.
- Updated newMCPHandler to use newTestBroadcaster() (real ws.Hub) —
  events.NewBroadcaster(nil) crashes inside hub.Broadcast in the
  send_message_to_user path. Same shape every other handler test uses.

Verified `go test ./internal/handlers/ -run TestMCPHandler_SendMessage`
green; full vet clean.

Refs reno-stars production incident 2026-05-05.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-05-05 14:47:48 -07:00
parent 1ad107cc15
commit cdfc9f743f
2 changed files with 104 additions and 3 deletions

View File

@ -13,16 +13,17 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/gin-gonic/gin"
)
// newMCPHandler is a test helper that constructs an MCPHandler backed by the
// sqlmock DB set up by setupTestDB.
// sqlmock DB set up by setupTestDB. Uses newTestBroadcaster so handlers
// that BroadcastOnly (send_message_to_user, etc.) don't nil-panic on the
// hub — events.NewBroadcaster(nil) crashes inside hub.Broadcast.
func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) {
t.Helper()
mock := setupTestDB(t)
h := NewMCPHandler(db.DB, events.NewBroadcaster(nil))
h := NewMCPHandler(db.DB, newTestBroadcaster())
return h, mock
}
@ -628,6 +629,69 @@ func TestMCPHandler_SendMessageToUser_Blocked_WhenEnvNotSet(t *testing.T) {
}
}
// TestMCPHandler_SendMessageToUser_PersistsToActivityLog pins the fix
// for the reno-stars / CEO Ryan PC chat-history data-loss bug:
// external claude-code agents using molecule-mcp's send_message_to_user
// tool route through THIS handler (not the HTTP /notify endpoint),
// and the handler used to broadcast WS only — visible live, gone on
// reload because nothing wrote to activity_logs.
//
// Pins:
// - INSERT happens on the success path (broadcast + DB write).
// - INSERT shape mirrors the HTTP /notify handler exactly:
// activity_type='a2a_receive', method='notify', request_body NULL,
// response_body={"result": message}, status='ok'. The canvas
// hydration query (`type=a2a_receive&source=canvas`) treats
// both writers as the same shape — drift here means the bug
// re-surfaces silently.
func TestMCPHandler_SendMessageToUser_PersistsToActivityLog(t *testing.T) {
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
h, mock := newMCPHandler(t)
// Workspace lookup — the handler verifies the workspace exists
// before it does anything else. Returning a name lets the
// broadcast payload populate; the test doesn't assert on the
// broadcast (no observable WS in this fake), only on the DB.
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-msg").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
// The persistence INSERT — pin the exact shape so a future
// refactor that switches columns or drops `method='notify'`
// breaks the test loud, not silently. Match by regex on the
// table + activity_type + method literals.
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
WithArgs(
"ws-msg",
sqlmock.AnyArg(), // summary "Agent message: ..."
sqlmock.AnyArg(), // response_body JSON
).
WillReturnResult(sqlmock.NewResult(1, 1))
w := mcpPost(t, h, "ws-msg", map[string]interface{}{
"jsonrpc": "2.0",
"id": 99,
"method": "tools/call",
"params": map[string]interface{}{
"name": "send_message_to_user",
"arguments": map[string]interface{}{
"message": "Hello, this should persist!",
},
},
})
var resp mcpResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("response was not valid JSON-RPC: %v\nbody=%s", err, w.Body.String())
}
if resp.Error != nil {
t.Errorf("unexpected JSON-RPC error: %+v", resp.Error)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("DB expectations not met (INSERT missing → reno-stars data-loss regression): %v", err)
}
}
// ─────────────────────────────────────────────────────────────────────────────
// Parse error
// ─────────────────────────────────────────────────────────────────────────────

View File

@ -344,6 +344,43 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
"name": wsName,
})
// Persist to activity_logs so chat history loaders surface this
// message after a page reload. Pre-fix (reno-stars 2026-05-05),
// the MCP-bridge variant of send_message_to_user broadcast WS
// only — visible live, gone on reload — while the HTTP /notify
// sibling already had this fix (activity.go:535). External
// claude-code agents using molecule-mcp's send_message_to_user
// tool route through THIS handler, not /notify, so they were
// hitting the unfixed path.
//
// Shape mirrors activity.go's Notify handler exactly so the
// canvas chat-history hydration treats both the same:
// - activity_type='a2a_receive' joins the source=canvas filter
// - source_id is omitted → defaults to NULL ("canvas-source")
// - method='notify' tags it as a push (vs a real A2A receive)
// - request_body=NULL so the loader doesn't draw a duplicate
// "user" bubble
// - response_body={"result": "<text>"} feeds extractResponseText
// directly
//
// Errors are log-only — the broadcast already returned ok to the
// caller, the user has seen the message, and the persistence
// failure mode (DB hiccup) shouldn't block the tool call. The
// downside is the same as pre-fix: message vanishes on reload
// after a transient DB error. Log it so operators have a signal.
respPayload := map[string]interface{}{"result": message}
respJSON, _ := json.Marshal(respPayload)
preview := message
if len(preview) > 80 {
preview = preview[:80] + "…"
}
if _, err := h.database.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
log.Printf("MCP send_message_to_user: failed to persist for %s: %v", workspaceID, err)
}
return "Message sent.", nil
}