diff --git a/workspace-server/internal/handlers/workspace_broadcast.go b/workspace-server/internal/handlers/workspace_broadcast.go index 6afd21e0a..668475661 100644 --- a/workspace-server/internal/handlers/workspace_broadcast.go +++ b/workspace-server/internal/handlers/workspace_broadcast.go @@ -3,7 +3,7 @@ package handlers // workspace_broadcast.go — POST /workspaces/:id/broadcast // // Allows a workspace with broadcast_enabled=true to send a message to every -// non-removed agent workspace in the org. The message is: +// non-removed agent workspace in the SAME ORG. The message is: // // • Persisted in each recipient's activity_logs (type='broadcast_receive') // so poll-mode agents pick it up via GET /activity. @@ -16,6 +16,11 @@ package handlers // Auth: WorkspaceAuth (the agent triggers this with its own bearer token). // The handler re-validates broadcast_enabled inside the DB lookup to prevent // TOCTOU — the middleware only proved the token is valid, not the ability. +// +// Org isolation (OFFSEC-015): recipients are scoped to the sender's org using +// a recursive CTE that walks the parent_id chain to find the org root. This +// prevents a compromised or misconfigured workspace from broadcasting to +// workspaces in other tenants' orgs. import ( "log" @@ -74,11 +79,49 @@ func (h *BroadcastHandler) Broadcast(c *gin.Context) { return } - // Collect all non-removed agent workspaces (excludes the sender itself). - rows, err := db.DB.QueryContext(ctx, - `SELECT id FROM workspaces WHERE status != 'removed' AND id != $1`, - senderID, - ) + // Find the sender's org root by walking the parent_id chain. + // Workspaces with parent_id = NULL are org roots; every other workspace + // belongs to the org identified by its topmost ancestor. + var orgRootID string + err = db.DB.QueryRowContext(ctx, ` + WITH RECURSIVE org_chain AS ( + SELECT id, parent_id, id AS root_id + FROM workspaces + WHERE id = $1 + UNION ALL + SELECT w.id, w.parent_id, c.root_id + FROM workspaces w + JOIN org_chain c ON w.id = c.parent_id + ) + SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1 + `, senderID).Scan(&orgRootID) + if err != nil { + log.Printf("Broadcast: org root lookup for %s: %v", senderID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"}) + return + } + + // Collect all non-removed agent workspaces in the SAME ORG (same root_id), + // excluding the sender itself. + rows, err := db.DB.QueryContext(ctx, ` + WITH RECURSIVE org_chain AS ( + SELECT id, parent_id, id AS root_id + FROM workspaces + WHERE parent_id IS NULL + UNION ALL + SELECT w.id, w.parent_id, c.root_id + FROM workspaces w + JOIN org_chain c ON w.parent_id = c.id + ) + SELECT c.id + FROM org_chain c + WHERE c.root_id = $1 + AND c.id != $2 + AND EXISTS ( + SELECT 1 FROM workspaces w + WHERE w.id = c.id AND w.status != 'removed' + ) + `, orgRootID, senderID) if err != nil { log.Printf("Broadcast: recipient query failed for %s: %v", senderID, err) c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"}) diff --git a/workspace-server/internal/handlers/workspace_broadcast_test.go b/workspace-server/internal/handlers/workspace_broadcast_test.go new file mode 100644 index 000000000..81ca3d1e9 --- /dev/null +++ b/workspace-server/internal/handlers/workspace_broadcast_test.go @@ -0,0 +1,666 @@ +package handlers + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/ws" + "github.com/gin-gonic/gin" +) + +// setupBroadcastDB uses QueryMatcherEqual so SQL strings with quoted literals +// (e.g. status != 'removed') are compared verbatim, not as regex. +func setupBroadcastDB(t *testing.T) sqlmock.Sqlmock { + t.Helper() + mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + if err != nil { + t.Fatalf("failed to create sqlmock: %v", err) + } + prevDB := db.DB + db.DB = mockDB + t.Cleanup(func() { db.DB = prevDB; mockDB.Close() }) + return mock +} + +// broadcastTestUUID is a properly formatted test UUID. +const broadcastTestUUID = "bbbbbbbb-0001-0001-0001-000000000001" + +// buildBroadcastCtx creates a gin.Context wired for POST /workspaces/:id/broadcast. +func buildBroadcastCtx(id, body string) (*gin.Context, *httptest.ResponseRecorder) { + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + req := httptest.NewRequest(http.MethodPost, "/workspaces/"+id+"/broadcast", bytes.NewBufferString(body)) + req.Header.Set("Content-Type", "application/json") + c.Request = req.WithContext(context.Background()) + c.Params = gin.Params{{Key: "id", Value: id}} + return c, w +} + +// ─── Pure function ──────────────────────────────────────────────────────────── + +func TestBroadcastTruncate(t *testing.T) { + tests := []struct { + name string + s string + max int + want string + }{ + {"empty string", "", 10, ""}, + {"under limit", "hello", 10, "hello"}, + {"exactly at limit", "hello", 5, "hello"}, + {"over limit", "hello world", 5, "hello…"}, + {"unicode over limit", "こんにちは世界", 5, "こんにちは…"}, + {"ascii over limit", "abcdefghij", 5, "abcde…"}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := broadcastTruncate(tc.s, tc.max) + if got != tc.want { + t.Errorf("broadcastTruncate(%q, %d) = %q; want %q", tc.s, tc.max, got, tc.want) + } + }) + } +} + +// ─── Validation ──────────────────────────────────────────────────────────────── + +func TestBroadcast_InvalidWorkspaceID(t *testing.T) { + c, w := buildBroadcastCtx("not-a-uuid", `{"message":"hello"}`) + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + if w.Code != http.StatusBadRequest { + t.Errorf("want 400, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestBroadcast_MissingMessage(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `{}`) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("want 400, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations: %v", err) + } +} + +func TestBroadcast_MalformedJSON(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `not json`) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("want 400, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations: %v", err) + } +} + +// ─── Auth / Authz ───────────────────────────────────────────────────────────── + +func TestBroadcast_WorkspaceNotFound(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`) + + // Workspace lookup returns no rows. + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(broadcastTestUUID). + WillReturnError(sql.ErrNoRows) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + if w.Code != http.StatusNotFound { + t.Errorf("want 404, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations: %v", err) + } +} + +func TestBroadcast_WorkspaceLookupQueryError(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`) + + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(broadcastTestUUID). + WillReturnError(sql.ErrConnDone) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + if w.Code != http.StatusNotFound { + t.Errorf("want 404, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations: %v", err) + } +} + +func TestBroadcast_BroadcastDisabled(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`) + + // Workspace found but broadcast_enabled=false. + rows := sqlmock.NewRows([]string{"name", "broadcast_enabled"}). + AddRow("test-workspace", false) + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(broadcastTestUUID). + WillReturnRows(rows) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + if w.Code != http.StatusForbidden { + t.Errorf("want 403, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations: %v", err) + } +} + +// ─── Org root lookup error (blocks cross-org broadcast) ────────────────────── + +func TestBroadcast_OrgRootLookupError(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`) + + // Workspace lookup succeeds. + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("test-workspace", true)) + + // Org root CTE fails — handler must NOT proceed to the recipient query + // (which would broadcast cross-org if org root lookup failed silently). + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1"). + WithArgs(broadcastTestUUID). + WillReturnError(context.DeadlineExceeded) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + if w.Code != http.StatusInternalServerError { + t.Errorf("want 500, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// ─── DB error paths ─────────────────────────────────────────────────────────── + +func TestBroadcast_RecipientQueryError(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`) + + // Workspace lookup succeeds with broadcast_enabled=true. + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("test-workspace", true)) + + // Org root lookup succeeds. + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID)) + + // Recipient query fails. + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )"). + WithArgs(broadcastTestUUID, broadcastTestUUID). + WillReturnError(sql.ErrConnDone) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + if w.Code != http.StatusInternalServerError { + t.Errorf("want 500, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations: %v", err) + } +} + +func TestBroadcast_RecipientRowsError(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`) + + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("test-workspace", true)) + + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID)) + + // Recipient query succeeds but rows.Err() fails. + badRows := sqlmock.NewRows([]string{"id"}).AddRow("ws-2").RowError(0, sql.ErrConnDone) + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )"). + WithArgs(broadcastTestUUID, broadcastTestUUID). + WillReturnRows(badRows) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + if w.Code != http.StatusInternalServerError { + t.Errorf("want 500, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations: %v", err) + } +} + +// ─── Success paths ─────────────────────────────────────────────────────────── + +func TestBroadcast_Success_OneRecipient(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello world"}`) + + // Workspace lookup. + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("sender-workspace", true)) + + // Org root lookup. + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID)) + + // Recipient query: one recipient. + recipRows := sqlmock.NewRows([]string{"id"}).AddRow("ws-recipient-1") + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )"). + WithArgs(broadcastTestUUID, broadcastTestUUID). + WillReturnRows(recipRows) + + // Activity log insert for recipient. + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')"). + WithArgs("ws-recipient-1", broadcastTestUUID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Activity log insert for sender (broadcast_sent). + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')"). + WithArgs(broadcastTestUUID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + if w.Code != http.StatusOK { + t.Errorf("want 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations: %v", err) + } +} + +func TestBroadcast_Success_NoRecipients(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`) + + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("solo-workspace", true)) + + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID)) + + // No recipients. + recipRows := sqlmock.NewRows([]string{"id"}) + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )"). + WithArgs(broadcastTestUUID, broadcastTestUUID). + WillReturnRows(recipRows) + + // Activity log insert for sender (broadcast_sent). + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')"). + WithArgs(broadcastTestUUID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + if w.Code != http.StatusOK { + t.Errorf("want 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations: %v", err) + } +} + +func TestBroadcast_Success_MultipleRecipients(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`) + + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("broadcaster", true)) + + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID)) + + // Three recipients. + recipRows := sqlmock.NewRows([]string{"id"}). + AddRow("ws-1").AddRow("ws-2").AddRow("ws-3") + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )"). + WithArgs(broadcastTestUUID, broadcastTestUUID). + WillReturnRows(recipRows) + + // Each recipient gets a broadcast_receive log. + for _, rid := range []string{"ws-1", "ws-2", "ws-3"} { + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')"). + WithArgs(rid, broadcastTestUUID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + } + + // Sender log. + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')"). + WithArgs(broadcastTestUUID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + if w.Code != http.StatusOK { + t.Errorf("want 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations: %v", err) + } +} + +// ─── Recipient insert failure (logged, continues) ───────────────────────────── + +func TestBroadcast_RecipientInsertError_ContinuesAndSucceeds(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`) + + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("broadcaster", true)) + + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID)) + + // Two recipients. + recipRows := sqlmock.NewRows([]string{"id"}).AddRow("ws-1").AddRow("ws-2") + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )"). + WithArgs(broadcastTestUUID, broadcastTestUUID). + WillReturnRows(recipRows) + + // First recipient insert fails (logged, continues). + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')"). + WithArgs("ws-1", broadcastTestUUID, sqlmock.AnyArg()). + WillReturnError(sql.ErrConnDone) + + // Second recipient insert succeeds. + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')"). + WithArgs("ws-2", broadcastTestUUID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Sender log. + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')"). + WithArgs(broadcastTestUUID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + // Handler returns 200 even though one insert failed — it logs and continues. + if w.Code != http.StatusOK { + t.Errorf("want 200 despite insert error, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations: %v", err) + } +} + +// ─── Sender activity log insert failure (logged, still 200) ─────────────────── + +func TestBroadcast_SenderLogInsertError_Still200(t *testing.T) { + mock := setupBroadcastDB(t) + c, w := buildBroadcastCtx(broadcastTestUUID, `{"message":"hello"}`) + + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("broadcaster", true)) + + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1"). + WithArgs(broadcastTestUUID). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(broadcastTestUUID)) + + recipRows := sqlmock.NewRows([]string{"id"}).AddRow("ws-1") + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )"). + WithArgs(broadcastTestUUID, broadcastTestUUID). + WillReturnRows(recipRows) + + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')"). + WithArgs("ws-1", broadcastTestUUID, sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Sender log fails — but handler still returns 200 (logged only). + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')"). + WithArgs(broadcastTestUUID, sqlmock.AnyArg()). + WillReturnError(sql.ErrConnDone) + + handler := NewBroadcastHandler(events.NewBroadcaster(ws.NewHub(nil))) + handler.Broadcast(c) + + if w.Code != http.StatusOK { + t.Errorf("want 200 despite sender log error, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations: %v", err) + } +} + +// ─── Org-scoped recipient query tests (OFFSEC-015) ──────────────────────────── + +// TestBroadcast_OrgScopedRecipients verifies that a broadcast from Org-A does +// NOT reach workspaces belonging to Org-B. This is the core regression test +// for OFFSEC-015: the original query had no org filter, so a workspace in +// Org-A could broadcast to every non-removed workspace in the entire DB. +func TestBroadcast_OrgScopedRecipients(t *testing.T) { + mock := setupBroadcastDB(t) + broadcaster := newTestBroadcaster() + handler := NewBroadcastHandler(broadcaster) + + senderID := "00000000-0000-0000-0000-000000000001" // org-a-root + wsAChild := "00000000-0000-0000-0000-000000000002" + + // 1. Sender lookup + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(senderID). + WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Org-A Root", true)) + + // 2. Org root lookup — sender is its own root (parent_id = NULL) + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1"). + WithArgs(senderID). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID)) + + // 3. Org-scoped recipient query — MUST include org filter so ws-b-child is NOT included. + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )"). + WithArgs(senderID, senderID). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(wsAChild)) + + // Activity log inserts + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')"). + WithArgs(wsAChild, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')"). + WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: senderID}} + body := `{"message":"hello from org-a"}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Broadcast(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to unmarshal response: %v", err) + } + if resp["status"] != "sent" { + t.Errorf("expected status 'sent', got %v", resp["status"]) + } + // ws-b-child is in a DIFFERENT org — the org-scoped query MUST NOT include it. + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet mock expectations — cross-org workspace was included: %v", err) + } +} + +// TestBroadcast_OrgScoped_OrgRootSender verifies that when the sender IS the +// org root (parent_id = NULL), broadcasts still reach sibling workspaces. +func TestBroadcast_OrgScoped_OrgRootSender(t *testing.T) { + mock := setupBroadcastDB(t) + broadcaster := newTestBroadcaster() + handler := NewBroadcastHandler(broadcaster) + + senderID := "00000000-0000-0000-0000-000000000001" + siblingID := "00000000-0000-0000-0000-000000000002" + + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(senderID). + WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true)) + + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1"). + WithArgs(senderID). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID)) + + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )"). + WithArgs(senderID, senderID). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID)) + + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')"). + WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')"). + WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: senderID}} + body := `{"message":"hello siblings"}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Broadcast(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestBroadcast_OrgScoped_ChildWorkspaceSender verifies that a non-root child +// workspace can broadcast to siblings in the same org. +func TestBroadcast_OrgScoped_ChildWorkspaceSender(t *testing.T) { + mock := setupBroadcastDB(t) + broadcaster := newTestBroadcaster() + handler := NewBroadcastHandler(broadcaster) + + orgRootID := "00000000-0000-0000-0000-000000000001" + senderID := "00000000-0000-0000-0000-000000000002" + siblingID := "00000000-0000-0000-0000-000000000003" + + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(senderID). + WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Child Agent", true)) + + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1"). + WithArgs(senderID). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(orgRootID)) + + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )"). + WithArgs(orgRootID, senderID). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID)) + + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')"). + WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')"). + WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: senderID}} + body := `{"message":"child broadcasting"}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Broadcast(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestBroadcast_OrgScoped_SelfBroadcastExcluded verifies that broadcasting +// from a workspace does not send a broadcast_receive to the sender itself. +func TestBroadcast_OrgScoped_SelfBroadcastExcluded(t *testing.T) { + mock := setupBroadcastDB(t) + broadcaster := newTestBroadcaster() + handler := NewBroadcastHandler(broadcaster) + + senderID := "00000000-0000-0000-0000-000000000001" + peerID := "00000000-0000-0000-0000-000000000002" + + mock.ExpectQuery("SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'"). + WithArgs(senderID). + WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true)) + + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE id = $1 UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.id = c.parent_id ) SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1"). + WithArgs(senderID). + WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID)) + + // Recipient query MUST exclude sender via id != senderID + mock.ExpectQuery("WITH RECURSIVE org_chain AS ( SELECT id, parent_id, id AS root_id FROM workspaces WHERE parent_id IS NULL UNION ALL SELECT w.id, w.parent_id, c.root_id FROM workspaces w JOIN org_chain c ON w.parent_id = c.id ) SELECT c.id FROM org_chain c WHERE c.root_id = $1 AND c.id != $2 AND EXISTS ( SELECT 1 FROM workspaces w WHERE w.id = c.id AND w.status != 'removed' )"). + WithArgs(senderID, senderID). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(peerID)) + + // Peer receives broadcast_receive + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status) VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')"). + WithArgs(peerID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1)) + // Sender logs broadcast_sent (NOT broadcast_receive) + mock.ExpectExec("INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status) VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')"). + WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: senderID}} + body := `{"message":"no echo to self"}` + c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body)) + c.Request.Header.Set("Content-Type", "application/json") + + handler.Broadcast(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +}