Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8538d8ef8c | |||
| 6b11830a98 | |||
| bf719f7570 | |||
| 66d98074ef | |||
| fb9c373a4b | |||
| 9871e7b3c3 | |||
| 6498ed758b |
@@ -18,10 +18,6 @@ permissions:
|
||||
pull-requests: read
|
||||
statuses: write
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.repository }}-${{ github.workflow }}-${{ github.event.issue.number || github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
dispatch:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
@@ -70,7 +70,7 @@ name: sop-checklist
|
||||
# Cancel any in-progress runs for the same PR to prevent
|
||||
# stale runs from overwriting newer status contexts.
|
||||
concurrency:
|
||||
group: ${{ github.repository }}-${{ github.workflow }}-${{ github.event.pull_request.number || github.event.issue.number || github.ref }}
|
||||
group: ${{ github.repository }}-${{ github.event.pull_request.number }}
|
||||
cancel-in-progress: true
|
||||
|
||||
# bp-required: yes ← emits sop-checklist / all-items-acked (pull_request)
|
||||
|
||||
@@ -61,10 +61,6 @@ on:
|
||||
pull_request_review:
|
||||
types: [submitted, dismissed, edited]
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.repository }}-${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
tier-check:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
@@ -63,31 +63,6 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSessionSearch_DBError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
mock.ExpectQuery("WITH session_items AS").
|
||||
WillReturnError(context.DeadlineExceeded)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-123/session-search?q=test", bytes.NewBufferString(""))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-123"}}
|
||||
|
||||
handler.SessionSearch(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500 on DB error, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Activity List source filter ----------
|
||||
|
||||
func TestActivityList_SourceCanvas(t *testing.T) {
|
||||
|
||||
@@ -60,7 +60,9 @@ func (h *ApprovalsHandler) Create(c *gin.Context) {
|
||||
|
||||
// Auto-escalate to parent
|
||||
var parentID *string
|
||||
db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID)
|
||||
if err := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); err != nil {
|
||||
log.Printf("CreateApproval parent lookup error workspace=%s: %v", workspaceID, err)
|
||||
}
|
||||
if parentID != nil {
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
@@ -80,10 +82,12 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// Auto-expire stale approvals (older than 10 min)
|
||||
db.DB.ExecContext(ctx, `
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE approval_requests SET status = 'denied', decided_by = 'auto-expired', decided_at = now()
|
||||
WHERE status = 'pending' AND created_at < now() - interval '10 minutes'
|
||||
`)
|
||||
`); err != nil {
|
||||
log.Printf("ListAll auto-expire error: %v", err)
|
||||
}
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT a.id, a.workspace_id, w.name, a.action, a.reason, a.status, a.created_at
|
||||
|
||||
@@ -543,33 +543,6 @@ func TestDelegationRecord_RejectsInvalidUUID(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelegationRecord_DBInsertFails(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
h := NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnError(fmt.Errorf("connection refused"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "550e8400-e29b-41d4-a716-446655440000"}}
|
||||
body := `{"target_id":"550e8400-e29b-41d4-a716-446655440001","task":"hello","delegation_id":"del-xyz"}`
|
||||
c.Request = httptest.NewRequest("POST", "/delegations/record", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
h.Record(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500 on DB insert failure, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelegationUpdateStatus_CompletedInsertsResultRow(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
@@ -646,12 +646,8 @@ const externalOpenClawTemplate = `# OpenClaw MCP config — outbound tool path.
|
||||
# external machine today, pair with the Python SDK tab.
|
||||
|
||||
# 1. Install openclaw CLI + the workspace runtime wheel:
|
||||
# The version pin (>=0.1.999) ensures the "molecule-mcp" console
|
||||
# script is present — it is what keeps the workspace ALIVE on canvas
|
||||
# (register-on-startup + 20s heartbeat). Older versions only ship
|
||||
# a2a_mcp_server which does not heartbeat.
|
||||
npm install -g openclaw@latest
|
||||
pip install "molecule-ai-workspace-runtime>=0.1.999"
|
||||
pip install molecule-ai-workspace-runtime
|
||||
|
||||
# 2. Onboard openclaw against your model provider (one-time setup).
|
||||
# --non-interactive needs an explicit --provider + --model so it
|
||||
|
||||
@@ -248,6 +248,9 @@ func (h *InstructionsHandler) Resolve(c *gin.Context) {
|
||||
b.WriteString(content)
|
||||
b.WriteString("\n\n")
|
||||
}
|
||||
if rowsErr := rows.Err(); rowsErr != nil {
|
||||
log.Printf("ResolveInstructions rows.Err workspace=%s: %v", workspaceID, rowsErr)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"workspace_id": workspaceID,
|
||||
@@ -258,6 +261,7 @@ func (h *InstructionsHandler) Resolve(c *gin.Context) {
|
||||
func scanInstructions(rows interface {
|
||||
Next() bool
|
||||
Scan(dest ...interface{}) error
|
||||
Err() error
|
||||
}) []Instruction {
|
||||
var instructions []Instruction
|
||||
for rows.Next() {
|
||||
@@ -269,6 +273,9 @@ func scanInstructions(rows interface {
|
||||
}
|
||||
instructions = append(instructions, inst)
|
||||
}
|
||||
if scanErr := rows.Err(); scanErr != nil {
|
||||
log.Printf("scanInstructions rows.Err: %v", scanErr)
|
||||
}
|
||||
if instructions == nil {
|
||||
instructions = []Instruction{}
|
||||
}
|
||||
|
||||
@@ -109,9 +109,11 @@ func (h *TerminalHandler) HandleConnect(c *gin.Context) {
|
||||
// provisionWorkspaceCP → migration 038). Null instance_id means the
|
||||
// workspace runs as a local Docker container on this tenant.
|
||||
var instanceID string
|
||||
db.DB.QueryRowContext(ctx,
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT COALESCE(instance_id, '') FROM workspaces WHERE id = $1`,
|
||||
workspaceID).Scan(&instanceID)
|
||||
workspaceID).Scan(&instanceID); err != nil {
|
||||
log.Printf("HandleConnect instanceID lookup error workspace=%s: %v", workspaceID, err)
|
||||
}
|
||||
|
||||
if instanceID != "" {
|
||||
h.handleRemoteConnect(c, workspaceID, instanceID)
|
||||
@@ -144,7 +146,9 @@ func (h *TerminalHandler) handleLocalConnect(c *gin.Context, workspaceID string)
|
||||
// Look up workspace name for manual container naming
|
||||
var wsName string
|
||||
if _, err := h.docker.Ping(ctx); err == nil {
|
||||
db.DB.QueryRowContext(ctx, `SELECT LOWER(REPLACE(name, ' ', '-')) FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
|
||||
if err := db.DB.QueryRowContext(ctx, `SELECT LOWER(REPLACE(name, ' ', '-')) FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName); err != nil {
|
||||
log.Printf("handleLocalConnect wsName lookup error workspace=%s: %v", workspaceID, err)
|
||||
}
|
||||
if wsName != "" {
|
||||
candidates = append(candidates, wsName)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,185 @@
|
||||
package handlers
|
||||
|
||||
// workspace_broadcast.go — POST /workspaces/:id/broadcast
|
||||
//
|
||||
// Allows a workspace with broadcast_enabled=true to send a message to every
|
||||
// non-removed agent workspace in the SAME ORG. The message is:
|
||||
//
|
||||
// • Persisted in each recipient's activity_logs (type='broadcast_receive')
|
||||
// so poll-mode agents pick it up via GET /activity.
|
||||
// • Broadcast via WebSocket BROADCAST_MESSAGE event so canvas panels can
|
||||
// show a real-time banner for each recipient workspace.
|
||||
//
|
||||
// The sender's own workspace logs a 'broadcast_sent' activity row for
|
||||
// traceability.
|
||||
//
|
||||
// Auth: WorkspaceAuth (the agent triggers this with its own bearer token).
|
||||
// The handler re-validates broadcast_enabled inside the DB lookup to prevent
|
||||
// TOCTOU — the middleware only proved the token is valid, not the ability.
|
||||
//
|
||||
// Org isolation (OFFSEC-015): recipients are scoped to the sender's org using
|
||||
// a recursive CTE that walks the parent_id chain to find the org root. This
|
||||
// prevents a compromised or misconfigured workspace from broadcasting to
|
||||
// workspaces in other tenants' orgs.
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// BroadcastHandler is constructed once and shared across requests.
|
||||
type BroadcastHandler struct {
|
||||
broadcaster *events.Broadcaster
|
||||
}
|
||||
|
||||
// NewBroadcastHandler creates a BroadcastHandler.
|
||||
func NewBroadcastHandler(b *events.Broadcaster) *BroadcastHandler {
|
||||
return &BroadcastHandler{broadcaster: b}
|
||||
}
|
||||
|
||||
// Broadcast handles POST /workspaces/:id/broadcast.
|
||||
func (h *BroadcastHandler) Broadcast(c *gin.Context) {
|
||||
senderID := c.Param("id")
|
||||
if err := validateWorkspaceID(senderID); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
|
||||
return
|
||||
}
|
||||
|
||||
var body struct {
|
||||
Message string `json:"message" binding:"required"`
|
||||
}
|
||||
if err := c.ShouldBindJSON(&body); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "message is required"})
|
||||
return
|
||||
}
|
||||
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// Verify sender exists and has broadcast_enabled=true.
|
||||
var senderName string
|
||||
var broadcastEnabled bool
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'`,
|
||||
senderID,
|
||||
).Scan(&senderName, &broadcastEnabled)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
|
||||
return
|
||||
}
|
||||
if !broadcastEnabled {
|
||||
c.JSON(http.StatusForbidden, gin.H{
|
||||
"error": "broadcast_disabled",
|
||||
"hint": "This workspace does not have the broadcast ability. Ask a user or admin to enable it via PATCH /workspaces/:id/abilities.",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Find the sender's org root by walking the parent_id chain.
|
||||
// Workspaces with parent_id = NULL are org roots; every other workspace
|
||||
// belongs to the org identified by its topmost ancestor.
|
||||
var orgRootID string
|
||||
err = db.DB.QueryRowContext(ctx, `
|
||||
WITH RECURSIVE org_chain AS (
|
||||
SELECT id, parent_id, id AS root_id
|
||||
FROM workspaces
|
||||
WHERE id = $1
|
||||
UNION ALL
|
||||
SELECT w.id, w.parent_id, c.root_id
|
||||
FROM workspaces w
|
||||
JOIN org_chain c ON w.id = c.parent_id
|
||||
)
|
||||
SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1
|
||||
`, senderID).Scan(&orgRootID)
|
||||
if err != nil {
|
||||
log.Printf("Broadcast: org root lookup for %s: %v", senderID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
|
||||
return
|
||||
}
|
||||
|
||||
// Collect all non-removed agent workspaces in the SAME ORG (same root_id),
|
||||
// excluding the sender itself.
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
WITH RECURSIVE org_chain AS (
|
||||
SELECT id, parent_id, id AS root_id
|
||||
FROM workspaces
|
||||
WHERE parent_id IS NULL
|
||||
UNION ALL
|
||||
SELECT w.id, w.parent_id, c.root_id
|
||||
FROM workspaces w
|
||||
JOIN org_chain c ON w.parent_id = c.id
|
||||
)
|
||||
SELECT c.id
|
||||
FROM org_chain c
|
||||
WHERE c.root_id = $1
|
||||
AND c.id != $2
|
||||
AND EXISTS (
|
||||
SELECT 1 FROM workspaces w
|
||||
WHERE w.id = c.id AND w.status != 'removed'
|
||||
)
|
||||
`, orgRootID, senderID)
|
||||
if err != nil {
|
||||
log.Printf("Broadcast: recipient query failed for %s: %v", senderID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var recipientIDs []string
|
||||
for rows.Next() {
|
||||
var rid string
|
||||
if rows.Scan(&rid) == nil {
|
||||
recipientIDs = append(recipientIDs, rid)
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("Broadcast: recipient rows error for %s: %v", senderID, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
|
||||
return
|
||||
}
|
||||
|
||||
broadcastPayload := map[string]interface{}{
|
||||
"message": body.Message,
|
||||
"sender_id": senderID,
|
||||
"sender": senderName,
|
||||
}
|
||||
|
||||
// Persist broadcast_receive in each recipient's activity log + emit WS event.
|
||||
delivered := 0
|
||||
for _, rid := range recipientIDs {
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status)
|
||||
VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')
|
||||
`, rid, senderID, "Broadcast from "+senderName+": "+broadcastTruncate(body.Message, 120)); err != nil {
|
||||
log.Printf("Broadcast: activity_logs insert for recipient %s: %v", rid, err)
|
||||
continue
|
||||
}
|
||||
h.broadcaster.BroadcastOnly(rid, "BROADCAST_MESSAGE", broadcastPayload)
|
||||
delivered++
|
||||
}
|
||||
|
||||
// Record the send on the sender's own log.
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status)
|
||||
VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')
|
||||
`, senderID, "Broadcast sent to "+strconv.Itoa(delivered)+" workspace(s)"); err != nil {
|
||||
log.Printf("Broadcast: sender activity_log for %s: %v", senderID, err)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"status": "sent",
|
||||
"delivered": delivered,
|
||||
})
|
||||
}
|
||||
|
||||
func broadcastTruncate(s string, max int) string {
|
||||
runes := []rune(s)
|
||||
if len(runes) <= max {
|
||||
return s
|
||||
}
|
||||
return string(runes[:max]) + "…"
|
||||
}
|
||||
@@ -0,0 +1,428 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// -------- Org-scoped recipient query tests (OFFSEC-015) --------
|
||||
|
||||
// TestBroadcast_OrgScopedRecipients verifies that a broadcast from Org-A does
|
||||
// NOT reach workspaces belonging to Org-B. This is the core regression test
|
||||
// for OFFSEC-015: the original query had no org filter, so a workspace in
|
||||
// Org-A could broadcast to every non-removed workspace in the entire DB,
|
||||
// including workspaces owned by other tenants.
|
||||
func TestBroadcast_OrgScopedRecipients(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
// Org-A structure:
|
||||
// org-a-root (parent_id = NULL) ← sender
|
||||
// ├── ws-a-child
|
||||
// Org-B structure:
|
||||
// org-b-root (parent_id = NULL)
|
||||
// └── ws-b-child
|
||||
senderID := "00000000-0000-0000-0000-000000000001" // org-a-root
|
||||
wsAChild := "00000000-0000-0000-0000-000000000002"
|
||||
// ws-b-child is in Org-B (different root); the org-scoped query MUST NOT include it.
|
||||
|
||||
// 1. Sender lookup
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Org-A Root", true))
|
||||
|
||||
// 2. Org root lookup — sender is its own root (parent_id = NULL)
|
||||
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
|
||||
|
||||
// 3. Org-scoped recipient query — MUST include org filter so ws-b-child is NOT included.
|
||||
// The query joins on org_chain.root_id = orgRootID, which scopes to Org-A only.
|
||||
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
|
||||
WithArgs(senderID, senderID). // orgRootID, senderID (EXCLUDED)
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(wsAChild)) // only Org-A child
|
||||
|
||||
// Activity log inserts
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(wsAChild, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
body := `{"message":"hello from org-a"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal response: %v", err)
|
||||
}
|
||||
if resp["status"] != "sent" {
|
||||
t.Errorf("expected status 'sent', got %v", resp["status"])
|
||||
}
|
||||
// ws-b-child is in a DIFFERENT org — the org-scoped query MUST NOT include it.
|
||||
// If it were included, the mock would have an unmet expectation.
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet mock expectations — cross-org workspace was included in broadcast: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBroadcast_OrgScoped_OrgRootSender verifies that when the sender IS the
|
||||
// org root (parent_id = NULL), broadcasts still reach sibling workspaces.
|
||||
func TestBroadcast_OrgScoped_OrgRootSender(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001" // org-a-root
|
||||
siblingID := "00000000-0000-0000-0000-000000000002"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
|
||||
|
||||
// Sender is the org root — CTE returns sender's own ID as root
|
||||
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
|
||||
|
||||
// Recipients in same org, excluding sender
|
||||
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
|
||||
WithArgs(senderID, senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID))
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
body := `{"message":"hello siblings"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBroadcast_OrgScoped_ChildWorkspaceSender verifies that a non-root child
|
||||
// workspace can broadcast to siblings in the same org.
|
||||
func TestBroadcast_OrgScoped_ChildWorkspaceSender(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
orgRootID := "00000000-0000-0000-0000-000000000001"
|
||||
senderID := "00000000-0000-0000-0000-000000000002" // child workspace
|
||||
siblingID := "00000000-0000-0000-0000-000000000003"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Child Agent", true))
|
||||
|
||||
// Org root lookup — walk up to find org-a-root
|
||||
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(orgRootID))
|
||||
|
||||
// Recipients: same org, excluding sender
|
||||
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
|
||||
WithArgs(orgRootID, senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID))
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
body := `{"message":"child broadcasting"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// -------- Non-regression cases --------
|
||||
|
||||
func TestBroadcast_NotFound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000099"
|
||||
// UUID is valid, but no workspace row matches
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnError(errors.New("workspace not found"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
body := `{"message":"test"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcast_Disabled(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001"
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Disabled Agent", false))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
body := `{"message":"should not send"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusForbidden {
|
||||
t.Errorf("expected 403, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
if resp["error"] != "broadcast_disabled" {
|
||||
t.Errorf("expected error 'broadcast_disabled', got %v", resp["error"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcast_EmptyOrg_NoRecipients(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001" // org root, only workspace in org
|
||||
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Lone Root", true))
|
||||
|
||||
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
|
||||
|
||||
// No other workspaces in this org
|
||||
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
|
||||
WithArgs(senderID, senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}))
|
||||
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
body := `{"message":"hello org"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to unmarshal: %v", err)
|
||||
}
|
||||
if resp["delivered"] != float64(0) {
|
||||
t.Errorf("expected delivered=0, got %v", resp["delivered"])
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcast_InvalidWorkspaceID(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "not-a-uuid"}}
|
||||
body := `{"message":"test"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/not-a-uuid/broadcast", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestBroadcast_MissingMessage(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-000000000001"}}
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/00000000-0000-0000-0000-000000000001/broadcast", bytes.NewBufferString("{}"))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// TestBroadcast_OrgRootLookupFails verifies that if the recursive CTE for
|
||||
// finding the org root errors, the handler returns 500 instead of proceeding
|
||||
// with an un-scoped query that would broadcast to all orgs.
|
||||
func TestBroadcast_OrgRootLookupFails(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
|
||||
|
||||
// Org root CTE fails
|
||||
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
|
||||
WithArgs(senderID).
|
||||
WillReturnError(context.DeadlineExceeded)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
body := `{"message":"should not broadcast"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
// The recipient query MUST NOT be called — it would broadcast cross-org
|
||||
// if the org root lookup failed silently.
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBroadcast_OrgScoped_SelfBroadcastExcluded verifies that broadcasting
|
||||
// from a workspace does not send a broadcast_receive to the sender itself
|
||||
// (the sender logs broadcast_sent, not broadcast_receive).
|
||||
func TestBroadcast_OrgScoped_SelfBroadcastExcluded(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewBroadcastHandler(broadcaster)
|
||||
|
||||
senderID := "00000000-0000-0000-0000-000000000001"
|
||||
peerID := "00000000-0000-0000-0000-000000000002"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
|
||||
|
||||
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
|
||||
WithArgs(senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
|
||||
|
||||
// Recipient query MUST exclude sender via id != senderID
|
||||
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
|
||||
WithArgs(senderID, senderID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(peerID))
|
||||
|
||||
// Peer receives broadcast_receive
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(peerID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
// Sender logs broadcast_sent (NOT broadcast_receive)
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: senderID}}
|
||||
body := `{"message":"no echo to self"}`
|
||||
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
handler.Broadcast(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBroadcast_Truncate tests that messages are truncated with the Unicode ellipsis
|
||||
// TestBroadcast_Truncate tests that messages are truncated with the Unicode ellipsis
|
||||
// character (U+2026) when len(msg) > max. The truncated output is max runes + "…",
|
||||
// so truncating a 48-char string at max=20 produces 21 characters (20 runes + "…").
|
||||
func TestBroadcast_Truncate(t *testing.T) {
|
||||
cases := []struct {
|
||||
msg string
|
||||
max int
|
||||
expect string
|
||||
}{
|
||||
{"short", 120, "short"}, // under max — no truncation
|
||||
// exactly120chars (15) + 105 ones = 120 chars; at max=120 → unchanged
|
||||
{"exactly120chars1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111", 120, "exactly120chars111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111…"},
|
||||
// "this is a longer mes" = 20 runes; + "…" = 21 chars
|
||||
{"this is a longer message that needs truncating", 20, "this is a longer mes…"},
|
||||
// at-max boundary: 20 chars at max=20 → no truncation
|
||||
{"exactly twenty chars", 20, "exactly twenty chars"},
|
||||
// over max: 11 chars at max=10 → 10 + "…" = 11
|
||||
{"hello world!", 10, "hello worl…"},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
result := broadcastTruncate(tc.msg, tc.max)
|
||||
if result != tc.expect {
|
||||
t.Errorf("broadcastTruncate(%q, %d) = %q; want %q", tc.msg, tc.max, result, tc.expect)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -224,6 +224,10 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
wsAuth.POST("/delegations/record", delh.Record)
|
||||
wsAuth.POST("/delegations/:delegation_id/update", delh.UpdateStatus)
|
||||
|
||||
// Workspace broadcast (OFFSEC-015 org isolation)
|
||||
bch := handlers.NewBroadcastHandler(broadcaster)
|
||||
wsAuth.POST("/broadcast", bch.Broadcast)
|
||||
|
||||
// Traces (Langfuse proxy)
|
||||
trh := handlers.NewTracesHandler()
|
||||
wsAuth.GET("/traces", trh.List)
|
||||
|
||||
Reference in New Issue
Block a user