fix(handlers): HOTFIX OFFSEC-015 — scope broadcast recipients to sender org #1243

Open
release-manager wants to merge 2 commits from fix/offsec-015-staging into staging
2 changed files with 715 additions and 6 deletions

View File

@ -3,7 +3,7 @@ package handlers
// workspace_broadcast.go — POST /workspaces/:id/broadcast
//
// Allows a workspace with broadcast_enabled=true to send a message to every
// non-removed agent workspace in the org. The message is:
// non-removed agent workspace in the SAME ORG. The message is:
//
// • Persisted in each recipient's activity_logs (type='broadcast_receive')
// so poll-mode agents pick it up via GET /activity.
@ -16,6 +16,11 @@ package handlers
// Auth: WorkspaceAuth (the agent triggers this with its own bearer token).
// The handler re-validates broadcast_enabled inside the DB lookup to prevent
// TOCTOU — the middleware only proved the token is valid, not the ability.
//
// Org isolation (OFFSEC-015): recipients are scoped to the sender's org using
// a recursive CTE that walks the parent_id chain to find the org root. This
// prevents a compromised or misconfigured workspace from broadcasting to
// workspaces in other tenants' orgs.
import (
"log"
@ -74,11 +79,49 @@ func (h *BroadcastHandler) Broadcast(c *gin.Context) {
return
}
// Collect all non-removed agent workspaces (excludes the sender itself).
rows, err := db.DB.QueryContext(ctx,
`SELECT id FROM workspaces WHERE status != 'removed' AND id != $1`,
senderID,
)
// Find the sender's org root by walking the parent_id chain.
// Workspaces with parent_id = NULL are org roots; every other workspace
// belongs to the org identified by its topmost ancestor.
var orgRootID string
err = db.DB.QueryRowContext(ctx, `
WITH RECURSIVE org_chain AS (
SELECT id, parent_id, id AS root_id
FROM workspaces
WHERE id = $1
UNION ALL
SELECT w.id, w.parent_id, c.root_id
FROM workspaces w
JOIN org_chain c ON w.id = c.parent_id
)
SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1
`, senderID).Scan(&orgRootID)
if err != nil {
log.Printf("Broadcast: org root lookup for %s: %v", senderID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
return
}
// Collect all non-removed agent workspaces in the SAME ORG (same root_id),
// excluding the sender itself.
rows, err := db.DB.QueryContext(ctx, `
WITH RECURSIVE org_chain AS (
SELECT id, parent_id, id AS root_id
FROM workspaces
WHERE parent_id IS NULL
UNION ALL
SELECT w.id, w.parent_id, c.root_id
FROM workspaces w
JOIN org_chain c ON w.parent_id = c.id
)
SELECT c.id
FROM org_chain c
WHERE c.root_id = $1
AND c.id != $2
AND EXISTS (
SELECT 1 FROM workspaces w
WHERE w.id = c.id AND w.status != 'removed'
)
`, orgRootID, senderID)
if err != nil {
log.Printf("Broadcast: recipient query failed for %s: %v", senderID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})

View File

@ -0,0 +1,666 @@
package handlers
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
"github.com/gin-gonic/gin"
)
// setupBroadcastDB uses QueryMatcherEqual so SQL strings with quoted literals
// (e.g. status != 'removed') are compared verbatim, not as regex.
func setupBroadcastDB(t *testing.T) sqlmock.Sqlmock {
t.Helper()
mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
return mock
}
// broadcastTestUUID is a properly formatted test UUID.
const broadcastTestUUID = "bbbbbbbb-0001-0001-0001-000000000001"
// buildBroadcastCtx creates a gin.Context wired for POST /workspaces/:id/broadcast.
func buildBroadcastCtx(id, body string) (*gin.Context, *httptest.ResponseRecorder) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
req := httptest.NewRequest(http.MethodPost, "/workspaces/"+id+"/broadcast", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
c.Request = req.WithContext(context.Background())
c.Params = gin.Params{{Key: "id", Value: id}}
return c, w
}
// ─── Pure function ────────────────────────────────────────────────────────────
func TestBroadcastTruncate(t *testing.T) {
tests := []struct {
name string
s string
max int
want string
}{
{"empty string", "", 10, ""},
{"under limit", "hello", 10, "hello"},
{"exactly at limit", "hello", 5, "hello"},
{"over limit", "hello world", 5, "hello…"},
{"unicode over limit", "こんにちは世界", 5, "こんにちは…"},
{"ascii over limit", "abcdefghij", 5, "abcde…"},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := broadcastTruncate(tc.s, tc.max)
if got != tc.want {
t.Errorf("broadcastTruncate(%q, %d) = %q; want %q", tc.s, tc.max, got, tc.want)
}
})
}
}
// ─── Validation ────────────────────────────────────────────────────────────────
func TestBroadcast_InvalidWorkspaceID(t *testing.T) {
c, w := buildBroadcastCtx("not-a-uuid", `{"message":"hello"}`)
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusBadRequest {
t.Errorf("want 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestBroadcast_MissingMessage(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `{}`)
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusBadRequest {
t.Errorf("want 400, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations: %v", err)
}
}
func TestBroadcast_MalformedJSON(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `not json`)
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusBadRequest {
t.Errorf("want 400, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations: %v", err)
}
}
// ─── Auth / Authz ─────────────────────────────────────────────────────────────
func TestBroadcast_WorkspaceNotFound(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`)
// Workspace lookup returns no rows.
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(broadcastTestUUID).
WillReturnError(sql.ErrNoRows)
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusNotFound {
t.Errorf("want 404, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations: %v", err)
}
}
func TestBroadcast_WorkspaceLookupQueryError(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`)
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(broadcastTestUUID).
WillReturnError(sql.ErrConnDone)
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusNotFound {
t.Errorf("want 404, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations: %v", err)
}
}
func TestBroadcast_BroadcastDisabled(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`)
// Workspace found but broadcast_enabled=false.
rows := sqlmock.NewRows([]string{"name", "broadcast_enabled"}).
AddRow("test-workspace", false)
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(broadcastTestUUID).
WillReturnRows(rows)
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusForbidden {
t.Errorf("want 403, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations: %v", err)
}
}
// ─── Org root lookup error (blocks cross-org broadcast) ──────────────────────
func TestBroadcast_OrgRootLookupError(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`)
// Workspace lookup succeeds.
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("test-workspace", true))
// Org root CTE fails — handler must NOT proceed to the recipient query
// (which would broadcast cross-org if org root lookup failed silently).
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1").
WithArgs(broadcastTestUUID).
WillReturnError(context.DeadlineExceeded)
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("want 500, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// ─── DB error paths ───────────────────────────────────────────────────────────
func TestBroadcast_RecipientQueryError(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`)
// Workspace lookup succeeds with broadcast_enabled=true.
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("test-workspace", true))
// Org root lookup succeeds.
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID))
// Recipient query fails.
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )").
WithArgs(broadcastTestUUID, broadcastTestUUID).
WillReturnError(sql.ErrConnDone)
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("want 500, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations: %v", err)
}
}
func TestBroadcast_RecipientRowsError(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`)
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("test-workspace", true))
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID))
// Recipient query succeeds but rows.Err() fails.
badRows := sqlmock.NewRows([]string{"id"}).AddRow("ws-2").RowError(0, sql.ErrConnDone)
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )").
WithArgs(broadcastTestUUID, broadcastTestUUID).
WillReturnRows(badRows)
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("want 500, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations: %v", err)
}
}
// ─── Success paths ───────────────────────────────────────────────────────────
func TestBroadcast_Success_OneRecipient(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello world"}`)
// Workspace lookup.
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("sender-workspace", true))
// Org root lookup.
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID))
// Recipient query: one recipient.
recipRows := sqlmock.NewRows([]string{"id"}).AddRow("ws-recipient-1")
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )").
WithArgs(broadcastTestUUID, broadcastTestUUID).
WillReturnRows(recipRows)
// Activity log insert for recipient.
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')").
WithArgs("ws-recipient-1", broadcastTestUUID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// Activity log insert for sender (broadcast_sent).
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')").
WithArgs(broadcastTestUUID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("want 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations: %v", err)
}
}
func TestBroadcast_Success_NoRecipients(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`)
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("solo-workspace", true))
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID))
// No recipients.
recipRows := sqlmock.NewRows([]string{"id"})
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )").
WithArgs(broadcastTestUUID, broadcastTestUUID).
WillReturnRows(recipRows)
// Activity log insert for sender (broadcast_sent).
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')").
WithArgs(broadcastTestUUID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("want 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations: %v", err)
}
}
func TestBroadcast_Success_MultipleRecipients(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`)
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("broadcaster", true))
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID))
// Three recipients.
recipRows := sqlmock.NewRows([]string{"id"}).
AddRow("ws-1").AddRow("ws-2").AddRow("ws-3")
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )").
WithArgs(broadcastTestUUID, broadcastTestUUID).
WillReturnRows(recipRows)
// Each recipient gets a broadcast_receive log.
for _, rid := range []string{"ws-1", "ws-2", "ws-3"} {
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')").
WithArgs(rid, broadcastTestUUID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
}
// Sender log.
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')").
WithArgs(broadcastTestUUID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("want 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations: %v", err)
}
}
// ─── Recipient insert failure (logged, continues) ─────────────────────────────
func TestBroadcast_RecipientInsertError_ContinuesAndSucceeds(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`)
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("broadcaster", true))
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID))
// Two recipients.
recipRows := sqlmock.NewRows([]string{"id"}).AddRow("ws-1").AddRow("ws-2")
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )").
WithArgs(broadcastTestUUID, broadcastTestUUID).
WillReturnRows(recipRows)
// First recipient insert fails (logged, continues).
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')").
WithArgs("ws-1", broadcastTestUUID, sqlmock.AnyArg()).
WillReturnError(sql.ErrConnDone)
// Second recipient insert succeeds.
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')").
WithArgs("ws-2", broadcastTestUUID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// Sender log.
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')").
WithArgs(broadcastTestUUID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
// Handler returns 200 even though one insert failed — it logs and continues.
if w.Code != http.StatusOK {
t.Errorf("want 200 despite insert error, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations: %v", err)
}
}
// ─── Sender activity log insert failure (logged, still 200) ───────────────────
func TestBroadcast_SenderLogInsertError_Still200(t *testing.T) {
mock := setupBroadcastDB(t)
c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`)
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("broadcaster", true))
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1").
WithArgs(broadcastTestUUID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID))
recipRows := sqlmock.NewRows([]string{"id"}).AddRow("ws-1")
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )").
WithArgs(broadcastTestUUID, broadcastTestUUID).
WillReturnRows(recipRows)
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')").
WithArgs("ws-1", broadcastTestUUID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
// Sender log fails — but handler still returns 200 (logged only).
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')").
WithArgs(broadcastTestUUID, sqlmock.AnyArg()).
WillReturnError(sql.ErrConnDone)
handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil)))
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("want 200 despite sender log error, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations: %v", err)
}
}
// ─── Org-scoped recipient query tests (OFFSEC-015) ────────────────────────────
// TestBroadcast_OrgScopedRecipients verifies that a broadcast from Org-A does
// NOT reach workspaces belonging to Org-B. This is the core regression test
// for OFFSEC-015: the original query had no org filter, so a workspace in
// Org-A could broadcast to every non-removed workspace in the entire DB.
func TestBroadcast_OrgScopedRecipients(t *testing.T) {
mock := setupBroadcastDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001" // org-a-root
wsAChild := "00000000-0000-0000-0000-000000000002"
// 1. Sender lookup
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Org-A Root", true))
// 2. Org root lookup — sender is its own root (parent_id = NULL)
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1").
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// 3. Org-scoped recipient query — MUST include org filter so ws-b-child is NOT included.
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )").
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(wsAChild))
// Activity log inserts
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')").
WithArgs(wsAChild, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')").
WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"hello from org-a"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to unmarshal response: %v", err)
}
if resp["status"] != "sent" {
t.Errorf("expected status 'sent', got %v", resp["status"])
}
// ws-b-child is in a DIFFERENT org — the org-scoped query MUST NOT include it.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations — cross-org workspace was included: %v", err)
}
}
// TestBroadcast_OrgScoped_OrgRootSender verifies that when the sender IS the
// org root (parent_id = NULL), broadcasts still reach sibling workspaces.
func TestBroadcast_OrgScoped_OrgRootSender(t *testing.T) {
mock := setupBroadcastDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
siblingID := "00000000-0000-0000-0000-000000000002"
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1").
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )").
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID))
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')").
WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')").
WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"hello siblings"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestBroadcast_OrgScoped_ChildWorkspaceSender verifies that a non-root child
// workspace can broadcast to siblings in the same org.
func TestBroadcast_OrgScoped_ChildWorkspaceSender(t *testing.T) {
mock := setupBroadcastDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
orgRootID := "00000000-0000-0000-0000-000000000001"
senderID := "00000000-0000-0000-0000-000000000002"
siblingID := "00000000-0000-0000-0000-000000000003"
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Child Agent", true))
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1").
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(orgRootID))
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )").
WithArgs(orgRootID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID))
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')").
WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')").
WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"child broadcasting"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestBroadcast_OrgScoped_SelfBroadcastExcluded verifies that broadcasting
// from a workspace does not send a broadcast_receive to the sender itself.
func TestBroadcast_OrgScoped_SelfBroadcastExcluded(t *testing.T) {
mock := setupBroadcastDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
peerID := "00000000-0000-0000-0000-000000000002"
mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'").
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1").
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// Recipient query MUST exclude sender via id != senderID
mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )").
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(peerID))
// Peer receives broadcast_receive
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')").
WithArgs(peerID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
// Sender logs broadcast_sent (NOT broadcast_receive)
mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')").
WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"no echo to self"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}