From 09972486e8ef3f044428c5b2112ea3c9fc3ddaad Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Sun, 26 Apr 2026 10:14:47 -0700 Subject: [PATCH] fix(platform/notify): persist agent send_message_to_user pushes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-fix, POST /workspaces/:id/notify (the side-channel agents use to push interim updates and follow-up results) only broadcast via WebSocket — no DB write. When the user refreshed the page, the chat-history loader (which queries activity_logs) couldn't restore those messages and they vanished from the chat. Hits the most common path: when the platform's POST /a2a times out (idle), the runtime keeps working and eventually pushes its reply via send_message_to_user. The reply rendered live but disappeared on reload. Fix: also INSERT an activity_logs row with shape the existing loader already understands (type=a2a_receive, source_id=NULL, response_body= {result: text}). Persistence is best-effort — a DB hiccup doesn't block the WebSocket push (which the user is already seeing). 🤖 Generated with [Claude Code](https://claude.com/claude-code) --- .../internal/handlers/activity.go | 31 +++++++ .../internal/handlers/activity_test.go | 80 +++++++++++++++++++ 2 files changed, 111 insertions(+) diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index a38603af..ba6d9f0f 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -286,6 +286,37 @@ func (h *ActivityHandler) Notify(c *gin.Context) { "name": wsName, }) + // Persist to activity_logs so the chat history loader restores this + // message after a page reload. Pre-fix, send_message_to_user pushes + // were broadcast-only — survived the WebSocket session but vanished + // when the user refreshed because nothing wrote them to the DB. + // + // Shape chosen to match the existing loader query + // (`type=a2a_receive&source=canvas`): + // - activity_type='a2a_receive' so it joins the same query path + // - source_id=NULL so the canvas-source filter accepts it + // - method='notify' to distinguish from real A2A receives in audits + // - request_body=NULL so the loader doesn't append a duplicate + // "user message" bubble for it + // - response_body={"result": ""} matches extractResponseText's + // simplest branch ({result: string} → take verbatim) + // + // Errors are logged-only — broadcast already succeeded, the user + // sees the message; persistence failure just means the message + // won't survive reload (pre-fix behavior). Don't fail the whole + // notify on a DB hiccup. + respJSON, _ := json.Marshal(map[string]interface{}{"result": body.Message}) + preview := body.Message + if len(preview) > 80 { + preview = preview[:80] + "…" + } + if _, err := db.DB.ExecContext(c.Request.Context(), ` + INSERT INTO activity_logs (workspace_id, activity_type, method, summary, response_body, status) + VALUES ($1, 'a2a_receive', 'notify', $2, $3::jsonb, 'ok') + `, workspaceID, "Agent message: "+preview, string(respJSON)); err != nil { + log.Printf("Notify: failed to persist message for %s: %v", workspaceID, err) + } + c.JSON(http.StatusOK, gin.H{"status": "sent"}) } diff --git a/workspace-server/internal/handlers/activity_test.go b/workspace-server/internal/handlers/activity_test.go index 1780be3b..9cba5873 100644 --- a/workspace-server/internal/handlers/activity_test.go +++ b/workspace-server/internal/handlers/activity_test.go @@ -217,6 +217,86 @@ func TestActivityReport_RejectsUnknownType(t *testing.T) { } } +func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) { + // Regression guard for the "responses gone on reload" bug. send_message_to_user + // pushes (which route through Notify) used to be broadcast-only — they + // rendered in the canvas but vanished on page reload because nothing + // wrote them to activity_logs. The chat history loader queries + // `type=a2a_receive&source=canvas`, so the persisted row must: + // - Use activity_type='a2a_receive' (loader's filter) + // - Have source_id NULL (canvas-source filter) + // - Carry the message text in response_body so extractResponseText + // can reconstruct the agent reply on reload + mockDB, mock, _ := sqlmock.New() + defer mockDB.Close() + db.DB = mockDB + + // Workspace existence check + mock.ExpectQuery(`SELECT name FROM workspaces`). + WithArgs("ws-notify"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD")) + + // Persistence INSERT — verify shape + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs( + "ws-notify", + sqlmock.AnyArg(), // summary + sqlmock.AnyArg(), // response_body JSON + ). + WillReturnResult(sqlmock.NewResult(1, 1)) + + broadcaster := newTestBroadcaster() + handler := NewActivityHandler(broadcaster) + + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-notify"}} + body := `{"message":"agent reply that arrived after the sync POST timed out"}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-notify/notify", strings.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.Notify(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("DB expectations not met: %v", err) + } +} + +func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) { + // Persistence is best-effort — a DB hiccup must NOT block the + // WebSocket push (which the user is already seeing in their open + // canvas). Pre-fix the WS push always succeeded; we don't want + // the new persistence step to regress that path. + mockDB, mock, _ := sqlmock.New() + defer mockDB.Close() + db.DB = mockDB + + mock.ExpectQuery(`SELECT name FROM workspaces`). + WithArgs("ws-x"). + WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD")) + mock.ExpectExec(`INSERT INTO activity_logs`). + WillReturnError(fmt.Errorf("simulated db hiccup")) + + broadcaster := newTestBroadcaster() + handler := NewActivityHandler(broadcaster) + + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-x"}} + body := `{"message":"hi"}` + c.Request = httptest.NewRequest("POST", "/workspaces/ws-x/notify", strings.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + handler.Notify(c) + + if w.Code != http.StatusOK { + t.Errorf("DB failure must not break the response; got %d", w.Code) + } +} + // ==================== Direct unit tests for SessionSearch helpers ==================== // --- parseSessionSearchParams ---