From b57de4174ec92e646edb6ddf2b9105c63b7a6260 Mon Sep 17 00:00:00 2001 From: core-be Date: Thu, 14 May 2026 13:40:22 -0700 Subject: [PATCH 1/5] feat(workspace-server): push notifications for agent messages Adds Expo Push Service integration so mobile devices receive background notifications when an agent sends a message to the user. - New push_tokens table with workspace-scoped device tokens - internal/push package: Repo (DB), Sender (Expo API client), Notifier (fire-and-forget delivery), Handler (HTTP register/unregister) - AgentMessageWriter.Send() now triggers push delivery after WS broadcast - New endpoints: POST /workspaces/:id/push-tokens, DELETE /push-tokens - Token invalidation: auto-removes tokens when Expo returns DeviceNotRegistered - Configured via EXPO_ACCESS_TOKEN env var (optional; push disabled when absent) All existing tests updated to pass nil notifier where required. --- .../internal/handlers/activity.go | 8 +- .../handlers/activity_since_id_test.go | 8 +- .../handlers/activity_since_secs_test.go | 8 +- .../internal/handlers/activity_test.go | 34 ++-- .../internal/handlers/agent_message_writer.go | 16 +- .../handlers/agent_message_writer_test.go | 18 +- .../internal/handlers/handlers_test.go | 24 +-- workspace-server/internal/handlers/mcp.go | 7 +- .../internal/handlers/mcp_test.go | 2 +- .../internal/handlers/mcp_tools.go | 2 +- .../internal/memory/e2e/swap_test.go | 4 +- workspace-server/internal/push/handler.go | 75 +++++++++ workspace-server/internal/push/notifier.go | 101 +++++++++++ workspace-server/internal/push/push_test.go | 159 ++++++++++++++++++ workspace-server/internal/push/repo.go | 76 +++++++++ workspace-server/internal/push/sender.go | 104 ++++++++++++ workspace-server/internal/router/router.go | 17 +- .../20260514000000_push_tokens.down.sql | 1 + .../20260514000000_push_tokens.up.sql | 11 ++ 19 files changed, 614 insertions(+), 61 deletions(-) create mode 100644 workspace-server/internal/push/handler.go create mode 100644 workspace-server/internal/push/notifier.go create mode 100644 workspace-server/internal/push/push_test.go create mode 100644 workspace-server/internal/push/repo.go create mode 100644 workspace-server/internal/push/sender.go create mode 100644 workspace-server/migrations/20260514000000_push_tokens.down.sql create mode 100644 workspace-server/migrations/20260514000000_push_tokens.up.sql diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index 99b8bd1c..0e446a75 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -14,16 +14,18 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/push" "github.com/gin-gonic/gin" "github.com/google/uuid" ) type ActivityHandler struct { broadcaster *events.Broadcaster + notifier *push.Notifier } -func NewActivityHandler(b *events.Broadcaster) *ActivityHandler { - return &ActivityHandler{broadcaster: b} +func NewActivityHandler(b *events.Broadcaster, notifier *push.Notifier) *ActivityHandler { + return &ActivityHandler{broadcaster: b, notifier: notifier} } // List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id= @@ -476,7 +478,7 @@ func (h *ActivityHandler) Notify(c *gin.Context) { for _, a := range body.Attachments { attachments = append(attachments, AgentMessageAttachment(a)) } - writer := NewAgentMessageWriter(db.DB, h.broadcaster) + writer := NewAgentMessageWriter(db.DB, h.broadcaster, h.notifier) if err := writer.Send(c.Request.Context(), workspaceID, body.Message, attachments); err != nil { if errors.Is(err, ErrWorkspaceNotFound) { c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"}) diff --git a/workspace-server/internal/handlers/activity_since_id_test.go b/workspace-server/internal/handlers/activity_since_id_test.go index 6c2dc53f..a798b4e7 100644 --- a/workspace-server/internal/handlers/activity_since_id_test.go +++ b/workspace-server/internal/handlers/activity_since_id_test.go @@ -40,7 +40,7 @@ func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) { WillReturnRows(newActivityRows()) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -69,7 +69,7 @@ func TestActivityHandler_SinceID_CursorNotFound_410(t *testing.T) { WillReturnError(sql.ErrNoRows) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -101,7 +101,7 @@ func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) { WillReturnError(sql.ErrNoRows) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -137,7 +137,7 @@ func TestActivityHandler_SinceID_CombinedWithSinceSecs(t *testing.T) { WillReturnRows(newActivityRows()) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) diff --git a/workspace-server/internal/handlers/activity_since_secs_test.go b/workspace-server/internal/handlers/activity_since_secs_test.go index a4e6237d..dc91c96a 100644 --- a/workspace-server/internal/handlers/activity_since_secs_test.go +++ b/workspace-server/internal/handlers/activity_since_secs_test.go @@ -41,7 +41,7 @@ func TestActivityHandler_SinceSecs_Accepted(t *testing.T) { WillReturnRows(newActivityRows()) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -70,7 +70,7 @@ func TestActivityHandler_SinceSecs_ClampedAt30Days(t *testing.T) { WillReturnRows(newActivityRows()) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -106,7 +106,7 @@ func TestActivityHandler_SinceSecs_InvalidRejected(t *testing.T) { // No DB call expected; bad input must be caught before the query. setupTestDB(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -142,7 +142,7 @@ func TestActivityHandler_SinceSecs_Omitted(t *testing.T) { WillReturnRows(newActivityRows()) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) diff --git a/workspace-server/internal/handlers/activity_test.go b/workspace-server/internal/handlers/activity_test.go index f6611814..480156b2 100644 --- a/workspace-server/internal/handlers/activity_test.go +++ b/workspace-server/internal/handlers/activity_test.go @@ -22,7 +22,7 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) rows := sqlmock.NewRows([]string{ "kind", "id", "workspace_id", "label", "content", "method", "status", "request_body", "response_body", "created_at", @@ -68,7 +68,7 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) { func TestActivityList_SourceCanvas(t *testing.T) { mock := setupTestDB(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) // Expect query with "source_id IS NULL" mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND source_id IS NULL`). @@ -97,7 +97,7 @@ func TestActivityList_SourceCanvas(t *testing.T) { func TestActivityList_SourceAgent(t *testing.T) { mock := setupTestDB(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) // Expect query with "source_id IS NOT NULL" mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND source_id IS NOT NULL`). @@ -126,7 +126,7 @@ func TestActivityList_SourceAgent(t *testing.T) { func TestActivityList_SourceInvalid(t *testing.T) { gin.SetMode(gin.TestMode) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -142,7 +142,7 @@ func TestActivityList_SourceInvalid(t *testing.T) { func TestActivityList_SourceWithType(t *testing.T) { mock := setupTestDB(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) // Both type and source filters mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND activity_type = .+ AND source_id IS NULL`). @@ -181,7 +181,7 @@ const testPeerUUID = "11111111-2222-3333-4444-555555555555" func TestActivityList_PeerIDFilter(t *testing.T) { mock := setupTestDB(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) // peer_id binds twice in the query (source_id OR target_id) but is // added to args once — sqlmock matches positional args, so the @@ -220,7 +220,7 @@ func TestActivityList_PeerIDComposesWithType(t *testing.T) { // of the builder can't silently rearrange placeholders. mock := setupTestDB(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) mock.ExpectQuery( `SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND activity_type = .+ AND source_id IS NOT NULL AND \(source_id = .+ OR target_id = .+\)`, @@ -258,7 +258,7 @@ func TestActivityList_PeerIDRejectsNonUUID(t *testing.T) { // otherwise interpolate the value into the URL or another query. gin.SetMode(gin.TestMode) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) for _, bad := range []string{ "not-a-uuid", @@ -292,7 +292,7 @@ func TestActivityList_PeerIDRejectsNonUUID(t *testing.T) { func TestActivityList_BeforeTSFilter(t *testing.T) { mock := setupTestDB(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) cutoff, _ := time.Parse(time.RFC3339, "2026-05-01T00:00:00Z") mock.ExpectQuery( @@ -328,7 +328,7 @@ func TestActivityList_BeforeTSComposesWithPeerID(t *testing.T) { // can't silently drop one filter or reorder placeholders. mock := setupTestDB(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) cutoff, _ := time.Parse(time.RFC3339, "2026-05-01T00:00:00Z") mock.ExpectQuery( @@ -363,7 +363,7 @@ func TestActivityList_BeforeTSComposesWithPeerID(t *testing.T) { func TestActivityList_BeforeTSRejectsInvalidFormat(t *testing.T) { gin.SetMode(gin.TestMode) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) for _, bad := range []string{ "yesterday", @@ -400,7 +400,7 @@ func TestActivityReport_AcceptsMemoryWriteType(t *testing.T) { WillReturnResult(sqlmock.NewResult(1, 1)) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) gin.SetMode(gin.TestMode) w := httptest.NewRecorder() @@ -426,7 +426,7 @@ func TestActivityReport_RejectsUnknownType(t *testing.T) { t.Cleanup(func() { db.DB = prevDB; mockDB.Close() }) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) gin.SetMode(gin.TestMode) w := httptest.NewRecorder() @@ -478,7 +478,7 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) { WillReturnResult(sqlmock.NewResult(1, 1)) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) gin.SetMode(gin.TestMode) w := httptest.NewRecorder() @@ -527,7 +527,7 @@ func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) { WillReturnResult(sqlmock.NewResult(1, 1)) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) gin.SetMode(gin.TestMode) w := httptest.NewRecorder() @@ -593,7 +593,7 @@ func TestNotify_RejectsAttachmentWithEmptyURIOrName(t *testing.T) { // only if the handler unexpectedly queries. broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) gin.SetMode(gin.TestMode) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -647,7 +647,7 @@ func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) { WillReturnError(fmt.Errorf("simulated db hiccup")) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) gin.SetMode(gin.TestMode) w := httptest.NewRecorder() diff --git a/workspace-server/internal/handlers/agent_message_writer.go b/workspace-server/internal/handlers/agent_message_writer.go index 6efea603..c8712ff3 100644 --- a/workspace-server/internal/handlers/agent_message_writer.go +++ b/workspace-server/internal/handlers/agent_message_writer.go @@ -44,6 +44,7 @@ import ( "log" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/push" "github.com/Molecule-AI/molecule-monorepo/platform/internal/textutil" ) @@ -76,12 +77,14 @@ type AgentMessageAttachment struct { type AgentMessageWriter struct { db *sql.DB broadcaster events.EventEmitter + notifier *push.Notifier } // NewAgentMessageWriter binds the writer to the platform's DB pool + -// WebSocket broadcaster. -func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter) *AgentMessageWriter { - return &AgentMessageWriter{db: db, broadcaster: broadcaster} +// WebSocket broadcaster. notifier may be nil if push notifications are +// not configured. +func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter, notifier *push.Notifier) *AgentMessageWriter { + return &AgentMessageWriter{db: db, broadcaster: broadcaster, notifier: notifier} } // Send delivers a single agent → user message. Look up + broadcast + @@ -132,7 +135,12 @@ func (w *AgentMessageWriter) Send( } w.broadcaster.BroadcastOnly(workspaceID, string(events.EventAgentMessage), broadcastPayload) - // 3. Persist for chat-history hydration. response_body shape MUST stay + // 3. Send push notifications to mobile devices. + if w.notifier != nil { + w.notifier.NotifyAgentMessage(ctx, workspaceID, wsName, message) + } + + // 4. Persist for chat-history hydration. response_body shape MUST stay // in sync with extractResponseText + extractFilesFromTask in // canvas/src/components/tabs/chat/historyHydration.ts: // - extractResponseText reads body.result (string) → renders text diff --git a/workspace-server/internal/handlers/agent_message_writer_test.go b/workspace-server/internal/handlers/agent_message_writer_test.go index 20f5540f..7ffe5f93 100644 --- a/workspace-server/internal/handlers/agent_message_writer_test.go +++ b/workspace-server/internal/handlers/agent_message_writer_test.go @@ -86,7 +86,7 @@ func (c *capturingEmitter) RecordAndBroadcast(_ context.Context, eventType strin // path: workspace lookup, broadcast, INSERT, return nil. func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) { mock := setupTestDB(t) - w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) + w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil) mock.ExpectQuery("SELECT name FROM workspaces"). WithArgs("ws-1"). @@ -114,7 +114,7 @@ func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) { // Drift here = chips disappear on chat reload. func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) { mock := setupTestDB(t) - w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) + w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil) mock.ExpectQuery("SELECT name FROM workspaces"). WithArgs("ws-att"). @@ -171,7 +171,7 @@ func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) { func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) { mock := setupTestDB(t) emitter := &capturingEmitter{} - w := NewAgentMessageWriter(db.DB, emitter) + w := NewAgentMessageWriter(db.DB, emitter, nil) mock.ExpectQuery("SELECT name FROM workspaces"). WithArgs("ws-missing"). @@ -200,7 +200,7 @@ func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) { // broadcast. func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) { mock := setupTestDB(t) - w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) + w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil) mock.ExpectQuery("SELECT name FROM workspaces"). WithArgs("ws-dbfail"). @@ -221,7 +221,7 @@ func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) { // table doesn't carry multi-KB summaries that bloat list queries. func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) { mock := setupTestDB(t) - w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) + w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil) mock.ExpectQuery("SELECT name FROM workspaces"). WithArgs("ws-trunc"). @@ -261,7 +261,7 @@ func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) { func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) { mock := setupTestDB(t) emitter := &capturingEmitter{} - w := NewAgentMessageWriter(db.DB, emitter) + w := NewAgentMessageWriter(db.DB, emitter, nil) mock.ExpectQuery("SELECT name FROM workspaces"). WithArgs("ws-bc"). @@ -312,7 +312,7 @@ func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) { // real incidents in alerting. func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) { mock := setupTestDB(t) - w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) + w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil) transientErr := errors.New("connection refused") mock.ExpectQuery("SELECT name FROM workspaces"). @@ -344,7 +344,7 @@ func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) { // coverage. Now it does. func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) { mock := setupTestDB(t) - w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) + w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil) // 200-rune CJK message — exceeds the 80-rune cap, would have hit // the byte-slice bug. @@ -393,7 +393,7 @@ func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) { func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) { mock := setupTestDB(t) emitter := &capturingEmitter{} - w := NewAgentMessageWriter(db.DB, emitter) + w := NewAgentMessageWriter(db.DB, emitter, nil) mock.ExpectQuery("SELECT name FROM workspaces"). WithArgs("ws-noatt"). diff --git a/workspace-server/internal/handlers/handlers_test.go b/workspace-server/internal/handlers/handlers_test.go index 847a3e9a..101fe4e8 100644 --- a/workspace-server/internal/handlers/handlers_test.go +++ b/workspace-server/internal/handlers/handlers_test.go @@ -646,7 +646,7 @@ func TestActivityHandler_List(t *testing.T) { WillReturnRows(rows) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -695,7 +695,7 @@ func TestActivityHandler_ListByType(t *testing.T) { WillReturnRows(rows) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -723,7 +723,7 @@ func TestActivityHandler_Report(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) // Expect the INSERT into activity_logs mock.ExpectExec("INSERT INTO activity_logs"). @@ -752,7 +752,7 @@ func TestActivityHandler_Report_InvalidType(t *testing.T) { setupTestDB(t) setupTestRedis(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -980,7 +980,7 @@ func TestActivityHandler_ListEmpty(t *testing.T) { WillReturnRows(sqlmock.NewRows(columns)) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -1014,7 +1014,7 @@ func TestActivityHandler_ListCustomLimit(t *testing.T) { WillReturnRows(sqlmock.NewRows(columns)) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -1047,7 +1047,7 @@ func TestActivityHandler_ListMaxLimit(t *testing.T) { WillReturnRows(sqlmock.NewRows(columns)) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -1075,7 +1075,7 @@ func TestActivityHandler_ReportAllValidTypes(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) mock.ExpectExec("INSERT INTO activity_logs"). WillReturnResult(sqlmock.NewResult(0, 1)) @@ -1106,7 +1106,7 @@ func TestActivityHandler_ReportAllValidTypes(t *testing.T) { func TestActivityHandler_ReportMissingBody(t *testing.T) { setupTestDB(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -1179,7 +1179,7 @@ func TestActivityHandler_Report_SourceIDSpoofRejected(t *testing.T) { setupTestDB(t) setupTestRedis(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -1202,7 +1202,7 @@ func TestActivityHandler_Report_MatchingSourceIDAccepted(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) mock.ExpectExec("INSERT INTO activity_logs"). WillReturnResult(sqlmock.NewResult(0, 1)) @@ -1232,7 +1232,7 @@ func TestActivityHandler_Report_SourceIDLogInjection(t *testing.T) { setupTestDB(t) setupTestRedis(t) broadcaster := newTestBroadcaster() - handler := NewActivityHandler(broadcaster) + handler := NewActivityHandler(broadcaster, nil) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) diff --git a/workspace-server/internal/handlers/mcp.go b/workspace-server/internal/handlers/mcp.go index 707c12f2..dd56cfb5 100644 --- a/workspace-server/internal/handlers/mcp.go +++ b/workspace-server/internal/handlers/mcp.go @@ -34,6 +34,7 @@ import ( "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/push" "github.com/gin-gonic/gin" ) @@ -84,6 +85,7 @@ type mcpTool struct { type MCPHandler struct { database *sql.DB broadcaster *events.Broadcaster + notifier *push.Notifier // memv2 is the v2 memory plugin wiring (RFC #2728). nil-safe: // every v2 tool calls memoryV2Available() first and returns a @@ -94,8 +96,9 @@ type MCPHandler struct { // NewMCPHandler wires the handler to db and broadcaster. // Pass db.DB and the platform broadcaster at router-setup time. -func NewMCPHandler(database *sql.DB, broadcaster *events.Broadcaster) *MCPHandler { - return &MCPHandler{database: database, broadcaster: broadcaster} +// notifier may be nil if push notifications are not configured. +func NewMCPHandler(database *sql.DB, broadcaster *events.Broadcaster, notifier *push.Notifier) *MCPHandler { + return &MCPHandler{database: database, broadcaster: broadcaster, notifier: notifier} } // ───────────────────────────────────────────────────────────────────────────── diff --git a/workspace-server/internal/handlers/mcp_test.go b/workspace-server/internal/handlers/mcp_test.go index 125eb725..86c67478 100644 --- a/workspace-server/internal/handlers/mcp_test.go +++ b/workspace-server/internal/handlers/mcp_test.go @@ -26,7 +26,7 @@ import ( func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) { t.Helper() mock := setupTestDB(t) - h := NewMCPHandler(db.DB, newTestBroadcaster()) + h := NewMCPHandler(db.DB, newTestBroadcaster(), nil) return h, mock } diff --git a/workspace-server/internal/handlers/mcp_tools.go b/workspace-server/internal/handlers/mcp_tools.go index 24e991bb..14db2a84 100644 --- a/workspace-server/internal/handlers/mcp_tools.go +++ b/workspace-server/internal/handlers/mcp_tools.go @@ -392,7 +392,7 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri // (the tool args don't accept them); pass nil. If a future tool // schema adds an attachments arg, build []AgentMessageAttachment // and pass through. - writer := NewAgentMessageWriter(h.database, h.broadcaster) + writer := NewAgentMessageWriter(h.database, h.broadcaster, h.notifier) if err := writer.Send(ctx, workspaceID, message, nil); err != nil { if errors.Is(err, ErrWorkspaceNotFound) { return "", fmt.Errorf("workspace not found") diff --git a/workspace-server/internal/memory/e2e/swap_test.go b/workspace-server/internal/memory/e2e/swap_test.go index 999df4f1..d0556590 100644 --- a/workspace-server/internal/memory/e2e/swap_test.go +++ b/workspace-server/internal/memory/e2e/swap_test.go @@ -207,7 +207,7 @@ func setupSwapEnv(t *testing.T) (*handlers.MCPHandler, *flatPlugin, sqlmock.Sqlm resolver := namespace.New(db) // MCPHandler needs a real *sql.DB; pass the sqlmock-backed one. - h := handlers.NewMCPHandler(db, nil).WithMemoryV2(cl, resolver) + h := handlers.NewMCPHandler(db, nil, nil).WithMemoryV2(cl, resolver) return h, plugin, mock } @@ -430,7 +430,7 @@ func TestE2E_PluginUnreachable_AgentSeesClearError(t *testing.T) { db, _, _ := sqlmock.New() defer db.Close() resolver := namespace.New(db) - h := handlers.NewMCPHandler(db, nil).WithMemoryV2(cl, resolver) + h := handlers.NewMCPHandler(db, nil, nil).WithMemoryV2(cl, resolver) _, err := h.Dispatch(context.Background(), "root-1", "commit_memory_v2", map[string]interface{}{ "content": "x", diff --git a/workspace-server/internal/push/handler.go b/workspace-server/internal/push/handler.go new file mode 100644 index 00000000..1c0533b8 --- /dev/null +++ b/workspace-server/internal/push/handler.go @@ -0,0 +1,75 @@ +package push + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +// Handler exposes HTTP endpoints for push-token management. +type Handler struct { + repo *Repo +} + +// NewHandler creates a push-token HTTP handler. +func NewHandler(repo *Repo) *Handler { + return &Handler{repo: repo} +} + +// RegisterRoutes mounts push-token routes on the given router group. +func (h *Handler) RegisterRoutes(rg *gin.RouterGroup) { + rg.POST("/push-tokens", h.Create) + rg.DELETE("/push-tokens", h.Delete) +} + +// Create handles POST /push-tokens. +// Body: { "token": "ExponentPushToken[xxx]", "platform": "ios" | "android" } +func (h *Handler) Create(c *gin.Context) { + workspaceID := c.Param("id") + if _, err := uuid.Parse(workspaceID); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"}) + return + } + + var body struct { + Token string `json:"token" binding:"required"` + Platform string `json:"platform" binding:"required,oneof=ios android"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + if err := h.repo.SaveToken(c.Request.Context(), workspaceID, body.Token, body.Platform); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to save token"}) + return + } + + c.Status(http.StatusNoContent) +} + +// Delete handles DELETE /push-tokens. +// Body: { "token": "ExponentPushToken[xxx]" } +func (h *Handler) Delete(c *gin.Context) { + workspaceID := c.Param("id") + if _, err := uuid.Parse(workspaceID); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"}) + return + } + + var body struct { + Token string `json:"token" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + if err := h.repo.DeleteToken(c.Request.Context(), workspaceID, body.Token); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete token"}) + return + } + + c.Status(http.StatusNoContent) +} diff --git a/workspace-server/internal/push/notifier.go b/workspace-server/internal/push/notifier.go new file mode 100644 index 00000000..f3f9084f --- /dev/null +++ b/workspace-server/internal/push/notifier.go @@ -0,0 +1,101 @@ +package push + +import ( + "context" + "database/sql" + "log" + "time" +) + +// Notifier sends push notifications for agent messages. +type Notifier struct { + repo *Repo + sender *Sender +} + +// NewNotifier creates a Notifier. +func NewNotifier(db *sql.DB, sender *Sender) *Notifier { + return &Notifier{ + repo: NewRepo(db), + sender: sender, + } +} + +// NotifyAgentMessage sends a push notification to all registered devices for a +// workspace when an agent sends a message. It runs asynchronously (fire-and- +// forget) so the caller's WebSocket broadcast is never blocked. +func (n *Notifier) NotifyAgentMessage(ctx context.Context, workspaceID, workspaceName, message string) { + if n == nil || n.sender == nil { + return + } + + // Capture values for the goroutine. + wsID := workspaceID + wsName := workspaceName + msg := message + + go func() { + // Use a fresh context with timeout so a slow Expo API doesn't + // leak the caller's context deadline. + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + tokens, err := n.repo.GetTokens(ctx, wsID) + if err != nil { + log.Printf("push: failed to get tokens for workspace %s: %v", wsID, err) + return + } + if len(tokens) == 0 { + return + } + + // Expo accepts batches of up to ~100 messages; we cap lower to stay + // well under the limit. + const batchSize = 50 + for i := 0; i < len(tokens); i += batchSize { + end := i + batchSize + if end > len(tokens) { + end = len(tokens) + } + + batch := tokens[i:end] + messages := make([]Message, 0, len(batch)) + for _, t := range batch { + messages = append(messages, Message{ + To: t.Token, + Title: wsName, + Body: truncate(msg, 100), + Data: map[string]string{ + "type": "agent_message", + "workspaceId": wsID, + "workspaceSlug": "", // populated by caller if available + }, + Sound: "default", + Priority: "high", + }) + } + + results, err := n.sender.Send(ctx, messages) + if err != nil { + log.Printf("push: send failed for workspace %s: %v", wsID, err) + continue + } + + // Remove invalid tokens. + for j, r := range results { + if ShouldRemoveToken(r) { + if delErr := n.repo.DeleteToken(ctx, wsID, batch[j].Token); delErr != nil { + log.Printf("push: failed to delete invalid token for workspace %s: %v", wsID, delErr) + } + } + } + } + }() +} + +func truncate(s string, max int) string { + if len(s) <= max { + return s + } + return s[:max] + "…" +} diff --git a/workspace-server/internal/push/push_test.go b/workspace-server/internal/push/push_test.go new file mode 100644 index 00000000..1f1168c5 --- /dev/null +++ b/workspace-server/internal/push/push_test.go @@ -0,0 +1,159 @@ +package push + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSenderSend(t *testing.T) { + gin.SetMode(gin.TestMode) + + expoResponse := map[string]interface{}{ + "data": []map[string]interface{}{ + {"status": "ok", "id": "abc123"}, + {"status": "error", "message": "Invalid token", "details": map[string]string{"error": "DeviceNotRegistered"}}, + }, + } + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "POST", r.Method) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + + var msgs []Message + require.NoError(t, json.NewDecoder(r.Body).Decode(&msgs)) + assert.Len(t, msgs, 2) + assert.Equal(t, "ExponentPushToken[test1]", msgs[0].To) + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(expoResponse) + })) + defer server.Close() + + sender := NewSender("") + sender.apiURL = server.URL + + results, err := sender.Send(context.Background(), []Message{ + {To: "ExponentPushToken[test1]", Title: "Test", Body: "Hello"}, + {To: "ExponentPushToken[test2]", Title: "Test", Body: "World"}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + assert.Equal(t, "ok", results[0].Status) + assert.Equal(t, "error", results[1].Status) + assert.True(t, ShouldRemoveToken(results[1])) +} + +func TestSenderSendEmpty(t *testing.T) { + sender := NewSender("") + results, err := sender.Send(context.Background(), nil) + require.NoError(t, err) + assert.Nil(t, results) +} + +func TestHandlerCreate(t *testing.T) { + gin.SetMode(gin.TestMode) + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + mock.ExpectExec("INSERT INTO push_tokens"). + WithArgs("11111111-1111-1111-1111-111111111111", "ExponentPushToken[abc]", "ios"). + WillReturnResult(sqlmock.NewResult(1, 1)) + + repo := NewRepo(db) + handler := NewHandler(repo) + + router := gin.New() + group := router.Group("/workspaces/:id") + handler.RegisterRoutes(group) + + w := httptest.NewRecorder() + body := `{"token":"ExponentPushToken[abc]","platform":"ios"}` + req, _ := http.NewRequest("POST", "/workspaces/11111111-1111-1111-1111-111111111111/push-tokens", bytes.NewBufferString(body)) + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusNoContent, w.Code) + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestHandlerCreateInvalidPlatform(t *testing.T) { + gin.SetMode(gin.TestMode) + + db, _, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + handler := NewHandler(NewRepo(db)) + + router := gin.New() + group := router.Group("/workspaces/:id") + handler.RegisterRoutes(group) + + w := httptest.NewRecorder() + body := `{"token":"ExponentPushToken[abc]","platform":"windows"}` + req, _ := http.NewRequest("POST", "/workspaces/11111111-1111-1111-1111-111111111111/push-tokens", bytes.NewBufferString(body)) + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusBadRequest, w.Code) +} + +func TestHandlerDelete(t *testing.T) { + gin.SetMode(gin.TestMode) + + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + mock.ExpectExec("DELETE FROM push_tokens"). + WithArgs("22222222-2222-2222-2222-222222222222", "ExponentPushToken[del]"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + repo := NewRepo(db) + handler := NewHandler(repo) + + router := gin.New() + group := router.Group("/workspaces/:id") + handler.RegisterRoutes(group) + + w := httptest.NewRecorder() + body := `{"token":"ExponentPushToken[del]"}` + req, _ := http.NewRequest("DELETE", "/workspaces/22222222-2222-2222-2222-222222222222/push-tokens", bytes.NewBufferString(body)) + req.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, req) + + assert.Equal(t, http.StatusNoContent, w.Code) + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestRepoGetTokens(t *testing.T) { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + mock.ExpectQuery("SELECT id, workspace_id, token, platform, created_at FROM push_tokens"). + WithArgs("ws-1"). + WillReturnRows(sqlmock.NewRows([]string{"id", "workspace_id", "token", "platform", "created_at"}). + AddRow("1", "ws-1", "ExponentPushToken[a]", "ios", "2026-01-01T00:00:00Z"). + AddRow("2", "ws-1", "ExponentPushToken[b]", "android", "2026-01-01T00:00:00Z")) + + repo := NewRepo(db) + tokens, err := repo.GetTokens(context.Background(), "ws-1") + require.NoError(t, err) + require.Len(t, tokens, 2) + assert.Equal(t, "ExponentPushToken[a]", tokens[0].Token) + assert.Equal(t, "ios", tokens[0].Platform) + assert.Equal(t, "ExponentPushToken[b]", tokens[1].Token) + require.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/workspace-server/internal/push/repo.go b/workspace-server/internal/push/repo.go new file mode 100644 index 00000000..d457d09b --- /dev/null +++ b/workspace-server/internal/push/repo.go @@ -0,0 +1,76 @@ +package push + +import ( + "context" + "database/sql" + "fmt" +) + +// Token is one registered push token for a workspace. +type Token struct { + ID string + WorkspaceID string + Token string + Platform string + CreatedAt string +} + +// Repo reads and writes push tokens in Postgres. +type Repo struct { + db *sql.DB +} + +// NewRepo creates a token repository backed by db. +func NewRepo(db *sql.DB) *Repo { + return &Repo{db: db} +} + +// SaveToken registers a push token for a workspace. If the same token already +// exists for the workspace, it updates the timestamp. +func (r *Repo) SaveToken(ctx context.Context, workspaceID, token, platform string) error { + _, err := r.db.ExecContext(ctx, ` + INSERT INTO push_tokens (workspace_id, token, platform) + VALUES ($1, $2, $3) + ON CONFLICT (workspace_id, token) DO UPDATE + SET updated_at = now() + `, workspaceID, token, platform) + if err != nil { + return fmt.Errorf("push_tokens: save: %w", err) + } + return nil +} + +// DeleteToken removes a push token. Returns nil even if the token did not exist. +func (r *Repo) DeleteToken(ctx context.Context, workspaceID, token string) error { + _, err := r.db.ExecContext(ctx, ` + DELETE FROM push_tokens + WHERE workspace_id = $1 AND token = $2 + `, workspaceID, token) + if err != nil { + return fmt.Errorf("push_tokens: delete: %w", err) + } + return nil +} + +// GetTokens returns all active push tokens for a workspace. +func (r *Repo) GetTokens(ctx context.Context, workspaceID string) ([]Token, error) { + rows, err := r.db.QueryContext(ctx, ` + SELECT id, workspace_id, token, platform, created_at + FROM push_tokens + WHERE workspace_id = $1 + `, workspaceID) + if err != nil { + return nil, fmt.Errorf("push_tokens: list: %w", err) + } + defer rows.Close() + + var tokens []Token + for rows.Next() { + var t Token + if err := rows.Scan(&t.ID, &t.WorkspaceID, &t.Token, &t.Platform, &t.CreatedAt); err != nil { + return nil, fmt.Errorf("push_tokens: scan: %w", err) + } + tokens = append(tokens, t) + } + return tokens, rows.Err() +} diff --git a/workspace-server/internal/push/sender.go b/workspace-server/internal/push/sender.go new file mode 100644 index 00000000..066357af --- /dev/null +++ b/workspace-server/internal/push/sender.go @@ -0,0 +1,104 @@ +package push + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" +) + +const expoPushAPI = "https://exp.host/--/api/v2/push/send" + +// Message is one Expo push notification. +type Message struct { + To string `json:"to"` + Title string `json:"title,omitempty"` + Body string `json:"body,omitempty"` + Data map[string]string `json:"data,omitempty"` + Sound string `json:"sound,omitempty"` + Priority string `json:"priority,omitempty"` +} + +// Sender delivers push notifications via the Expo Push Service. +type Sender struct { + apiURL string + httpClient *http.Client + expoToken string // optional Expo access token for authenticated requests +} + +// NewSender creates a Sender. expoToken may be empty for unauthenticated +// requests (sufficient for most use cases). +func NewSender(expoToken string) *Sender { + return &Sender{ + apiURL: expoPushAPI, + httpClient: &http.Client{ + Timeout: 10 * time.Second, + }, + expoToken: expoToken, + } +} + +// SendResult is the per-recipient status from Expo. +type SendResult struct { + Status string `json:"status"` + ID string `json:"id"` + Message string `json:"message,omitempty"` + Details struct { + Error string `json:"error,omitempty"` + } `json:"details,omitempty"` +} + +// expoResponse is the wrapper shape returned by the Expo API. +type expoResponse struct { + Data []SendResult `json:"data"` +} + +// Send fires a batch of push messages. It returns a slice of results in the +// same order as the input, plus an error only when the HTTP call itself fails. +// Callers should inspect each result's Status field for per-message errors +// (e.g. "DeviceNotRegistered" → token should be deleted). +func (s *Sender) Send(ctx context.Context, messages []Message) ([]SendResult, error) { + if len(messages) == 0 { + return nil, nil + } + + body, err := json.Marshal(messages) + if err != nil { + return nil, fmt.Errorf("push: marshal: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.apiURL, bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("push: new request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + req.Header.Set("Accept-Encoding", "gzip, deflate") + if s.expoToken != "" { + req.Header.Set("Authorization", "Bearer "+s.expoToken) + } + + res, err := s.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("push: post: %w", err) + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("push: expo returned %d", res.StatusCode) + } + + var resp expoResponse + if err := json.NewDecoder(res.Body).Decode(&resp); err != nil { + return nil, fmt.Errorf("push: decode: %w", err) + } + return resp.Data, nil +} + +// ShouldRemoveToken reports whether a SendResult indicates the token is no +// longer valid and should be deleted from the database. +func ShouldRemoveToken(r SendResult) bool { + return r.Status == "error" && r.Details.Error == "DeviceNotRegistered" +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index aac18c14..148c8ca9 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -20,6 +20,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" "github.com/Molecule-AI/molecule-monorepo/platform/internal/plugins" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/push" "github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised" "github.com/Molecule-AI/molecule-monorepo/platform/internal/ws" "github.com/docker/docker/client" @@ -318,13 +319,25 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // Remaining auth-gated workspace sub-routes — appended to wsAuth group declared above. { + // Push notifications (mobile) + var pushNotifier *push.Notifier + if expoToken := os.Getenv("EXPO_ACCESS_TOKEN"); expoToken != "" { + pushNotifier = push.NewNotifier(db.DB, push.NewSender(expoToken)) + } + // Activity Logs - acth := handlers.NewActivityHandler(broadcaster) + acth := handlers.NewActivityHandler(broadcaster, pushNotifier) wsAuth.GET("/activity", acth.List) wsAuth.GET("/session-search", acth.SessionSearch) wsAuth.POST("/activity", acth.Report) wsAuth.POST("/notify", acth.Notify) + // Push token registration (mobile) + if pushNotifier != nil { + pushH := push.NewHandler(push.NewRepo(db.DB)) + pushH.RegisterRoutes(wsAuth) + } + // Chat history — RFC #2945 PR-C (issue #3017) + PR-D (issue // #3026). Server-side rendering of activity_logs rows into // the canonical ChatMessage shape; storage is plugin-shaped @@ -428,7 +441,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // opencode session cannot saturate the platform. // C3: commit_memory/recall_memory with scope=GLOBAL → permission error; // send_message_to_user excluded unless MOLECULE_MCP_ALLOW_SEND_MESSAGE=true. - mcpH := handlers.NewMCPHandler(db.DB, broadcaster) + mcpH := handlers.NewMCPHandler(db.DB, broadcaster, pushNotifier) if memBundle != nil { mcpH.WithMemoryV2(memBundle.Plugin, memBundle.Resolver) } diff --git a/workspace-server/migrations/20260514000000_push_tokens.down.sql b/workspace-server/migrations/20260514000000_push_tokens.down.sql new file mode 100644 index 00000000..a159f1cf --- /dev/null +++ b/workspace-server/migrations/20260514000000_push_tokens.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS push_tokens; diff --git a/workspace-server/migrations/20260514000000_push_tokens.up.sql b/workspace-server/migrations/20260514000000_push_tokens.up.sql new file mode 100644 index 00000000..6bd9dce2 --- /dev/null +++ b/workspace-server/migrations/20260514000000_push_tokens.up.sql @@ -0,0 +1,11 @@ +CREATE TABLE push_tokens ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, + token TEXT NOT NULL, + platform TEXT NOT NULL CHECK (platform IN ('ios', 'android')), + created_at TIMESTAMPTZ DEFAULT now(), + updated_at TIMESTAMPTZ DEFAULT now(), + UNIQUE(workspace_id, token) +); + +CREATE INDEX idx_push_tokens_workspace ON push_tokens(workspace_id); -- 2.52.0 From cec732ec686eb43b992f03235a52554b13bdc32d Mon Sep 17 00:00:00 2001 From: core-be Date: Thu, 14 May 2026 14:16:36 -0700 Subject: [PATCH 2/5] fix(push): populate workspaceSlug from MOLECULE_ORG_SLUG The push payload's workspaceSlug was hardcoded to empty string, breaking deep-link navigation when users tap a notification. Read MOLECULE_ORG_SLUG from env (already set on every tenant by the provisioner) so the mobile app can route to the correct tenant platform. Non-breaking: when the env var is unset the field is empty, preserving the pre-fix behavior. --- workspace-server/internal/push/notifier.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/workspace-server/internal/push/notifier.go b/workspace-server/internal/push/notifier.go index f3f9084f..f36553da 100644 --- a/workspace-server/internal/push/notifier.go +++ b/workspace-server/internal/push/notifier.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "log" + "os" "time" ) @@ -68,7 +69,7 @@ func (n *Notifier) NotifyAgentMessage(ctx context.Context, workspaceID, workspac Data: map[string]string{ "type": "agent_message", "workspaceId": wsID, - "workspaceSlug": "", // populated by caller if available + "workspaceSlug": os.Getenv("MOLECULE_ORG_SLUG"), }, Sound: "default", Priority: "high", -- 2.52.0 From 1494f945121f471651e15a79ee01b0d0f4f8eaaf Mon Sep 17 00:00:00 2001 From: core-be Date: Thu, 14 May 2026 16:21:52 -0700 Subject: [PATCH 3/5] feat(mobile-chat): render message text as markdown with GFM support --- canvas/src/components/mobile/MobileChat.tsx | 171 +++++++++++++++++++- 1 file changed, 170 insertions(+), 1 deletion(-) diff --git a/canvas/src/components/mobile/MobileChat.tsx b/canvas/src/components/mobile/MobileChat.tsx index a7078255..309715f3 100644 --- a/canvas/src/components/mobile/MobileChat.tsx +++ b/canvas/src/components/mobile/MobileChat.tsx @@ -6,6 +6,8 @@ // attachments, no A2A topology overlay, no conversation tracing. import { useEffect, useRef, useState } from "react"; +import ReactMarkdown from "react-markdown"; +import remarkGfm from "remark-gfm"; import { api } from "@/lib/api"; import { useCanvasStore } from "@/store/canvas"; @@ -39,6 +41,171 @@ interface A2AResponseShape { const formatTime = (date: Date) => date.toLocaleTimeString([], { hour: "numeric", minute: "2-digit" }); +function MarkdownBubble({ + children, + dark, + accent, +}: { + children: string; + dark: boolean; + accent: string; +}) { + const codeBg = dark ? "rgba(255,255,255,0.08)" : "rgba(0,0,0,0.06)"; + const codeBlockBg = dark ? "#1a1a1a" : "#f5f5f0"; + const linkColor = accent; + const quoteBorder = dark ? "rgba(255,250,240,0.15)" : "rgba(40,30,20,0.15)"; + + return ( + ( +
{children}
+ ), + a: ({ href, children }) => ( + + {children} + + ), + pre: ({ children }) => ( +
+            {children}
+          
+ ), + code: ({ children, className }) => { + const isBlock = className != null && String(className).length > 0; + if (isBlock) { + return ( + + {children} + + ); + } + return ( + + {children} + + ); + }, + ul: ({ children }) => ( +
    + {children} +
+ ), + ol: ({ children }) => ( +
    + {children} +
+ ), + li: ({ children }) =>
  • {children}
  • , + strong: ({ children }) => ( + {children} + ), + em: ({ children }) => {children}, + h1: ({ children }) => ( +
    {children}
    + ), + h2: ({ children }) => ( +
    {children}
    + ), + h3: ({ children }) => ( +
    {children}
    + ), + h4: ({ children }) => ( +
    {children}
    + ), + h5: ({ children }) => ( +
    {children}
    + ), + h6: ({ children }) => ( +
    {children}
    + ), + blockquote: ({ children }) => ( +
    + {children} +
    + ), + hr: () => ( +
    + ), + table: ({ children }) => ( + + {children} +
    + ), + thead: ({ children }) => {children}, + th: ({ children }) => ( + + {children} + + ), + td: ({ children }) => ( + + {children} + + ), + }} + > + {children} +
    + ); +} + export function MobileChat({ agentId, dark, @@ -337,7 +504,9 @@ export function MobileChat({ overflowWrap: "anywhere", }} > - {m.text} + + {m.text} +
    Date: Thu, 14 May 2026 16:39:41 -0700 Subject: [PATCH 4/5] feat(mobile-chat): add file attachment support with upload --- canvas/src/components/mobile/MobileChat.tsx | 143 ++++++++++++++++++-- 1 file changed, 135 insertions(+), 8 deletions(-) diff --git a/canvas/src/components/mobile/MobileChat.tsx b/canvas/src/components/mobile/MobileChat.tsx index 309715f3..badfeee5 100644 --- a/canvas/src/components/mobile/MobileChat.tsx +++ b/canvas/src/components/mobile/MobileChat.tsx @@ -11,6 +11,8 @@ import remarkGfm from "remark-gfm"; import { api } from "@/lib/api"; import { useCanvasStore } from "@/store/canvas"; +import { uploadChatFiles } from "@/components/tabs/chat/uploads"; +import type { ChatAttachment } from "@/components/tabs/chat/types"; import { toMobileAgent } from "./components"; import { MOBILE_FONT_MONO, MOBILE_FONT_SANS, usePalette } from "./palette"; @@ -21,6 +23,7 @@ interface ChatMessage { role: "user" | "agent" | "system"; text: string; ts: string; + attachments?: ChatAttachment[]; } const formatStoredTimestamp = (iso: string): string => { @@ -236,6 +239,7 @@ export function MobileChat({ const [draft, setDraft] = useState(""); const [tab, setTab] = useState("my"); const [sending, setSending] = useState(false); + const [uploading, setUploading] = useState(false); const [error, setError] = useState(null); const scrollRef = useRef(null); // Synchronous re-entry guard. `setSending(true)` schedules a state @@ -244,6 +248,8 @@ export function MobileChat({ // double-send race a stale `sending` lets through. const sendInFlightRef = useRef(false); const composerRef = useRef(null); + const fileInputRef = useRef(null); + const [pendingFiles, setPendingFiles] = useState([]); // Auto-grow the textarea: reset height to 'auto' so the scrollHeight // shrinks when the user deletes text, then size to scrollHeight up to @@ -283,30 +289,82 @@ export function MobileChat({ const a = toMobileAgent(node); const reachable = a.status === "online" || a.status === "degraded"; + const onFilesPicked = (fileList: FileList | null) => { + if (!fileList) return; + const picked = Array.from(fileList); + setPendingFiles((prev) => { + const keyed = new Set(prev.map((f) => `${f.name}:${f.size}`)); + return [...prev, ...picked.filter((f) => !keyed.has(`${f.name}:${f.size}`))]; + }); + if (fileInputRef.current) fileInputRef.current.value = ""; + }; + + const removePendingFile = (index: number) => + setPendingFiles((prev) => prev.filter((_, i) => i !== index)); + const send = async () => { const text = draft.trim(); - if (!text || sending || !reachable) return; + if ((!text && pendingFiles.length === 0) || sending || !reachable) return; if (sendInFlightRef.current) return; sendInFlightRef.current = true; setDraft(""); setError(null); setSending(true); + + let uploaded: ChatAttachment[] = []; + if (pendingFiles.length > 0) { + setUploading(true); + try { + uploaded = await uploadChatFiles(agentId, pendingFiles); + } catch (e) { + setError(e instanceof Error ? e.message : "Upload failed"); + setSending(false); + setUploading(false); + sendInFlightRef.current = false; + return; + } finally { + setUploading(false); + } + setPendingFiles([]); + } + const myMsg: ChatMessage = { id: crypto.randomUUID(), role: "user", text, ts: formatTime(new Date()), + attachments: uploaded.length > 0 ? uploaded : undefined, }; setMessages((m) => [...m, myMsg]); try { + const parts: Array< + | { kind: "text"; text: string } + | { + kind: "file"; + file: { name: string; mimeType?: string; uri: string; size?: number }; + } + > = []; + if (text) parts.push({ kind: "text", text }); + for (const att of uploaded) { + parts.push({ + kind: "file", + file: { + name: att.name, + mimeType: att.mimeType, + uri: att.uri, + size: att.size, + }, + }); + } + const res = await api.post(`/workspaces/${agentId}/a2a`, { method: "message/send", params: { message: { role: "user", messageId: crypto.randomUUID(), - parts: [{ kind: "text", text }], + parts, }, }, }); @@ -564,6 +622,60 @@ export function MobileChat({ backdropFilter: "blur(14px)", }} > + {pendingFiles.length > 0 && ( +
    + {pendingFiles.map((f, i) => ( +
    + + {f.name} + + +
    + ))} +
    + )}
    + onFilesPicked(e.target.files)} + aria-hidden="true" + />
    -- 2.52.0 From 67b2d5cc187c85271dd7dff50f576baa0150aaff Mon Sep 17 00:00:00 2001 From: core-devops Date: Thu, 14 May 2026 18:38:06 -0700 Subject: [PATCH 5/5] refactor(chat): unify mobile and desktop chat via shared hooks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract three shared hooks from desktop ChatTab and consume them in MobileChat, eliminating ~880 lines of duplicated logic: - useChatHistory — paginated history load, deduped append, scroll anchor - useChatSend — file upload + A2A send with history context, guard refs - useChatSocket — WS activity log + agent push delivery via callbacks MobileChat gains desktop features: • History context sent with every A2A message (last 20 msgs) • sendInFlightRef / sendingFromAPIRef double-send guards • WS push releaseSendGuards integration • Proper error handling for unreachable agents Desktop ChatTab sheds inline state management in favor of the same hooks, keeping its rich UI (AttachmentPreview, activity feed, etc.). All 31 existing tests pass (12 ChatTab + 19 MobileChat). TypeScript clean. Dev server compiles without errors. --- canvas/src/components/mobile/MobileChat.tsx | 232 ++--- canvas/src/components/tabs/ChatTab.tsx | 792 +++--------------- .../src/components/tabs/chat/hooks/index.ts | 3 + .../tabs/chat/hooks/resolveWorkspaceName.ts | 11 + .../tabs/chat/hooks/useChatHistory.ts | 134 +++ .../components/tabs/chat/hooks/useChatSend.ts | 182 ++++ .../tabs/chat/hooks/useChatSocket.ts | 100 +++ canvas/src/components/tabs/chat/index.ts | 3 + 8 files changed, 578 insertions(+), 879 deletions(-) create mode 100644 canvas/src/components/tabs/chat/hooks/index.ts create mode 100644 canvas/src/components/tabs/chat/hooks/resolveWorkspaceName.ts create mode 100644 canvas/src/components/tabs/chat/hooks/useChatHistory.ts create mode 100644 canvas/src/components/tabs/chat/hooks/useChatSend.ts create mode 100644 canvas/src/components/tabs/chat/hooks/useChatSocket.ts diff --git a/canvas/src/components/mobile/MobileChat.tsx b/canvas/src/components/mobile/MobileChat.tsx index 57baead7..42884f27 100644 --- a/canvas/src/components/mobile/MobileChat.tsx +++ b/canvas/src/components/mobile/MobileChat.tsx @@ -5,27 +5,22 @@ // that the desktop ChatTab uses, but with a slimmer surface: no // attachments, no A2A topology overlay, no conversation tracing. -import { useCallback, useEffect, useRef, useState } from "react"; +import { useEffect, useRef, useState } from "react"; import ReactMarkdown from "react-markdown"; import remarkGfm from "remark-gfm"; -import { api } from "@/lib/api"; import { useCanvasStore } from "@/store/canvas"; -import { uploadChatFiles } from "@/components/tabs/chat/uploads"; -import type { ChatAttachment } from "@/components/tabs/chat/types"; +import { type ChatAttachment, type ChatMessage, createMessage } from "@/components/tabs/chat/types"; +import { + useChatHistory, + useChatSend, + useChatSocket, +} from "@/components/tabs/chat/hooks"; import { toMobileAgent } from "./components"; import { MOBILE_FONT_MONO, MOBILE_FONT_SANS, usePalette } from "./palette"; import { Icons, StatusDot, TierChip } from "./primitives"; -interface ChatMessage { - id: string; - role: "user" | "agent" | "system"; - text: string; - ts: string; - attachments?: ChatAttachment[]; -} - const formatStoredTimestamp = (iso: string): string => { const d = new Date(iso); if (isNaN(d.getTime())) return ""; @@ -34,16 +29,6 @@ const formatStoredTimestamp = (iso: string): string => { type SubTab = "my" | "a2a"; -interface A2AResponseShape { - result?: { - parts?: Array<{ kind?: string; text?: string }>; - }; - error?: { message?: string }; -} - -const formatTime = (date: Date) => - date.toLocaleTimeString([], { hour: "numeric", minute: "2-digit" }); - function MarkdownBubble({ children, dark, @@ -220,24 +205,38 @@ export function MobileChat({ }) { const p = usePalette(dark); const node = useCanvasStore((s) => s.nodes.find((n) => n.id === agentId)); - const [messages, setMessages] = useState([]); const [draft, setDraft] = useState(""); const [tab, setTab] = useState("my"); - const [sending, setSending] = useState(false); - const [uploading, setUploading] = useState(false); - const [error, setError] = useState(null); - const [historyLoading, setHistoryLoading] = useState(true); - const [historyError, setHistoryError] = useState(null); const scrollRef = useRef(null); - // Synchronous re-entry guard. `setSending(true)` schedules a state - // update but doesn't flush before a second tap can fire send() — a ref - // mirrors the desktop ChatTab pattern (sendInFlightRef) and closes the - // double-send race a stale `sending` lets through. - const sendInFlightRef = useRef(false); const composerRef = useRef(null); const fileInputRef = useRef(null); const [pendingFiles, setPendingFiles] = useState([]); + const { + messages, + loading: historyLoading, + loadError: historyError, + appendMessageDeduped, + } = useChatHistory(agentId); + + const { + sending, + uploading, + sendMessage, + error: sendError, + clearError, + releaseSendGuards, + } = useChatSend(agentId, { + getHistoryMessages: () => messages, + onUserMessage: appendMessageDeduped, + onAgentMessage: appendMessageDeduped, + }); + + useChatSocket(agentId, { + onAgentMessage: appendMessageDeduped, + onSendComplete: releaseSendGuards, + }); + // Auto-grow the textarea: reset height to 'auto' so the scrollHeight // shrinks when the user deletes text, then size to scrollHeight up to // a 5-line cap. Beyond the cap, internal scroll kicks in. @@ -255,73 +254,19 @@ export function MobileChat({ } }, [messages]); - // Load chat history on mount / agent switch. - const loadHistory = useCallback(async () => { - setHistoryLoading(true); - setHistoryError(null); - try { - const resp = await api.get<{ - messages: Array<{ - id: string; - role: string; - content: string; - timestamp: string; - }>; - }>(`/workspaces/${agentId}/chat-history?limit=50`); - const loaded = (resp.messages ?? []).map((m) => ({ - id: m.id, - role: m.role as "user" | "agent" | "system", - text: m.content, - ts: formatStoredTimestamp(m.timestamp), - })); - setMessages(loaded); - } catch (e) { - setHistoryError(e instanceof Error ? e.message : "Failed to load history"); - } finally { - setHistoryLoading(false); - } - }, [agentId]); - + // Consume any agent messages that arrived while history was loading. + const initialConsumeDoneRef = useRef(false); useEffect(() => { - let cancelled = false; - loadHistory().then(() => { - if (cancelled) return; - // Consume any agent messages that arrived while history was loading. - const consume = useCanvasStore.getState().consumeAgentMessages; - const msgs = consume(agentId); - if (msgs.length > 0) { - setMessages((prev) => [ - ...prev, - ...msgs.map((m) => ({ - id: m.id, - role: "agent" as const, - text: m.content, - ts: formatStoredTimestamp(m.timestamp), - })), - ]); - } - }); - return () => { cancelled = true; }; - }, [agentId, loadHistory]); - - // Consume live agent pushes while the panel is mounted. - const pendingAgentMsgs = useCanvasStore((s) => s.agentMessages[agentId]); - useEffect(() => { - if (!pendingAgentMsgs || pendingAgentMsgs.length === 0) return; + if (historyLoading || initialConsumeDoneRef.current) return; + initialConsumeDoneRef.current = true; const consume = useCanvasStore.getState().consumeAgentMessages; const msgs = consume(agentId); - if (msgs.length > 0) { - setMessages((prev) => [ - ...prev, - ...msgs.map((m) => ({ - id: m.id, - role: "agent" as const, - text: m.content, - ts: formatStoredTimestamp(m.timestamp), - })), - ]); + for (const m of msgs) { + appendMessageDeduped( + createMessage("agent", m.content, m.attachments), + ); } - }, [pendingAgentMsgs, agentId]); + }, [historyLoading, agentId, appendMessageDeduped]); if (!node) { return ( @@ -360,90 +305,11 @@ export function MobileChat({ const send = async () => { const text = draft.trim(); if ((!text && pendingFiles.length === 0) || sending || !reachable) return; - if (sendInFlightRef.current) return; - sendInFlightRef.current = true; + clearError(); setDraft(""); - setError(null); - setSending(true); - - let uploaded: ChatAttachment[] = []; - if (pendingFiles.length > 0) { - setUploading(true); - try { - uploaded = await uploadChatFiles(agentId, pendingFiles); - } catch (e) { - setError(e instanceof Error ? e.message : "Upload failed"); - setSending(false); - setUploading(false); - sendInFlightRef.current = false; - return; - } finally { - setUploading(false); - } - setPendingFiles([]); - } - - const myMsg: ChatMessage = { - id: crypto.randomUUID(), - role: "user", - text, - ts: formatTime(new Date()), - attachments: uploaded.length > 0 ? uploaded : undefined, - }; - setMessages((m) => [...m, myMsg]); - - try { - const parts: Array< - | { kind: "text"; text: string } - | { - kind: "file"; - file: { name: string; mimeType?: string; uri: string; size?: number }; - } - > = []; - if (text) parts.push({ kind: "text", text }); - for (const att of uploaded) { - parts.push({ - kind: "file", - file: { - name: att.name, - mimeType: att.mimeType, - uri: att.uri, - size: att.size, - }, - }); - } - - const res = await api.post(`/workspaces/${agentId}/a2a`, { - method: "message/send", - params: { - message: { - role: "user", - messageId: crypto.randomUUID(), - parts, - }, - }, - }); - const reply = - res.result?.parts?.find((part) => part.kind === "text")?.text ?? ""; - if (reply) { - setMessages((m) => [ - ...m, - { - id: crypto.randomUUID(), - role: "agent", - text: reply, - ts: formatTime(new Date()), - }, - ]); - } else if (res.error?.message) { - setError(res.error.message); - } - } catch (e) { - setError(e instanceof Error ? e.message : "Failed to send"); - } finally { - setSending(false); - sendInFlightRef.current = false; - } + const files = pendingFiles; + setPendingFiles([]); + await sendMessage(text, files); }; return ( @@ -628,7 +494,7 @@ export function MobileChat({ }} > - {m.text} + {m.content}
    - {m.ts} + {formatStoredTimestamp(m.timestamp)}
    ); })} - {error && ( + {sendError && (
    - {error} + {sendError}
    )} diff --git a/canvas/src/components/tabs/ChatTab.tsx b/canvas/src/components/tabs/ChatTab.tsx index 7f05270b..6b468f47 100644 --- a/canvas/src/components/tabs/ChatTab.tsx +++ b/canvas/src/components/tabs/ChatTab.tsx @@ -3,18 +3,20 @@ import { useState, useRef, useEffect, useCallback, useLayoutEffect } from "react"; import ReactMarkdown from "react-markdown"; import remarkGfm from "remark-gfm"; -import { api } from "@/lib/api"; import { useCanvasStore, type WorkspaceNodeData } from "@/store/canvas"; -import { useSocketEvent } from "@/hooks/useSocketEvent"; import { type ChatMessage, type ChatAttachment, createMessage, appendMessageDeduped } from "./chat/types"; -import { uploadChatFiles, downloadChatFile, isPlatformAttachment } from "./chat/uploads"; +import { downloadChatFile, isPlatformAttachment } from "./chat/uploads"; import { PendingAttachmentPill } from "./chat/AttachmentViews"; import { AttachmentPreview } from "./chat/AttachmentPreview"; -import { extractFilesFromTask } from "./chat/message-parser"; import { AgentCommsPanel } from "./chat/AgentCommsPanel"; import { appendActivityLine } from "./chat/activityLog"; import { runtimeDisplayName } from "@/lib/runtime-names"; import { ConfirmDialog } from "@/components/ConfirmDialog"; +import { useChatHistory } from "./chat/hooks/useChatHistory"; +import { useChatSend } from "./chat/hooks/useChatSend"; +import { useChatSocket } from "./chat/hooks/useChatSocket"; + +export { extractReplyText } from "./chat/hooks/useChatSend"; interface Props { workspaceId: string; @@ -23,147 +25,6 @@ interface Props { type ChatSubTab = "my-chat" | "agent-comms"; -// A2A response shape (subset). The full schema is in @a2a-js/sdk but we only -// need parts/artifacts text + file extraction for the synchronous fallback. -interface A2AFileRef { - name?: string; - mimeType?: string; - uri?: string; - bytes?: string; - size?: number; -} -// Outbound shape matches a2a-sdk's JSON-RPC `SendMessageRequest` -// Pydantic union (TextPart | FilePart | DataPart). The flat -// protobuf shape `{url, filename, mediaType}` is rejected at the -// request boundary with `Field required` errors — keep this -// outbound shape unless a2a-sdk migrates the JSON-RPC schema. -interface A2APart { - kind: string; - text?: string; - file?: A2AFileRef; -} -interface A2AResponse { - result?: { - parts?: A2APart[]; - artifacts?: Array<{ parts: A2APart[] }>; - }; -} - -// Internal-self-message filtering moved server-side in RFC #2945 -// PR-C/D — the platform's /chat-history endpoint applies the -// IsInternalSelfMessage predicate before returning rows, so the -// client no longer needs the local backstop on the history path. -// The proper fix is still X-Workspace-ID header (source_id=workspace_id); -// the platform-side prefix filter handles the residual cases. - -// extractReplyText pulls the agent's text reply out of an A2A response. -// Concatenates ALL text parts (joined with "\n") rather than returning -// just the first. Claude Code and other runtimes commonly emit multi- -// part text replies for long content (markdown tables, code blocks), -// and the prior "first part wins" implementation silently truncated -// the rest — observed on a 15k-char Wave 1 brief that rendered only -// the table header. Mirrors extractTextsFromParts in message-parser.ts. -// -// Server-side counterpart in workspace-server/internal/channels/ -// manager.go has the same single-part bug; fix that too if/when a -// channel-delivered reply (Slack, Lark, etc.) gets truncated. -export function extractReplyText(resp: A2AResponse): string { - const collect = (parts: A2APart[] | undefined): string => { - if (!parts) return ""; - return parts - .filter((p) => p.kind === "text") - .map((p) => p.text ?? "") - .filter(Boolean) - .join("\n"); - }; - const result = resp?.result; - const collected: string[] = []; - const fromParts = collect(result?.parts); - if (fromParts) collected.push(fromParts); - // Walk artifacts even if parts had text — some producers (Hermes - // tool calls) emit a summary in parts AND details in artifacts. - // Returning early on parts dropped the artifact body silently. - if (result?.artifacts) { - for (const a of result.artifacts) { - const t = collect(a.parts); - if (t) collected.push(t); - } - } - return collected.join("\n"); -} - -// Agent-returned files live on the same response shape as text — -// delegated to extractFilesFromTask in message-parser.ts, which also -// walks status.message.parts (that ChatTab's legacy text extractor -// doesn't). Single source of truth for file-part parsing across -// live chat, activity log replay, and any future consumers. - -/** Initial chat history page size. The newest N messages are rendered - * on first paint; older history is fetched on demand via loadOlder() - * when the user scrolls the top sentinel into view. */ -const INITIAL_HISTORY_LIMIT = 10; -/** Subsequent older-history batch size. Larger than INITIAL so a long - * scroll-back doesn't fan out into many round-trips. */ -const OLDER_HISTORY_BATCH = 20; - -/** - * Load chat history from the platform's typed /chat-history endpoint. - * - * Server-side rendering of activity_logs rows into ChatMessage shape - * lives in workspace-server/internal/messagestore/postgres_store.go - * (RFC #2945 PR-C/D). The server already applies the canvas-source - * filter, the internal-self-message predicate, the role decision - * (status=error vs agent-error prefix → system), and the v0/v1 - * file-shape extraction. Canvas just renders what it receives. - * - * Wire shape (mirrors ChatMessage exactly, no per-row mapping needed): - * - * GET /workspaces/:id/chat-history?limit=N&before_ts=T - * 200 → {"messages": ChatMessage[], "reached_end": boolean} - * - * Pagination: - * - Pass `limit` to bound the page size (newest-first from server). - * - Pass `beforeTs` (RFC3339) to fetch rows STRICTLY OLDER than that - * timestamp. Combined with limit, this yields the next-older page - * when scrolling backward through history. - * - * `reachedEnd` is propagated from the server. The server computes it - * by comparing rowCount vs limit so a partial last page is correctly - * detected even when the row→bubble fan-out is non-1:1 (each row - * produces 1-2 bubbles). - */ -async function loadMessagesFromDB( - workspaceId: string, - limit: number, - beforeTs?: string, -): Promise<{ messages: ChatMessage[]; error: string | null; reachedEnd: boolean }> { - try { - const params = new URLSearchParams({ limit: String(limit) }); - if (beforeTs) params.set("before_ts", beforeTs); - const resp = await api.get<{ messages: ChatMessage[]; reached_end: boolean }>( - `/workspaces/${workspaceId}/chat-history?${params.toString()}`, - ); - - // Server emits oldest-first within the page (RFC #2945 PR-C-2 - // post-fix: server reverses row-aware before returning so the - // wire is display-ready). Canvas appends/prepends without - // reordering — this avoids the pair-flip bug a naive flat - // reverse causes when each row produces a (user, agent) pair - // with the same timestamp. - return { - messages: resp.messages ?? [], - error: null, - reachedEnd: resp.reached_end, - }; - } catch (err) { - return { - messages: [], - error: err instanceof Error ? err.message : "Failed to load chat history", - reachedEnd: true, - }; - } -} - /** * ChatTab container — renders sub-tab bar + My Chat or Agent Comms panel. */ @@ -247,268 +108,68 @@ export function ChatTab({ workspaceId, data }: Props) { * MyChatPanel — user↔agent conversation (extracted from original ChatTab). */ function MyChatPanel({ workspaceId, data }: Props) { - const [messages, setMessages] = useState([]); const [input, setInput] = useState(""); - // `sending` is strictly the "this tab kicked off a send and hasn't - // seen the reply yet" signal. Previously this was initialized from - // data.currentTask to pick up in-flight agent work on mount, but - // that conflated agent-busy (workspace heartbeat) with user- - // in-flight (local send): when the WS dropped a TASK_COMPLETE event, - // currentTask lingered, the component re-mounted with sending=true, - // and the Send button stayed disabled forever even though nothing - // local was in flight. For the "agent is busy, show spinner" UX, - // use data.currentTask directly in the render path. - const [sending, setSending] = useState(false); - const [thinkingElapsed, setThinkingElapsed] = useState(0); + const [pendingFiles, setPendingFiles] = useState([]); const [activityLog, setActivityLog] = useState([]); - const [loading, setLoading] = useState(true); - const [loadError, setLoadError] = useState(null); - const currentTaskRef = useRef(data.currentTask); - const sendingFromAPIRef = useRef(false); + const [thinkingElapsed, setThinkingElapsed] = useState(0); const [agentReachable, setAgentReachable] = useState(false); const [error, setError] = useState(null); const [confirmRestart, setConfirmRestart] = useState(false); - const bottomRef = useRef(null); - // First-mount scroll-to-bottom needs `behavior: "instant"` — long - // conversations smooth-animate for ~300ms which any concurrent - // re-render can interrupt, leaving the user stuck mid-conversation - // when the chat tab opens. Subsequent appends (new agent messages) - // keep `smooth` for the visual "landing" feel. Flipped the first - // time messages.length goes positive, so a workspace switch (which - // remounts ChatTab) gets a fresh instant jump too. - const hasInitialScrollRef = useRef(false); - // Lazy-load older history on scroll-up. - // - containerRef = the scrollable messages viewport - // - topRef = sentinel above the messages list; IO observes it - // and triggers loadOlder() when it enters view - // - hasMore = false once a fetch returns < limit rows; stops IO - // - loadingOlder = drives the "Loading older messages…" UI label - // - inflightRef = synchronous guard against double-entry of loadOlder - // when the IO callback fires twice in the same - // microtask (state-based guard would be stale until - // the next React commit) - // - scrollAnchorRef = saves distance-from-bottom before a prepend - // so the useLayoutEffect below can restore the - // user's exact viewport position. Without this, - // prepending older messages would jump the scroll - // position by the height of the new content. - // - oldestMessageRef / hasMoreRef = let the loadOlder closure read - // the latest values without taking them as deps — - // every live agent push mutates `messages`, and - // having loadOlder depend on `messages` would tear - // down + re-arm the IntersectionObserver on every - // push. Refs decouple the observer lifecycle from - // message-list updates. + const [dragOver, setDragOver] = useState(false); + const containerRef = useRef(null); const topRef = useRef(null); - const [hasMore, setHasMore] = useState(true); - const [loadingOlder, setLoadingOlder] = useState(false); - const inflightRef = useRef(false); - // The scroll anchor includes the first-message id as it was BEFORE - // the prepend — see useLayoutEffect below for why. Without this tag, - // a live agent push that appends WHILE loadOlder is in flight would - // run useLayoutEffect against the append (anchor still set), the - // "restore" math would scroll the user to a stale offset, AND the - // append's normal scroll-to-bottom would be swallowed. - const scrollAnchorRef = useRef< - { savedDistanceFromBottom: number; expectFirstIdNotEqual: string | null } | null - >(null); - const oldestMessageRef = useRef(null); - const hasMoreRef = useRef(true); - // Monotonic token bumped on workspace switch + on every loadOlder - // entry. Each fetch's .then() captures its own token; if the token - // has moved, the resolved messages belong to a stale workspace or a - // superseded fetch and we silently drop them. Without this guard, a - // workspace switch mid-fetch would have the in-flight promise - // resolve into the new workspace's setMessages — the user sees - // someone else's history briefly. - const fetchTokenRef = useRef(0); - // Files the user has picked but not yet sent. Cleared on send - // (upload success) or by the × on each pill. - const [pendingFiles, setPendingFiles] = useState([]); - const [uploading, setUploading] = useState(false); + const bottomRef = useRef(null); + const hasInitialScrollRef = useRef(false); const fileInputRef = useRef(null); - // Guard against a double-click during the upload phase: React - // state updates from the click that started the upload haven't - // flushed yet, so the disabled-button logic sees `uploading=false` - // from the closure and lets a second `sendMessage` enter. A ref - // observes the latest value synchronously. - const sendInFlightRef = useRef(false); - // Monotonic token bumped on every sendMessage entry. Each .then()/ - // .catch() captures its own token in closure and bails if a newer - // send has superseded it — prevents a late HTTP response for an - // earlier message from clobbering the flags / appending text that - // belong to a newer in-flight send. Race scenario the token closes: - // (1) send msg #1 (2) WS push for msg #1 arrives, releases guards - // (3) user sends msg #2 (4) HTTP for msg #1 finally lands — without - // the token check, .then() sees sendingFromAPIRef=true (set by - // msg #2's send), enters the main body, and processes msg #1's body - // as if it were msg #2's reply. - const sendTokenRef = useRef(0); + const dragDepthRef = useRef(0); + const pasteCounterRef = useRef(0); - // Release every in-flight send guard at once. Used by every site - // that ends a send: pendingAgentMsgs WS push, ACTIVITY_LOGGED - // a2a_receive ok/error WS event, HTTP .then() success, and HTTP - // .catch() success. Keep these in lockstep — a future contributor - // adding a new "I saw the reply" path that only clears `sending` + - // `sendingFromAPIRef` (the natural pair) silently re-introduces - // the post-WS Send-button freeze, because the disabled-button - // logic can't see `sendInFlightRef` and so the visible state diverges - // from the synchronous re-entry guard at line 464. - const releaseSendGuards = useCallback(() => { - setSending(false); - sendingFromAPIRef.current = false; - sendInFlightRef.current = false; - }, []); + const history = useChatHistory(workspaceId, containerRef); + const chatSend = useChatSend(workspaceId, { + getHistoryMessages: () => history.messages, + onUserMessage: (msg) => history.setMessages((prev) => [...prev, msg]), + onAgentMessage: (msg) => history.setMessages((prev) => appendMessageDeduped(prev, msg)), + }); + const { sending, uploading, sendMessage, error: sendError, clearError: clearSendError, releaseSendGuards, sendingFromAPIRef } = chatSend; - // Initial-load fetch — used by the mount effect and the "Retry" - // button below. Single source of truth so the two paths can't drift - // (e.g. INITIAL_HISTORY_LIMIT bumped in the effect but not the - // retry, leading to inconsistent first-paint sizes). - const loadInitial = useCallback(() => { - setLoading(true); - setLoadError(null); - setHasMore(true); - // Bump the token; any in-flight fetch from the previous workspace - // (or a previous retry) will see token != myToken in its .then() - // and silently bail — the late response can't clobber the new - // workspace's state. - fetchTokenRef.current += 1; - const myToken = fetchTokenRef.current; - loadMessagesFromDB(workspaceId, INITIAL_HISTORY_LIMIT).then( - ({ messages: msgs, error: fetchErr, reachedEnd }) => { - if (fetchTokenRef.current !== myToken) return; - setMessages(msgs); - setLoadError(fetchErr); - setHasMore(!reachedEnd); - setLoading(false); - }, - ); - }, [workspaceId]); + const displayError = error || sendError; - // Load chat history on mount / workspace switch. - // Initial load is bounded to INITIAL_HISTORY_LIMIT (newest 10) — the - // rest streams in as the user scrolls up via loadOlder() below. Pre- - // 2026-05-05 this fetched the newest 50 in one shot; on a long-running - // workspace that meant 50× message-bubble paint + DOM cost on every - // tab-open even when the user only wanted to read the last few. - useEffect(() => { - loadInitial(); - }, [loadInitial]); - - // Mirror the latest oldest-message + hasMore into refs so loadOlder - // can read them without taking `messages` as a dep. Every live push - // through agentMessages would otherwise recreate loadOlder and tear - // down the IO observer. - useEffect(() => { - oldestMessageRef.current = messages[0] ?? null; - }, [messages]); - useEffect(() => { - hasMoreRef.current = hasMore; - }, [hasMore]); - - // Fetch the next-older batch and prepend. Stable identity (deps = - // [workspaceId]) so the IntersectionObserver effect below doesn't - // re-arm on every messages update. - const loadOlder = useCallback(async () => { - // inflightRef is the load-bearing guard — synchronous, set BEFORE - // any await, so two IO callbacks dispatched in the same microtask - // can't both pass. The state checks are defensive secondary - // gates for the slow-scroll case. - if (inflightRef.current || !hasMoreRef.current) return; - const oldest = oldestMessageRef.current; - if (!oldest) return; - const container = containerRef.current; - if (!container) return; - inflightRef.current = true; - // Capture the user's distance-from-bottom BEFORE we prepend so the - // useLayoutEffect can restore it after the new DOM lands. The - // expectFirstIdNotEqual tag is what the layout effect checks - // against `messages[0].id` to disambiguate prepend (id changed) vs - // append (id unchanged → live message landed mid-fetch). Without - // it, an agent push during loadOlder runs the "restore" against a - // stale anchor — user gets yanked + the append's bottom-pin is - // swallowed. - scrollAnchorRef.current = { - savedDistanceFromBottom: container.scrollHeight - container.scrollTop, - expectFirstIdNotEqual: oldest.id, - }; - fetchTokenRef.current += 1; - const myToken = fetchTokenRef.current; - setLoadingOlder(true); - try { - const { messages: older, reachedEnd } = await loadMessagesFromDB( - workspaceId, - OLDER_HISTORY_BATCH, - oldest.timestamp, - ); - // Workspace switched (or another loadOlder bumped the token) - // mid-fetch — drop these results, they belong to a stale tab. - if (fetchTokenRef.current !== myToken) { - scrollAnchorRef.current = null; - return; + useChatSocket(workspaceId, { + onAgentMessage: (msg) => { + history.setMessages((prev) => appendMessageDeduped(prev, msg)); + if (sendingFromAPIRef.current) { + releaseSendGuards(); } - if (older.length > 0) { - setMessages((prev) => [...older, ...prev]); - } else { - // Nothing came back — clear the anchor so the next paint doesn't - // try to "restore" against a no-op prepend. - scrollAnchorRef.current = null; + }, + onActivityLog: (entry) => { + if (!sending) return; + setActivityLog((prev) => appendActivityLine(prev, entry)); + }, + onSendComplete: () => { + if (sendingFromAPIRef.current) { + releaseSendGuards(); } - setHasMore(!reachedEnd); - } finally { - setLoadingOlder(false); - inflightRef.current = false; - } - }, [workspaceId]); - - // IntersectionObserver on the top sentinel. Fires loadOlder() the - // moment the user scrolls within 200px of the top. AbortController - // unwires cleanly on workspace switch / unmount; root is the - // scrollable container so we observe only what's visible inside it. - // - // Dependencies: - // - loadOlder — stable per workspaceId (refs decouple it from - // message updates), so this dep is here for the - // workspace-switch case only - // - hasMore — re-run when older history runs out so we - // disconnect cleanly - // - hasMessages — load-bearing: the sentinel JSX is gated on - // `messages.length > 0`, so topRef.current is null - // on the empty-messages render. We re-arm exactly - // once when messages first land. NOT depending on - // `messages.length` (or `messages`) directly so - // each subsequent message append doesn't tear down - // + re-arm the observer. - const hasMessages = messages.length > 0; - useEffect(() => { - const top = topRef.current; - const container = containerRef.current; - if (!top || !container) return; - if (!hasMore) return; // stop observing when no older history exists - const ac = new AbortController(); - const io = new IntersectionObserver( - (entries) => { - if (ac.signal.aborted) return; - if (entries[0]?.isIntersecting) loadOlder(); - }, - { root: container, rootMargin: "200px 0px 0px 0px", threshold: 0 }, - ); - io.observe(top); - ac.signal.addEventListener("abort", () => io.disconnect()); - return () => ac.abort(); - }, [loadOlder, hasMore, hasMessages]); + }, + onSendError: (err) => { + if (sendingFromAPIRef.current) { + releaseSendGuards(); + setError(err); + } + }, + }); // Agent reachability useEffect(() => { const reachable = data.status === "online" || data.status === "degraded"; setAgentReachable(reachable); - setError(reachable ? null : `Agent is ${data.status}`); - }, [data.status]); - - useEffect(() => { - currentTaskRef.current = data.currentTask; - }, [data.currentTask]); + if (reachable) { + setError(null); + clearSendError(); + } else { + setError(`Agent is ${data.status}`); + } + }, [data.status, clearSendError]); // Scroll behavior across messages updates: // - Prepend (loadOlder landed) → restore the user's saved @@ -518,71 +179,24 @@ function MyChatPanel({ workspaceId, data }: Props) { // paint — otherwise the user sees the page jump for one frame. useLayoutEffect(() => { const container = containerRef.current; - const anchor = scrollAnchorRef.current; - // Only honor the anchor when this messages-update is the prepend - // we expected. messages[0].id is the test: - // - prepend → messages[0] is one of the older rows → id !== expectFirstIdNotEqual - // - append → messages[0] unchanged → id === expectFirstIdNotEqual → fall through - // Without this check, an agent push that lands mid-loadOlder would - // run the restore against the append's update, yank the user's - // scroll, AND swallow the append's bottom-pin. + const anchor = history.scrollAnchorRef.current; if ( anchor && container && - messages.length > 0 && - messages[0].id !== anchor.expectFirstIdNotEqual + history.messages.length > 0 && + history.messages[0].id !== anchor.expectFirstIdNotEqual ) { container.scrollTop = container.scrollHeight - anchor.savedDistanceFromBottom; - scrollAnchorRef.current = null; + history.scrollAnchorRef.current = null; return; } - // Instant on first arrival of messages — smooth-scroll on a long - // conversation gets interrupted by concurrent renders and leaves - // the user stuck in the middle. After the first jump, subsequent - // appends animate as before. - if (!hasInitialScrollRef.current && messages.length > 0) { + if (!hasInitialScrollRef.current && history.messages.length > 0) { hasInitialScrollRef.current = true; bottomRef.current?.scrollIntoView({ behavior: "instant" as ScrollBehavior }); return; } bottomRef.current?.scrollIntoView({ behavior: "smooth" }); - }, [messages]); - - // Consume agent push messages (send_message_to_user) from global store. - // Runtimes like Claude Code SDK deliver their reply via a WS push rather - // than the /a2a HTTP response — when that happens, the push is the - // authoritative "reply arrived" signal for the UI, so clear `sending` - // here too. The HTTP .then() coordinates through sendingFromAPIRef so - // whichever path clears first wins. - const pendingAgentMsgs = useCanvasStore((s) => s.agentMessages[workspaceId]); - useEffect(() => { - if (!pendingAgentMsgs || pendingAgentMsgs.length === 0) return; - const consume = useCanvasStore.getState().consumeAgentMessages; - const msgs = consume(workspaceId); - for (const m of msgs) { - // Dedupe in case the agent proactively pushed the same text the - // HTTP /a2a response already delivered (observed with the Hermes - // runtime, which emits both a reply body and a send_message_to_user - // push for the same content). Attachments ride along with the - // message so files returned by the A2A_RESPONSE WS path render - // their download chips. - setMessages((prev) => appendMessageDeduped(prev, createMessage("agent", m.content, m.attachments))); - } - if (sendingFromAPIRef.current && msgs.length > 0) { - // Reply arrived via WS push (e.g. claude-code SDK). Release all - // three guards together — without sendInFlightRef the next - // sendMessage() silently no-ops at the synchronous re-entry - // check. - releaseSendGuards(); - } - }, [pendingAgentMsgs, workspaceId]); - - // Resolve workspace ID → name for activity display - const resolveWorkspaceName = useCallback((id: string) => { - const nodes = useCanvasStore.getState().nodes; - const node = nodes.find((n) => n.id === id); - return (node?.data as WorkspaceNodeData)?.name || id.slice(0, 8); - }, []); + }, [history.messages, history.scrollAnchorRef]); // Elapsed timer while sending useEffect(() => { @@ -609,211 +223,43 @@ function MyChatPanel({ workspaceId, data }: Props) { setActivityLog([`Processing with ${runtimeDisplayName(data.runtime)}...`]); }, [sending, data.runtime]); - // Subscribe to global WS via the singleton ReconnectingSocket (no - // per-component WebSocket — the previous pattern dropped events - // silently on any reconnect because each panel's raw socket had no - // onclose handler). - useSocketEvent((msg) => { - if (!sending) return; - try { - if (msg.event === "ACTIVITY_LOGGED") { - // Filter to events for THIS workspace. The platform's - // BroadcastOnly fires to every connected client, and - // without this guard a sibling workspace's a2a_send would - // surface as "→ Delegating to X..." inside the wrong - // chat panel. (workspace_id on the WS envelope is the - // workspace whose activity_log row we just wrote.) - if (msg.workspace_id !== workspaceId) return; + // IntersectionObserver on the top sentinel. Fires loadOlder() the + // moment the user scrolls within 200px of the top. AbortController + // unwires cleanly on workspace switch / unmount; root is the + // scrollable container so we observe only what's visible inside it. + const hasMessages = history.messages.length > 0; + useEffect(() => { + const top = topRef.current; + const container = containerRef.current; + if (!top || !container) return; + if (!history.hasMore) return; + const ac = new AbortController(); + const io = new IntersectionObserver( + (entries) => { + if (ac.signal.aborted) return; + if (entries[0]?.isIntersecting) history.loadOlder(); + }, + { root: container, rootMargin: "200px 0px 0px 0px", threshold: 0 }, + ); + io.observe(top); + ac.signal.addEventListener("abort", () => io.disconnect()); + return () => ac.abort(); + }, [history.loadOlder, history.hasMore, hasMessages]); - const p = msg.payload || {}; - const type = p.activity_type as string; - const method = (p.method as string) || ""; - const status = (p.status as string) || ""; - const targetId = (p.target_id as string) || ""; - const durationMs = p.duration_ms as number | undefined; - const summary = (p.summary as string) || ""; - - let line = ""; - if (type === "a2a_receive" && method === "message/send") { - const targetName = resolveWorkspaceName(targetId || msg.workspace_id); - if (status === "ok" && durationMs) { - const sec = Math.round(durationMs / 1000); - line = `← ${targetName} responded (${sec}s)`; - // The platform logs a successful a2a_receive once the workspace - // has fully produced its reply. That's the authoritative "done" - // signal for the spinner — clear it even if the reply hasn't - // surfaced through the store yet (it may be delivered shortly - // via pendingAgentMsgs or the HTTP .then()). - const own = (targetId || msg.workspace_id) === workspaceId; - if (own && sendingFromAPIRef.current) { - releaseSendGuards(); - } - } else if (status === "error") { - line = `⚠ ${targetName} error`; - const own = (targetId || msg.workspace_id) === workspaceId; - if (own && sendingFromAPIRef.current) { - releaseSendGuards(); - setError("Agent error (Exception) — see workspace logs for details."); - } - } - } else if (type === "a2a_send") { - const targetName = resolveWorkspaceName(targetId); - line = `→ Delegating to ${targetName}...`; - } else if (type === "task_update") { - if (summary) line = `⟳ ${summary}`; - } else if (type === "agent_log") { - // Per-tool-use telemetry from claude_sdk_executor's - // _report_tool_use. The summary already carries an icon - // + human-readable args (📄 Read /path, ⚡ Bash: …) - // so we render it verbatim. No icon prefix here — the - // emoji at the start of summary is the visual marker. - if (summary) line = summary; - } - - if (line) { - setActivityLog((prev) => appendActivityLine(prev, line)); - } - } else if (msg.event === "TASK_UPDATED" && msg.workspace_id === workspaceId) { - const task = (msg.payload?.current_task as string) || ""; - if (task) { - setActivityLog((prev) => appendActivityLine(prev, `⟳ ${task}`)); - } - } - // A2A_RESPONSE is already consumed by the store and its text is - // appended to messages via the pendingAgentMsgs effect above; we - // don't need to duplicate it here. - } catch { /* ignore */ } - }); - - const sendMessage = async () => { + const handleSend = async () => { const text = input.trim(); - const filesToSend = pendingFiles; - // Allow sending if EITHER text OR attachments are present — a user - // can drop a file with no text and the agent still receives it. - if ((!text && filesToSend.length === 0) || !agentReachable || sending || uploading) return; - // Synchronous re-entry guard — see sendInFlightRef comment. - if (sendInFlightRef.current) return; - sendInFlightRef.current = true; - - // Upload attachments first so we can include URIs in the A2A - // message parts. Sequential-before-send: a message with references - // to files not yet staged would fail agent-side; staging happens - // synchronously via /chat/uploads before message/send dispatch. - let uploaded: ChatAttachment[] = []; - if (filesToSend.length > 0) { - setUploading(true); - try { - uploaded = await uploadChatFiles(workspaceId, filesToSend); - } catch (e) { - setUploading(false); - sendInFlightRef.current = false; - setError(e instanceof Error ? `Upload failed: ${e.message}` : "Upload failed"); - return; - } - setUploading(false); - } - + const files = pendingFiles; + if ((!text && files.length === 0) || !agentReachable || sending || uploading) return; setInput(""); setPendingFiles([]); - setMessages((prev) => [...prev, createMessage("user", text, uploaded)]); - setSending(true); - sendingFromAPIRef.current = true; + clearSendError(); setError(null); - // Capture this send's token so the .then()/.catch() callbacks can - // detect a newer send that may have superseded them. See the - // sendTokenRef declaration for the race scenario this closes. - const myToken = ++sendTokenRef.current; - - // Build conversation history from prior messages (last 20) - const history = messages - .filter((m) => m.role === "user" || m.role === "agent") - .slice(-20) - .map((m) => ({ - role: m.role === "user" ? "user" : "agent", - parts: [{ kind: "text", text: m.content }], - })); - - // A2A parts: text part (if any) + file parts (per attachment). The - // agent sees both in a single turn, matching the A2A spec shape. - // Wire shape is v0 — see A2APart definition above. - const parts: A2APart[] = []; - if (text) parts.push({ kind: "text", text }); - for (const att of uploaded) { - parts.push({ - kind: "file", - file: { - name: att.name, - mimeType: att.mimeType, - uri: att.uri, - size: att.size, - }, - }); - } - - // A2A calls can legitimately take minutes — LLM latency + - // multi-turn tool use is common on slower providers (Hermes+minimax, - // Claude Code invoking bash/file tools, etc.). The 15s default - // would silently abort the fetch here, leaving the server to - // complete the reply and the user staring at - // "agent may be unreachable". Match the upload timeout (60s × 2) - // for the happy-path ceiling; anything longer is genuinely stuck. - api.post(`/workspaces/${workspaceId}/a2a`, { - method: "message/send", - params: { - message: { - role: "user", - messageId: crypto.randomUUID(), - parts, - }, - metadata: { history }, - }, - }, { timeoutMs: 120_000 }) - .then((resp) => { - // Bail without touching any flags if a newer sendMessage has - // already run — its myToken bumped sendTokenRef, so this is - // a stale callback for an earlier message. The newer send - // owns the in-flight guards now. - if (sendTokenRef.current !== myToken) return; - // Skip if the WS A2A_RESPONSE event already handled this response. - // Both paths (WS + HTTP) check sendingFromAPIRef — whichever clears - // it first wins, the other becomes a no-op (no duplicate messages). - if (!sendingFromAPIRef.current) { - sendInFlightRef.current = false; - return; - } - const replyText = extractReplyText(resp); - const replyFiles = extractFilesFromTask((resp?.result ?? {}) as Record); - if (replyText || replyFiles.length > 0) { - setMessages((prev) => - appendMessageDeduped(prev, createMessage("agent", replyText, replyFiles)), - ); - } - releaseSendGuards(); - }) - .catch(() => { - // Stale-callback guard — same rationale as .then(). - if (sendTokenRef.current !== myToken) return; - // Same dedup guard as .then(): if a WS path (pendingAgentMsgs - // or ACTIVITY_LOGGED a2a_receive ok) already delivered the - // reply, sendingFromAPIRef is already false and there's - // nothing to roll back. Surfacing "Failed to send" here would - // contradict the agent reply the user is currently reading — - // exactly the false-positive observed when the HTTP request - // hung up (proxy idle / 502) after WS already won. - if (!sendingFromAPIRef.current) { - sendInFlightRef.current = false; - return; - } - releaseSendGuards(); - setError("Failed to send message — agent may be unreachable"); - }); + await sendMessage(text, files); }; const onFilesPicked = (fileList: FileList | null) => { if (!fileList) return; const picked = Array.from(fileList); - // Deduplicate against current pending set by name+size — user - // picking the same file twice shouldn't append it. setPendingFiles((prev) => { const keyed = new Set(prev.map((f) => `${f.name}:${f.size}`)); return [...prev, ...picked.filter((f) => !keyed.has(`${f.name}:${f.size}`))]; @@ -824,35 +270,7 @@ function MyChatPanel({ workspaceId, data }: Props) { const removePendingFile = (index: number) => setPendingFiles((prev) => prev.filter((_, i) => i !== index)); - // Monotonic counter so two paste events within the same wall-clock - // second still produce distinct filenames. Without this, on - // Firefox (where pasted images have an empty `file.name`), two - // pastes ~100ms apart could yield identical synthetic names AND - // identical sizes, collapsing into one attachment via the - // `name:size` dedup in onFilesPicked. - const pasteCounterRef = useRef(0); - - /** Paste-from-clipboard image attachment. - * - * Browser clipboard image items arrive as `File`s whose `name` is - * often a generic "image.png" (Chrome) or empty (Firefox/Safari), - * so two consecutive screenshot pastes collide on the name+size - * dedup the file-picker uses. Re-tag each pasted image with a - * per-paste unique name so dedup keeps them apart and the upload - * pipeline (which expects a non-empty filename) is happy. - * - * Falls through to onFilesPicked via direct File[] (NOT through - * the DataTransfer constructor — that throws on Safari < 14.1 - * and old Edge, silently aborting the paste). - * - * Only intercepts the paste when the clipboard has at least one - * image; text-only pastes fall through to the textarea's default - * behaviour. */ const mimeToExt = (mime: string): string => { - // Avoid raw `mime.split("/")[1]` — that yields `"svg+xml"`, - // `"jpeg"`, `"webp"` etc. which produce ugly filenames and may - // trip server-side extension allowlists. Map known types - // explicitly; unknown falls back to a safe default. if (mime === "image/svg+xml") return "svg"; if (mime === "image/jpeg") return "jpg"; if (mime === "image/png") return "png"; @@ -873,26 +291,16 @@ function MyChatPanel({ workspaceId, data }: Props) { const file = item.getAsFile(); if (!file) continue; const ext = mimeToExt(file.type); - const stamp = new Date() - .toISOString() - .replace(/[:.]/g, "-") - .slice(0, 19); + const stamp = new Date().toISOString().replace(/[:.]/g, "-").slice(0, 19); const seq = pasteCounterRef.current++; const fname = `pasted-${stamp}-${seq}-${i}.${ext}`; imageFiles.push(new File([file], fname, { type: file.type })); } if (imageFiles.length === 0) return; e.preventDefault(); - // Reuse the picker path so file-size guards, dedup, and pending- - // list state all run through the same code. Build a synthetic - // FileList-like object to avoid the DataTransfer constructor — - // that's missing on Safari < 14.1 / old Edge and would silently - // throw, leaving the paste a no-op. addPastedFiles(imageFiles); }; - // Variant of onFilesPicked that accepts a File[] directly, sidestepping - // the DataTransfer-FileList round-trip. Same dedup + state shape. const addPastedFiles = (files: File[]) => { setPendingFiles((prev) => { const keyed = new Set(prev.map((f) => `${f.name}:${f.size}`)); @@ -900,11 +308,6 @@ function MyChatPanel({ workspaceId, data }: Props) { }); }; - // Drag-and-drop staging. dragDepthRef counts enter vs leave events so - // the overlay doesn't flicker when the cursor crosses nested children - // (textarea, buttons) — dragenter/dragleave fire for every boundary. - const [dragOver, setDragOver] = useState(false); - const dragDepthRef = useRef(0); const dropEnabled = agentReachable && !sending && !uploading; const isFileDrag = (e: React.DragEvent) => Array.from(e.dataTransfer.types || []).includes("Files"); @@ -934,9 +337,6 @@ function MyChatPanel({ workspaceId, data }: Props) { }; const downloadAttachment = (att: ChatAttachment) => { - // Errors here are rare but user-visible (401 on a revoked token, - // 404 if the agent deleted the file). Surface via the inline - // error banner — the message list itself stays untouched. downloadChatFile(workspaceId, att).catch((e) => { setError(e instanceof Error ? `Download failed: ${e.message}` : "Download failed"); }); @@ -964,26 +364,26 @@ function MyChatPanel({ workspaceId, data }: Props) { )} {/* Messages */}
    - {loading && ( + {history.loading && (
    Loading chat history...
    )} - {!loading && loadError !== null && messages.length === 0 && ( + {!history.loading && history.loadError !== null && history.messages.length === 0 && (

    - Failed to load chat history: {loadError} + Failed to load chat history: {history.loadError}

    )} - {!loading && loadError === null && messages.length === 0 && ( + {!history.loading && history.loadError === null && history.messages.length === 0 && (
    No messages yet. Send a message to start chatting with this agent.
    @@ -1001,12 +401,12 @@ function MyChatPanel({ workspaceId, data }: Props) { instead of showing a "no more messages" footer — the user's scroll resting against the top of the conversation IS the signal. */} - {hasMore && messages.length > 0 && ( + {history.hasMore && history.messages.length > 0 && (
    - {loadingOlder ? "Loading older messages…" : " "} + {history.loadingOlder ? "Loading older messages…" : " "}
    )} - {messages.map((msg) => ( + {history.messages.map((msg) => (
    {/* Error banner */} - {error && ( + {displayError && (
    - {error} + {displayError} {!isOnline && (