diff --git a/canvas/src/components/tabs/ChatTab.tsx b/canvas/src/components/tabs/ChatTab.tsx index 7f05270b..055d7e00 100644 --- a/canvas/src/components/tabs/ChatTab.tsx +++ b/canvas/src/components/tabs/ChatTab.tsx @@ -962,6 +962,32 @@ function MyChatPanel({ workspaceId, data }: Props) { )} + {/* talk_to_user disabled banner — shown when the workspace has + talk_to_user_enabled=false. The agent cannot send canvas messages; + the user can re-enable the ability from here without opening settings. */} + {data.talkToUserEnabled === false && ( +
+ + + Agent is not enabled to chat with you. + + +
+ )} {/* Messages */}
{loading && ( diff --git a/canvas/src/store/canvas-topology.ts b/canvas/src/store/canvas-topology.ts index 12a1cc45..1bed943b 100644 --- a/canvas/src/store/canvas-topology.ts +++ b/canvas/src/store/canvas-topology.ts @@ -519,6 +519,10 @@ export function buildNodesAndEdges( // #2054 — server-declared per-workspace provisioning timeout. // Falls through to the runtime profile when null/absent. provisionTimeoutMs: ws.provision_timeout_ms ?? null, + // Workspace abilities — defaults preserved for old platform versions + // that don't yet include these columns in the GET response. + broadcastEnabled: ws.broadcast_enabled ?? false, + talkToUserEnabled: ws.talk_to_user_enabled ?? true, }, }; if (hasParent) { diff --git a/canvas/src/store/canvas.ts b/canvas/src/store/canvas.ts index 38129468..1baa0e66 100644 --- a/canvas/src/store/canvas.ts +++ b/canvas/src/store/canvas.ts @@ -99,6 +99,13 @@ export interface WorkspaceNodeData extends Record { * @/lib/runtimeProfiles. Lets a slow runtime declare its cold-boot * expectation without a canvas release. */ provisionTimeoutMs?: number | null; + /** When true the workspace may POST /broadcast to send org-wide messages. + * Default false. Toggled by user/admin via PATCH /workspaces/:id/abilities. */ + broadcastEnabled?: boolean; + /** When false the workspace cannot deliver canvas chat messages. + * send_message_to_user / POST /notify return 403 and the canvas + * shows a "not enabled" state with a button to re-enable. Default true. */ + talkToUserEnabled?: boolean; } export type PanelTab = "details" | "skills" | "chat" | "terminal" | "config" | "schedule" | "channels" | "files" | "memory" | "traces" | "events" | "activity" | "audit"; diff --git a/canvas/src/store/socket.ts b/canvas/src/store/socket.ts index 81114ae9..7b2adcd3 100644 --- a/canvas/src/store/socket.ts +++ b/canvas/src/store/socket.ts @@ -299,6 +299,9 @@ export interface WorkspaceData { * `@/lib/runtimeProfiles` when absent (the default behavior for any * template that hasn't yet declared the field). */ provision_timeout_ms?: number | null; + /** Workspace ability flags (migration 20260514). */ + broadcast_enabled?: boolean; + talk_to_user_enabled?: boolean; } let socket: ReconnectingSocket | null = null; diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index 99b8bd1c..56dd7a1b 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -482,6 +482,13 @@ func (h *ActivityHandler) Notify(c *gin.Context) { c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"}) return } + if errors.Is(err, ErrTalkToUserDisabled) { + c.JSON(http.StatusForbidden, gin.H{ + "error": "talk_to_user_disabled", + "hint": "This workspace is not allowed to send messages directly to the user. Forward your update to a parent workspace using delegate_task — they may be able to reach the user.", + }) + return + } c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"}) return } diff --git a/workspace-server/internal/handlers/activity_test.go b/workspace-server/internal/handlers/activity_test.go index f6611814..ffb93d70 100644 --- a/workspace-server/internal/handlers/activity_test.go +++ b/workspace-server/internal/handlers/activity_test.go @@ -464,9 +464,9 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) { t.Cleanup(func() { db.DB = prevDB; mockDB.Close() }) // Workspace existence check - mock.ExpectQuery(`SELECT name FROM workspaces`). + mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`). WithArgs("ws-notify"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true)) // Persistence INSERT — verify shape mock.ExpectExec(`INSERT INTO activity_logs`). @@ -511,9 +511,9 @@ func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) { db.DB = mockDB t.Cleanup(func() { db.DB = prevDB; mockDB.Close() }) - mock.ExpectQuery(`SELECT name FROM workspaces`). + mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`). WithArgs("ws-attach"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true)) // Capture the JSONB arg so we can assert on the persisted shape // AFTER the call (must include parts[].kind=file so reload @@ -640,9 +640,9 @@ func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) { db.DB = mockDB t.Cleanup(func() { db.DB = prevDB; mockDB.Close() }) - mock.ExpectQuery(`SELECT name FROM workspaces`). + mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`). WithArgs("ws-x"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true)) mock.ExpectExec(`INSERT INTO activity_logs`). WillReturnError(fmt.Errorf("simulated db hiccup")) diff --git a/workspace-server/internal/handlers/agent_message_writer.go b/workspace-server/internal/handlers/agent_message_writer.go index 6efea603..82f18a8e 100644 --- a/workspace-server/internal/handlers/agent_message_writer.go +++ b/workspace-server/internal/handlers/agent_message_writer.go @@ -54,6 +54,11 @@ import ( // timeout) surface as wrapped errors and should be treated as 503. var ErrWorkspaceNotFound = errors.New("agent_message: workspace not found") +// ErrTalkToUserDisabled is returned when the workspace has +// talk_to_user_enabled=false. Callers surface HTTP 403 so the Python tool +// can detect it and suggest forwarding to a parent workspace. +var ErrTalkToUserDisabled = errors.New("agent_message: talk_to_user disabled") + // AgentMessageAttachment is one file attached to an agent → user // message. Identical to handlers.NotifyAttachment in field set; kept // distinct so the writer's API doesn't import a handler type with HTTP @@ -107,16 +112,20 @@ func (w *AgentMessageWriter) Send( // notify call surfaced as "workspace not found" and masked real // incidents in the alert path. var wsName string + var talkToUserEnabled bool err := w.db.QueryRowContext(ctx, - `SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`, + `SELECT name, talk_to_user_enabled FROM workspaces WHERE id = $1 AND status != 'removed'`, workspaceID, - ).Scan(&wsName) + ).Scan(&wsName, &talkToUserEnabled) if errors.Is(err, sql.ErrNoRows) { return ErrWorkspaceNotFound } if err != nil { return fmt.Errorf("agent_message: workspace lookup: %w", err) } + if !talkToUserEnabled { + return ErrTalkToUserDisabled + } // 2. Build broadcast payload + WS-emit. Same shape that ChatTab's // AGENT_MESSAGE handler in canvas/src/store/canvas-events.ts has diff --git a/workspace-server/internal/handlers/agent_message_writer_test.go b/workspace-server/internal/handlers/agent_message_writer_test.go index 20f5540f..c75a3edd 100644 --- a/workspace-server/internal/handlers/agent_message_writer_test.go +++ b/workspace-server/internal/handlers/agent_message_writer_test.go @@ -88,9 +88,9 @@ func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) { mock := setupTestDB(t) w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) - mock.ExpectQuery("SELECT name FROM workspaces"). + mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces"). WithArgs("ws-1"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true)) mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`). WithArgs( @@ -116,9 +116,9 @@ func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) { mock := setupTestDB(t) w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) - mock.ExpectQuery("SELECT name FROM workspaces"). + mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces"). WithArgs("ws-att"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Ryan", true)) mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`). WithArgs( @@ -173,9 +173,9 @@ func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) { emitter := &capturingEmitter{} w := NewAgentMessageWriter(db.DB, emitter) - mock.ExpectQuery("SELECT name FROM workspaces"). + mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces"). WithArgs("ws-missing"). - WillReturnRows(sqlmock.NewRows([]string{"name"})) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"})) err := w.Send(context.Background(), "ws-missing", "lost in the void", nil) if !errors.Is(err, ErrWorkspaceNotFound) { @@ -202,9 +202,9 @@ func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) { mock := setupTestDB(t) w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) - mock.ExpectQuery("SELECT name FROM workspaces"). + mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces"). WithArgs("ws-dbfail"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true)) mock.ExpectExec(`INSERT INTO activity_logs`). WillReturnError(errors.New("transient db error")) @@ -223,9 +223,9 @@ func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) { mock := setupTestDB(t) w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) - mock.ExpectQuery("SELECT name FROM workspaces"). + mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces"). WithArgs("ws-trunc"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Ryan", true)) longMsg := strings.Repeat("x", 200) mock.ExpectExec(`INSERT INTO activity_logs`). @@ -263,9 +263,9 @@ func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) { emitter := &capturingEmitter{} w := NewAgentMessageWriter(db.DB, emitter) - mock.ExpectQuery("SELECT name FROM workspaces"). + mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces"). WithArgs("ws-bc"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Workspace Name")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Workspace Name", true)) mock.ExpectExec(`INSERT INTO activity_logs`). WillReturnResult(sqlmock.NewResult(1, 1)) @@ -315,7 +315,7 @@ func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) { w := NewAgentMessageWriter(db.DB, newTestBroadcaster()) transientErr := errors.New("connection refused") - mock.ExpectQuery("SELECT name FROM workspaces"). + mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces"). WithArgs("ws-dbdown"). WillReturnError(transientErr) @@ -350,9 +350,9 @@ func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) { // the byte-slice bug. msg := strings.Repeat("你", 200) - mock.ExpectQuery("SELECT name FROM workspaces"). + mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces"). WithArgs("ws-cjk"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true)) mock.ExpectExec(`INSERT INTO activity_logs`). WithArgs( @@ -395,9 +395,9 @@ func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) { emitter := &capturingEmitter{} w := NewAgentMessageWriter(db.DB, emitter) - mock.ExpectQuery("SELECT name FROM workspaces"). + mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces"). WithArgs("ws-noatt"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("X")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("X", true)) mock.ExpectExec(`INSERT INTO activity_logs`). WillReturnResult(sqlmock.NewResult(1, 1)) diff --git a/workspace-server/internal/handlers/handlers_additional_test.go b/workspace-server/internal/handlers/handlers_additional_test.go index c08d138f..0e13600d 100644 --- a/workspace-server/internal/handlers/handlers_additional_test.go +++ b/workspace-server/internal/handlers/handlers_additional_test.go @@ -230,20 +230,21 @@ func TestWorkspaceList_WithData(t *testing.T) { broadcaster := newTestBroadcaster() handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) - // 21 cols — see scanWorkspaceRow for order (max_concurrent_tasks - // lands between active_tasks and last_error_rate). + // 23 cols — broadcast_enabled + talk_to_user_enabled added after monthly_spend + // (migration 20260514). Column order must match scanWorkspaceRow exactly. columns := []string{ "id", "name", "role", "tier", "status", "agent_card", "url", "parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", "budget_limit", "monthly_spend", + "broadcast_enabled", "talk_to_user_enabled", } rows := sqlmock.NewRows(columns). AddRow("ws-1", "Agent One", "worker", 1, "online", []byte(`{"name":"agent1"}`), "http://localhost:8001", - nil, 3, 1, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false, nil, int64(0)). + nil, 3, 1, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false, nil, int64(0), false, true). AddRow("ws-2", "Agent Two", "", 2, "degraded", []byte("null"), "", - nil, 0, 1, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true, nil, int64(0)) + nil, 0, 1, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true, nil, int64(0), false, true) mock.ExpectQuery("SELECT w.id, w.name"). WillReturnRows(rows) diff --git a/workspace-server/internal/handlers/handlers_test.go b/workspace-server/internal/handlers/handlers_test.go index eb4db75b..958858f0 100644 --- a/workspace-server/internal/handlers/handlers_test.go +++ b/workspace-server/internal/handlers/handlers_test.go @@ -392,21 +392,21 @@ func TestWorkspaceList(t *testing.T) { broadcaster := newTestBroadcaster() handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", "/tmp/configs") - // 21 cols: `max_concurrent_tasks` added between active_tasks and - // last_error_rate (see scanWorkspaceRow + COALESCE(w.max_concurrent_tasks, 1) - // in workspace.go). Column order must match that scan exactly. + // 23 cols: broadcast_enabled + talk_to_user_enabled added after monthly_spend + // (migration 20260514). Column order must match scanWorkspaceRow exactly. columns := []string{ "id", "name", "role", "tier", "status", "agent_card", "url", "parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", "budget_limit", "monthly_spend", + "broadcast_enabled", "talk_to_user_enabled", } rows := sqlmock.NewRows(columns). AddRow("ws-1", "Agent One", "worker", 1, "online", []byte("null"), "http://localhost:8001", - nil, 0, 1, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false, nil, int64(0)). + nil, 0, 1, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false, nil, int64(0), false, true). AddRow("ws-2", "Agent Two", "manager", 2, "provisioning", []byte("null"), "", - nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false, nil, int64(0)) + nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false, nil, int64(0), false, true) mock.ExpectQuery("SELECT w.id, w.name"). WillReturnRows(rows) @@ -1120,13 +1120,14 @@ func TestWorkspaceGet_CurrentTask(t *testing.T) { "parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", "budget_limit", "monthly_spend", + "broadcast_enabled", "talk_to_user_enabled", } mock.ExpectQuery("SELECT w.id, w.name"). WithArgs("dddddddd-0004-0000-0000-000000000000"). WillReturnRows(sqlmock.NewRows(columns).AddRow( "dddddddd-0004-0000-0000-000000000000", "Task Worker", "worker", 1, "online", []byte("null"), "http://localhost:9000", nil, 2, 1, 0.0, "", 300, "Analyzing document", "langgraph", "", 10.0, 20.0, false, - nil, int64(0), + nil, int64(0), false, true, )) w := httptest.NewRecorder() diff --git a/workspace-server/internal/handlers/mcp_test.go b/workspace-server/internal/handlers/mcp_test.go index 125eb725..3a274fbf 100644 --- a/workspace-server/internal/handlers/mcp_test.go +++ b/workspace-server/internal/handlers/mcp_test.go @@ -751,9 +751,9 @@ func TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s(t *testing.T) { t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true") h, mock := newMCPHandler(t) - mock.ExpectQuery("SELECT name FROM workspaces"). + mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces"). WithArgs("ws-err"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true)) // INSERT fails — must NOT abort the tool response. mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`). @@ -802,9 +802,9 @@ func TestMCPHandler_SendMessageToUser_ResponseBodyShape(t *testing.T) { const userMessage = "Hi there from the agent" - mock.ExpectQuery("SELECT name FROM workspaces"). + mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces"). WithArgs("ws-shape"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true)) // Capture the response_body argument and assert its exact shape. mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`). @@ -861,9 +861,9 @@ func TestMCPHandler_SendMessageToUser_PersistsToActivityLog(t *testing.T) { // before it does anything else. Returning a name lets the // broadcast payload populate; the test doesn't assert on the // broadcast (no observable WS in this fake), only on the DB. - mock.ExpectQuery("SELECT name FROM workspaces"). + mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces"). WithArgs("ws-msg"). - WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC")) + WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true)) // The persistence INSERT — pin the exact shape so a future // refactor that switches columns or drops `method='notify'` diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go index ff97728b..b3651d2a 100644 --- a/workspace-server/internal/handlers/workspace.go +++ b/workspace-server/internal/handlers/workspace.go @@ -595,7 +595,7 @@ func scanWorkspaceRow(rows interface { var id, name, role, status, url, sampleError, currentTask, runtime, workspaceDir string var tier, activeTasks, maxConcurrentTasks, uptimeSeconds int var errorRate, x, y float64 - var collapsed bool + var collapsed, broadcastEnabled, talkToUserEnabled bool var parentID *string var agentCard []byte var budgetLimit sql.NullInt64 @@ -604,7 +604,7 @@ func scanWorkspaceRow(rows interface { err := rows.Scan(&id, &name, &role, &tier, &status, &agentCard, &url, &parentID, &activeTasks, &maxConcurrentTasks, &errorRate, &sampleError, &uptimeSeconds, ¤tTask, &runtime, &workspaceDir, &x, &y, &collapsed, - &budgetLimit, &monthlySpend) + &budgetLimit, &monthlySpend, &broadcastEnabled, &talkToUserEnabled) if err != nil { return nil, err } @@ -628,6 +628,8 @@ func scanWorkspaceRow(rows interface { "x": x, "y": y, "collapsed": collapsed, + "broadcast_enabled": broadcastEnabled, + "talk_to_user_enabled": talkToUserEnabled, } // budget_limit: nil when no limit set, int64 otherwise @@ -663,7 +665,8 @@ const workspaceListQuery = ` COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'), COALESCE(w.workspace_dir, ''), COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false), - w.budget_limit, COALESCE(w.monthly_spend, 0) + w.budget_limit, COALESCE(w.monthly_spend, 0), + w.broadcast_enabled, w.talk_to_user_enabled FROM workspaces w LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id WHERE w.status != 'removed' @@ -723,7 +726,8 @@ func (h *WorkspaceHandler) Get(c *gin.Context) { COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'), COALESCE(w.workspace_dir, ''), COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false), - w.budget_limit, COALESCE(w.monthly_spend, 0) + w.budget_limit, COALESCE(w.monthly_spend, 0), + w.broadcast_enabled, w.talk_to_user_enabled FROM workspaces w LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id WHERE w.id = $1 diff --git a/workspace-server/internal/handlers/workspace_abilities.go b/workspace-server/internal/handlers/workspace_abilities.go new file mode 100644 index 00000000..71fa48f9 --- /dev/null +++ b/workspace-server/internal/handlers/workspace_abilities.go @@ -0,0 +1,82 @@ +package handlers + +// workspace_abilities.go — PATCH /workspaces/:id/abilities +// +// Allows users and admin agents to toggle two workspace-level ability flags: +// +// broadcast_enabled — workspace may POST /broadcast to send org-wide messages +// talk_to_user_enabled — workspace may deliver canvas chat messages via +// send_message_to_user / POST /notify +// +// Gated behind AdminAuth so workspace agents cannot self-modify their own +// ability flags (that would let any agent grant itself broadcast rights or +// suppress its own chat-silence constraint). + +import ( + "log" + "net/http" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/gin-gonic/gin" +) + +// AbilitiesPayload carries the subset of ability flags the caller wants to +// update. Fields are pointers so that the handler can distinguish "caller +// supplied false" from "caller omitted the field" (omitempty semantics). +type AbilitiesPayload struct { + BroadcastEnabled *bool `json:"broadcast_enabled"` + TalkToUserEnabled *bool `json:"talk_to_user_enabled"` +} + +// PatchAbilities handles PATCH /workspaces/:id/abilities (AdminAuth). +func PatchAbilities(c *gin.Context) { + id := c.Param("id") + if err := validateWorkspaceID(id); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"}) + return + } + + var body AbilitiesPayload + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) + return + } + if body.BroadcastEnabled == nil && body.TalkToUserEnabled == nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "at least one ability field required"}) + return + } + + ctx := c.Request.Context() + + var exists bool + if err := db.DB.QueryRowContext(ctx, + `SELECT EXISTS(SELECT 1 FROM workspaces WHERE id = $1 AND status != 'removed')`, id, + ).Scan(&exists); err != nil || !exists { + c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"}) + return + } + + if body.BroadcastEnabled != nil { + if _, err := db.DB.ExecContext(ctx, + `UPDATE workspaces SET broadcast_enabled = $2, updated_at = now() WHERE id = $1`, + id, *body.BroadcastEnabled, + ); err != nil { + log.Printf("PatchAbilities broadcast_enabled for %s: %v", id, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"}) + return + } + } + + if body.TalkToUserEnabled != nil { + if _, err := db.DB.ExecContext(ctx, + `UPDATE workspaces SET talk_to_user_enabled = $2, updated_at = now() WHERE id = $1`, + id, *body.TalkToUserEnabled, + ); err != nil { + log.Printf("PatchAbilities talk_to_user_enabled for %s: %v", id, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"}) + return + } + } + + c.JSON(http.StatusOK, gin.H{"status": "updated"}) +} diff --git a/workspace-server/internal/handlers/workspace_broadcast.go b/workspace-server/internal/handlers/workspace_broadcast.go new file mode 100644 index 00000000..6afd21e0 --- /dev/null +++ b/workspace-server/internal/handlers/workspace_broadcast.go @@ -0,0 +1,142 @@ +package handlers + +// workspace_broadcast.go — POST /workspaces/:id/broadcast +// +// Allows a workspace with broadcast_enabled=true to send a message to every +// non-removed agent workspace in the org. The message is: +// +// • Persisted in each recipient's activity_logs (type='broadcast_receive') +// so poll-mode agents pick it up via GET /activity. +// • Broadcast via WebSocket BROADCAST_MESSAGE event so canvas panels can +// show a real-time banner for each recipient workspace. +// +// The sender's own workspace logs a 'broadcast_sent' activity row for +// traceability. +// +// Auth: WorkspaceAuth (the agent triggers this with its own bearer token). +// The handler re-validates broadcast_enabled inside the DB lookup to prevent +// TOCTOU — the middleware only proved the token is valid, not the ability. + +import ( + "log" + "net/http" + "strconv" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" + "github.com/gin-gonic/gin" +) + +// BroadcastHandler is constructed once and shared across requests. +type BroadcastHandler struct { + broadcaster *events.Broadcaster +} + +// NewBroadcastHandler creates a BroadcastHandler. +func NewBroadcastHandler(b *events.Broadcaster) *BroadcastHandler { + return &BroadcastHandler{broadcaster: b} +} + +// Broadcast handles POST /workspaces/:id/broadcast. +func (h *BroadcastHandler) Broadcast(c *gin.Context) { + senderID := c.Param("id") + if err := validateWorkspaceID(senderID); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"}) + return + } + + var body struct { + Message string `json:"message" binding:"required"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "message is required"}) + return + } + + ctx := c.Request.Context() + + // Verify sender exists and has broadcast_enabled=true. + var senderName string + var broadcastEnabled bool + err := db.DB.QueryRowContext(ctx, + `SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'`, + senderID, + ).Scan(&senderName, &broadcastEnabled) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"}) + return + } + if !broadcastEnabled { + c.JSON(http.StatusForbidden, gin.H{ + "error": "broadcast_disabled", + "hint": "This workspace does not have the broadcast ability. Ask a user or admin to enable it via PATCH /workspaces/:id/abilities.", + }) + return + } + + // Collect all non-removed agent workspaces (excludes the sender itself). + rows, err := db.DB.QueryContext(ctx, + `SELECT id FROM workspaces WHERE status != 'removed' AND id != $1`, + senderID, + ) + if err != nil { + log.Printf("Broadcast: recipient query failed for %s: %v", senderID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"}) + return + } + defer rows.Close() + + var recipientIDs []string + for rows.Next() { + var rid string + if rows.Scan(&rid) == nil { + recipientIDs = append(recipientIDs, rid) + } + } + if err := rows.Err(); err != nil { + log.Printf("Broadcast: recipient rows error for %s: %v", senderID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"}) + return + } + + broadcastPayload := map[string]interface{}{ + "message": body.Message, + "sender_id": senderID, + "sender": senderName, + } + + // Persist broadcast_receive in each recipient's activity log + emit WS event. + delivered := 0 + for _, rid := range recipientIDs { + if _, err := db.DB.ExecContext(ctx, ` + INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) + VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok') + `, rid, senderID, "Broadcast from "+senderName+": "+broadcastTruncate(body.Message, 120)); err != nil { + log.Printf("Broadcast: activity_logs insert for recipient %s: %v", rid, err) + continue + } + h.broadcaster.BroadcastOnly(rid, "BROADCAST_MESSAGE", broadcastPayload) + delivered++ + } + + // Record the send on the sender's own log. + if _, err := db.DB.ExecContext(ctx, ` + INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) + VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok') + `, senderID, "Broadcast sent to "+strconv.Itoa(delivered)+" workspace(s)"); err != nil { + log.Printf("Broadcast: sender activity_log for %s: %v", senderID, err) + } + + c.JSON(http.StatusOK, gin.H{ + "status": "sent", + "delivered": delivered, + }) +} + +func broadcastTruncate(s string, max int) string { + runes := []rune(s) + if len(runes) <= max { + return s + } + return string(runes[:max]) + "…" +} diff --git a/workspace-server/internal/handlers/workspace_budget_test.go b/workspace-server/internal/handlers/workspace_budget_test.go index 920dad9c..4652e293 100644 --- a/workspace-server/internal/handlers/workspace_budget_test.go +++ b/workspace-server/internal/handlers/workspace_budget_test.go @@ -33,6 +33,7 @@ var wsColumns = []string{ "parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", "budget_limit", "monthly_spend", + "broadcast_enabled", "talk_to_user_enabled", } // ==================== GET — financial fields stripped from open endpoint ==================== @@ -52,8 +53,10 @@ func TestWorkspaceBudget_Get_NilLimit(t *testing.T) { []byte(`{}`), "http://localhost:9001", nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 0.0, 0.0, false, - nil, // budget_limit NULL - 0)) // monthly_spend 0 + nil, // budget_limit NULL + 0, // monthly_spend 0 + false, // broadcast_enabled + true)) // talk_to_user_enabled w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -96,7 +99,8 @@ func TestWorkspaceBudget_Get_WithLimit(t *testing.T) { nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 0.0, 0.0, false, int64(500), // budget_limit = $5.00 in DB - int64(123))) // monthly_spend = $1.23 in DB + int64(123), // monthly_spend = $1.23 in DB + false, true)) // broadcast_enabled, talk_to_user_enabled w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) diff --git a/workspace-server/internal/handlers/workspace_test.go b/workspace-server/internal/handlers/workspace_test.go index 9d5b1a77..32367e14 100644 --- a/workspace-server/internal/handlers/workspace_test.go +++ b/workspace-server/internal/handlers/workspace_test.go @@ -29,6 +29,7 @@ func TestWorkspaceGet_Success(t *testing.T) { "parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", "budget_limit", "monthly_spend", + "broadcast_enabled", "talk_to_user_enabled", } mock.ExpectQuery("SELECT w.id, w.name"). WithArgs("cccccccc-0001-0000-0000-000000000000"). @@ -36,7 +37,7 @@ func TestWorkspaceGet_Success(t *testing.T) { AddRow("cccccccc-0001-0000-0000-000000000000", "My Agent", "worker", 1, "online", []byte(`{"name":"test"}`), "http://localhost:8001", nil, 2, 1, 0.05, "", 3600, "working", "langgraph", "", 10.0, 20.0, false, - nil, 0)) + nil, 0, false, true)) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -118,6 +119,7 @@ func TestWorkspaceGet_RemovedReturns410(t *testing.T) { "parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", "budget_limit", "monthly_spend", + "broadcast_enabled", "talk_to_user_enabled", } mock.ExpectQuery("SELECT w.id, w.name"). WithArgs(id). @@ -125,7 +127,7 @@ func TestWorkspaceGet_RemovedReturns410(t *testing.T) { AddRow(id, "Old Agent", "worker", 1, string(models.StatusRemoved), []byte(`null`), "", nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 0.0, 0.0, false, - nil, 0)) + nil, 0, false, true)) mock.ExpectQuery(`SELECT updated_at FROM workspaces`). WithArgs(id). WillReturnRows(sqlmock.NewRows([]string{"updated_at"}).AddRow(removedAt)) @@ -181,6 +183,7 @@ func TestWorkspaceGet_RemovedReturns410WithNullRemovedAtOnTimestampFetchFailure( "parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", "budget_limit", "monthly_spend", + "broadcast_enabled", "talk_to_user_enabled", } mock.ExpectQuery("SELECT w.id, w.name"). WithArgs(id). @@ -188,7 +191,7 @@ func TestWorkspaceGet_RemovedReturns410WithNullRemovedAtOnTimestampFetchFailure( AddRow(id, "Vanished", "worker", 1, string(models.StatusRemoved), []byte(`null`), "", nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 0.0, 0.0, false, - nil, 0)) + nil, 0, false, true)) // Simulate the row vanishing between the two queries. mock.ExpectQuery(`SELECT updated_at FROM workspaces`). WithArgs(id). @@ -243,6 +246,7 @@ func TestWorkspaceGet_RemovedWithIncludeQueryReturns200(t *testing.T) { "parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", "budget_limit", "monthly_spend", + "broadcast_enabled", "talk_to_user_enabled", } mock.ExpectQuery("SELECT w.id, w.name"). WithArgs(id). @@ -250,7 +254,7 @@ func TestWorkspaceGet_RemovedWithIncludeQueryReturns200(t *testing.T) { AddRow(id, "Audit Agent", "worker", 1, string(models.StatusRemoved), []byte(`null`), "", nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 0.0, 0.0, false, - nil, 0)) + nil, 0, false, true)) // last_outbound_at follow-up query (existing path) mock.ExpectQuery(`SELECT last_outbound_at FROM workspaces`). WithArgs(id). @@ -676,6 +680,7 @@ func TestWorkspaceList_Empty(t *testing.T) { "parent_id", "active_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", "budget_limit", "monthly_spend", + "broadcast_enabled", "talk_to_user_enabled", })) w := httptest.NewRecorder() @@ -1379,6 +1384,7 @@ func TestWorkspaceGet_FinancialFieldsStripped(t *testing.T) { "parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", "budget_limit", "monthly_spend", + "broadcast_enabled", "talk_to_user_enabled", } // Populate with non-zero financial values to confirm they are stripped. mock.ExpectQuery("SELECT w.id, w.name"). @@ -1387,7 +1393,7 @@ func TestWorkspaceGet_FinancialFieldsStripped(t *testing.T) { AddRow("cccccccc-0010-0000-0000-000000000000", "Finance Test", "worker", 1, "online", []byte(`{}`), "http://localhost:9001", nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 0.0, 0.0, false, - int64(50000), int64(12500))) // budget_limit=500 USD, spend=125 USD + int64(50000), int64(12500), false, true)) // budget_limit=500 USD, spend=125 USD w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -1435,6 +1441,7 @@ func TestWorkspaceGet_SensitiveFieldsStripped(t *testing.T) { "parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error", "uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed", "budget_limit", "monthly_spend", + "broadcast_enabled", "talk_to_user_enabled", } mock.ExpectQuery("SELECT w.id, w.name"). WithArgs("cccccccc-0955-0000-0000-000000000000"). @@ -1447,7 +1454,7 @@ func TestWorkspaceGet_SensitiveFieldsStripped(t *testing.T) { "langgraph", "/home/user/secret-projects/client-work", 0.0, 0.0, false, - nil, 0)) + nil, 0, false, true)) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) diff --git a/workspace-server/internal/models/workspace.go b/workspace-server/internal/models/workspace.go index 11284473..9139fc5b 100644 --- a/workspace-server/internal/models/workspace.go +++ b/workspace-server/internal/models/workspace.go @@ -36,6 +36,15 @@ type Workspace struct { // to activity_logs, agent reads via GET /activity?since_id=). See // migration 045 + RFC #2339. DeliveryMode string `json:"delivery_mode" db:"delivery_mode"` + // BroadcastEnabled: when true the workspace may call POST /broadcast to + // deliver a message to all non-removed agent workspaces in the org. + // Default false — only privileged orchestrators should hold this ability. + BroadcastEnabled bool `json:"broadcast_enabled" db:"broadcast_enabled"` + // TalkToUserEnabled: when false the workspace's send_message_to_user calls + // and POST /notify requests are rejected with HTTP 403 so the agent is + // forced to route updates through a parent workspace. Default true + // (preserves existing behaviour for all workspaces). + TalkToUserEnabled bool `json:"talk_to_user_enabled" db:"talk_to_user_enabled"` // Canvas layout fields (from JOIN) X float64 `json:"x"` Y float64 `json:"y"` diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index aac18c14..6e7026ab 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -146,6 +146,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi wsAdmin.GET("/workspaces", wh.List) wsAdmin.POST("/workspaces", wh.Create) wsAdmin.DELETE("/workspaces/:id", wh.Delete) + // Ability toggles — admin-only so workspace agents cannot self-modify + // broadcast_enabled or talk_to_user_enabled. + wsAdmin.PATCH("/workspaces/:id/abilities", handlers.PatchAbilities) // Out-of-band bootstrap signal: CP's watcher POSTs here when it // detects "RUNTIME CRASHED" in a workspace EC2 console output, // so the canvas flips to failed in seconds instead of waiting @@ -201,6 +204,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // to 'hibernated'. The workspace auto-wakes on the next A2A message. wsAuth.POST("/hibernate", wh.Hibernate) + // Broadcast — send a message to all non-removed workspaces in the org. + // Requires broadcast_enabled=true on the source workspace (checked + // inside the handler). WorkspaceAuth on wsAuth proves token ownership. + broadcastH := handlers.NewBroadcastHandler(broadcaster) + wsAuth.POST("/broadcast", broadcastH.Broadcast) + // External-workspace credential lifecycle (issue #319 follow-up to // the Create flow). Both endpoints reject runtime ≠ external with // 400 — see external_rotate.go for the rationale. diff --git a/workspace-server/migrations/20260514120000_workspace_abilities.down.sql b/workspace-server/migrations/20260514120000_workspace_abilities.down.sql new file mode 100644 index 00000000..12b5f846 --- /dev/null +++ b/workspace-server/migrations/20260514120000_workspace_abilities.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE workspaces + DROP COLUMN IF EXISTS broadcast_enabled, + DROP COLUMN IF EXISTS talk_to_user_enabled; diff --git a/workspace-server/migrations/20260514120000_workspace_abilities.up.sql b/workspace-server/migrations/20260514120000_workspace_abilities.up.sql new file mode 100644 index 00000000..f172c30f --- /dev/null +++ b/workspace-server/migrations/20260514120000_workspace_abilities.up.sql @@ -0,0 +1,16 @@ +-- Workspace abilities: opt-in flags that gate platform-level behaviours. +-- +-- broadcast_enabled (default FALSE): when TRUE the workspace may call +-- POST /workspaces/:id/broadcast to send a message to every non-removed +-- agent workspace in the org. Off by default — only privileged +-- orchestrator workspaces should hold this ability. +-- +-- talk_to_user_enabled (default TRUE): when FALSE the workspace is not +-- allowed to deliver messages to the canvas user via send_message_to_user / +-- POST /notify. The platform returns HTTP 403 so the agent can forward its +-- update to a parent workspace instead. Default TRUE preserves existing +-- behaviour for all current workspaces. + +ALTER TABLE workspaces + ADD COLUMN IF NOT EXISTS broadcast_enabled BOOLEAN NOT NULL DEFAULT FALSE, + ADD COLUMN IF NOT EXISTS talk_to_user_enabled BOOLEAN NOT NULL DEFAULT TRUE; diff --git a/workspace/a2a_tools.py b/workspace/a2a_tools.py index 1b1ef267..eb26e622 100644 --- a/workspace/a2a_tools.py +++ b/workspace/a2a_tools.py @@ -137,6 +137,7 @@ from a2a_tools_delegation import ( # noqa: E402 (import after the from-a2a_cli # identically. from a2a_tools_messaging import ( # noqa: E402 (import after the top-of-module imports) _upload_chat_files, + tool_broadcast_message, tool_chat_history, tool_get_workspace_info, tool_list_peers, diff --git a/workspace/a2a_tools_messaging.py b/workspace/a2a_tools_messaging.py index dea24f90..9b832a2b 100644 --- a/workspace/a2a_tools_messaging.py +++ b/workspace/a2a_tools_messaging.py @@ -101,6 +101,50 @@ async def _upload_chat_files( return uploaded, None +async def tool_broadcast_message( + message: str, + workspace_id: str | None = None, +) -> str: + """Send a broadcast message to ALL agent workspaces in the org. + + Requires the workspace to have broadcast_enabled=true (set by a user or + admin via PATCH /workspaces/:id/abilities). Use for urgent org-wide + signals — status changes, critical alerts, coordination instructions. + Every non-removed workspace receives the message in its activity log so + poll-mode agents pick it up, and push-mode canvases get a real-time + BROADCAST_MESSAGE WebSocket event. + + Args: + message: The broadcast text. Keep it concise — all agents receive + this, so avoid lengthy prose that floods every context. + workspace_id: Optional. Which registered workspace to send the + broadcast from. Single-workspace agents omit this. + """ + if not message: + return "Error: message is required" + target_workspace_id = (workspace_id or "").strip() or WORKSPACE_ID + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post( + f"{PLATFORM_URL}/workspaces/{target_workspace_id}/broadcast", + json={"message": message}, + headers=_auth_headers_for_heartbeat(target_workspace_id), + ) + if resp.status_code == 200: + data = resp.json() + delivered = data.get("delivered", "?") + return f"Broadcast sent to {delivered} workspace(s)" + if resp.status_code == 403: + try: + hint = resp.json().get("hint", "") + except Exception: + hint = "" + return f"Error: broadcast ability not enabled.{(' ' + hint) if hint else ''}" + return f"Error: platform returned {resp.status_code}" + except Exception as e: + return f"Error sending broadcast: {e}" + + async def tool_send_message_to_user( message: str, attachments: list[str] | None = None, @@ -151,6 +195,20 @@ async def tool_send_message_to_user( if uploaded: return f"Message sent to user with {len(uploaded)} attachment(s)" return "Message sent to user" + if resp.status_code == 403: + try: + body = resp.json() + if body.get("error") == "talk_to_user_disabled": + hint = body.get("hint", "") + return ( + "Error: this workspace is not allowed to send messages " + "directly to the user (talk_to_user is disabled). " + + (hint + " " if hint else "") + + "Use delegate_task to forward your update to a parent " + "or supervisor workspace that can reach the user." + ) + except Exception: + pass return f"Error: platform returned {resp.status_code}" except Exception as e: return f"Error sending message: {e}" diff --git a/workspace/executor_helpers.py b/workspace/executor_helpers.py index 3343dee5..aba334f9 100644 --- a/workspace/executor_helpers.py +++ b/workspace/executor_helpers.py @@ -340,6 +340,10 @@ _CLI_A2A_COMMAND_KEYWORDS: dict[str, str | None] = { "delegate_task_async": "delegate --async", "check_task_status": "status", "get_workspace_info": "info", + # `broadcast_message` is not exposed via the CLI subprocess interface + # today — it's an MCP-first capability. If a2a_cli grows a `broadcast` + # subcommand, map it here and the alignment test will gate the change. + "broadcast_message": None, # `send_message_to_user` is not exposed via the CLI subprocess # interface today — it requires a structured `attachments` field # that wouldn't survive a positional-arg shell invocation cleanly. diff --git a/workspace/platform_tools/registry.py b/workspace/platform_tools/registry.py index f4fa773e..6550c9e7 100644 --- a/workspace/platform_tools/registry.py +++ b/workspace/platform_tools/registry.py @@ -51,6 +51,7 @@ from dataclasses import dataclass from typing import Any, Literal from a2a_tools import ( + tool_broadcast_message, tool_chat_history, tool_check_task_status, tool_commit_memory, @@ -288,6 +289,44 @@ _GET_WORKSPACE_INFO = ToolSpec( section=A2A_SECTION, ) +_BROADCAST_MESSAGE = ToolSpec( + name="broadcast_message", + short=( + "Send a message to ALL agent workspaces in the org simultaneously. " + "Requires broadcast_enabled=true on this workspace (set by user/admin)." + ), + when_to_use=( + "Use for urgent, org-wide signals: critical status changes, emergency " + "stop instructions, coordinated task announcements. Every non-removed " + "workspace receives the message in its activity log (poll-mode agents " + "see it on their next poll; push-mode canvases get a real-time banner). " + "This tool returns an error if broadcast_enabled is false — a user or " + "admin must enable it via the workspace abilities settings first." + ), + input_schema={ + "type": "object", + "properties": { + "message": { + "type": "string", + "description": ( + "The broadcast text. Keep it concise — every agent in the " + "org receives this in their activity feed." + ), + }, + "workspace_id": { + "type": "string", + "description": ( + "Optional. Multi-workspace mode: the registered workspace " + "to broadcast from. Single-workspace agents omit this." + ), + }, + }, + "required": ["message"], + }, + impl=tool_broadcast_message, + section=A2A_SECTION, +) + _SEND_MESSAGE_TO_USER = ToolSpec( name="send_message_to_user", short=( @@ -603,6 +642,7 @@ TOOLS: list[ToolSpec] = [ _CHECK_TASK_STATUS, _LIST_PEERS, _GET_WORKSPACE_INFO, + _BROADCAST_MESSAGE, _SEND_MESSAGE_TO_USER, # Inbox (standalone-only; in-container returns informational error) _WAIT_FOR_MESSAGE, diff --git a/workspace/tests/snapshots/a2a_instructions_mcp.txt b/workspace/tests/snapshots/a2a_instructions_mcp.txt index 6bcf471e..3f0213e1 100644 --- a/workspace/tests/snapshots/a2a_instructions_mcp.txt +++ b/workspace/tests/snapshots/a2a_instructions_mcp.txt @@ -5,6 +5,7 @@ - **check_task_status**: Poll the status of a task started with delegate_task_async; returns result when done. - **list_peers**: List the workspaces this agent can communicate with — name, ID, status, role for each. - **get_workspace_info**: Get this workspace's own info — ID, name, role, tier, parent, status. +- **broadcast_message**: Send a message to ALL agent workspaces in the org simultaneously. Requires broadcast_enabled=true on this workspace (set by user/admin). - **send_message_to_user**: Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download via the `attachments` field (NEVER paste file URLs in `message`). The message appears in the user's chat as if you're proactively reaching out. - **wait_for_message**: Block until the next inbound message (canvas user OR peer agent) arrives, or until ``timeout_secs`` elapses. - **inbox_peek**: List pending inbound messages without removing them. @@ -26,6 +27,9 @@ Call this first when you need to delegate but don't know the target's ID. Access ### get_workspace_info Use to introspect your own identity (e.g. before reporting back to the user, or to determine whether you're a tier-0 root that can write GLOBAL memory). +### broadcast_message +Use for urgent, org-wide signals: critical status changes, emergency stop instructions, coordinated task announcements. Every non-removed workspace receives the message in its activity log (poll-mode agents see it on their next poll; push-mode canvases get a real-time banner). This tool returns an error if broadcast_enabled is false — a user or admin must enable it via the workspace abilities settings first. + ### send_message_to_user Use proactively across the lifecycle of a task — early to acknowledge, mid-flight to update, late to deliver. Never paste file URLs in the message body — always pass absolute paths in `attachments` so the platform serves them as download chips (works on SaaS where external file hosts are unreachable).