fix(platform/notify): persist agent send_message_to_user pushes

Pre-fix, POST /workspaces/:id/notify (the side-channel agents use to push
interim updates and follow-up results) only broadcast via WebSocket — no
DB write. When the user refreshed the page, the chat-history loader
(which queries activity_logs) couldn't restore those messages and they
vanished from the chat.

Hits the most common path: when the platform's POST /a2a times out (idle),
the runtime keeps working and eventually pushes its reply via
send_message_to_user. The reply rendered live but disappeared on reload.

Fix: also INSERT an activity_logs row with shape the existing loader
already understands (type=a2a_receive, source_id=NULL, response_body=
{result: text}). Persistence is best-effort — a DB hiccup doesn't block
the WebSocket push (which the user is already seeing).

🤖 Generated with [Claude Code](https://claude.com/claude-code)
This commit is contained in:
Hongming Wang 2026-04-26 10:14:47 -07:00
parent 7ed50824b6
commit 09972486e8
2 changed files with 111 additions and 0 deletions

View File

@ -286,6 +286,37 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
"name": wsName,
})
// Persist to activity_logs so the chat history loader restores this
// message after a page reload. Pre-fix, send_message_to_user pushes
// were broadcast-only — survived the WebSocket session but vanished
// when the user refreshed because nothing wrote them to the DB.
//
// Shape chosen to match the existing loader query
// (`type=a2a_receive&source=canvas`):
// - activity_type='a2a_receive' so it joins the same query path
// - source_id=NULL so the canvas-source filter accepts it
// - method='notify' to distinguish from real A2A receives in audits
// - request_body=NULL so the loader doesn't append a duplicate
// "user message" bubble for it
// - response_body={"result": "<text>"} matches extractResponseText's
// simplest branch ({result: string} → take verbatim)
//
// Errors are logged-only — broadcast already succeeded, the user
// sees the message; persistence failure just means the message
// won't survive reload (pre-fix behavior). Don't fail the whole
// notify on a DB hiccup.
respJSON, _ := json.Marshal(map[string]interface{}{"result": body.Message})
preview := body.Message
if len(preview) > 80 {
preview = preview[:80] + "…"
}
if _, err := db.DB.ExecContext(c.Request.Context(), `
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status)
VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok')
`, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil {
log.Printf("Notify: failed to persist message for %s: %v", workspaceID, err)
}
c.JSON(http.StatusOK, gin.H{"status": "sent"})
}

View File

@ -217,6 +217,86 @@ func TestActivityReport_RejectsUnknownType(t *testing.T) {
}
}
func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) {
// Regression guard for the "responses gone on reload" bug. send_message_to_user
// pushes (which route through Notify) used to be broadcast-only — they
// rendered in the canvas but vanished on page reload because nothing
// wrote them to activity_logs. The chat history loader queries
// `type=a2a_receive&source=canvas`, so the persisted row must:
// - Use activity_type='a2a_receive' (loader's filter)
// - Have source_id NULL (canvas-source filter)
// - Carry the message text in response_body so extractResponseText
// can reconstruct the agent reply on reload
mockDB, mock, _ := sqlmock.New()
defer mockDB.Close()
db.DB = mockDB
// Workspace existence check
mock.ExpectQuery(`SELECT name FROM workspaces`).
WithArgs("ws-notify").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
// Persistence INSERT — verify shape
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
"ws-notify",
sqlmock.AnyArg(), // summary
sqlmock.AnyArg(), // response_body JSON
).
WillReturnResult(sqlmock.NewResult(1, 1))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-notify"}}
body := `{"message":"agent reply that arrived after the sync POST timed out"}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-notify/notify", strings.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Notify(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("DB expectations not met: %v", err)
}
}
func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) {
// Persistence is best-effort — a DB hiccup must NOT block the
// WebSocket push (which the user is already seeing in their open
// canvas). Pre-fix the WS push always succeeded; we don't want
// the new persistence step to regress that path.
mockDB, mock, _ := sqlmock.New()
defer mockDB.Close()
db.DB = mockDB
mock.ExpectQuery(`SELECT name FROM workspaces`).
WithArgs("ws-x").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnError(fmt.Errorf("simulated db hiccup"))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-x"}}
body := `{"message":"hi"}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-x/notify", strings.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Notify(c)
if w.Code != http.StatusOK {
t.Errorf("DB failure must not break the response; got %d", w.Code)
}
}
// ==================== Direct unit tests for SessionSearch helpers ====================
// --- parseSessionSearchParams ---