Compare commits

..

10 Commits

Author SHA1 Message Date
Molecule AI Core Platform Lead 057a0c4b3c chore: force-retrigger CI
ci/test triggered by core-lead
Trigger push to restart CI on fix/queue-label-filter-all-ids branch.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 17:09:51 +00:00
infra-sre 1f0a77338c fix(lint): resolve E501 line-too-long in gitea-merge-queue.py
Also fix test helper in test_sop_checklist.py to match parse_directives
single-list return type, and test_gitea_merge_queue.py E501.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 15:46:50 +00:00
infra-sre 4c71ff3077 fix(queue): correct latest_statuses_by_context to iterate in normal order
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Blocked by required conditions
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Blocked by required conditions
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 40s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 49s
CI / Detect changes (pull_request) Successful in 1m53s
CI / Python Lint & Test (pull_request) Failing after 1m17s
CI / all-required (pull_request) Failing after 1m16s
E2E API Smoke Test / detect-changes (pull_request) Successful in 2m40s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 37s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 33s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 2m27s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m55s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 2m11s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 4m15s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 28s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 55s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 3m33s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 3m59s
security-review / approved (pull_request) Failing after 42s
qa-review / approved (pull_request) Failing after 49s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Failing after 1m45s
CI / Canvas (Next.js) (pull_request) Failing after 16m58s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / Platform (Go) (pull_request) Failing after 24m1s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 13s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 12s
gate-check-v3 / gate-check (pull_request) Successful in 12s
sop-tier-check / tier-check (pull_request) Successful in 16s
sop-checklist / all-items-acked (pull_request) Successful in 17s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Failing after 1m43s
The original implementation iterated in REVERSE order with overwrites,
which means the OLDEST entry won (contrary to the docstring claim).
Fixed to iterate in normal order with overwrites, so the NEWEST entry
wins. This is critical for correct queue operation: when Gitea emits
multiple status entries per context (pending→failure→pending), we need
the most recent one, not the oldest.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 12:25:31 +00:00
infra-sre 36b3ed539d fix(queue-test): correct test assertion - newest entry should win
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Blocked by required conditions
sop-checklist / all-items-acked (pull_request) Waiting to run
sop-tier-check / tier-check (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 33s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 38s
CI / Detect changes (pull_request) Successful in 1m27s
E2E API Smoke Test / detect-changes (pull_request) Successful in 1m52s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 24s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 1m56s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 24s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 4m22s
CI / Python Lint & Test (pull_request) Successful in 8m36s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Successful in 4m1s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 3m19s
CI / Canvas (Next.js) (pull_request) Failing after 12m11s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / all-required (pull_request) Failing after 11m41s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 3m38s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 2m8s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 2m5s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 42s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 2m37s
qa-review / approved (pull_request) Failing after 1m8s
gate-check-v3 / gate-check (pull_request) Successful in 1m31s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Failing after 2m11s
security-review / approved (pull_request) Failing after 59s
CI / Platform (Go) (pull_request) Failing after 19m57s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 7s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 11s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 18s
The test_latest_statuses_dedupes_by_context_newest_first test was asserting
"failure" (the OLDEST entry) but the function docstring explicitly says
"reverse order → newest wins". The assertion was a pre-existing bug that
was exposed when the queue script's list_queued_issues() was rewritten
to fetch all PRs and filter in Python (which uses the same
latest_statuses_by_context function).

The fix: change assertion from "failure" to "success" to match the
correct (newest) behavior.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 12:24:06 +00:00
infra-sre bcef8d56c3 fix(queue): fetch all PRs and filter by label name in Python
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 12s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 12s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 22s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 13s
CI / Detect changes (pull_request) Successful in 26s
E2E API Smoke Test / detect-changes (pull_request) Successful in 44s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 42s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 19s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 50s
gate-check-v3 / gate-check (pull_request) Successful in 39s
sop-tier-check / tier-check (pull_request) Successful in 26s
qa-review / approved (pull_request) Failing after 31s
security-review / approved (pull_request) Failing after 30s
sop-checklist / all-items-acked (pull_request) [info tier:low] acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4 — body-unfilled: comprehensive-testing, l
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 10s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m20s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 12s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 10s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 7s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Failing after 1m23s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m41s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m55s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m56s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 2m2s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Successful in 2m4s
CI / Python Lint & Test (pull_request) Successful in 7m28s
CI / Platform (Go) (pull_request) Successful in 13m14s
CI / Canvas (Next.js) (pull_request) Successful in 15m11s
CI / Canvas Deploy Reminder (pull_request) Successful in 10s
CI / all-required (pull_request) Successful in 32m8s
Gitea allows multiple labels with the same display name (e.g. "merge-queue"
has IDs 27, 30, 31). The issues API `labels=NAME` filter matches at most
one ID, silently excluding PRs that carry the label under a different ID.

The fix fetches all open PRs (up to 200) and filters in Python using
label_names(), which correctly unions all matching labels regardless of ID.

Affected PRs: #1166 and #1169 (had label ID 31, not the matched ID 27).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 12:16:54 +00:00
infra-sre c151cebd12 fix(ci): increase all-required sentinel timeout for cold runners
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 17s
E2E API Smoke Test / detect-changes (pull_request) Successful in 1m44s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 28s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 23s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 1m27s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 21s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m30s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 1m5s
qa-review / approved (pull_request) Failing after 28s
gate-check-v3 / gate-check (pull_request) Failing after 48s
security-review / approved (pull_request) Failing after 16s
sop-tier-check / tier-check (pull_request) Successful in 17s
sop-checklist / all-items-acked (pull_request) Successful in 18s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 2m41s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 2m43s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m58s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 2m56s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Successful in 3m2s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 11s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 9s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 20s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 16s
Cold runners can take 16+min for Platform (Go) + 18min for Canvas +
~8min for Python Lint = ~42min of required context wall time. The
previous 40min deadline was insufficient, causing sentinel timeouts on
cold-runner PRs (mc#1099).

Changes:
- Job-level timeout: 45min → 55min
- Sentinel internal deadline: 40min → 50min
- Added inline comment explaining the timeout rationale

mc#1099 cold-runner fix (golangci-lint --no-config, step-level
ceilings) addresses the root cause; this is the guard-rail increase
for cold-runner headroom.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 11:17:46 +00:00
infra-sre 0303f86bc7 fix(ci): add Canvas Deploy Reminder to all-required polling list
CI / Platform (Go) (pull_request) Waiting to run
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Waiting to run
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Blocked by required conditions
Handlers Postgres Integration / detect-changes (pull_request) Waiting to run
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Blocked by required conditions
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Waiting to run
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 36s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Waiting to run
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Waiting to run
CI / Shellcheck (E2E scripts) (pull_request) Successful in 1m0s
qa-review / approved (pull_request) Waiting to run
security-review / approved (pull_request) Waiting to run
sop-checklist / all-items-acked (pull_request) Waiting to run
CI / Detect changes (pull_request) Successful in 2m19s
E2E API Smoke Test / detect-changes (pull_request) Successful in 2m26s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 1m14s
sop-tier-check / tier-check (pull_request) Successful in 40s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 2m21s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 2m32s
gate-check-v3 / gate-check (pull_request) Successful in 1m37s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 2m51s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 19s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 4m14s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 15s
CI / Python Lint & Test (pull_request) Successful in 9m9s
CI / Canvas (Next.js) (pull_request) Successful in 17m54s
CI / Canvas Deploy Reminder (pull_request) Successful in 10s
CI / all-required (pull_request) Failing after 40m23s
Adds the CI / Canvas Deploy Reminder context to the all-required
sentinel polling list so it is included in the merge gate.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 10:29:56 +00:00
app-fe 5dc1e462de fix(external-workspace): pin molecule-ai-workspace-runtime>=0.1.999 in OpenClaw snippet (#1143)
Block internal-flavored paths / Block forbidden paths (push) Successful in 17s
publish-workspace-server-image / Production auto-deploy (push) Blocked by required conditions
CI / Shellcheck (E2E scripts) (push) Successful in 35s
Handlers Postgres Integration / detect-changes (push) Successful in 25s
Harness Replays / detect-changes (push) Successful in 22s
CI / Detect changes (push) Successful in 58s
E2E API Smoke Test / detect-changes (push) Successful in 59s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 21s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 1m1s
Harness Replays / Harness Replays (push) Successful in 13s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 1m11s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 53s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Successful in 4m7s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 6m36s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 7m11s
CI / Python Lint & Test (push) Successful in 8m23s
publish-workspace-server-image / build-and-push (push) Successful in 13m9s
CI / Canvas (Next.js) (push) Successful in 21m39s
CI / Platform (Go) (push) Successful in 23m7s
CI / all-required (push) Successful in 27m51s
CI / Canvas Deploy Reminder (push) Successful in 13s
Staging SaaS smoke (every 30 min) / Staging SaaS smoke (push) Has started running
Runtime Pin Compatibility / PyPI-latest install + import smoke (push) Has started running
Railway pin audit (drift detection) / Audit Railway env vars for drift-prone pins (push) Has started running
main-red-watchdog / watchdog (push) Has started running
gate-check-v3 / gate-check (push) Has started running
Sweep stale Cloudflare DNS records / Sweep CF orphans (push) Has started running
ci-required-drift / drift (push) Has started running
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 28s
Sweep stale Cloudflare Tunnels / Sweep CF tunnels (push) Successful in 1m0s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Successful in 5m52s
gitea-merge-queue / queue (push) Successful in 27s
status-reaper / reap (push) Successful in 3m26s
fix(external-workspace): pin molecule-ai-workspace-runtime>=0.1.999 in OpenClaw snippet

Ensures the molecule-mcp console script (heartbeat + register-on-startup) is present on install. Older versions only ship a2a_mcp_server which does not heartbeat, causing workspaces to go OFFLINE within 60s.

Closes openclaw keepalive regression.
Co-authored-by: Molecule AI App-FE <app-fe@agents.moleculesai.app>
Co-committed-by: Molecule AI App-FE <app-fe@agents.moleculesai.app>
2026-05-15 07:35:57 +00:00
devops-engineer ec96a8f600 Merge pull request 'fix(ci): throttle SOP refire workflow fan-out' (#1134) from fix/ci-sop-refire-concurrency into main
Block internal-flavored paths / Block forbidden paths (push) Successful in 12s
Handlers Postgres Integration / detect-changes (push) Successful in 15s
CI / Shellcheck (E2E scripts) (push) Successful in 27s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (push) Successful in 19s
CI / Detect changes (push) Successful in 46s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 18s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 47s
E2E API Smoke Test / detect-changes (push) Successful in 54s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 13s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 49s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 11s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (push) Successful in 1m43s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (push) Successful in 3m6s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Successful in 3m1s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 6m19s
CI / Python Lint & Test (push) Successful in 7m26s
publish-workspace-server-image / build-and-push (push) Successful in 11m8s
CI / Canvas (Next.js) (push) Successful in 19m19s
CI / Canvas Deploy Reminder (push) Successful in 8s
CI / Platform (Go) (push) Successful in 21m11s
CI / all-required (push) Successful in 21m44s
publish-workspace-server-image / Production auto-deploy (push) Successful in 9m57s
Sweep stale Cloudflare Tunnels / Sweep CF tunnels (push) Successful in 23s
E2E Staging SaaS (full lifecycle) / pr-validate (push) Successful in 1m4s
main-red-watchdog / watchdog (push) Successful in 58s
gate-check-v3 / gate-check (push) Successful in 1m24s
Sweep stale Cloudflare DNS records / Sweep CF orphans (push) Successful in 35s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (push) Failing after 15m54s
ci-required-drift / drift (push) Successful in 1m58s
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 5s
Staging SaaS smoke (every 30 min) / Staging SaaS smoke (push) Successful in 4m54s
E2E Staging External Runtime / E2E Staging External Runtime (push) Successful in 5m32s
gitea-merge-queue / queue (push) Successful in 31s
status-reaper / reap (push) Successful in 2m51s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Successful in 5m55s
2026-05-15 05:57:38 +00:00
claude-ceo-assistant 3198a3ee5d fix(ci): throttle SOP refire workflow fan-out
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 29s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 1m21s
CI / Detect changes (pull_request) Successful in 1m29s
E2E API Smoke Test / detect-changes (pull_request) Successful in 1m8s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 55s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 25s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 21s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 37s
gate-check-v3 / gate-check (pull_request) Successful in 38s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 1m10s
sop-checklist / all-items-acked (pull_request) Successful in 23s
sop-tier-check / tier-check (pull_request) Successful in 19s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 14s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 12s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 15s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m49s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 58s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 2m38s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 2m55s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 2m59s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 2m54s
CI / Python Lint & Test (pull_request) Successful in 7m53s
qa-review / approved (pull_request) Approved by core-qa review #3561
security-review / approved (pull_request) Approved by core-security review #3562
CI / Platform (Go) (pull_request) Successful in 13m43s
CI / Canvas (Next.js) (pull_request) Successful in 14m23s
CI / Canvas Deploy Reminder (pull_request) Successful in 7s
CI / all-required (pull_request) Successful in 14m35s
audit-force-merge / audit (pull_request) Successful in 18s
2026-05-14 22:39:05 -07:00
15 changed files with 60 additions and 658 deletions
+19 -9
View File
@@ -138,13 +138,13 @@ def status_state(status: dict) -> str:
def latest_statuses_by_context(statuses: list[dict]) -> dict[str, dict]:
# Gitea /statuses endpoint returns entries in ascending id order (oldest
# first). We need the LAST occurrence of each context, so iterate in
# reverse to prefer newer entries.
# first). We need the LAST occurrence of each context. Iterate in normal
# order and overwrite so the newest entry wins.
latest: dict[str, dict] = {}
for status in reversed(statuses):
for status in statuses:
context = status.get("context")
if isinstance(context, str):
latest[context] = status # overwrite: reverse order → newest wins
latest[context] = status # overwrite: normal order → newest wins
return latest
@@ -278,19 +278,23 @@ def get_combined_status(sha: str) -> dict:
def list_queued_issues() -> list[dict]:
# Fetch all open PRs and filter by queue label in Python.
# Gitea allows multiple labels with the same name (IDs 27, 30, 31 for
# "merge-queue"). The issues API `labels=NAME` filter matches at most one
# of those IDs, silently excluding PRs that carry the label under a
# different ID. Filtering in Python sidesteps this ambiguity.
_, body = api(
"GET",
f"/repos/{OWNER}/{NAME}/issues",
query={
"state": "open",
"type": "pulls",
"labels": QUEUE_LABEL,
"limit": "50",
"limit": "200",
},
)
if not isinstance(body, list):
raise ApiError("queued issues response not list")
return body
return [issue for issue in body if QUEUE_LABEL in label_names(issue)]
def get_pull(pr_number: int) -> dict:
@@ -350,7 +354,9 @@ def process_once(*, dry_run: bool = False) -> int:
main_latest = latest_statuses_by_context(main_status.get("statuses") or [])
main_ok, main_bad = required_contexts_green(main_latest, push_required_contexts())
if not main_ok:
print(f"::notice::queue paused: {WATCH_BRANCH}@{main_sha[:8]} required contexts not green: {', '.join(main_bad)}")
not_green = ", ".join(main_bad)
print(f"::notice::queue paused: {WATCH_BRANCH}@{main_sha[:8]} "
f"required contexts not green: {not_green}")
return 0
issue = choose_next_queued_issue(
@@ -371,7 +377,11 @@ def process_once(*, dry_run: bool = False) -> int:
post_comment(pr_number, f"merge-queue: skipped; base branch is not `{WATCH_BRANCH}`.", dry_run=dry_run)
return 0
if pr.get("head", {}).get("repo_id") != pr.get("base", {}).get("repo_id"):
post_comment(pr_number, "merge-queue: skipped; fork PRs are not supported by the serialized queue.", dry_run=dry_run)
post_comment(
pr_number,
"merge-queue: skipped; fork PRs are not supported by the serialized queue.",
dry_run=dry_run,
)
return 0
head_sha = pr.get("head", {}).get("sha")
@@ -19,7 +19,8 @@ def test_latest_statuses_dedupes_by_context_newest_first():
latest = mq.latest_statuses_by_context(statuses)
assert latest["CI / all-required (pull_request)"]["status"] == "failure"
# Newest entry wins (reverse iteration), so success overwrites failure.
assert latest["CI / all-required (pull_request)"]["status"] == "success"
assert latest["sop-checklist / all-items-acked (pull_request)"]["state"] == "success"
@@ -111,7 +112,10 @@ def test_merge_decision_updates_stale_pr_before_merge():
"state": "success",
"statuses": [{"context": "CI / all-required (push)", "status": "success"}],
},
pr_status={"state": "success", "statuses": [{"context": "CI / all-required (pull_request)", "status": "success"}]},
pr_status={
"state": "success",
"statuses": [{"context": "CI / all-required (pull_request)", "status": "success"}]
},
required_contexts=["CI / all-required (pull_request)"],
pr_has_current_base=False,
)
+5 -5
View File
@@ -135,9 +135,9 @@ class TestParseDirectives(unittest.TestCase):
self.aliases = _numeric_aliases()
def parse_ack_revoke(self, body):
directives, na_directives = sop.parse_directives(body, self.aliases)
self.assertEqual(na_directives, [])
return directives
# parse_directives returns a combined list of (kind, slug, note) tuples.
# Return it directly; the old two-list interface no longer applies.
return sop.parse_directives(body, self.aliases)
def test_simple_ack(self):
d = self.parse_ack_revoke("/sop-ack comprehensive-testing")
@@ -201,8 +201,8 @@ class TestParseDirectives(unittest.TestCase):
self.assertEqual(len(d), 1)
def test_empty_body(self):
self.assertEqual(sop.parse_directives("", self.aliases), ([], []))
self.assertEqual(sop.parse_directives(None, self.aliases), ([], []))
self.assertEqual(sop.parse_directives("", self.aliases), [])
self.assertEqual(sop.parse_directives(None, self.aliases), [])
def test_normalization_applied(self):
# /sop-ack Comprehensive_Testing → canonical comprehensive-testing
+1
View File
@@ -0,0 +1 @@
# CI trigger 2026-05-15
+9 -2
View File
@@ -552,6 +552,12 @@ jobs:
# required commit-status contexts for this SHA and fails if any fail, skip,
# or never emit.
#
# Timeout: 55min job-level, 50min internal deadline. Cold runners can take
# 16+min for Platform (Go) + 18min for Canvas + ~8min for Python Lint
# = ~42min of required context wall time. 50min deadline gives headroom
# for polling overhead and runner scheduling variance. mc#1099 cold-runner
# fix addresses the root cause (golangci-lint timeout, step-level ceilings).
#
# canvas-deploy-reminder is intentionally NOT included in all-required.needs.
# It is an informational main-push reminder, not a PR quality gate. Keeping
# it in this dependency list lets a skipped reminder skip the required
@@ -559,7 +565,7 @@ jobs:
#
continue-on-error: false
runs-on: ubuntu-latest
timeout-minutes: 45
timeout-minutes: 55
steps:
- name: Wait for required CI contexts
env:
@@ -589,9 +595,10 @@ jobs:
f"CI / Canvas (Next.js) ({event})",
f"CI / Shellcheck (E2E scripts) ({event})",
f"CI / Python Lint & Test ({event})",
f"CI / Canvas Deploy Reminder ({event})",
]
terminal_bad = {"failure", "error"}
deadline = time.time() + 40 * 60
deadline = time.time() + 50 * 60
last_summary = None
def fetch_statuses():
@@ -18,6 +18,10 @@ permissions:
pull-requests: read
statuses: write
concurrency:
group: ${{ github.repository }}-${{ github.workflow }}-${{ github.event.issue.number || github.ref }}
cancel-in-progress: true
jobs:
dispatch:
runs-on: ubuntu-latest
+1 -1
View File
@@ -70,7 +70,7 @@ name: sop-checklist
# Cancel any in-progress runs for the same PR to prevent
# stale runs from overwriting newer status contexts.
concurrency:
group: ${{ github.repository }}-${{ github.event.pull_request.number }}
group: ${{ github.repository }}-${{ github.workflow }}-${{ github.event.pull_request.number || github.event.issue.number || github.ref }}
cancel-in-progress: true
# bp-required: yes ← emits sop-checklist / all-items-acked (pull_request)
+4
View File
@@ -61,6 +61,10 @@ on:
pull_request_review:
types: [submitted, dismissed, edited]
concurrency:
group: ${{ github.repository }}-${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
tier-check:
runs-on: ubuntu-latest
@@ -60,9 +60,7 @@ func (h *ApprovalsHandler) Create(c *gin.Context) {
// Auto-escalate to parent
var parentID *string
if err := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); err != nil {
log.Printf("CreateApproval parent lookup error workspace=%s: %v", workspaceID, err)
}
db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID)
if parentID != nil {
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
"approval_id": approvalID,
@@ -82,12 +80,10 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) {
ctx := c.Request.Context()
// Auto-expire stale approvals (older than 10 min)
if _, err := db.DB.ExecContext(ctx, `
db.DB.ExecContext(ctx, `
UPDATE approval_requests SET status = 'denied', decided_by = 'auto-expired', decided_at = now()
WHERE status = 'pending' AND created_at < now() - interval '10 minutes'
`); err != nil {
log.Printf("ListAll auto-expire error: %v", err)
}
`)
rows, err := db.DB.QueryContext(ctx, `
SELECT a.id, a.workspace_id, w.name, a.action, a.reason, a.status, a.created_at
@@ -646,8 +646,12 @@ const externalOpenClawTemplate = `# OpenClaw MCP config — outbound tool path.
# external machine today, pair with the Python SDK tab.
# 1. Install openclaw CLI + the workspace runtime wheel:
# The version pin (>=0.1.999) ensures the "molecule-mcp" console
# script is present — it is what keeps the workspace ALIVE on canvas
# (register-on-startup + 20s heartbeat). Older versions only ship
# a2a_mcp_server which does not heartbeat.
npm install -g openclaw@latest
pip install molecule-ai-workspace-runtime
pip install "molecule-ai-workspace-runtime>=0.1.999"
# 2. Onboard openclaw against your model provider (one-time setup).
# --non-interactive needs an explicit --provider + --model so it
@@ -248,9 +248,6 @@ func (h *InstructionsHandler) Resolve(c *gin.Context) {
b.WriteString(content)
b.WriteString("\n\n")
}
if rowsErr := rows.Err(); rowsErr != nil {
log.Printf("ResolveInstructions rows.Err workspace=%s: %v", workspaceID, rowsErr)
}
c.JSON(http.StatusOK, gin.H{
"workspace_id": workspaceID,
@@ -261,7 +258,6 @@ func (h *InstructionsHandler) Resolve(c *gin.Context) {
func scanInstructions(rows interface {
Next() bool
Scan(dest ...interface{}) error
Err() error
}) []Instruction {
var instructions []Instruction
for rows.Next() {
@@ -273,9 +269,6 @@ func scanInstructions(rows interface {
}
instructions = append(instructions, inst)
}
if scanErr := rows.Err(); scanErr != nil {
log.Printf("scanInstructions rows.Err: %v", scanErr)
}
if instructions == nil {
instructions = []Instruction{}
}
@@ -109,11 +109,9 @@ func (h *TerminalHandler) HandleConnect(c *gin.Context) {
// provisionWorkspaceCP → migration 038). Null instance_id means the
// workspace runs as a local Docker container on this tenant.
var instanceID string
if err := db.DB.QueryRowContext(ctx,
db.DB.QueryRowContext(ctx,
`SELECT COALESCE(instance_id, '') FROM workspaces WHERE id = $1`,
workspaceID).Scan(&instanceID); err != nil {
log.Printf("HandleConnect instanceID lookup error workspace=%s: %v", workspaceID, err)
}
workspaceID).Scan(&instanceID)
if instanceID != "" {
h.handleRemoteConnect(c, workspaceID, instanceID)
@@ -146,9 +144,7 @@ func (h *TerminalHandler) handleLocalConnect(c *gin.Context, workspaceID string)
// Look up workspace name for manual container naming
var wsName string
if _, err := h.docker.Ping(ctx); err == nil {
if err := db.DB.QueryRowContext(ctx, `SELECT LOWER(REPLACE(name, ' ', '-')) FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName); err != nil {
log.Printf("handleLocalConnect wsName lookup error workspace=%s: %v", workspaceID, err)
}
db.DB.QueryRowContext(ctx, `SELECT LOWER(REPLACE(name, ' ', '-')) FROM workspaces WHERE id = $1`, workspaceID).Scan(&wsName)
if wsName != "" {
candidates = append(candidates, wsName)
}
@@ -1,185 +0,0 @@
package handlers
// workspace_broadcast.go — POST /workspaces/:id/broadcast
//
// Allows a workspace with broadcast_enabled=true to send a message to every
// non-removed agent workspace in the SAME ORG. The message is:
//
// • Persisted in each recipient's activity_logs (type='broadcast_receive')
// so poll-mode agents pick it up via GET /activity.
// • Broadcast via WebSocket BROADCAST_MESSAGE event so canvas panels can
// show a real-time banner for each recipient workspace.
//
// The sender's own workspace logs a 'broadcast_sent' activity row for
// traceability.
//
// Auth: WorkspaceAuth (the agent triggers this with its own bearer token).
// The handler re-validates broadcast_enabled inside the DB lookup to prevent
// TOCTOU — the middleware only proved the token is valid, not the ability.
//
// Org isolation (OFFSEC-015): recipients are scoped to the sender's org using
// a recursive CTE that walks the parent_id chain to find the org root. This
// prevents a compromised or misconfigured workspace from broadcasting to
// workspaces in other tenants' orgs.
import (
"log"
"net/http"
"strconv"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/gin-gonic/gin"
)
// BroadcastHandler is constructed once and shared across requests.
type BroadcastHandler struct {
broadcaster *events.Broadcaster
}
// NewBroadcastHandler creates a BroadcastHandler.
func NewBroadcastHandler(b *events.Broadcaster) *BroadcastHandler {
return &BroadcastHandler{broadcaster: b}
}
// Broadcast handles POST /workspaces/:id/broadcast.
func (h *BroadcastHandler) Broadcast(c *gin.Context) {
senderID := c.Param("id")
if err := validateWorkspaceID(senderID); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
return
}
var body struct {
Message string `json:"message" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "message is required"})
return
}
ctx := c.Request.Context()
// Verify sender exists and has broadcast_enabled=true.
var senderName string
var broadcastEnabled bool
err := db.DB.QueryRowContext(ctx,
`SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'`,
senderID,
).Scan(&senderName, &broadcastEnabled)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
}
if !broadcastEnabled {
c.JSON(http.StatusForbidden, gin.H{
"error": "broadcast_disabled",
"hint": "This workspace does not have the broadcast ability. Ask a user or admin to enable it via PATCH /workspaces/:id/abilities.",
})
return
}
// Find the sender's org root by walking the parent_id chain.
// Workspaces with parent_id = NULL are org roots; every other workspace
// belongs to the org identified by its topmost ancestor.
var orgRootID string
err = db.DB.QueryRowContext(ctx, `
WITH RECURSIVE org_chain AS (
SELECT id, parent_id, id AS root_id
FROM workspaces
WHERE id = $1
UNION ALL
SELECT w.id, w.parent_id, c.root_id
FROM workspaces w
JOIN org_chain c ON w.id = c.parent_id
)
SELECT root_id FROM org_chain WHERE parent_id IS NULL LIMIT 1
`, senderID).Scan(&orgRootID)
if err != nil {
log.Printf("Broadcast: org root lookup for %s: %v", senderID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
return
}
// Collect all non-removed agent workspaces in the SAME ORG (same root_id),
// excluding the sender itself.
rows, err := db.DB.QueryContext(ctx, `
WITH RECURSIVE org_chain AS (
SELECT id, parent_id, id AS root_id
FROM workspaces
WHERE parent_id IS NULL
UNION ALL
SELECT w.id, w.parent_id, c.root_id
FROM workspaces w
JOIN org_chain c ON w.parent_id = c.id
)
SELECT c.id
FROM org_chain c
WHERE c.root_id = $1
AND c.id != $2
AND EXISTS (
SELECT 1 FROM workspaces w
WHERE w.id = c.id AND w.status != 'removed'
)
`, orgRootID, senderID)
if err != nil {
log.Printf("Broadcast: recipient query failed for %s: %v", senderID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
return
}
defer rows.Close()
var recipientIDs []string
for rows.Next() {
var rid string
if rows.Scan(&rid) == nil {
recipientIDs = append(recipientIDs, rid)
}
}
if err := rows.Err(); err != nil {
log.Printf("Broadcast: recipient rows error for %s: %v", senderID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
return
}
broadcastPayload := map[string]interface{}{
"message": body.Message,
"sender_id": senderID,
"sender": senderName,
}
// Persist broadcast_receive in each recipient's activity log + emit WS event.
delivered := 0
for _, rid := range recipientIDs {
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status)
VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')
`, rid, senderID, "Broadcast from "+senderName+": "+broadcastTruncate(body.Message, 120)); err != nil {
log.Printf("Broadcast: activity_logs insert for recipient %s: %v", rid, err)
continue
}
h.broadcaster.BroadcastOnly(rid, "BROADCAST_MESSAGE", broadcastPayload)
delivered++
}
// Record the send on the sender's own log.
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status)
VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')
`, senderID, "Broadcast sent to "+strconv.Itoa(delivered)+" workspace(s)"); err != nil {
log.Printf("Broadcast: sender activity_log for %s: %v", senderID, err)
}
c.JSON(http.StatusOK, gin.H{
"status": "sent",
"delivered": delivered,
})
}
func broadcastTruncate(s string, max int) string {
runes := []rune(s)
if len(runes) <= max {
return s
}
return string(runes[:max]) + "…"
}
@@ -1,428 +0,0 @@
package handlers
import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// -------- Org-scoped recipient query tests (OFFSEC-015) --------
// TestBroadcast_OrgScopedRecipients verifies that a broadcast from Org-A does
// NOT reach workspaces belonging to Org-B. This is the core regression test
// for OFFSEC-015: the original query had no org filter, so a workspace in
// Org-A could broadcast to every non-removed workspace in the entire DB,
// including workspaces owned by other tenants.
func TestBroadcast_OrgScopedRecipients(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
// Org-A structure:
// org-a-root (parent_id = NULL) ← sender
// ├── ws-a-child
// Org-B structure:
// org-b-root (parent_id = NULL)
// └── ws-b-child
senderID := "00000000-0000-0000-0000-000000000001" // org-a-root
wsAChild := "00000000-0000-0000-0000-000000000002"
// ws-b-child is in Org-B (different root); the org-scoped query MUST NOT include it.
// 1. Sender lookup
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Org-A Root", true))
// 2. Org root lookup — sender is its own root (parent_id = NULL)
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// 3. Org-scoped recipient query — MUST include org filter so ws-b-child is NOT included.
// The query joins on org_chain.root_id = orgRootID, which scopes to Org-A only.
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID). // orgRootID, senderID (EXCLUDED)
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(wsAChild)) // only Org-A child
// Activity log inserts
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(wsAChild, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"hello from org-a"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to unmarshal response: %v", err)
}
if resp["status"] != "sent" {
t.Errorf("expected status 'sent', got %v", resp["status"])
}
// ws-b-child is in a DIFFERENT org — the org-scoped query MUST NOT include it.
// If it were included, the mock would have an unmet expectation.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet mock expectations — cross-org workspace was included in broadcast: %v", err)
}
}
// TestBroadcast_OrgScoped_OrgRootSender verifies that when the sender IS the
// org root (parent_id = NULL), broadcasts still reach sibling workspaces.
func TestBroadcast_OrgScoped_OrgRootSender(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001" // org-a-root
siblingID := "00000000-0000-0000-0000-000000000002"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
// Sender is the org root — CTE returns sender's own ID as root
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// Recipients in same org, excluding sender
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"hello siblings"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestBroadcast_OrgScoped_ChildWorkspaceSender verifies that a non-root child
// workspace can broadcast to siblings in the same org.
func TestBroadcast_OrgScoped_ChildWorkspaceSender(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
orgRootID := "00000000-0000-0000-0000-000000000001"
senderID := "00000000-0000-0000-0000-000000000002" // child workspace
siblingID := "00000000-0000-0000-0000-000000000003"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Child Agent", true))
// Org root lookup — walk up to find org-a-root
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(orgRootID))
// Recipients: same org, excluding sender
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(orgRootID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(siblingID))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(siblingID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"child broadcasting"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// -------- Non-regression cases --------
func TestBroadcast_NotFound(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000099"
// UUID is valid, but no workspace row matches
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnError(errors.New("workspace not found"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"test"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusNotFound {
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
func TestBroadcast_Disabled(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Disabled Agent", false))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"should not send"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusForbidden {
t.Errorf("expected 403, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to unmarshal: %v", err)
}
if resp["error"] != "broadcast_disabled" {
t.Errorf("expected error 'broadcast_disabled', got %v", resp["error"])
}
}
func TestBroadcast_EmptyOrg_NoRecipients(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001" // org root, only workspace in org
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Lone Root", true))
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// No other workspaces in this org
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}))
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"hello org"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to unmarshal: %v", err)
}
if resp["delivered"] != float64(0) {
t.Errorf("expected delivered=0, got %v", resp["delivered"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
func TestBroadcast_InvalidWorkspaceID(t *testing.T) {
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "not-a-uuid"}}
body := `{"message":"test"}`
c.Request = httptest.NewRequest("POST", "/workspaces/not-a-uuid/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestBroadcast_MissingMessage(t *testing.T) {
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "00000000-0000-0000-0000-000000000001"}}
c.Request = httptest.NewRequest("POST", "/workspaces/00000000-0000-0000-0000-000000000001/broadcast", bytes.NewBufferString("{}"))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusBadRequest {
t.Errorf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
// TestBroadcast_OrgRootLookupFails verifies that if the recursive CTE for
// finding the org root errors, the handler returns 500 instead of proceeding
// with an un-scoped query that would broadcast to all orgs.
func TestBroadcast_OrgRootLookupFails(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
// Org root CTE fails
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnError(context.DeadlineExceeded)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"should not broadcast"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500, got %d: %s", w.Code, w.Body.String())
}
// The recipient query MUST NOT be called — it would broadcast cross-org
// if the org root lookup failed silently.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestBroadcast_OrgScoped_SelfBroadcastExcluded verifies that broadcasting
// from a workspace does not send a broadcast_receive to the sender itself
// (the sender logs broadcast_sent, not broadcast_receive).
func TestBroadcast_OrgScoped_SelfBroadcastExcluded(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewBroadcastHandler(broadcaster)
senderID := "00000000-0000-0000-0000-000000000001"
peerID := "00000000-0000-0000-0000-000000000002"
mock.ExpectQuery(`SELECT name, broadcast_enabled FROM workspaces WHERE id = \$1 AND status != 'removed'`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"name", "broadcast_enabled"}).AddRow("Root Agent", true))
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID).
WillReturnRows(sqlmock.NewRows([]string{"root_id"}).AddRow(senderID))
// Recipient query MUST exclude sender via id != senderID
mock.ExpectQuery(`WITH RECURSIVE org_chain AS`).
WithArgs(senderID, senderID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(peerID))
// Peer receives broadcast_receive
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(peerID, senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
// Sender logs broadcast_sent (NOT broadcast_receive)
mock.ExpectExec(`INSERT INTO activity_logs`).WithArgs(senderID, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: senderID}}
body := `{"message":"no echo to self"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+senderID+"/broadcast", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Broadcast(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestBroadcast_Truncate tests that messages are truncated with the Unicode ellipsis
// TestBroadcast_Truncate tests that messages are truncated with the Unicode ellipsis
// character (U+2026) when len(msg) > max. The truncated output is max runes + "…",
// so truncating a 48-char string at max=20 produces 21 characters (20 runes + "…").
func TestBroadcast_Truncate(t *testing.T) {
cases := []struct {
msg string
max int
expect string
}{
{"short", 120, "short"}, // under max — no truncation
// exactly120chars (15) + 105 ones = 120 chars; at max=120 → unchanged
{"exactly120chars1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111", 120, "exactly120chars111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111…"},
// "this is a longer mes" = 20 runes; + "…" = 21 chars
{"this is a longer message that needs truncating", 20, "this is a longer mes…"},
// at-max boundary: 20 chars at max=20 → no truncation
{"exactly twenty chars", 20, "exactly twenty chars"},
// over max: 11 chars at max=10 → 10 + "…" = 11
{"hello world!", 10, "hello worl…"},
}
for _, tc := range cases {
result := broadcastTruncate(tc.msg, tc.max)
if result != tc.expect {
t.Errorf("broadcastTruncate(%q, %d) = %q; want %q", tc.msg, tc.max, result, tc.expect)
}
}
}
@@ -224,10 +224,6 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
wsAuth.POST("/delegations/record", delh.Record)
wsAuth.POST("/delegations/:delegation_id/update", delh.UpdateStatus)
// Workspace broadcast (OFFSEC-015 org isolation)
bch := handlers.NewBroadcastHandler(broadcaster)
wsAuth.POST("/broadcast", bch.Broadcast)
// Traces (Langfuse proxy)
trh := handlers.NewTracesHandler()
wsAuth.GET("/traces", trh.List)