diff --git a/canvas/src/components/tabs/ChatTab.tsx b/canvas/src/components/tabs/ChatTab.tsx
index 21e9f665..01b067e1 100644
--- a/canvas/src/components/tabs/ChatTab.tsx
+++ b/canvas/src/components/tabs/ChatTab.tsx
@@ -962,6 +962,32 @@ function MyChatPanel({ workspaceId, data }: Props) {
)}
+ {/* talk_to_user disabled banner — shown when the workspace has
+ talk_to_user_enabled=false. The agent cannot send canvas messages;
+ the user can re-enable the ability from here without opening settings. */}
+ {data.talkToUserEnabled === false && (
+
+
+
+ Agent is not enabled to chat with you.
+
+
+
+ )}
{/* Messages */}
{loading && (
diff --git a/canvas/src/store/canvas-topology.ts b/canvas/src/store/canvas-topology.ts
index 334dcff7..a2df69af 100644
--- a/canvas/src/store/canvas-topology.ts
+++ b/canvas/src/store/canvas-topology.ts
@@ -507,6 +507,10 @@ export function buildNodesAndEdges(
// #2054 — server-declared per-workspace provisioning timeout.
// Falls through to the runtime profile when null/absent.
provisionTimeoutMs: ws.provision_timeout_ms ?? null,
+ // Workspace abilities — defaults preserved for old platform versions
+ // that don't yet include these columns in the GET response.
+ broadcastEnabled: ws.broadcast_enabled ?? false,
+ talkToUserEnabled: ws.talk_to_user_enabled ?? true,
},
};
if (hasParent) {
diff --git a/canvas/src/store/canvas.ts b/canvas/src/store/canvas.ts
index 6f09907d..219fe8e6 100644
--- a/canvas/src/store/canvas.ts
+++ b/canvas/src/store/canvas.ts
@@ -99,6 +99,13 @@ export interface WorkspaceNodeData extends Record {
* @/lib/runtimeProfiles. Lets a slow runtime declare its cold-boot
* expectation without a canvas release. */
provisionTimeoutMs?: number | null;
+ /** When true the workspace may POST /broadcast to send org-wide messages.
+ * Default false. Toggled by user/admin via PATCH /workspaces/:id/abilities. */
+ broadcastEnabled?: boolean;
+ /** When false the workspace cannot deliver canvas chat messages.
+ * send_message_to_user / POST /notify return 403 and the canvas
+ * shows a "not enabled" state with a button to re-enable. Default true. */
+ talkToUserEnabled?: boolean;
}
export type PanelTab = "details" | "skills" | "chat" | "terminal" | "config" | "schedule" | "channels" | "files" | "memory" | "traces" | "events" | "activity" | "audit";
diff --git a/canvas/src/store/socket.ts b/canvas/src/store/socket.ts
index 81114ae9..7b2adcd3 100644
--- a/canvas/src/store/socket.ts
+++ b/canvas/src/store/socket.ts
@@ -299,6 +299,9 @@ export interface WorkspaceData {
* `@/lib/runtimeProfiles` when absent (the default behavior for any
* template that hasn't yet declared the field). */
provision_timeout_ms?: number | null;
+ /** Workspace ability flags (migration 20260514). */
+ broadcast_enabled?: boolean;
+ talk_to_user_enabled?: boolean;
}
let socket: ReconnectingSocket | null = null;
diff --git a/tests/e2e/test_workspace_abilities_e2e.sh b/tests/e2e/test_workspace_abilities_e2e.sh
new file mode 100755
index 00000000..72a32c51
--- /dev/null
+++ b/tests/e2e/test_workspace_abilities_e2e.sh
@@ -0,0 +1,296 @@
+#!/usr/bin/env bash
+# E2E test: workspace broadcast and talk-to-user platform abilities.
+#
+# What this proves:
+# 1. talk_to_user_enabled (default true) — POST /notify works out-of-the-box.
+# 2. PATCH /workspaces/:id/abilities { talk_to_user_enabled: false } disables
+# delivery: /notify → 403 with error="talk_to_user_disabled" + delegate hint.
+# 3. Re-enabling talk_to_user_enabled restores delivery.
+# 4. broadcast_enabled (default false) — POST /broadcast → 403 when disabled.
+# 5. PATCH { broadcast_enabled: true } enables fan-out.
+# 6. POST /broadcast delivers to all non-sender, non-removed workspaces:
+# - Returns {"status":"sent","delivered":N}
+# - Receiver's activity log has a broadcast_receive entry with the message.
+# - Sender's activity log has a broadcast_sent entry.
+# 7. The sender itself does NOT receive a broadcast_receive entry.
+#
+# Usage: tests/e2e/test_workspace_abilities_e2e.sh
+# Prereqs: workspace-server on http://localhost:8080, MOLECULE_ENV != production
+
+set -euo pipefail
+
+source "$(dirname "$0")/_lib.sh"
+
+PASS=0
+FAIL=0
+SENDER_ID=""
+RECEIVER_ID=""
+
+cleanup() {
+ for wid in "$SENDER_ID" "$RECEIVER_ID"; do
+ if [ -n "$wid" ]; then
+ curl -s -X DELETE "$BASE/workspaces/$wid?confirm=true" > /dev/null || true
+ fi
+ done
+}
+trap cleanup EXIT INT TERM
+
+assert() {
+ local label="$1" actual="$2" expected="$3"
+ if [ "$actual" = "$expected" ]; then
+ echo " PASS — $label"
+ PASS=$((PASS+1))
+ else
+ echo " FAIL — $label"
+ echo " expected: $expected"
+ echo " actual: $actual"
+ FAIL=$((FAIL+1))
+ fi
+}
+
+assert_contains() {
+ local label="$1" haystack="$2" needle="$3"
+ if echo "$haystack" | grep -qF "$needle"; then
+ echo " PASS — $label"
+ PASS=$((PASS+1))
+ else
+ echo " FAIL — $label"
+ echo " needle: $needle"
+ echo " haystack: $haystack"
+ FAIL=$((FAIL+1))
+ fi
+}
+
+assert_not_contains() {
+ local label="$1" haystack="$2" needle="$3"
+ if ! echo "$haystack" | grep -qF "$needle"; then
+ echo " PASS — $label"
+ PASS=$((PASS+1))
+ else
+ echo " FAIL — $label (unexpected match)"
+ echo " needle: $needle"
+ echo " haystack: $haystack"
+ FAIL=$((FAIL+1))
+ fi
+}
+
+# ── Pre-sweep: remove any stale leftover workspaces from a prior aborted run ──
+echo "=== Setup ==="
+for NAME in "Abilities Sender" "Abilities Receiver"; do
+ PRIOR=$(curl -s "$BASE/workspaces" | python3 -c "
+import json, sys
+try:
+ print(' '.join(w['id'] for w in json.load(sys.stdin) if w.get('name') == '$NAME'))
+except Exception:
+ pass
+")
+ for _wid in $PRIOR; do
+ echo "Sweeping leftover '$NAME' workspace: $_wid"
+ curl -s -X DELETE "$BASE/workspaces/$_wid?confirm=true" > /dev/null || true
+ done
+done
+
+R=$(curl -s -X POST "$BASE/workspaces" -H "Content-Type: application/json" \
+ -d '{"name":"Abilities Sender","tier":1}')
+SENDER_ID=$(echo "$R" | python3 -c 'import json,sys;print(json.load(sys.stdin)["id"])' 2>/dev/null || true)
+[ -n "$SENDER_ID" ] || { echo "Failed to create sender workspace: $R"; exit 1; }
+echo "Created sender workspace: $SENDER_ID"
+
+R=$(curl -s -X POST "$BASE/workspaces" -H "Content-Type: application/json" \
+ -d '{"name":"Abilities Receiver","tier":1}')
+RECEIVER_ID=$(echo "$R" | python3 -c 'import json,sys;print(json.load(sys.stdin)["id"])' 2>/dev/null || true)
+[ -n "$RECEIVER_ID" ] || { echo "Failed to create receiver workspace: $R"; exit 1; }
+echo "Created receiver workspace: $RECEIVER_ID"
+
+# Mint workspace-scoped bearer tokens (test-only endpoint, disabled in prod).
+SENDER_TOKEN=$(e2e_mint_test_token "$SENDER_ID")
+[ -n "$SENDER_TOKEN" ] || { echo "Failed to mint sender token"; exit 1; }
+SENDER_AUTH="Authorization: Bearer $SENDER_TOKEN"
+
+# Admin token — any live workspace bearer satisfies AdminAuth in local dev.
+# In production-like envs, set MOLECULE_ADMIN_TOKEN.
+ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:-$SENDER_TOKEN}"
+ADMIN_AUTH="Authorization: Bearer $ADMIN_TOKEN"
+
+# ─────────────────────────────────────────────────────────────────────────────
+echo ""
+echo "=== Part 1: talk_to_user ability ==="
+
+echo ""
+echo "--- 1a: /notify works with default talk_to_user_enabled=true ---"
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":"Hello from sender"}')
+assert "POST /notify returns 200 when talk_to_user_enabled=true (default)" "$CODE" "200"
+
+echo ""
+echo "--- 1b: Disable talk_to_user ---"
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
+ -H "Content-Type: application/json" -H "$ADMIN_AUTH" \
+ -d '{"talk_to_user_enabled": false}')
+assert "PATCH /abilities talk_to_user_enabled=false returns 200" "$CODE" "200"
+
+# Verify the flag is reflected in the workspace GET response.
+WS=$(curl -s "$BASE/workspaces/$SENDER_ID" -H "$SENDER_AUTH")
+FLAG=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("talk_to_user_enabled","MISSING"))')
+assert "GET /workspaces/:id reflects talk_to_user_enabled=false" "$FLAG" "False"
+
+echo ""
+echo "--- 1c: /notify blocked when talk_to_user disabled ---"
+BODY=$(curl -s -w "" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":"Should be blocked"}')
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":"Should be blocked"}')
+assert "POST /notify returns 403 when talk_to_user_enabled=false" "$CODE" "403"
+
+ERR=$(echo "$BODY" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("error",""))' 2>/dev/null || echo "")
+assert_contains "403 body contains talk_to_user_disabled error code" "$ERR" "talk_to_user_disabled"
+
+HINT=$(echo "$BODY" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("hint",""))' 2>/dev/null || echo "")
+assert_contains "403 body contains delegate_task hint" "$HINT" "delegate_task"
+
+echo ""
+echo "--- 1d: Re-enable talk_to_user and verify /notify works again ---"
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
+ -H "Content-Type: application/json" -H "$ADMIN_AUTH" \
+ -d '{"talk_to_user_enabled": true}')
+assert "PATCH /abilities talk_to_user_enabled=true returns 200" "$CODE" "200"
+
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":"Re-enabled, should work"}')
+assert "POST /notify returns 200 after re-enabling talk_to_user" "$CODE" "200"
+
+# ─────────────────────────────────────────────────────────────────────────────
+echo ""
+echo "=== Part 2: broadcast ability ==="
+
+echo ""
+echo "--- 2a: Broadcast blocked by default (broadcast_enabled=false) ---"
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/broadcast" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":"Should be blocked"}')
+assert "POST /broadcast returns 403 when broadcast_enabled=false (default)" "$CODE" "403"
+
+echo ""
+echo "--- 2b: Enable broadcast ---"
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
+ -H "Content-Type: application/json" -H "$ADMIN_AUTH" \
+ -d '{"broadcast_enabled": true}')
+assert "PATCH /abilities broadcast_enabled=true returns 200" "$CODE" "200"
+
+WS=$(curl -s "$BASE/workspaces/$SENDER_ID" -H "$SENDER_AUTH")
+FLAG=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("broadcast_enabled","MISSING"))')
+assert "GET /workspaces/:id reflects broadcast_enabled=true" "$FLAG" "True"
+
+echo ""
+echo "--- 2c: Successful broadcast fan-out ---"
+BCAST=$(curl -s -X POST "$BASE/workspaces/$SENDER_ID/broadcast" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":"Org-wide notice: scheduled maintenance in 5 minutes."}')
+BSTATUS=$(echo "$BCAST" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("status",""))' 2>/dev/null || echo "")
+BDELIVERED=$(echo "$BCAST" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("delivered","-1"))' 2>/dev/null || echo "-1")
+assert "POST /broadcast returns status=sent" "$BSTATUS" "sent"
+
+# delivered count must be >= 1 (the receiver workspace).
+echo " INFO — broadcast delivered=$BDELIVERED"
+if python3 -c "import sys; sys.exit(0 if int('$BDELIVERED') >= 1 else 1)" 2>/dev/null; then
+ echo " PASS — delivered count >= 1"
+ PASS=$((PASS+1))
+else
+ echo " FAIL — expected delivered >= 1, got $BDELIVERED"
+ FAIL=$((FAIL+1))
+fi
+
+echo ""
+echo "--- 2d: Receiver activity log has broadcast_receive entry ---"
+RECEIVER_TOKEN=$(e2e_mint_test_token "$RECEIVER_ID")
+[ -n "$RECEIVER_TOKEN" ] || { echo "Failed to mint receiver token"; exit 1; }
+RECEIVER_AUTH="Authorization: Bearer $RECEIVER_TOKEN"
+
+ACT=$(curl -s -H "$RECEIVER_AUTH" "$BASE/workspaces/$RECEIVER_ID/activity?source=agent&limit=20")
+ROW=$(echo "$ACT" | python3 -c '
+import json, sys
+rows = json.load(sys.stdin) or []
+for r in rows:
+ if r.get("activity_type") == "broadcast_receive":
+ print(json.dumps(r))
+ break
+')
+[ -n "$ROW" ] || {
+ echo " FAIL — could not find broadcast_receive row in receiver activity"
+ FAIL=$((FAIL+1))
+}
+
+if [ -n "$ROW" ]; then
+ # Message is stored in summary field.
+ MSG=$(echo "$ROW" | python3 -c 'import json,sys;r=json.load(sys.stdin);print(r.get("summary",""))')
+ assert_contains "broadcast_receive row summary has original message" "$MSG" "scheduled maintenance"
+ # Sender ID is stored in source_id field.
+ SRC=$(echo "$ROW" | python3 -c 'import json,sys;r=json.load(sys.stdin);print(r.get("source_id",""))')
+ assert "broadcast_receive row source_id is sender workspace" "$SRC" "$SENDER_ID"
+fi
+
+echo ""
+echo "--- 2e: Sender activity log has broadcast_sent entry ---"
+ACT_SENDER=$(curl -s -H "$SENDER_AUTH" "$BASE/workspaces/$SENDER_ID/activity?limit=20")
+SENT_ROW=$(echo "$ACT_SENDER" | python3 -c '
+import json, sys
+rows = json.load(sys.stdin) or []
+for r in rows:
+ if r.get("activity_type") == "broadcast_sent":
+ print(json.dumps(r))
+ break
+')
+[ -n "$SENT_ROW" ] || {
+ echo " FAIL — could not find broadcast_sent row in sender activity"
+ FAIL=$((FAIL+1))
+}
+
+if [ -n "$SENT_ROW" ]; then
+ # Delivered count is baked into the summary field (no response_body for sender row).
+ SUMMARY=$(echo "$SENT_ROW" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("summary",""))')
+ assert_contains "broadcast_sent summary mentions workspace count" "$SUMMARY" "workspace"
+fi
+
+echo ""
+echo "--- 2f: Sender does NOT receive a broadcast_receive entry ---"
+SELF_RECV=$(echo "$ACT_SENDER" | python3 -c '
+import json, sys
+rows = json.load(sys.stdin) or []
+for r in rows:
+ if r.get("activity_type") == "broadcast_receive":
+ print("found")
+ break
+')
+assert_not_contains "sender has no broadcast_receive in own activity log" "${SELF_RECV:-}" "found"
+
+# ─────────────────────────────────────────────────────────────────────────────
+echo ""
+echo "--- 2g: Empty message is rejected ---"
+CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/broadcast" \
+ -H "Content-Type: application/json" -H "$SENDER_AUTH" \
+ -d '{"message":""}')
+assert "POST /broadcast with empty message returns 400" "$CODE" "400"
+
+echo ""
+echo "--- 2h: Partial PATCH does not clobber other flags ---"
+# Set talk_to_user=false, then patch only broadcast — talk_to_user must stay false.
+curl -s -o /dev/null -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
+ -H "Content-Type: application/json" -H "$ADMIN_AUTH" \
+ -d '{"talk_to_user_enabled": false}'
+curl -s -o /dev/null -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
+ -H "Content-Type: application/json" -H "$ADMIN_AUTH" \
+ -d '{"broadcast_enabled": false}'
+WS=$(curl -s "$BASE/workspaces/$SENDER_ID" -H "$SENDER_AUTH")
+TUF=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("talk_to_user_enabled","MISSING"))')
+BEF=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("broadcast_enabled","MISSING"))')
+assert "partial PATCH preserves talk_to_user_enabled=false" "$TUF" "False"
+assert "partial PATCH sets broadcast_enabled=false" "$BEF" "False"
+
+# ─────────────────────────────────────────────────────────────────────────────
+echo ""
+echo "=== Results: $PASS passed, $FAIL failed ==="
+[ "$FAIL" -eq 0 ]
diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go
index cd1ef86d..c2af800d 100644
--- a/workspace-server/internal/handlers/activity.go
+++ b/workspace-server/internal/handlers/activity.go
@@ -487,6 +487,13 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
}
+ if errors.Is(err, ErrTalkToUserDisabled) {
+ c.JSON(http.StatusForbidden, gin.H{
+ "error": "talk_to_user_disabled",
+ "hint": "This workspace is not allowed to send messages directly to the user. Forward your update to a parent workspace using delegate_task — they may be able to reach the user.",
+ })
+ return
+ }
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
return
}
diff --git a/workspace-server/internal/handlers/activity_test.go b/workspace-server/internal/handlers/activity_test.go
index b6b3c42e..b60d80ab 100644
--- a/workspace-server/internal/handlers/activity_test.go
+++ b/workspace-server/internal/handlers/activity_test.go
@@ -452,9 +452,9 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) {
db.DB = mockDB
// Workspace existence check
- mock.ExpectQuery(`SELECT name FROM workspaces`).
+ mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`).
WithArgs("ws-notify").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true))
// Persistence INSERT — verify shape
mock.ExpectExec(`INSERT INTO activity_logs`).
@@ -495,9 +495,9 @@ func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) {
defer mockDB.Close()
db.DB = mockDB
- mock.ExpectQuery(`SELECT name FROM workspaces`).
+ mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`).
WithArgs("ws-attach").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true))
// Capture the JSONB arg so we can assert on the persisted shape
// AFTER the call (must include parts[].kind=file so reload
@@ -616,9 +616,9 @@ func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) {
defer mockDB.Close()
db.DB = mockDB
- mock.ExpectQuery(`SELECT name FROM workspaces`).
+ mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`).
WithArgs("ws-x").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnError(fmt.Errorf("simulated db hiccup"))
diff --git a/workspace-server/internal/handlers/agent_message_writer.go b/workspace-server/internal/handlers/agent_message_writer.go
index 6efea603..82f18a8e 100644
--- a/workspace-server/internal/handlers/agent_message_writer.go
+++ b/workspace-server/internal/handlers/agent_message_writer.go
@@ -54,6 +54,11 @@ import (
// timeout) surface as wrapped errors and should be treated as 503.
var ErrWorkspaceNotFound = errors.New("agent_message: workspace not found")
+// ErrTalkToUserDisabled is returned when the workspace has
+// talk_to_user_enabled=false. Callers surface HTTP 403 so the Python tool
+// can detect it and suggest forwarding to a parent workspace.
+var ErrTalkToUserDisabled = errors.New("agent_message: talk_to_user disabled")
+
// AgentMessageAttachment is one file attached to an agent → user
// message. Identical to handlers.NotifyAttachment in field set; kept
// distinct so the writer's API doesn't import a handler type with HTTP
@@ -107,16 +112,20 @@ func (w *AgentMessageWriter) Send(
// notify call surfaced as "workspace not found" and masked real
// incidents in the alert path.
var wsName string
+ var talkToUserEnabled bool
err := w.db.QueryRowContext(ctx,
- `SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`,
+ `SELECT name, talk_to_user_enabled FROM workspaces WHERE id = $1 AND status != 'removed'`,
workspaceID,
- ).Scan(&wsName)
+ ).Scan(&wsName, &talkToUserEnabled)
if errors.Is(err, sql.ErrNoRows) {
return ErrWorkspaceNotFound
}
if err != nil {
return fmt.Errorf("agent_message: workspace lookup: %w", err)
}
+ if !talkToUserEnabled {
+ return ErrTalkToUserDisabled
+ }
// 2. Build broadcast payload + WS-emit. Same shape that ChatTab's
// AGENT_MESSAGE handler in canvas/src/store/canvas-events.ts has
diff --git a/workspace-server/internal/handlers/agent_message_writer_test.go b/workspace-server/internal/handlers/agent_message_writer_test.go
index 20f5540f..c75a3edd 100644
--- a/workspace-server/internal/handlers/agent_message_writer_test.go
+++ b/workspace-server/internal/handlers/agent_message_writer_test.go
@@ -88,9 +88,9 @@ func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-1").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
WithArgs(
@@ -116,9 +116,9 @@ func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-att").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Ryan", true))
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
WithArgs(
@@ -173,9 +173,9 @@ func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-missing").
- WillReturnRows(sqlmock.NewRows([]string{"name"}))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}))
err := w.Send(context.Background(), "ws-missing", "lost in the void", nil)
if !errors.Is(err, ErrWorkspaceNotFound) {
@@ -202,9 +202,9 @@ func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-dbfail").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnError(errors.New("transient db error"))
@@ -223,9 +223,9 @@ func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-trunc").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Ryan", true))
longMsg := strings.Repeat("x", 200)
mock.ExpectExec(`INSERT INTO activity_logs`).
@@ -263,9 +263,9 @@ func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-bc").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Workspace Name"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Workspace Name", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnResult(sqlmock.NewResult(1, 1))
@@ -315,7 +315,7 @@ func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) {
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
transientErr := errors.New("connection refused")
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-dbdown").
WillReturnError(transientErr)
@@ -350,9 +350,9 @@ func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) {
// the byte-slice bug.
msg := strings.Repeat("你", 200)
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-cjk").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
@@ -395,9 +395,9 @@ func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) {
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-noatt").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("X"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("X", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnResult(sqlmock.NewResult(1, 1))
diff --git a/workspace-server/internal/handlers/handlers_additional_test.go b/workspace-server/internal/handlers/handlers_additional_test.go
index c08d138f..0e13600d 100644
--- a/workspace-server/internal/handlers/handlers_additional_test.go
+++ b/workspace-server/internal/handlers/handlers_additional_test.go
@@ -230,20 +230,21 @@ func TestWorkspaceList_WithData(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
- // 21 cols — see scanWorkspaceRow for order (max_concurrent_tasks
- // lands between active_tasks and last_error_rate).
+ // 23 cols — broadcast_enabled + talk_to_user_enabled added after monthly_spend
+ // (migration 20260514). Column order must match scanWorkspaceRow exactly.
columns := []string{
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "max_concurrent_tasks",
"last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
rows := sqlmock.NewRows(columns).
AddRow("ws-1", "Agent One", "worker", 1, "online", []byte(`{"name":"agent1"}`), "http://localhost:8001",
- nil, 3, 1, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false, nil, int64(0)).
+ nil, 3, 1, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false, nil, int64(0), false, true).
AddRow("ws-2", "Agent Two", "", 2, "degraded", []byte("null"), "",
- nil, 0, 1, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true, nil, int64(0))
+ nil, 0, 1, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true, nil, int64(0), false, true)
mock.ExpectQuery("SELECT w.id, w.name").
WillReturnRows(rows)
diff --git a/workspace-server/internal/handlers/handlers_test.go b/workspace-server/internal/handlers/handlers_test.go
index d57e5811..141693cc 100644
--- a/workspace-server/internal/handlers/handlers_test.go
+++ b/workspace-server/internal/handlers/handlers_test.go
@@ -391,21 +391,21 @@ func TestWorkspaceList(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", "/tmp/configs")
- // 21 cols: `max_concurrent_tasks` added between active_tasks and
- // last_error_rate (see scanWorkspaceRow + COALESCE(w.max_concurrent_tasks, 1)
- // in workspace.go). Column order must match that scan exactly.
+ // 23 cols: broadcast_enabled + talk_to_user_enabled added after monthly_spend
+ // (migration 20260514). Column order must match scanWorkspaceRow exactly.
columns := []string{
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "max_concurrent_tasks",
"last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
rows := sqlmock.NewRows(columns).
AddRow("ws-1", "Agent One", "worker", 1, "online", []byte("null"), "http://localhost:8001",
- nil, 0, 1, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false, nil, int64(0)).
+ nil, 0, 1, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false, nil, int64(0), false, true).
AddRow("ws-2", "Agent Two", "manager", 2, "provisioning", []byte("null"), "",
- nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false, nil, int64(0))
+ nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false, nil, int64(0), false, true)
mock.ExpectQuery("SELECT w.id, w.name").
WillReturnRows(rows)
@@ -1119,13 +1119,14 @@ func TestWorkspaceGet_CurrentTask(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("dddddddd-0004-0000-0000-000000000000").
WillReturnRows(sqlmock.NewRows(columns).AddRow(
"dddddddd-0004-0000-0000-000000000000", "Task Worker", "worker", 1, "online", []byte("null"), "http://localhost:9000",
nil, 2, 1, 0.0, "", 300, "Analyzing document", "langgraph", "", 10.0, 20.0, false,
- nil, int64(0),
+ nil, int64(0), false, true,
))
w := httptest.NewRecorder()
diff --git a/workspace-server/internal/handlers/mcp_test.go b/workspace-server/internal/handlers/mcp_test.go
index 6ede0c2c..cb92fb04 100644
--- a/workspace-server/internal/handlers/mcp_test.go
+++ b/workspace-server/internal/handlers/mcp_test.go
@@ -645,9 +645,9 @@ func TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s(t *testing.T) {
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
h, mock := newMCPHandler(t)
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-err").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
// INSERT fails — must NOT abort the tool response.
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
@@ -696,9 +696,9 @@ func TestMCPHandler_SendMessageToUser_ResponseBodyShape(t *testing.T) {
const userMessage = "Hi there from the agent"
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-shape").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
// Capture the response_body argument and assert its exact shape.
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
@@ -755,9 +755,9 @@ func TestMCPHandler_SendMessageToUser_PersistsToActivityLog(t *testing.T) {
// before it does anything else. Returning a name lets the
// broadcast payload populate; the test doesn't assert on the
// broadcast (no observable WS in this fake), only on the DB.
- mock.ExpectQuery("SELECT name FROM workspaces").
+ mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-msg").
- WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
+ WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
// The persistence INSERT — pin the exact shape so a future
// refactor that switches columns or drops `method='notify'`
diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go
index a163cee9..75426d68 100644
--- a/workspace-server/internal/handlers/workspace.go
+++ b/workspace-server/internal/handlers/workspace.go
@@ -523,7 +523,7 @@ func scanWorkspaceRow(rows interface {
var id, name, role, status, url, sampleError, currentTask, runtime, workspaceDir string
var tier, activeTasks, maxConcurrentTasks, uptimeSeconds int
var errorRate, x, y float64
- var collapsed bool
+ var collapsed, broadcastEnabled, talkToUserEnabled bool
var parentID *string
var agentCard []byte
var budgetLimit sql.NullInt64
@@ -532,7 +532,7 @@ func scanWorkspaceRow(rows interface {
err := rows.Scan(&id, &name, &role, &tier, &status, &agentCard, &url,
&parentID, &activeTasks, &maxConcurrentTasks, &errorRate, &sampleError, &uptimeSeconds,
¤tTask, &runtime, &workspaceDir, &x, &y, &collapsed,
- &budgetLimit, &monthlySpend)
+ &budgetLimit, &monthlySpend, &broadcastEnabled, &talkToUserEnabled)
if err != nil {
return nil, err
}
@@ -556,6 +556,8 @@ func scanWorkspaceRow(rows interface {
"x": x,
"y": y,
"collapsed": collapsed,
+ "broadcast_enabled": broadcastEnabled,
+ "talk_to_user_enabled": talkToUserEnabled,
}
// budget_limit: nil when no limit set, int64 otherwise
@@ -591,7 +593,8 @@ const workspaceListQuery = `
COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'),
COALESCE(w.workspace_dir, ''),
COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false),
- w.budget_limit, COALESCE(w.monthly_spend, 0)
+ w.budget_limit, COALESCE(w.monthly_spend, 0),
+ w.broadcast_enabled, w.talk_to_user_enabled
FROM workspaces w
LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id
WHERE w.status != 'removed'
@@ -651,7 +654,8 @@ func (h *WorkspaceHandler) Get(c *gin.Context) {
COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'),
COALESCE(w.workspace_dir, ''),
COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false),
- w.budget_limit, COALESCE(w.monthly_spend, 0)
+ w.budget_limit, COALESCE(w.monthly_spend, 0),
+ w.broadcast_enabled, w.talk_to_user_enabled
FROM workspaces w
LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id
WHERE w.id = $1
diff --git a/workspace-server/internal/handlers/workspace_abilities.go b/workspace-server/internal/handlers/workspace_abilities.go
new file mode 100644
index 00000000..71fa48f9
--- /dev/null
+++ b/workspace-server/internal/handlers/workspace_abilities.go
@@ -0,0 +1,82 @@
+package handlers
+
+// workspace_abilities.go — PATCH /workspaces/:id/abilities
+//
+// Allows users and admin agents to toggle two workspace-level ability flags:
+//
+// broadcast_enabled — workspace may POST /broadcast to send org-wide messages
+// talk_to_user_enabled — workspace may deliver canvas chat messages via
+// send_message_to_user / POST /notify
+//
+// Gated behind AdminAuth so workspace agents cannot self-modify their own
+// ability flags (that would let any agent grant itself broadcast rights or
+// suppress its own chat-silence constraint).
+
+import (
+ "log"
+ "net/http"
+
+ "github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
+ "github.com/gin-gonic/gin"
+)
+
+// AbilitiesPayload carries the subset of ability flags the caller wants to
+// update. Fields are pointers so that the handler can distinguish "caller
+// supplied false" from "caller omitted the field" (omitempty semantics).
+type AbilitiesPayload struct {
+ BroadcastEnabled *bool `json:"broadcast_enabled"`
+ TalkToUserEnabled *bool `json:"talk_to_user_enabled"`
+}
+
+// PatchAbilities handles PATCH /workspaces/:id/abilities (AdminAuth).
+func PatchAbilities(c *gin.Context) {
+ id := c.Param("id")
+ if err := validateWorkspaceID(id); err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
+ return
+ }
+
+ var body AbilitiesPayload
+ if err := c.ShouldBindJSON(&body); err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
+ return
+ }
+ if body.BroadcastEnabled == nil && body.TalkToUserEnabled == nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "at least one ability field required"})
+ return
+ }
+
+ ctx := c.Request.Context()
+
+ var exists bool
+ if err := db.DB.QueryRowContext(ctx,
+ `SELECT EXISTS(SELECT 1 FROM workspaces WHERE id = $1 AND status != 'removed')`, id,
+ ).Scan(&exists); err != nil || !exists {
+ c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
+ return
+ }
+
+ if body.BroadcastEnabled != nil {
+ if _, err := db.DB.ExecContext(ctx,
+ `UPDATE workspaces SET broadcast_enabled = $2, updated_at = now() WHERE id = $1`,
+ id, *body.BroadcastEnabled,
+ ); err != nil {
+ log.Printf("PatchAbilities broadcast_enabled for %s: %v", id, err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
+ return
+ }
+ }
+
+ if body.TalkToUserEnabled != nil {
+ if _, err := db.DB.ExecContext(ctx,
+ `UPDATE workspaces SET talk_to_user_enabled = $2, updated_at = now() WHERE id = $1`,
+ id, *body.TalkToUserEnabled,
+ ); err != nil {
+ log.Printf("PatchAbilities talk_to_user_enabled for %s: %v", id, err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
+ return
+ }
+ }
+
+ c.JSON(http.StatusOK, gin.H{"status": "updated"})
+}
diff --git a/workspace-server/internal/handlers/workspace_broadcast.go b/workspace-server/internal/handlers/workspace_broadcast.go
new file mode 100644
index 00000000..f04ef2ff
--- /dev/null
+++ b/workspace-server/internal/handlers/workspace_broadcast.go
@@ -0,0 +1,150 @@
+package handlers
+
+// workspace_broadcast.go — POST /workspaces/:id/broadcast
+//
+// Allows a workspace with broadcast_enabled=true to send a message to every
+// non-removed agent workspace in the org. The message is:
+//
+// • Persisted in each recipient's activity_logs (type='broadcast_receive')
+// so poll-mode agents pick it up via GET /activity.
+// • Broadcast via WebSocket BROADCAST_MESSAGE event so canvas panels can
+// show a real-time banner for each recipient workspace.
+//
+// The sender's own workspace logs a 'broadcast_sent' activity row for
+// traceability.
+//
+// Auth: WorkspaceAuth (the agent triggers this with its own bearer token).
+// The handler re-validates broadcast_enabled inside the DB lookup to prevent
+// TOCTOU — the middleware only proved the token is valid, not the ability.
+//
+// Security: recipient query is scoped by parent_id (org) to prevent cross-tenant
+// message injection. See OFFSEC-015. parent_id IS NOT DISTINCT FROM is used so
+// NULL parent_id (root org workspace) correctly broadcasts to other NULL workspaces.
+
+import (
+ "log"
+ "net/http"
+ "strconv"
+
+ "github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
+ "github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
+ "github.com/gin-gonic/gin"
+)
+
+// BroadcastHandler is constructed once and shared across requests.
+type BroadcastHandler struct {
+ broadcaster *events.Broadcaster
+}
+
+// NewBroadcastHandler creates a BroadcastHandler.
+func NewBroadcastHandler(b *events.Broadcaster) *BroadcastHandler {
+ return &BroadcastHandler{broadcaster: b}
+}
+
+// Broadcast handles POST /workspaces/:id/broadcast.
+func (h *BroadcastHandler) Broadcast(c *gin.Context) {
+ senderID := c.Param("id")
+ if err := validateWorkspaceID(senderID); err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
+ return
+ }
+
+ var body struct {
+ Message string `json:"message" binding:"required"`
+ }
+ if err := c.ShouldBindJSON(&body); err != nil {
+ c.JSON(http.StatusBadRequest, gin.H{"error": "message is required"})
+ return
+ }
+
+ ctx := c.Request.Context()
+
+ // Verify sender exists and has broadcast_enabled=true.
+ var senderName string
+ var broadcastEnabled bool
+ var parentID *string
+ err := db.DB.QueryRowContext(ctx,
+ `SELECT name, broadcast_enabled, parent_id FROM workspaces WHERE id = $1 AND status != 'removed'`,
+ senderID,
+ ).Scan(&senderName, &broadcastEnabled, &parentID)
+ if err != nil {
+ c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
+ return
+ }
+ if !broadcastEnabled {
+ c.JSON(http.StatusForbidden, gin.H{
+ "error": "broadcast_disabled",
+ "hint": "This workspace does not have the broadcast ability. Ask a user or admin to enable it via PATCH /workspaces/:id/abilities.",
+ })
+ return
+ }
+
+ // Collect all non-removed agent workspaces in the same org (same parent_id),
+ // excluding the sender itself. parent_id IS NOT DISTINCT FROM $2 handles
+ // both NULL (root workspace broadcasts to other root workspaces) and
+ // non-NULL (workspace broadcasts to sibling workspaces in the same org) cases.
+ rows, err := db.DB.QueryContext(ctx,
+ `SELECT id FROM workspaces WHERE status != 'removed' AND id != $1 AND parent_id IS NOT DISTINCT FROM $2`,
+ senderID, parentID,
+ )
+ if err != nil {
+ log.Printf("Broadcast: recipient query failed for %s: %v", senderID, err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
+ return
+ }
+ defer rows.Close()
+
+ var recipientIDs []string
+ for rows.Next() {
+ var rid string
+ if rows.Scan(&rid) == nil {
+ recipientIDs = append(recipientIDs, rid)
+ }
+ }
+ if err := rows.Err(); err != nil {
+ log.Printf("Broadcast: recipient rows error for %s: %v", senderID, err)
+ c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
+ return
+ }
+
+ broadcastPayload := map[string]interface{}{
+ "message": body.Message,
+ "sender_id": senderID,
+ "sender": senderName,
+ }
+
+ // Persist broadcast_receive in each recipient's activity log + emit WS event.
+ delivered := 0
+ for _, rid := range recipientIDs {
+ if _, err := db.DB.ExecContext(ctx, `
+ INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status)
+ VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')
+ `, rid, senderID, "Broadcast from "+senderName+": "+broadcastTruncate(body.Message, 120)); err != nil {
+ log.Printf("Broadcast: activity_logs insert for recipient %s: %v", rid, err)
+ continue
+ }
+ h.broadcaster.BroadcastOnly(rid, "BROADCAST_MESSAGE", broadcastPayload)
+ delivered++
+ }
+
+ // Record the send on the sender's own log.
+ if _, err := db.DB.ExecContext(ctx, `
+ INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status)
+ VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')
+ `, senderID, "Broadcast sent to "+strconv.Itoa(delivered)+" workspace(s)"); err != nil {
+ log.Printf("Broadcast: sender activity_log for %s: %v", senderID, err)
+ }
+
+ c.JSON(http.StatusOK, gin.H{
+ "status": "sent",
+ "delivered": delivered,
+ })
+}
+
+func broadcastTruncate(s string, max int) string {
+ runes := []rune(s)
+ if len(runes) <= max {
+ return s
+ }
+ return string(runes[:max]) + "…"
+}
diff --git a/workspace-server/internal/handlers/workspace_budget_test.go b/workspace-server/internal/handlers/workspace_budget_test.go
index 920dad9c..4652e293 100644
--- a/workspace-server/internal/handlers/workspace_budget_test.go
+++ b/workspace-server/internal/handlers/workspace_budget_test.go
@@ -33,6 +33,7 @@ var wsColumns = []string{
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
// ==================== GET — financial fields stripped from open endpoint ====================
@@ -52,8 +53,10 @@ func TestWorkspaceBudget_Get_NilLimit(t *testing.T) {
[]byte(`{}`), "http://localhost:9001",
nil, 0, 1, 0.0, "", 0, "", "langgraph", "",
0.0, 0.0, false,
- nil, // budget_limit NULL
- 0)) // monthly_spend 0
+ nil, // budget_limit NULL
+ 0, // monthly_spend 0
+ false, // broadcast_enabled
+ true)) // talk_to_user_enabled
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -96,7 +99,8 @@ func TestWorkspaceBudget_Get_WithLimit(t *testing.T) {
nil, 0, 1, 0.0, "", 0, "", "langgraph", "",
0.0, 0.0, false,
int64(500), // budget_limit = $5.00 in DB
- int64(123))) // monthly_spend = $1.23 in DB
+ int64(123), // monthly_spend = $1.23 in DB
+ false, true)) // broadcast_enabled, talk_to_user_enabled
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
diff --git a/workspace-server/internal/handlers/workspace_test.go b/workspace-server/internal/handlers/workspace_test.go
index 4e17ca6a..8bd46afe 100644
--- a/workspace-server/internal/handlers/workspace_test.go
+++ b/workspace-server/internal/handlers/workspace_test.go
@@ -29,6 +29,7 @@ func TestWorkspaceGet_Success(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("cccccccc-0001-0000-0000-000000000000").
@@ -36,7 +37,7 @@ func TestWorkspaceGet_Success(t *testing.T) {
AddRow("cccccccc-0001-0000-0000-000000000000", "My Agent", "worker", 1, "online", []byte(`{"name":"test"}`),
"http://localhost:8001", nil, 2, 1, 0.05, "", 3600, "working", "langgraph",
"", 10.0, 20.0, false,
- nil, 0))
+ nil, 0, false, true))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -118,6 +119,7 @@ func TestWorkspaceGet_RemovedReturns410(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs(id).
@@ -125,7 +127,7 @@ func TestWorkspaceGet_RemovedReturns410(t *testing.T) {
AddRow(id, "Old Agent", "worker", 1, string(models.StatusRemoved), []byte(`null`),
"", nil, 0, 1, 0.0, "", 0, "", "langgraph",
"", 0.0, 0.0, false,
- nil, 0))
+ nil, 0, false, true))
mock.ExpectQuery(`SELECT updated_at FROM workspaces`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"updated_at"}).AddRow(removedAt))
@@ -181,6 +183,7 @@ func TestWorkspaceGet_RemovedReturns410WithNullRemovedAtOnTimestampFetchFailure(
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs(id).
@@ -188,7 +191,7 @@ func TestWorkspaceGet_RemovedReturns410WithNullRemovedAtOnTimestampFetchFailure(
AddRow(id, "Vanished", "worker", 1, string(models.StatusRemoved), []byte(`null`),
"", nil, 0, 1, 0.0, "", 0, "", "langgraph",
"", 0.0, 0.0, false,
- nil, 0))
+ nil, 0, false, true))
// Simulate the row vanishing between the two queries.
mock.ExpectQuery(`SELECT updated_at FROM workspaces`).
WithArgs(id).
@@ -243,6 +246,7 @@ func TestWorkspaceGet_RemovedWithIncludeQueryReturns200(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs(id).
@@ -250,7 +254,7 @@ func TestWorkspaceGet_RemovedWithIncludeQueryReturns200(t *testing.T) {
AddRow(id, "Audit Agent", "worker", 1, string(models.StatusRemoved), []byte(`null`),
"", nil, 0, 1, 0.0, "", 0, "", "langgraph",
"", 0.0, 0.0, false,
- nil, 0))
+ nil, 0, false, true))
// last_outbound_at follow-up query (existing path)
mock.ExpectQuery(`SELECT last_outbound_at FROM workspaces`).
WithArgs(id).
@@ -535,6 +539,7 @@ func TestWorkspaceList_Empty(t *testing.T) {
"parent_id", "active_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}))
w := httptest.NewRecorder()
@@ -1226,6 +1231,7 @@ func TestWorkspaceGet_FinancialFieldsStripped(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
// Populate with non-zero financial values to confirm they are stripped.
mock.ExpectQuery("SELECT w.id, w.name").
@@ -1234,7 +1240,7 @@ func TestWorkspaceGet_FinancialFieldsStripped(t *testing.T) {
AddRow("cccccccc-0010-0000-0000-000000000000", "Finance Test", "worker", 1, "online", []byte(`{}`),
"http://localhost:9001", nil, 0, 1, 0.0, "", 0, "", "langgraph",
"", 0.0, 0.0, false,
- int64(50000), int64(12500))) // budget_limit=500 USD, spend=125 USD
+ int64(50000), int64(12500), false, true)) // budget_limit=500 USD, spend=125 USD
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1282,6 +1288,7 @@ func TestWorkspaceGet_SensitiveFieldsStripped(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
+ "broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("cccccccc-0955-0000-0000-000000000000").
@@ -1294,7 +1301,7 @@ func TestWorkspaceGet_SensitiveFieldsStripped(t *testing.T) {
"langgraph",
"/home/user/secret-projects/client-work",
0.0, 0.0, false,
- nil, 0))
+ nil, 0, false, true))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
diff --git a/workspace-server/internal/models/workspace.go b/workspace-server/internal/models/workspace.go
index 11284473..9139fc5b 100644
--- a/workspace-server/internal/models/workspace.go
+++ b/workspace-server/internal/models/workspace.go
@@ -36,6 +36,15 @@ type Workspace struct {
// to activity_logs, agent reads via GET /activity?since_id=). See
// migration 045 + RFC #2339.
DeliveryMode string `json:"delivery_mode" db:"delivery_mode"`
+ // BroadcastEnabled: when true the workspace may call POST /broadcast to
+ // deliver a message to all non-removed agent workspaces in the org.
+ // Default false — only privileged orchestrators should hold this ability.
+ BroadcastEnabled bool `json:"broadcast_enabled" db:"broadcast_enabled"`
+ // TalkToUserEnabled: when false the workspace's send_message_to_user calls
+ // and POST /notify requests are rejected with HTTP 403 so the agent is
+ // forced to route updates through a parent workspace. Default true
+ // (preserves existing behaviour for all workspaces).
+ TalkToUserEnabled bool `json:"talk_to_user_enabled" db:"talk_to_user_enabled"`
// Canvas layout fields (from JOIN)
X float64 `json:"x"`
Y float64 `json:"y"`
diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go
index 770b0a66..556904c5 100644
--- a/workspace-server/internal/router/router.go
+++ b/workspace-server/internal/router/router.go
@@ -137,6 +137,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
wsAdmin.GET("/workspaces", wh.List)
wsAdmin.POST("/workspaces", wh.Create)
wsAdmin.DELETE("/workspaces/:id", wh.Delete)
+ // Ability toggles — admin-only so workspace agents cannot self-modify
+ // broadcast_enabled or talk_to_user_enabled.
+ wsAdmin.PATCH("/workspaces/:id/abilities", handlers.PatchAbilities)
// Out-of-band bootstrap signal: CP's watcher POSTs here when it
// detects "RUNTIME CRASHED" in a workspace EC2 console output,
// so the canvas flips to failed in seconds instead of waiting
@@ -192,6 +195,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// to 'hibernated'. The workspace auto-wakes on the next A2A message.
wsAuth.POST("/hibernate", wh.Hibernate)
+ // Broadcast — send a message to all non-removed workspaces in the org.
+ // Requires broadcast_enabled=true on the source workspace (checked
+ // inside the handler). WorkspaceAuth on wsAuth proves token ownership.
+ broadcastH := handlers.NewBroadcastHandler(broadcaster)
+ wsAuth.POST("/broadcast", broadcastH.Broadcast)
+
// External-workspace credential lifecycle (issue #319 follow-up to
// the Create flow). Both endpoints reject runtime ≠ external with
// 400 — see external_rotate.go for the rationale.
diff --git a/workspace-server/migrations/20260514120000_workspace_abilities.down.sql b/workspace-server/migrations/20260514120000_workspace_abilities.down.sql
new file mode 100644
index 00000000..12b5f846
--- /dev/null
+++ b/workspace-server/migrations/20260514120000_workspace_abilities.down.sql
@@ -0,0 +1,3 @@
+ALTER TABLE workspaces
+ DROP COLUMN IF EXISTS broadcast_enabled,
+ DROP COLUMN IF EXISTS talk_to_user_enabled;
diff --git a/workspace-server/migrations/20260514120000_workspace_abilities.up.sql b/workspace-server/migrations/20260514120000_workspace_abilities.up.sql
new file mode 100644
index 00000000..f172c30f
--- /dev/null
+++ b/workspace-server/migrations/20260514120000_workspace_abilities.up.sql
@@ -0,0 +1,16 @@
+-- Workspace abilities: opt-in flags that gate platform-level behaviours.
+--
+-- broadcast_enabled (default FALSE): when TRUE the workspace may call
+-- POST /workspaces/:id/broadcast to send a message to every non-removed
+-- agent workspace in the org. Off by default — only privileged
+-- orchestrator workspaces should hold this ability.
+--
+-- talk_to_user_enabled (default TRUE): when FALSE the workspace is not
+-- allowed to deliver messages to the canvas user via send_message_to_user /
+-- POST /notify. The platform returns HTTP 403 so the agent can forward its
+-- update to a parent workspace instead. Default TRUE preserves existing
+-- behaviour for all current workspaces.
+
+ALTER TABLE workspaces
+ ADD COLUMN IF NOT EXISTS broadcast_enabled BOOLEAN NOT NULL DEFAULT FALSE,
+ ADD COLUMN IF NOT EXISTS talk_to_user_enabled BOOLEAN NOT NULL DEFAULT TRUE;
diff --git a/workspace/a2a_tools.py b/workspace/a2a_tools.py
index 1b1ef267..eb26e622 100644
--- a/workspace/a2a_tools.py
+++ b/workspace/a2a_tools.py
@@ -137,6 +137,7 @@ from a2a_tools_delegation import ( # noqa: E402 (import after the from-a2a_cli
# identically.
from a2a_tools_messaging import ( # noqa: E402 (import after the top-of-module imports)
_upload_chat_files,
+ tool_broadcast_message,
tool_chat_history,
tool_get_workspace_info,
tool_list_peers,
diff --git a/workspace/a2a_tools_messaging.py b/workspace/a2a_tools_messaging.py
index dea24f90..9b832a2b 100644
--- a/workspace/a2a_tools_messaging.py
+++ b/workspace/a2a_tools_messaging.py
@@ -101,6 +101,50 @@ async def _upload_chat_files(
return uploaded, None
+async def tool_broadcast_message(
+ message: str,
+ workspace_id: str | None = None,
+) -> str:
+ """Send a broadcast message to ALL agent workspaces in the org.
+
+ Requires the workspace to have broadcast_enabled=true (set by a user or
+ admin via PATCH /workspaces/:id/abilities). Use for urgent org-wide
+ signals — status changes, critical alerts, coordination instructions.
+ Every non-removed workspace receives the message in its activity log so
+ poll-mode agents pick it up, and push-mode canvases get a real-time
+ BROADCAST_MESSAGE WebSocket event.
+
+ Args:
+ message: The broadcast text. Keep it concise — all agents receive
+ this, so avoid lengthy prose that floods every context.
+ workspace_id: Optional. Which registered workspace to send the
+ broadcast from. Single-workspace agents omit this.
+ """
+ if not message:
+ return "Error: message is required"
+ target_workspace_id = (workspace_id or "").strip() or WORKSPACE_ID
+ try:
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ resp = await client.post(
+ f"{PLATFORM_URL}/workspaces/{target_workspace_id}/broadcast",
+ json={"message": message},
+ headers=_auth_headers_for_heartbeat(target_workspace_id),
+ )
+ if resp.status_code == 200:
+ data = resp.json()
+ delivered = data.get("delivered", "?")
+ return f"Broadcast sent to {delivered} workspace(s)"
+ if resp.status_code == 403:
+ try:
+ hint = resp.json().get("hint", "")
+ except Exception:
+ hint = ""
+ return f"Error: broadcast ability not enabled.{(' ' + hint) if hint else ''}"
+ return f"Error: platform returned {resp.status_code}"
+ except Exception as e:
+ return f"Error sending broadcast: {e}"
+
+
async def tool_send_message_to_user(
message: str,
attachments: list[str] | None = None,
@@ -151,6 +195,20 @@ async def tool_send_message_to_user(
if uploaded:
return f"Message sent to user with {len(uploaded)} attachment(s)"
return "Message sent to user"
+ if resp.status_code == 403:
+ try:
+ body = resp.json()
+ if body.get("error") == "talk_to_user_disabled":
+ hint = body.get("hint", "")
+ return (
+ "Error: this workspace is not allowed to send messages "
+ "directly to the user (talk_to_user is disabled). "
+ + (hint + " " if hint else "")
+ + "Use delegate_task to forward your update to a parent "
+ "or supervisor workspace that can reach the user."
+ )
+ except Exception:
+ pass
return f"Error: platform returned {resp.status_code}"
except Exception as e:
return f"Error sending message: {e}"
diff --git a/workspace/executor_helpers.py b/workspace/executor_helpers.py
index 95ac65fc..2257959f 100644
--- a/workspace/executor_helpers.py
+++ b/workspace/executor_helpers.py
@@ -326,6 +326,10 @@ _CLI_A2A_COMMAND_KEYWORDS: dict[str, str | None] = {
"delegate_task_async": "delegate --async",
"check_task_status": "status",
"get_workspace_info": "info",
+ # `broadcast_message` is not exposed via the CLI subprocess interface
+ # today — it's an MCP-first capability. If a2a_cli grows a `broadcast`
+ # subcommand, map it here and the alignment test will gate the change.
+ "broadcast_message": None,
# `send_message_to_user` is not exposed via the CLI subprocess
# interface today — it requires a structured `attachments` field
# that wouldn't survive a positional-arg shell invocation cleanly.
diff --git a/workspace/platform_tools/registry.py b/workspace/platform_tools/registry.py
index f4fa773e..6550c9e7 100644
--- a/workspace/platform_tools/registry.py
+++ b/workspace/platform_tools/registry.py
@@ -51,6 +51,7 @@ from dataclasses import dataclass
from typing import Any, Literal
from a2a_tools import (
+ tool_broadcast_message,
tool_chat_history,
tool_check_task_status,
tool_commit_memory,
@@ -288,6 +289,44 @@ _GET_WORKSPACE_INFO = ToolSpec(
section=A2A_SECTION,
)
+_BROADCAST_MESSAGE = ToolSpec(
+ name="broadcast_message",
+ short=(
+ "Send a message to ALL agent workspaces in the org simultaneously. "
+ "Requires broadcast_enabled=true on this workspace (set by user/admin)."
+ ),
+ when_to_use=(
+ "Use for urgent, org-wide signals: critical status changes, emergency "
+ "stop instructions, coordinated task announcements. Every non-removed "
+ "workspace receives the message in its activity log (poll-mode agents "
+ "see it on their next poll; push-mode canvases get a real-time banner). "
+ "This tool returns an error if broadcast_enabled is false — a user or "
+ "admin must enable it via the workspace abilities settings first."
+ ),
+ input_schema={
+ "type": "object",
+ "properties": {
+ "message": {
+ "type": "string",
+ "description": (
+ "The broadcast text. Keep it concise — every agent in the "
+ "org receives this in their activity feed."
+ ),
+ },
+ "workspace_id": {
+ "type": "string",
+ "description": (
+ "Optional. Multi-workspace mode: the registered workspace "
+ "to broadcast from. Single-workspace agents omit this."
+ ),
+ },
+ },
+ "required": ["message"],
+ },
+ impl=tool_broadcast_message,
+ section=A2A_SECTION,
+)
+
_SEND_MESSAGE_TO_USER = ToolSpec(
name="send_message_to_user",
short=(
@@ -603,6 +642,7 @@ TOOLS: list[ToolSpec] = [
_CHECK_TASK_STATUS,
_LIST_PEERS,
_GET_WORKSPACE_INFO,
+ _BROADCAST_MESSAGE,
_SEND_MESSAGE_TO_USER,
# Inbox (standalone-only; in-container returns informational error)
_WAIT_FOR_MESSAGE,
diff --git a/workspace/tests/snapshots/a2a_instructions_mcp.txt b/workspace/tests/snapshots/a2a_instructions_mcp.txt
index 6bcf471e..3f0213e1 100644
--- a/workspace/tests/snapshots/a2a_instructions_mcp.txt
+++ b/workspace/tests/snapshots/a2a_instructions_mcp.txt
@@ -5,6 +5,7 @@
- **check_task_status**: Poll the status of a task started with delegate_task_async; returns result when done.
- **list_peers**: List the workspaces this agent can communicate with — name, ID, status, role for each.
- **get_workspace_info**: Get this workspace's own info — ID, name, role, tier, parent, status.
+- **broadcast_message**: Send a message to ALL agent workspaces in the org simultaneously. Requires broadcast_enabled=true on this workspace (set by user/admin).
- **send_message_to_user**: Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download via the `attachments` field (NEVER paste file URLs in `message`). The message appears in the user's chat as if you're proactively reaching out.
- **wait_for_message**: Block until the next inbound message (canvas user OR peer agent) arrives, or until ``timeout_secs`` elapses.
- **inbox_peek**: List pending inbound messages without removing them.
@@ -26,6 +27,9 @@ Call this first when you need to delegate but don't know the target's ID. Access
### get_workspace_info
Use to introspect your own identity (e.g. before reporting back to the user, or to determine whether you're a tier-0 root that can write GLOBAL memory).
+### broadcast_message
+Use for urgent, org-wide signals: critical status changes, emergency stop instructions, coordinated task announcements. Every non-removed workspace receives the message in its activity log (poll-mode agents see it on their next poll; push-mode canvases get a real-time banner). This tool returns an error if broadcast_enabled is false — a user or admin must enable it via the workspace abilities settings first.
+
### send_message_to_user
Use proactively across the lifecycle of a task — early to acknowledge, mid-flight to update, late to deliver. Never paste file URLs in the message body — always pass absolute paths in `attachments` so the platform serves them as download chips (works on SaaS where external file hosts are unreachable).