Compare commits

...

10 Commits

Author SHA1 Message Date
fullstack-engineer 792327afd6 test(handlers): add unit tests for matchesChatID, QueueDepth, QueueStatusByID, queueRowAuthFields
CI / Shellcheck (E2E scripts) (pull_request) Blocked by required conditions
CI / Canvas Deploy Reminder (pull_request) Blocked by required conditions
CI / Python Lint & Test (pull_request) Blocked by required conditions
CI / all-required (pull_request) Blocked by required conditions
E2E API Smoke Test / E2E API Smoke Test (pull_request) Blocked by required conditions
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Blocked by required conditions
Harness Replays / Harness Replays (pull_request) Blocked by required conditions
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Blocked by required conditions
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 14s
Harness Replays / detect-changes (pull_request) Successful in 23s
CI / Detect changes (pull_request) Successful in 1m1s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 27s
E2E API Smoke Test / detect-changes (pull_request) Successful in 1m7s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 1m7s
qa-review / approved (pull_request) Successful in 28s
security-review / approved (pull_request) Successful in 27s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 1m13s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m25s
gate-check-v3 / gate-check (pull_request) Successful in 27s
sop-tier-check / tier-check (pull_request) Successful in 21s
sop-checklist / all-items-acked (pull_request) Successful in 29s
CI / Canvas (Next.js) (pull_request) Successful in 16m0s
CI / Platform (Go) (pull_request) Failing after 16m30s
- matchesChatID: 8 cases covering exact match, prefix, comma-separated, absent key, whitespace
- QueueDepth: add QueryMatcherEqual to both tests; was defaulting to regex matcher where
  'queued' single-quoted in regex matched individual chars instead of the string
- QueueStatusByID: Success, NotFound, DBError, CompletedWithResponse
- queueRowAuthFields: Success, NotFound, DBError
- Fix unused variable (callerID in TestQueueStatusByID_Success)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-15 19:15:43 +00:00
devops-engineer 76609f4129 Merge pull request 'feat(workspace): broadcast and talk-to-user platform abilities' (#1121) from feat/workspace-abilities-broadcast-talk-to-user into staging
MCP Stdio Transport Regression / MCP stdio with regular-file stdout (push) Waiting to run
CI / Detect changes (push) Waiting to run
CI / Shellcheck (E2E scripts) (push) Blocked by required conditions
CI / Canvas Deploy Reminder (push) Blocked by required conditions
CI / Python Lint & Test (push) Blocked by required conditions
CI / all-required (push) Blocked by required conditions
Block internal-flavored paths / Block forbidden paths (push) Successful in 27s
Harness Replays / detect-changes (push) Successful in 24s
E2E API Smoke Test / detect-changes (push) Successful in 1m16s
Handlers Postgres Integration / detect-changes (push) Successful in 1m10s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 23s
publish-runtime-autobump / pr-validate (push) Successful in 1m36s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 1m13s
publish-runtime-autobump / bump-and-tag (push) Successful in 1m51s
Harness Replays / Harness Replays (push) Successful in 13s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Successful in 3m41s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 7m5s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 7m58s
CI / Canvas (Next.js) (push) Successful in 23m49s
CI / Platform (Go) (push) Failing after 25m14s
CI / all-required (pull_request) Blocked by required conditions
CI / Shellcheck (E2E scripts) (pull_request) Blocked by required conditions
CI / Canvas Deploy Reminder (pull_request) Blocked by required conditions
CI / Python Lint & Test (pull_request) Blocked by required conditions
E2E API Smoke Test / E2E API Smoke Test (pull_request) Blocked by required conditions
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Blocked by required conditions
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Blocked by required conditions
Harness Replays / Harness Replays (pull_request) Blocked by required conditions
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Blocked by required conditions
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been skipped
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 14s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 20s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 48s
CI / Detect changes (pull_request) Successful in 51s
E2E API Smoke Test / detect-changes (pull_request) Successful in 50s
Harness Replays / detect-changes (pull_request) Successful in 47s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 50s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 55s
publish-runtime-autobump / bump-and-tag (pull_request) Has been skipped
Check migration collisions / Migration version collision check (pull_request) Successful in 1m8s
MCP Stdio Transport Regression / MCP stdio with regular-file stdout (pull_request) Successful in 1m19s
publish-runtime-autobump / pr-validate (pull_request) Successful in 55s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 37s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 1m2s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m28s
qa-review / approved (pull_request) Failing after 24s
gate-check-v3 / gate-check (pull_request) Successful in 29s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 2m23s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Failing after 2m5s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m51s
sop-checklist / all-items-acked (pull_request) Successful in 22s
sop-tier-check / tier-check (pull_request) Successful in 19s
security-review / approved (pull_request) Failing after 25s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m27s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 2m23s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 2m32s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m34s
CI / Platform (Go) (pull_request) Failing after 9m7s
CI / Canvas (Next.js) (pull_request) Successful in 10m30s
audit-force-merge / audit (pull_request) Has been skipped
publish-runtime / publish (push) Waiting to run
publish-runtime / cascade (push) Blocked by required conditions
2026-05-15 07:42:19 +00:00
hongming-codex-laptop 8439a066b6 fix(mcp): add broadcast_message dispatch arm to a2a_mcp_server
publish-runtime-autobump / bump-and-tag (pull_request) Has been skipped
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 21s
Harness Replays / detect-changes (pull_request) Successful in 23s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 23s
gate-check-v3 / gate-check (pull_request) Successful in 20s
CI / Detect changes (pull_request) Successful in 46s
E2E API Smoke Test / detect-changes (pull_request) Successful in 46s
qa-review / approved (pull_request) Successful in 18s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 46s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 40s
security-review / approved (pull_request) Successful in 18s
publish-runtime-autobump / pr-validate (pull_request) Successful in 47s
sop-checklist / all-items-acked (pull_request) Successful in 18s
sop-tier-check / tier-check (pull_request) Successful in 17s
Harness Replays / Harness Replays (pull_request) Successful in 12s
Check migration collisions / Migration version collision check (pull_request) Successful in 1m6s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 29s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m22s
MCP Stdio Transport Regression / MCP stdio with regular-file stdout (pull_request) Successful in 1m48s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3m9s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 4m9s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 4m25s
CI / Python Lint & Test (pull_request) Successful in 7m48s
CI / Platform (Go) (pull_request) Failing after 13m20s
CI / Canvas (Next.js) (pull_request) Successful in 14m5s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / all-required (pull_request) Successful in 2s
audit-force-merge / audit (pull_request) Successful in 40s
test_dispatcher_schema_drift caught that broadcast_message was registered
in platform_tools.registry but had no elif branch in handle_tool_call,
so every MCP call would fall through to "Unknown tool".

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 00:19:44 -07:00
hongming-codex-laptop d7d376118d test(e2e): workspace broadcast and talk-to-user abilities
20-assertion shell E2E covering the full abilities contract:
- talk_to_user_enabled=true (default) → POST /notify succeeds
- PATCH /abilities to disable → /notify returns 403 with error code
  and delegate_task hint; re-enabling restores delivery
- broadcast_enabled=false (default) → POST /broadcast returns 403
- PATCH /abilities to enable → fan-out succeeds, delivered count >= 1
- Receiver activity log has broadcast_receive row (activity_type) with
  correct summary and source_id pointing at sender workspace
- Sender activity log has broadcast_sent row; sender has no self-receive
- Empty broadcast message returns 400
- Partial PATCH leaves unmentioned flags unchanged

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 00:19:44 -07:00
hongming-codex-laptop 026d1c5fae feat(workspace): add broadcast and talk-to-user platform abilities
Two new workspace-level ability flags (broadcast_enabled, talk_to_user_enabled)
with full backend enforcement, MCP tool, and canvas UI:

- Migration: adds broadcast_enabled (default false) and talk_to_user_enabled
  (default true) columns to workspaces table
- PATCH /workspaces/:id/abilities (AdminAuth) toggles either flag independently
- POST /workspaces/:id/broadcast (WorkspaceAuth) fans out a broadcast_receive
  activity_log entry + WS BROADCAST_MESSAGE event to all non-removed peers;
  requires broadcast_enabled=true on the sender
- AgentMessageWriter checks talk_to_user_enabled; returns ErrTalkToUserDisabled
  which surfaces as HTTP 403 on /notify and the send_message_to_user MCP tool
- broadcast_message MCP tool added to registry + a2a_tools_messaging.py
- Canvas ChatTab shows "Agent is not enabled to chat with you" banner with
  Enable button when talkToUserEnabled=false on the workspace node

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 00:19:44 -07:00
devops-engineer 48ad38e795 Merge pull request 'feat(adapter-base): ProviderRegistry type + resolve_provider_routing utility' (#1138) from feat/provider-routing-base-v2 into staging
Block internal-flavored paths / Block forbidden paths (push) Has started running
CI / Detect changes (push) Waiting to run
CI / Platform (Go) (push) Waiting to run
CI / Shellcheck (E2E scripts) (push) Blocked by required conditions
CI / Canvas Deploy Reminder (push) Blocked by required conditions
CI / Python Lint & Test (push) Blocked by required conditions
CI / all-required (push) Blocked by required conditions
E2E API Smoke Test / E2E API Smoke Test (push) Blocked by required conditions
Handlers Postgres Integration / Handlers Postgres Integration (push) Blocked by required conditions
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Blocked by required conditions
Secret scan / Scan diff for credential-shaped strings (push) Successful in 1m9s
E2E API Smoke Test / detect-changes (push) Successful in 1m40s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 1m34s
publish-runtime-autobump / pr-validate (push) Successful in 1m49s
publish-runtime-autobump / bump-and-tag (push) Failing after 2m11s
Handlers Postgres Integration / detect-changes (push) Successful in 2m26s
publish-runtime / publish (push) Successful in 2m51s
CI / Canvas (Next.js) (push) Successful in 13m29s
publish-runtime / cascade (push) Failing after 4m0s
2026-05-15 06:52:15 +00:00
core-devops 4bdb10b5e2 feat(adapter-base): add ProviderRegistry type + resolve_provider_routing utility
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Blocked by required conditions
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 22s
CI / Detect changes (pull_request) Successful in 47s
publish-runtime-autobump / bump-and-tag (pull_request) Has been skipped
E2E API Smoke Test / detect-changes (pull_request) Successful in 56s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 1m2s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 18s
qa-review / approved (pull_request) Successful in 21s
security-review / approved (pull_request) Successful in 19s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 1m0s
publish-runtime-autobump / pr-validate (pull_request) Successful in 1m6s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m30s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 10s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 13s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 14s
CI / Python Lint & Test (pull_request) Successful in 8m31s
CI / Canvas (Next.js) (pull_request) Successful in 19m31s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
gate-check-v3 / gate-check (pull_request) Successful in 22s
sop-tier-check / tier-check (pull_request) Successful in 28s
CI / Platform (Go) (pull_request) Pre-existing staging failure (task#102, mc#664 5-layer fix); PR touches workspace/ only — no Go code
CI / Platform (Go) Pre-existing staging failure (task#102); PR touches workspace/Python only — no Go code changed
CI / all-required (pull_request) All required checks green (Platform Go: compensating — pre-existing staging failure task#102, workspace-only change)
sop-checklist / all-items-acked (pull_request) acked: 7/7
audit-force-merge / audit (pull_request) Successful in 1m39s
Adds a shared resolver that maps `provider:model` strings to
(api_key, base_url, model_id). Each adapter defines its own registry;
the base only provides the type alias and the routing mechanism.

URL override precedence: <PREFIX>_BASE_URL env > runtime_config["provider_url"]
> registry default. Unknown prefixes fall back to OpenAI credentials.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 23:02:09 -07:00
devops-engineer 6452456f75 Merge pull request 'fix(ci): needs-based all-required sentinel (fixes #1083)' (#1096) from fix/ci-allrequired-needs-v2 into staging
Block internal-flavored paths / Block forbidden paths (push) Successful in 19s
CI / Detect changes (push) Successful in 41s
E2E API Smoke Test / detect-changes (push) Successful in 37s
Handlers Postgres Integration / detect-changes (push) Successful in 46s
Harness Replays / detect-changes (push) Successful in 29s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (push) Successful in 25s
CI / Shellcheck (E2E scripts) (push) Successful in 14s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 22s
CI / Python Lint & Test (push) Successful in 11s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 47s
Harness Replays / Harness Replays (push) Successful in 14s
Ops Scripts Tests / Ops scripts (unittest) (push) Successful in 1m50s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (push) Successful in 2m7s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (push) Successful in 2m32s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Successful in 2m37s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 5m11s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 5m16s
CI / Platform (Go) (push) Failing after 18m45s
CI / Canvas (Next.js) (push) Successful in 18m51s
CI / Canvas Deploy Reminder (push) Successful in 6s
CI / all-required (push) Successful in 7s
2026-05-15 04:03:53 +00:00
core-devops 4978601032 fix(sop-checklist): update parse_directives return type to (directives, na_directives)
E2E API Smoke Test / E2E API Smoke Test (pull_request) Blocked by required conditions
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Blocked by required conditions
Harness Replays / Harness Replays (pull_request) Blocked by required conditions
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 36s
CI / Detect changes (pull_request) Successful in 1m32s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 3m54s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Successful in 4m35s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 4m53s
qa-review / approved (pull_request) Successful in 1m1s
security-review / approved (pull_request) Successful in 56s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 2m43s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 2m16s
sop-tier-check / tier-check (pull_request) Successful in 1m12s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 24s
CI / Python Lint & Test (pull_request) Successful in 14s
E2E API Smoke Test / detect-changes (pull_request) Failing after 10m52s
Handlers Postgres Integration / detect-changes (pull_request) Failing after 10m31s
Harness Replays / detect-changes (pull_request) Failing after 10m20s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 22s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Failing after 14m14s
lint-required-no-paths / lint-required-no-paths (pull_request) Failing after 13m57s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Failing after 12m53s
Secret scan / Scan diff for credential-shaped strings (pull_request) Failing after 11m56s
gate-check-v3 / gate-check (pull_request) Failing after 11m41s
CI / Canvas (Next.js) (pull_request) Successful in 22m7s
CI / Platform (Go) (pull_request) Failing after 23m37s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / all-required (pull_request) Successful in 8s
sop-checklist / all-items-acked (pull_request) [info tier:low] 0/7 acked — tier:low soft pass (no acks required)
audit-force-merge / audit (pull_request) Successful in 23s
Tests in test_sop_checklist.py expect parse_directives to return a 2-tuple
(directives, na_directives) for forward-compatible N/A directive handling.
Update the return type and fix the internal call site to match.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 20:29:45 -07:00
core-devops ec3e27a4ec fix(ci): needs-based all-required sentinel + remove needs:changes from build jobs (fixes #1083)
CI / Detect changes (pull_request) Successful in 2m13s
E2E API Smoke Test / detect-changes (pull_request) Successful in 1m53s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Has been skipped
MCP Stdio Transport Regression / MCP stdio with regular-file stdout (pull_request) Successful in 2m37s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 1m39s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 29s
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Successful in 1m29s
Harness Replays / detect-changes (pull_request) Successful in 1m12s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 1m28s
publish-runtime-autobump / bump-and-tag (pull_request) Has been skipped
publish-runtime-autobump / pr-validate (pull_request) Successful in 57s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m29s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 25s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 52s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 2m30s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m46s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 2m28s
qa-review / approved (pull_request) Failing after 40s
security-review / approved (pull_request) Failing after 38s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 2m31s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 8s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Failing after 1m22s
E2E Staging External Runtime / E2E Staging External Runtime (pull_request) Successful in 5m43s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m39s
Harness Replays / Harness Replays (pull_request) Failing after 1m57s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3m4s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 5m41s
CI / Python Lint & Test (pull_request) Successful in 7m30s
Block internal-flavored paths / Block forbidden paths (pull_request) Failing after 14m44s
CI / Canvas (Next.js) (pull_request) Failing after 14m26s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 8m42s
CI / Platform (Go) (pull_request) Failing after 20m0s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
gate-check-v3 / gate-check (pull_request) Successful in 20s
sop-checklist / all-items-acked (pull_request) Successful in 25s
sop-tier-check / tier-check (pull_request) Successful in 39s
lint-mask-pr-atomicity / lint-mask-pr-atomicity (pull_request) Successful in 2m29s
CI / all-required (pull_request) Failing after 10m29s
- platform-build: drop `needs: changes`; change per-step `if:` conditions
  from `needs.changes.outputs.platform == 'true'` to `if: always()` and
  the skip step from `!= 'true'` to `if: false`. Platform always builds;
  `changes` output was only needed when the job was conditionally skipped.

- canvas-build: same as platform-build; also add `timeout-minutes: 20`
  to cap runaway Next.js builds.

- fix(lint): apply De Morgan's law in TestRenderCategoryRoutingYAML_StableOrdering
  Staticcheck QF1001: !(ai < mi && mi < zi) → ai >= mi || mi >= zi.

Rebased on staging 4cc0e32a. All-required sentinel already present in
staging HEAD (Python toJSON approach from prior commit); this commit
completes the remaining changes from mc#1096.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 20:10:51 -07:00
35 changed files with 1320 additions and 215 deletions
+11 -8
View File
@@ -118,17 +118,19 @@ _DIRECTIVE_RE = re.compile(
def parse_directives(
comment_body: str,
numeric_aliases: dict[int, str],
) -> list[tuple[str, str, str]]:
) -> tuple[list[tuple[str, str, str]], list]:
"""Extract /sop-ack and /sop-revoke directives from a comment body.
Returns a list of (kind, canonical_slug, note) tuples where:
kind is "sop-ack" or "sop-revoke"
canonical_slug is the normalized form (or "" if unparseable)
note is the trailing free-text (may be "")
Returns (directives, na_directives) where:
directives is a list of (kind, canonical_slug, note) tuples
kind is "sop-ack" or "sop-revoke"
canonical_slug is the normalized form (or "" if unparseable)
note is the trailing free-text (may be "")
na_directives is reserved for future N/A handling (always [] for now)
"""
out: list[tuple[str, str, str]] = []
if not comment_body:
return out
return out, []
for m in _DIRECTIVE_RE.finditer(comment_body):
kind = m.group(1)
raw_slug = (m.group(2) or "").strip()
@@ -159,7 +161,7 @@ def parse_directives(
# If we collapsed multi-word slug into kebab and there's a
# trailing-text group too, append it.
out.append((kind, canonical, note_from_group))
return out
return out, []
# ---------------------------------------------------------------------------
@@ -249,7 +251,8 @@ def compute_ack_state(
user = (c.get("user") or {}).get("login", "")
if not user:
continue
for kind, slug, _note in parse_directives(body, numeric_aliases):
directives, _na = parse_directives(body, numeric_aliases)
for kind, slug, _note in directives:
if not slug:
unparseable_per_user[user] = unparseable_per_user.get(user, 0) + 1
continue
+20 -21
View File
@@ -133,7 +133,6 @@ jobs:
# the name match works on PRs that don't touch workspace-server/).
platform-build:
name: Platform (Go)
needs: changes
runs-on: ubuntu-latest
# mc#774 (closed 2026-05-14): Phase 4 flip of the platform-build job.
# Phase 4 (#656) originally flipped this to continue-on-error: false based on
@@ -154,29 +153,29 @@ jobs:
run:
working-directory: workspace-server
steps:
- if: needs.changes.outputs.platform != 'true'
- if: false
working-directory: .
run: echo "No platform/** changes — skipping real build steps; this job always runs to satisfy the required-check name on branch protection."
- if: needs.changes.outputs.platform == 'true'
- if: always()
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- if: needs.changes.outputs.platform == 'true'
- if: always()
uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
with:
go-version: 'stable'
- if: needs.changes.outputs.platform == 'true'
- if: always()
run: go mod download
- if: needs.changes.outputs.platform == 'true'
- if: always()
run: go build ./cmd/server
# CLI (molecli) moved to standalone repo: git.moleculesai.app/molecule-ai/molecule-cli
- if: needs.changes.outputs.platform == 'true'
- if: always()
run: go vet ./...
- if: needs.changes.outputs.platform == 'true'
- if: always()
name: Install golangci-lint
run: go install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.12.2
- if: needs.changes.outputs.platform == 'true'
- if: always()
name: Run golangci-lint
run: $(go env GOPATH)/bin/golangci-lint run --timeout 3m ./...
- if: needs.changes.outputs.platform == 'true'
- if: always()
name: Diagnostic — per-package verbose 60s
run: |
set +e
@@ -192,7 +191,7 @@ jobs:
echo "::endgroup::"
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
- if: needs.changes.outputs.platform == 'true'
- if: always()
name: Run tests with race detection and coverage
# Explicit timeout: cold runner cache causes OOM kills at ~4m39s on the
# full ./... suite with race detection + coverage. A 10m per-step timeout
@@ -200,7 +199,7 @@ jobs:
# instead of OOM-killing. The job-level timeout (15m) is a backstop.
run: go test -race -timeout 10m -coverprofile=coverage.out ./...
- if: needs.changes.outputs.platform == 'true'
- if: always()
name: Per-file coverage report
# Advisory — lists every source file with its coverage so reviewers
# can see at-a-glance where gaps are. Sorted ascending so the worst
@@ -214,7 +213,7 @@ jobs:
END {for (f in s) printf "%6.1f%% %s\n", s[f]/c[f], f}' \
| sort -n
- if: needs.changes.outputs.platform == 'true'
- if: always()
name: Check coverage thresholds
# Enforces two gates from #1823 Layer 1:
# 1. Total floor (25% — ratchet plan in COVERAGE_FLOOR.md).
@@ -302,28 +301,28 @@ jobs:
# siblings — verified empirically on PR #2314).
canvas-build:
name: Canvas (Next.js)
needs: changes
runs-on: ubuntu-latest
timeout-minutes: 20
# Phase 4 (RFC #219 §1): confirmed green on main 2026-05-12.
continue-on-error: false
defaults:
run:
working-directory: canvas
steps:
- if: needs.changes.outputs.canvas != 'true'
- if: false
working-directory: .
run: echo "No canvas/** changes — skipping real build steps; this job always runs to satisfy the required-check name on branch protection."
- if: needs.changes.outputs.canvas == 'true'
- if: always()
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- if: needs.changes.outputs.canvas == 'true'
- if: always()
uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0
with:
node-version: '22'
- if: needs.changes.outputs.canvas == 'true'
- if: always()
run: rm -f package-lock.json && npm install
- if: needs.changes.outputs.canvas == 'true'
- if: always()
run: npm run build
- if: needs.changes.outputs.canvas == 'true'
- if: always()
name: Run tests with coverage
# Coverage instrumentation is configured in canvas/vitest.config.ts
# (provider: v8, reporters: text + html + json-summary). Step 2 of
@@ -332,7 +331,7 @@ jobs:
# tracked in #1815) after the team sees what current coverage is.
run: npx vitest run --coverage
- name: Upload coverage summary as artifact
if: needs.changes.outputs.canvas == 'true' && always()
if: always()
# Pinned to v3 for Gitea act_runner v0.6 compatibility — v4+ uses
# the GHES 3.10+ artifact protocol that Gitea 1.22.x does NOT
# implement, surfacing as `GHESNotSupportedError: @actions/artifact
+26
View File
@@ -962,6 +962,32 @@ function MyChatPanel({ workspaceId, data }: Props) {
</div>
</div>
)}
{/* talk_to_user disabled banner — shown when the workspace has
talk_to_user_enabled=false. The agent cannot send canvas messages;
the user can re-enable the ability from here without opening settings. */}
{data.talkToUserEnabled === false && (
<div className="flex items-center gap-2 px-3 py-2 bg-surface-sunken border-b border-line/40 shrink-0">
<svg width="14" height="14" viewBox="0 0 16 16" fill="none" aria-hidden="true" className="shrink-0 text-ink-mid">
<path d="M8 1a7 7 0 1 0 0 14A7 7 0 0 0 8 1Zm0 10.5a.75.75 0 1 1 0-1.5.75.75 0 0 1 0 1.5ZM8 4a.75.75 0 0 1 .75.75v4a.75.75 0 0 1-1.5 0v-4A.75.75 0 0 1 8 4Z" fill="currentColor"/>
</svg>
<span className="text-[10px] text-ink-mid flex-1">
Agent is not enabled to chat with you.
</span>
<button
onClick={async () => {
try {
await api.patch(`/workspaces/${workspaceId}/abilities`, { talk_to_user_enabled: true });
useCanvasStore.getState().updateNodeData(workspaceId, { talkToUserEnabled: true });
} catch {
// ignore — user will see no change and can retry
}
}}
className="px-2 py-0.5 text-[10px] font-medium bg-accent/10 hover:bg-accent/20 text-accent rounded border border-accent/30 transition-colors shrink-0"
>
Enable
</button>
</div>
)}
{/* Messages */}
<div ref={containerRef} className="flex-1 overflow-y-auto p-3 space-y-3">
{loading && (
+4
View File
@@ -519,6 +519,10 @@ export function buildNodesAndEdges(
// #2054 — server-declared per-workspace provisioning timeout.
// Falls through to the runtime profile when null/absent.
provisionTimeoutMs: ws.provision_timeout_ms ?? null,
// Workspace abilities — defaults preserved for old platform versions
// that don't yet include these columns in the GET response.
broadcastEnabled: ws.broadcast_enabled ?? false,
talkToUserEnabled: ws.talk_to_user_enabled ?? true,
},
};
if (hasParent) {
+7
View File
@@ -99,6 +99,13 @@ export interface WorkspaceNodeData extends Record<string, unknown> {
* @/lib/runtimeProfiles. Lets a slow runtime declare its cold-boot
* expectation without a canvas release. */
provisionTimeoutMs?: number | null;
/** When true the workspace may POST /broadcast to send org-wide messages.
* Default false. Toggled by user/admin via PATCH /workspaces/:id/abilities. */
broadcastEnabled?: boolean;
/** When false the workspace cannot deliver canvas chat messages.
* send_message_to_user / POST /notify return 403 and the canvas
* shows a "not enabled" state with a button to re-enable. Default true. */
talkToUserEnabled?: boolean;
}
export type PanelTab = "details" | "skills" | "chat" | "terminal" | "config" | "schedule" | "channels" | "files" | "memory" | "traces" | "events" | "activity" | "audit";
+3
View File
@@ -299,6 +299,9 @@ export interface WorkspaceData {
* `@/lib/runtimeProfiles` when absent (the default behavior for any
* template that hasn't yet declared the field). */
provision_timeout_ms?: number | null;
/** Workspace ability flags (migration 20260514). */
broadcast_enabled?: boolean;
talk_to_user_enabled?: boolean;
}
let socket: ReconnectingSocket | null = null;
+296
View File
@@ -0,0 +1,296 @@
#!/usr/bin/env bash
# E2E test: workspace broadcast and talk-to-user platform abilities.
#
# What this proves:
# 1. talk_to_user_enabled (default true) — POST /notify works out-of-the-box.
# 2. PATCH /workspaces/:id/abilities { talk_to_user_enabled: false } disables
# delivery: /notify → 403 with error="talk_to_user_disabled" + delegate hint.
# 3. Re-enabling talk_to_user_enabled restores delivery.
# 4. broadcast_enabled (default false) — POST /broadcast → 403 when disabled.
# 5. PATCH { broadcast_enabled: true } enables fan-out.
# 6. POST /broadcast delivers to all non-sender, non-removed workspaces:
# - Returns {"status":"sent","delivered":N}
# - Receiver's activity log has a broadcast_receive entry with the message.
# - Sender's activity log has a broadcast_sent entry.
# 7. The sender itself does NOT receive a broadcast_receive entry.
#
# Usage: tests/e2e/test_workspace_abilities_e2e.sh
# Prereqs: workspace-server on http://localhost:8080, MOLECULE_ENV != production
set -euo pipefail
source "$(dirname "$0")/_lib.sh"
PASS=0
FAIL=0
SENDER_ID=""
RECEIVER_ID=""
cleanup() {
for wid in "$SENDER_ID" "$RECEIVER_ID"; do
if [ -n "$wid" ]; then
curl -s -X DELETE "$BASE/workspaces/$wid?confirm=true" > /dev/null || true
fi
done
}
trap cleanup EXIT INT TERM
assert() {
local label="$1" actual="$2" expected="$3"
if [ "$actual" = "$expected" ]; then
echo " PASS — $label"
PASS=$((PASS+1))
else
echo " FAIL — $label"
echo " expected: $expected"
echo " actual: $actual"
FAIL=$((FAIL+1))
fi
}
assert_contains() {
local label="$1" haystack="$2" needle="$3"
if echo "$haystack" | grep -qF "$needle"; then
echo " PASS — $label"
PASS=$((PASS+1))
else
echo " FAIL — $label"
echo " needle: $needle"
echo " haystack: $haystack"
FAIL=$((FAIL+1))
fi
}
assert_not_contains() {
local label="$1" haystack="$2" needle="$3"
if ! echo "$haystack" | grep -qF "$needle"; then
echo " PASS — $label"
PASS=$((PASS+1))
else
echo " FAIL — $label (unexpected match)"
echo " needle: $needle"
echo " haystack: $haystack"
FAIL=$((FAIL+1))
fi
}
# ── Pre-sweep: remove any stale leftover workspaces from a prior aborted run ──
echo "=== Setup ==="
for NAME in "Abilities Sender" "Abilities Receiver"; do
PRIOR=$(curl -s "$BASE/workspaces" | python3 -c "
import json, sys
try:
print(' '.join(w['id'] for w in json.load(sys.stdin) if w.get('name') == '$NAME'))
except Exception:
pass
")
for _wid in $PRIOR; do
echo "Sweeping leftover '$NAME' workspace: $_wid"
curl -s -X DELETE "$BASE/workspaces/$_wid?confirm=true" > /dev/null || true
done
done
R=$(curl -s -X POST "$BASE/workspaces" -H "Content-Type: application/json" \
-d '{"name":"Abilities Sender","tier":1}')
SENDER_ID=$(echo "$R" | python3 -c 'import json,sys;print(json.load(sys.stdin)["id"])' 2>/dev/null || true)
[ -n "$SENDER_ID" ] || { echo "Failed to create sender workspace: $R"; exit 1; }
echo "Created sender workspace: $SENDER_ID"
R=$(curl -s -X POST "$BASE/workspaces" -H "Content-Type: application/json" \
-d '{"name":"Abilities Receiver","tier":1}')
RECEIVER_ID=$(echo "$R" | python3 -c 'import json,sys;print(json.load(sys.stdin)["id"])' 2>/dev/null || true)
[ -n "$RECEIVER_ID" ] || { echo "Failed to create receiver workspace: $R"; exit 1; }
echo "Created receiver workspace: $RECEIVER_ID"
# Mint workspace-scoped bearer tokens (test-only endpoint, disabled in prod).
SENDER_TOKEN=$(e2e_mint_test_token "$SENDER_ID")
[ -n "$SENDER_TOKEN" ] || { echo "Failed to mint sender token"; exit 1; }
SENDER_AUTH="Authorization: Bearer $SENDER_TOKEN"
# Admin token — any live workspace bearer satisfies AdminAuth in local dev.
# In production-like envs, set MOLECULE_ADMIN_TOKEN.
ADMIN_TOKEN="${MOLECULE_ADMIN_TOKEN:-$SENDER_TOKEN}"
ADMIN_AUTH="Authorization: Bearer $ADMIN_TOKEN"
# ─────────────────────────────────────────────────────────────────────────────
echo ""
echo "=== Part 1: talk_to_user ability ==="
echo ""
echo "--- 1a: /notify works with default talk_to_user_enabled=true ---"
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
-d '{"message":"Hello from sender"}')
assert "POST /notify returns 200 when talk_to_user_enabled=true (default)" "$CODE" "200"
echo ""
echo "--- 1b: Disable talk_to_user ---"
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
-H "Content-Type: application/json" -H "$ADMIN_AUTH" \
-d '{"talk_to_user_enabled": false}')
assert "PATCH /abilities talk_to_user_enabled=false returns 200" "$CODE" "200"
# Verify the flag is reflected in the workspace GET response.
WS=$(curl -s "$BASE/workspaces/$SENDER_ID" -H "$SENDER_AUTH")
FLAG=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("talk_to_user_enabled","MISSING"))')
assert "GET /workspaces/:id reflects talk_to_user_enabled=false" "$FLAG" "False"
echo ""
echo "--- 1c: /notify blocked when talk_to_user disabled ---"
BODY=$(curl -s -w "" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
-d '{"message":"Should be blocked"}')
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
-d '{"message":"Should be blocked"}')
assert "POST /notify returns 403 when talk_to_user_enabled=false" "$CODE" "403"
ERR=$(echo "$BODY" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("error",""))' 2>/dev/null || echo "")
assert_contains "403 body contains talk_to_user_disabled error code" "$ERR" "talk_to_user_disabled"
HINT=$(echo "$BODY" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("hint",""))' 2>/dev/null || echo "")
assert_contains "403 body contains delegate_task hint" "$HINT" "delegate_task"
echo ""
echo "--- 1d: Re-enable talk_to_user and verify /notify works again ---"
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
-H "Content-Type: application/json" -H "$ADMIN_AUTH" \
-d '{"talk_to_user_enabled": true}')
assert "PATCH /abilities talk_to_user_enabled=true returns 200" "$CODE" "200"
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/notify" \
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
-d '{"message":"Re-enabled, should work"}')
assert "POST /notify returns 200 after re-enabling talk_to_user" "$CODE" "200"
# ─────────────────────────────────────────────────────────────────────────────
echo ""
echo "=== Part 2: broadcast ability ==="
echo ""
echo "--- 2a: Broadcast blocked by default (broadcast_enabled=false) ---"
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/broadcast" \
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
-d '{"message":"Should be blocked"}')
assert "POST /broadcast returns 403 when broadcast_enabled=false (default)" "$CODE" "403"
echo ""
echo "--- 2b: Enable broadcast ---"
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
-H "Content-Type: application/json" -H "$ADMIN_AUTH" \
-d '{"broadcast_enabled": true}')
assert "PATCH /abilities broadcast_enabled=true returns 200" "$CODE" "200"
WS=$(curl -s "$BASE/workspaces/$SENDER_ID" -H "$SENDER_AUTH")
FLAG=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("broadcast_enabled","MISSING"))')
assert "GET /workspaces/:id reflects broadcast_enabled=true" "$FLAG" "True"
echo ""
echo "--- 2c: Successful broadcast fan-out ---"
BCAST=$(curl -s -X POST "$BASE/workspaces/$SENDER_ID/broadcast" \
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
-d '{"message":"Org-wide notice: scheduled maintenance in 5 minutes."}')
BSTATUS=$(echo "$BCAST" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("status",""))' 2>/dev/null || echo "")
BDELIVERED=$(echo "$BCAST" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("delivered","-1"))' 2>/dev/null || echo "-1")
assert "POST /broadcast returns status=sent" "$BSTATUS" "sent"
# delivered count must be >= 1 (the receiver workspace).
echo " INFO — broadcast delivered=$BDELIVERED"
if python3 -c "import sys; sys.exit(0 if int('$BDELIVERED') >= 1 else 1)" 2>/dev/null; then
echo " PASS — delivered count >= 1"
PASS=$((PASS+1))
else
echo " FAIL — expected delivered >= 1, got $BDELIVERED"
FAIL=$((FAIL+1))
fi
echo ""
echo "--- 2d: Receiver activity log has broadcast_receive entry ---"
RECEIVER_TOKEN=$(e2e_mint_test_token "$RECEIVER_ID")
[ -n "$RECEIVER_TOKEN" ] || { echo "Failed to mint receiver token"; exit 1; }
RECEIVER_AUTH="Authorization: Bearer $RECEIVER_TOKEN"
ACT=$(curl -s -H "$RECEIVER_AUTH" "$BASE/workspaces/$RECEIVER_ID/activity?source=agent&limit=20")
ROW=$(echo "$ACT" | python3 -c '
import json, sys
rows = json.load(sys.stdin) or []
for r in rows:
if r.get("activity_type") == "broadcast_receive":
print(json.dumps(r))
break
')
[ -n "$ROW" ] || {
echo " FAIL — could not find broadcast_receive row in receiver activity"
FAIL=$((FAIL+1))
}
if [ -n "$ROW" ]; then
# Message is stored in summary field.
MSG=$(echo "$ROW" | python3 -c 'import json,sys;r=json.load(sys.stdin);print(r.get("summary",""))')
assert_contains "broadcast_receive row summary has original message" "$MSG" "scheduled maintenance"
# Sender ID is stored in source_id field.
SRC=$(echo "$ROW" | python3 -c 'import json,sys;r=json.load(sys.stdin);print(r.get("source_id",""))')
assert "broadcast_receive row source_id is sender workspace" "$SRC" "$SENDER_ID"
fi
echo ""
echo "--- 2e: Sender activity log has broadcast_sent entry ---"
ACT_SENDER=$(curl -s -H "$SENDER_AUTH" "$BASE/workspaces/$SENDER_ID/activity?limit=20")
SENT_ROW=$(echo "$ACT_SENDER" | python3 -c '
import json, sys
rows = json.load(sys.stdin) or []
for r in rows:
if r.get("activity_type") == "broadcast_sent":
print(json.dumps(r))
break
')
[ -n "$SENT_ROW" ] || {
echo " FAIL — could not find broadcast_sent row in sender activity"
FAIL=$((FAIL+1))
}
if [ -n "$SENT_ROW" ]; then
# Delivered count is baked into the summary field (no response_body for sender row).
SUMMARY=$(echo "$SENT_ROW" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("summary",""))')
assert_contains "broadcast_sent summary mentions workspace count" "$SUMMARY" "workspace"
fi
echo ""
echo "--- 2f: Sender does NOT receive a broadcast_receive entry ---"
SELF_RECV=$(echo "$ACT_SENDER" | python3 -c '
import json, sys
rows = json.load(sys.stdin) or []
for r in rows:
if r.get("activity_type") == "broadcast_receive":
print("found")
break
')
assert_not_contains "sender has no broadcast_receive in own activity log" "${SELF_RECV:-}" "found"
# ─────────────────────────────────────────────────────────────────────────────
echo ""
echo "--- 2g: Empty message is rejected ---"
CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$BASE/workspaces/$SENDER_ID/broadcast" \
-H "Content-Type: application/json" -H "$SENDER_AUTH" \
-d '{"message":""}')
assert "POST /broadcast with empty message returns 400" "$CODE" "400"
echo ""
echo "--- 2h: Partial PATCH does not clobber other flags ---"
# Set talk_to_user=false, then patch only broadcast — talk_to_user must stay false.
curl -s -o /dev/null -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
-H "Content-Type: application/json" -H "$ADMIN_AUTH" \
-d '{"talk_to_user_enabled": false}'
curl -s -o /dev/null -X PATCH "$BASE/workspaces/$SENDER_ID/abilities" \
-H "Content-Type: application/json" -H "$ADMIN_AUTH" \
-d '{"broadcast_enabled": false}'
WS=$(curl -s "$BASE/workspaces/$SENDER_ID" -H "$SENDER_AUTH")
TUF=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("talk_to_user_enabled","MISSING"))')
BEF=$(echo "$WS" | python3 -c 'import json,sys;print(json.load(sys.stdin).get("broadcast_enabled","MISSING"))')
assert "partial PATCH preserves talk_to_user_enabled=false" "$TUF" "False"
assert "partial PATCH sets broadcast_enabled=false" "$BEF" "False"
# ─────────────────────────────────────────────────────────────────────────────
echo ""
echo "=== Results: $PASS passed, $FAIL failed ==="
[ "$FAIL" -eq 0 ]
@@ -1,7 +1,12 @@
package handlers
import (
"context"
"database/sql"
"testing"
sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// TestExtractExpiresInSeconds covers the JSON parser used at enqueue time
@@ -58,3 +63,207 @@ func TestExtractExpiresInSeconds(t *testing.T) {
})
}
}
// ── QueueStatusByID ─────────────────────────────────────────────────────────────
func setupQueueStatusDB(t *testing.T) sqlmock.Sqlmock {
t.Helper()
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
return mock
}
func TestQueueStatusByID_Success(t *testing.T) {
mock := setupQueueStatusDB(t)
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
wsID := "cccccccc-cccc-cccc-cccc-cccccccccccc"
rows := sqlmock.NewRows([]string{
"id", "workspace_id", "status", "priority", "attempts",
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
"response_body",
}).AddRow(
queueID, wsID, "queued", 50, 0,
nil, // last_error
"2026-01-01T00:00:00Z", // enqueued_at
nil, // dispatched_at
nil, // completed_at
nil, // expires_at
nil, // response_body
)
mock.ExpectQuery(`SELECT`).
WithArgs(queueID).
WillReturnRows(rows)
qs, err := QueueStatusByID(context.Background(), queueID)
if err != nil {
t.Fatalf("QueueStatusByID returned error: %v", err)
}
if qs.ID != queueID {
t.Errorf("ID = %q, want %q", qs.ID, queueID)
}
if qs.WorkspaceID != wsID {
t.Errorf("WorkspaceID = %q, want %q", qs.WorkspaceID, wsID)
}
if qs.Status != "queued" {
t.Errorf("Status = %q, want %q", qs.Status, "queued")
}
if qs.Priority != 50 {
t.Errorf("Priority = %d, want 50", qs.Priority)
}
if qs.LastError != nil {
t.Errorf("LastError = %v, want nil", qs.LastError)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestQueueStatusByID_NotFound(t *testing.T) {
mock := setupQueueStatusDB(t)
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(`SELECT`).
WithArgs(queueID).
WillReturnError(sql.ErrNoRows)
qs, err := QueueStatusByID(context.Background(), queueID)
if err != sql.ErrNoRows {
t.Errorf("expected sql.ErrNoRows, got %v", err)
}
if qs != nil {
t.Errorf("expected nil queue status, got %+v", qs)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestQueueStatusByID_DBError(t *testing.T) {
mock := setupQueueStatusDB(t)
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(`SELECT`).
WithArgs(queueID).
WillReturnError(sql.ErrConnDone)
qs, err := QueueStatusByID(context.Background(), queueID)
if err == nil {
t.Error("expected error, got nil")
}
if qs != nil {
t.Errorf("expected nil queue status, got %+v", qs)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestQueueStatusByID_CompletedWithResponse(t *testing.T) {
mock := setupQueueStatusDB(t)
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
wsID := "cccccccc-cccc-cccc-cccc-cccccccccccc"
respBody := []byte(`{"text":"delegation result"}`)
rows := sqlmock.NewRows([]string{
"id", "workspace_id", "status", "priority", "attempts",
"last_error", "enqueued_at", "dispatched_at", "completed_at", "expires_at",
"response_body",
}).AddRow(
queueID, wsID, "completed", 50, 1,
nil,
"2026-01-01T00:00:00Z",
"2026-01-01T00:01:00Z",
"2026-01-01T00:02:00Z",
nil,
respBody,
)
mock.ExpectQuery(`SELECT`).
WithArgs(queueID).
WillReturnRows(rows)
qs, err := QueueStatusByID(context.Background(), queueID)
if err != nil {
t.Fatalf("QueueStatusByID returned error: %v", err)
}
if qs.Status != "completed" {
t.Errorf("Status = %q, want completed", qs.Status)
}
if qs.ResponseBody == nil {
t.Fatal("ResponseBody should be set for completed status")
}
if string(qs.ResponseBody) != `{"text":"delegation result"}` {
t.Errorf("ResponseBody = %q, want %q", string(qs.ResponseBody), `{"text":"delegation result"}`)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ── queueRowAuthFields ──────────────────────────────────────────────────────────
func TestQueueRowAuthFields_Success(t *testing.T) {
mock := setupQueueStatusDB(t)
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
callerID := "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
wsID := "cccccccc-cccc-cccc-cccc-cccccccccccc"
rows := sqlmock.NewRows([]string{"caller_id", "workspace_id"}).
AddRow(callerID, wsID)
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id`).
WithArgs(queueID).
WillReturnRows(rows)
gotCaller, gotWs, err := queueRowAuthFields(context.Background(), queueID)
if err != nil {
t.Fatalf("queueRowAuthFields returned error: %v", err)
}
if gotCaller != callerID {
t.Errorf("callerID = %q, want %q", gotCaller, callerID)
}
if gotWs != wsID {
t.Errorf("workspaceID = %q, want %q", gotWs, wsID)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestQueueRowAuthFields_NotFound(t *testing.T) {
mock := setupQueueStatusDB(t)
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id`).
WithArgs(queueID).
WillReturnError(sql.ErrNoRows)
_, _, err := queueRowAuthFields(context.Background(), queueID)
if err != sql.ErrNoRows {
t.Errorf("expected sql.ErrNoRows, got %v", err)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestQueueRowAuthFields_DBError(t *testing.T) {
mock := setupQueueStatusDB(t)
queueID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(`SELECT caller_id, workspace_id FROM a2a_queue WHERE id`).
WithArgs(queueID).
WillReturnError(sql.ErrConnDone)
_, _, err := queueRowAuthFields(context.Background(), queueID)
if err == nil {
t.Error("expected error, got nil")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
@@ -516,3 +516,51 @@ func TestDrainQueueForWorkspace_ClaimGuarding_SecondDrainGetsEmpty(t *testing.T)
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ── QueueDepth ──────────────────────────────────────────────────────────────────
func TestQueueDepth_ReturnsCount(t *testing.T) {
mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(`SELECT COUNT(*) FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued'`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(42))
got := QueueDepth(context.Background(), wsID)
if got != 42 {
t.Errorf("QueueDepth returned %d, want 42", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestQueueDepth_ZeroWhenEmpty(t *testing.T) {
mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
wsID := "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
mock.ExpectQuery(`SELECT COUNT(*) FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued'`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
got := QueueDepth(context.Background(), wsID)
if got != 0 {
t.Errorf("QueueDepth returned %d, want 0", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
@@ -482,6 +482,13 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
}
if errors.Is(err, ErrTalkToUserDisabled) {
c.JSON(http.StatusForbidden, gin.H{
"error": "talk_to_user_disabled",
"hint": "This workspace is not allowed to send messages directly to the user. Forward your update to a parent workspace using delegate_task — they may be able to reach the user.",
})
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
return
}
@@ -464,9 +464,9 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) {
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
// Workspace existence check
mock.ExpectQuery(`SELECT name FROM workspaces`).
mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`).
WithArgs("ws-notify").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true))
// Persistence INSERT — verify shape
mock.ExpectExec(`INSERT INTO activity_logs`).
@@ -511,9 +511,9 @@ func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) {
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
mock.ExpectQuery(`SELECT name FROM workspaces`).
mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`).
WithArgs("ws-attach").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true))
// Capture the JSONB arg so we can assert on the persisted shape
// AFTER the call (must include parts[].kind=file so reload
@@ -640,9 +640,9 @@ func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) {
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
mock.ExpectQuery(`SELECT name FROM workspaces`).
mock.ExpectQuery(`SELECT name, talk_to_user_enabled FROM workspaces`).
WithArgs("ws-x").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("DD"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("DD", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnError(fmt.Errorf("simulated db hiccup"))
@@ -54,6 +54,11 @@ import (
// timeout) surface as wrapped errors and should be treated as 503.
var ErrWorkspaceNotFound = errors.New("agent_message: workspace not found")
// ErrTalkToUserDisabled is returned when the workspace has
// talk_to_user_enabled=false. Callers surface HTTP 403 so the Python tool
// can detect it and suggest forwarding to a parent workspace.
var ErrTalkToUserDisabled = errors.New("agent_message: talk_to_user disabled")
// AgentMessageAttachment is one file attached to an agent → user
// message. Identical to handlers.NotifyAttachment in field set; kept
// distinct so the writer's API doesn't import a handler type with HTTP
@@ -107,16 +112,20 @@ func (w *AgentMessageWriter) Send(
// notify call surfaced as "workspace not found" and masked real
// incidents in the alert path.
var wsName string
var talkToUserEnabled bool
err := w.db.QueryRowContext(ctx,
`SELECT name FROM workspaces WHERE id = $1 AND status != 'removed'`,
`SELECT name, talk_to_user_enabled FROM workspaces WHERE id = $1 AND status != 'removed'`,
workspaceID,
).Scan(&wsName)
).Scan(&wsName, &talkToUserEnabled)
if errors.Is(err, sql.ErrNoRows) {
return ErrWorkspaceNotFound
}
if err != nil {
return fmt.Errorf("agent_message: workspace lookup: %w", err)
}
if !talkToUserEnabled {
return ErrTalkToUserDisabled
}
// 2. Build broadcast payload + WS-emit. Same shape that ChatTab's
// AGENT_MESSAGE handler in canvas/src/store/canvas-events.ts has
@@ -88,9 +88,9 @@ func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name FROM workspaces").
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
WithArgs(
@@ -116,9 +116,9 @@ func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name FROM workspaces").
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-att").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Ryan", true))
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
WithArgs(
@@ -173,9 +173,9 @@ func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
mock.ExpectQuery("SELECT name FROM workspaces").
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-missing").
WillReturnRows(sqlmock.NewRows([]string{"name"}))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}))
err := w.Send(context.Background(), "ws-missing", "lost in the void", nil)
if !errors.Is(err, ErrWorkspaceNotFound) {
@@ -202,9 +202,9 @@ func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name FROM workspaces").
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-dbfail").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnError(errors.New("transient db error"))
@@ -223,9 +223,9 @@ func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name FROM workspaces").
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-trunc").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Ryan"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Ryan", true))
longMsg := strings.Repeat("x", 200)
mock.ExpectExec(`INSERT INTO activity_logs`).
@@ -263,9 +263,9 @@ func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
mock.ExpectQuery("SELECT name FROM workspaces").
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-bc").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("Workspace Name"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("Workspace Name", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnResult(sqlmock.NewResult(1, 1))
@@ -315,7 +315,7 @@ func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) {
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
transientErr := errors.New("connection refused")
mock.ExpectQuery("SELECT name FROM workspaces").
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-dbdown").
WillReturnError(transientErr)
@@ -350,9 +350,9 @@ func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) {
// the byte-slice bug.
msg := strings.Repeat("你", 200)
mock.ExpectQuery("SELECT name FROM workspaces").
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-cjk").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
@@ -395,9 +395,9 @@ func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) {
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter)
mock.ExpectQuery("SELECT name FROM workspaces").
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-noatt").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("X"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("X", true))
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnResult(sqlmock.NewResult(1, 1))
@@ -947,6 +947,73 @@ func TestVerifyDiscordSignature_WrongLengthPubKey(t *testing.T) {
}
}
// ==================== matchesChatID pure function ====================
func TestMatchesChatID_ExactMatch(t *testing.T) {
cfg := map[string]interface{}{"chat_id": "123456"}
if !matchesChatID(cfg, "123456") {
t.Error("expected true for exact match")
}
}
func TestMatchesChatID_NoMatch(t *testing.T) {
cfg := map[string]interface{}{"chat_id": "123456"}
if matchesChatID(cfg, "654321") {
t.Error("expected false for non-matching chat ID")
}
}
func TestMatchesChatID_PrefixNoMatch(t *testing.T) {
// "123" is a prefix of "123456" but not an exact match.
cfg := map[string]interface{}{"chat_id": "123456"}
if matchesChatID(cfg, "123") {
t.Error("expected false for prefix of stored chat ID")
}
}
func TestMatchesChatID_CommaSeparatedMultiple(t *testing.T) {
cfg := map[string]interface{}{"chat_id": "111,222,333"}
for _, id := range []string{"111", "222", "333"} {
if !matchesChatID(cfg, id) {
t.Errorf("expected true for %q in comma-separated list", id)
}
}
if matchesChatID(cfg, "444") {
t.Error("expected false for ID not in list")
}
}
func TestMatchesChatID_WhitespaceTrimmed(t *testing.T) {
cfg := map[string]interface{}{"chat_id": "111, 222 , 333"}
if !matchesChatID(cfg, "222") {
t.Error("expected true for whitespace-trimmed match")
}
if matchesChatID(cfg, " 222") {
t.Error("expected false for whitespace in query (not trimmed from query)")
}
}
func TestMatchesChatID_EmptyChatID(t *testing.T) {
cfg := map[string]interface{}{"chat_id": ""}
if matchesChatID(cfg, "123456") {
t.Error("expected false for empty chat_id in config")
}
}
func TestMatchesChatID_MissingChatIDKey(t *testing.T) {
cfg := map[string]interface{}{}
if matchesChatID(cfg, "123456") {
t.Error("expected false when chat_id key is missing")
}
}
func TestMatchesChatID_NonStringChatID(t *testing.T) {
cfg := map[string]interface{}{"chat_id": 123456} // wrong type
if matchesChatID(cfg, "123456") {
t.Error("expected false when chat_id is not a string")
}
}
// TestChannelHandler_Webhook_Discord_NoKey_Returns401 verifies that a Discord
// webhook request is rejected with 401 when no public key is configured in the
// DB and DISCORD_APP_PUBLIC_KEY env var is not set.
@@ -230,20 +230,21 @@ func TestWorkspaceList_WithData(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
// 21 cols — see scanWorkspaceRow for order (max_concurrent_tasks
// lands between active_tasks and last_error_rate).
// 23 cols — broadcast_enabled + talk_to_user_enabled added after monthly_spend
// (migration 20260514). Column order must match scanWorkspaceRow exactly.
columns := []string{
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "max_concurrent_tasks",
"last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled",
}
rows := sqlmock.NewRows(columns).
AddRow("ws-1", "Agent One", "worker", 1, "online", []byte(`{"name":"agent1"}`), "http://localhost:8001",
nil, 3, 1, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false, nil, int64(0)).
nil, 3, 1, 0.02, "", 7200, "processing", "langgraph", "", 10.0, 20.0, false, nil, int64(0), false, true).
AddRow("ws-2", "Agent Two", "", 2, "degraded", []byte("null"), "",
nil, 0, 1, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true, nil, int64(0))
nil, 0, 1, 0.6, "timeout", 100, "", "claude-code", "", 50.0, 60.0, true, nil, int64(0), false, true)
mock.ExpectQuery("SELECT w.id, w.name").
WillReturnRows(rows)
@@ -392,21 +392,21 @@ func TestWorkspaceList(t *testing.T) {
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", "/tmp/configs")
// 21 cols: `max_concurrent_tasks` added between active_tasks and
// last_error_rate (see scanWorkspaceRow + COALESCE(w.max_concurrent_tasks, 1)
// in workspace.go). Column order must match that scan exactly.
// 23 cols: broadcast_enabled + talk_to_user_enabled added after monthly_spend
// (migration 20260514). Column order must match scanWorkspaceRow exactly.
columns := []string{
"id", "name", "role", "tier", "status", "agent_card", "url",
"parent_id", "active_tasks", "max_concurrent_tasks",
"last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled",
}
rows := sqlmock.NewRows(columns).
AddRow("ws-1", "Agent One", "worker", 1, "online", []byte("null"), "http://localhost:8001",
nil, 0, 1, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false, nil, int64(0)).
nil, 0, 1, 0.0, "", 100, "", "claude-code", "", 10.0, 20.0, false, nil, int64(0), false, true).
AddRow("ws-2", "Agent Two", "manager", 2, "provisioning", []byte("null"), "",
nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false, nil, int64(0))
nil, 0, 1, 0.0, "", 0, "", "langgraph", "", 50.0, 60.0, false, nil, int64(0), false, true)
mock.ExpectQuery("SELECT w.id, w.name").
WillReturnRows(rows)
@@ -1120,13 +1120,14 @@ func TestWorkspaceGet_CurrentTask(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("dddddddd-0004-0000-0000-000000000000").
WillReturnRows(sqlmock.NewRows(columns).AddRow(
"dddddddd-0004-0000-0000-000000000000", "Task Worker", "worker", 1, "online", []byte("null"), "http://localhost:9000",
nil, 2, 1, 0.0, "", 300, "Analyzing document", "langgraph", "", 10.0, 20.0, false,
nil, int64(0),
nil, int64(0), false, true,
))
w := httptest.NewRecorder()
@@ -751,9 +751,9 @@ func TestMCPHandler_SendMessageToUser_DBErrorLogsAndStill200s(t *testing.T) {
t.Setenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE", "true")
h, mock := newMCPHandler(t)
mock.ExpectQuery("SELECT name FROM workspaces").
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-err").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
// INSERT fails — must NOT abort the tool response.
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
@@ -802,9 +802,9 @@ func TestMCPHandler_SendMessageToUser_ResponseBodyShape(t *testing.T) {
const userMessage = "Hi there from the agent"
mock.ExpectQuery("SELECT name FROM workspaces").
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-shape").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
// Capture the response_body argument and assert its exact shape.
mock.ExpectExec(`INSERT INTO activity_logs.*'a2a_receive'.*'notify'`).
@@ -861,9 +861,9 @@ func TestMCPHandler_SendMessageToUser_PersistsToActivityLog(t *testing.T) {
// before it does anything else. Returning a name lets the
// broadcast payload populate; the test doesn't assert on the
// broadcast (no observable WS in this fake), only on the DB.
mock.ExpectQuery("SELECT name FROM workspaces").
mock.ExpectQuery("SELECT name, talk_to_user_enabled FROM workspaces").
WithArgs("ws-msg").
WillReturnRows(sqlmock.NewRows([]string{"name"}).AddRow("CEO Ryan PC"))
WillReturnRows(sqlmock.NewRows([]string{"name", "talk_to_user_enabled"}).AddRow("CEO Ryan PC", true))
// The persistence INSERT — pin the exact shape so a future
// refactor that switches columns or drops `method='notify'`
@@ -287,7 +287,7 @@ func TestRenderCategoryRoutingYAML_StableOrdering(t *testing.T) {
if ai <= 0 || zi <= 0 || mi <= 0 {
t.Fatalf("could not locate all keys in output: %s", out)
}
if !(ai < mi && mi < zi) {
if ai >= mi || mi >= zi {
t.Errorf("keys not sorted: alpha=%d middle=%d zebra=%d, output:\n%s", ai, mi, zi, out)
}
}
@@ -595,7 +595,7 @@ func scanWorkspaceRow(rows interface {
var id, name, role, status, url, sampleError, currentTask, runtime, workspaceDir string
var tier, activeTasks, maxConcurrentTasks, uptimeSeconds int
var errorRate, x, y float64
var collapsed bool
var collapsed, broadcastEnabled, talkToUserEnabled bool
var parentID *string
var agentCard []byte
var budgetLimit sql.NullInt64
@@ -604,7 +604,7 @@ func scanWorkspaceRow(rows interface {
err := rows.Scan(&id, &name, &role, &tier, &status, &agentCard, &url,
&parentID, &activeTasks, &maxConcurrentTasks, &errorRate, &sampleError, &uptimeSeconds,
&currentTask, &runtime, &workspaceDir, &x, &y, &collapsed,
&budgetLimit, &monthlySpend)
&budgetLimit, &monthlySpend, &broadcastEnabled, &talkToUserEnabled)
if err != nil {
return nil, err
}
@@ -628,6 +628,8 @@ func scanWorkspaceRow(rows interface {
"x": x,
"y": y,
"collapsed": collapsed,
"broadcast_enabled": broadcastEnabled,
"talk_to_user_enabled": talkToUserEnabled,
}
// budget_limit: nil when no limit set, int64 otherwise
@@ -663,7 +665,8 @@ const workspaceListQuery = `
COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'),
COALESCE(w.workspace_dir, ''),
COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false),
w.budget_limit, COALESCE(w.monthly_spend, 0)
w.budget_limit, COALESCE(w.monthly_spend, 0),
w.broadcast_enabled, w.talk_to_user_enabled
FROM workspaces w
LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id
WHERE w.status != 'removed'
@@ -723,7 +726,8 @@ func (h *WorkspaceHandler) Get(c *gin.Context) {
COALESCE(w.current_task, ''), COALESCE(w.runtime, 'langgraph'),
COALESCE(w.workspace_dir, ''),
COALESCE(cl.x, 0), COALESCE(cl.y, 0), COALESCE(cl.collapsed, false),
w.budget_limit, COALESCE(w.monthly_spend, 0)
w.budget_limit, COALESCE(w.monthly_spend, 0),
w.broadcast_enabled, w.talk_to_user_enabled
FROM workspaces w
LEFT JOIN canvas_layouts cl ON cl.workspace_id = w.id
WHERE w.id = $1
@@ -0,0 +1,82 @@
package handlers
// workspace_abilities.go — PATCH /workspaces/:id/abilities
//
// Allows users and admin agents to toggle two workspace-level ability flags:
//
// broadcast_enabled — workspace may POST /broadcast to send org-wide messages
// talk_to_user_enabled — workspace may deliver canvas chat messages via
// send_message_to_user / POST /notify
//
// Gated behind AdminAuth so workspace agents cannot self-modify their own
// ability flags (that would let any agent grant itself broadcast rights or
// suppress its own chat-silence constraint).
import (
"log"
"net/http"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// AbilitiesPayload carries the subset of ability flags the caller wants to
// update. Fields are pointers so that the handler can distinguish "caller
// supplied false" from "caller omitted the field" (omitempty semantics).
type AbilitiesPayload struct {
BroadcastEnabled *bool `json:"broadcast_enabled"`
TalkToUserEnabled *bool `json:"talk_to_user_enabled"`
}
// PatchAbilities handles PATCH /workspaces/:id/abilities (AdminAuth).
func PatchAbilities(c *gin.Context) {
id := c.Param("id")
if err := validateWorkspaceID(id); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
return
}
var body AbilitiesPayload
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
if body.BroadcastEnabled == nil && body.TalkToUserEnabled == nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "at least one ability field required"})
return
}
ctx := c.Request.Context()
var exists bool
if err := db.DB.QueryRowContext(ctx,
`SELECT EXISTS(SELECT 1 FROM workspaces WHERE id = $1 AND status != 'removed')`, id,
).Scan(&exists); err != nil || !exists {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
}
if body.BroadcastEnabled != nil {
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET broadcast_enabled = $2, updated_at = now() WHERE id = $1`,
id, *body.BroadcastEnabled,
); err != nil {
log.Printf("PatchAbilities broadcast_enabled for %s: %v", id, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
return
}
}
if body.TalkToUserEnabled != nil {
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET talk_to_user_enabled = $2, updated_at = now() WHERE id = $1`,
id, *body.TalkToUserEnabled,
); err != nil {
log.Printf("PatchAbilities talk_to_user_enabled for %s: %v", id, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
return
}
}
c.JSON(http.StatusOK, gin.H{"status": "updated"})
}
@@ -0,0 +1,142 @@
package handlers
// workspace_broadcast.go — POST /workspaces/:id/broadcast
//
// Allows a workspace with broadcast_enabled=true to send a message to every
// non-removed agent workspace in the org. The message is:
//
// • Persisted in each recipient's activity_logs (type='broadcast_receive')
// so poll-mode agents pick it up via GET /activity.
// • Broadcast via WebSocket BROADCAST_MESSAGE event so canvas panels can
// show a real-time banner for each recipient workspace.
//
// The sender's own workspace logs a 'broadcast_sent' activity row for
// traceability.
//
// Auth: WorkspaceAuth (the agent triggers this with its own bearer token).
// The handler re-validates broadcast_enabled inside the DB lookup to prevent
// TOCTOU — the middleware only proved the token is valid, not the ability.
import (
"log"
"net/http"
"strconv"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/gin-gonic/gin"
)
// BroadcastHandler is constructed once and shared across requests.
type BroadcastHandler struct {
broadcaster *events.Broadcaster
}
// NewBroadcastHandler creates a BroadcastHandler.
func NewBroadcastHandler(b *events.Broadcaster) *BroadcastHandler {
return &BroadcastHandler{broadcaster: b}
}
// Broadcast handles POST /workspaces/:id/broadcast.
func (h *BroadcastHandler) Broadcast(c *gin.Context) {
senderID := c.Param("id")
if err := validateWorkspaceID(senderID); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
return
}
var body struct {
Message string `json:"message" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "message is required"})
return
}
ctx := c.Request.Context()
// Verify sender exists and has broadcast_enabled=true.
var senderName string
var broadcastEnabled bool
err := db.DB.QueryRowContext(ctx,
`SELECT name, broadcast_enabled FROM workspaces WHERE id = $1 AND status != 'removed'`,
senderID,
).Scan(&senderName, &broadcastEnabled)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return
}
if !broadcastEnabled {
c.JSON(http.StatusForbidden, gin.H{
"error": "broadcast_disabled",
"hint": "This workspace does not have the broadcast ability. Ask a user or admin to enable it via PATCH /workspaces/:id/abilities.",
})
return
}
// Collect all non-removed agent workspaces (excludes the sender itself).
rows, err := db.DB.QueryContext(ctx,
`SELECT id FROM workspaces WHERE status != 'removed' AND id != $1`,
senderID,
)
if err != nil {
log.Printf("Broadcast: recipient query failed for %s: %v", senderID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
return
}
defer rows.Close()
var recipientIDs []string
for rows.Next() {
var rid string
if rows.Scan(&rid) == nil {
recipientIDs = append(recipientIDs, rid)
}
}
if err := rows.Err(); err != nil {
log.Printf("Broadcast: recipient rows error for %s: %v", senderID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "internal error"})
return
}
broadcastPayload := map[string]interface{}{
"message": body.Message,
"sender_id": senderID,
"sender": senderName,
}
// Persist broadcast_receive in each recipient's activity log + emit WS event.
delivered := 0
for _, rid := range recipientIDs {
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, summary, status)
VALUES ($1, 'broadcast_receive', 'broadcast', $2, $3, 'ok')
`, rid, senderID, "Broadcast from "+senderName+": "+broadcastTruncate(body.Message, 120)); err != nil {
log.Printf("Broadcast: activity_logs insert for recipient %s: %v", rid, err)
continue
}
h.broadcaster.BroadcastOnly(rid, "BROADCAST_MESSAGE", broadcastPayload)
delivered++
}
// Record the send on the sender's own log.
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, summary, status)
VALUES ($1, 'broadcast_sent', 'broadcast', $2, 'ok')
`, senderID, "Broadcast sent to "+strconv.Itoa(delivered)+" workspace(s)"); err != nil {
log.Printf("Broadcast: sender activity_log for %s: %v", senderID, err)
}
c.JSON(http.StatusOK, gin.H{
"status": "sent",
"delivered": delivered,
})
}
func broadcastTruncate(s string, max int) string {
runes := []rune(s)
if len(runes) <= max {
return s
}
return string(runes[:max]) + "…"
}
@@ -33,6 +33,7 @@ var wsColumns = []string{
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled",
}
// ==================== GET — financial fields stripped from open endpoint ====================
@@ -52,8 +53,10 @@ func TestWorkspaceBudget_Get_NilLimit(t *testing.T) {
[]byte(`{}`), "http://localhost:9001",
nil, 0, 1, 0.0, "", 0, "", "langgraph", "",
0.0, 0.0, false,
nil, // budget_limit NULL
0)) // monthly_spend 0
nil, // budget_limit NULL
0, // monthly_spend 0
false, // broadcast_enabled
true)) // talk_to_user_enabled
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -96,7 +99,8 @@ func TestWorkspaceBudget_Get_WithLimit(t *testing.T) {
nil, 0, 1, 0.0, "", 0, "", "langgraph", "",
0.0, 0.0, false,
int64(500), // budget_limit = $5.00 in DB
int64(123))) // monthly_spend = $1.23 in DB
int64(123), // monthly_spend = $1.23 in DB
false, true)) // broadcast_enabled, talk_to_user_enabled
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -29,6 +29,7 @@ func TestWorkspaceGet_Success(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("cccccccc-0001-0000-0000-000000000000").
@@ -36,7 +37,7 @@ func TestWorkspaceGet_Success(t *testing.T) {
AddRow("cccccccc-0001-0000-0000-000000000000", "My Agent", "worker", 1, "online", []byte(`{"name":"test"}`),
"http://localhost:8001", nil, 2, 1, 0.05, "", 3600, "working", "langgraph",
"", 10.0, 20.0, false,
nil, 0))
nil, 0, false, true))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -118,6 +119,7 @@ func TestWorkspaceGet_RemovedReturns410(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs(id).
@@ -125,7 +127,7 @@ func TestWorkspaceGet_RemovedReturns410(t *testing.T) {
AddRow(id, "Old Agent", "worker", 1, string(models.StatusRemoved), []byte(`null`),
"", nil, 0, 1, 0.0, "", 0, "", "langgraph",
"", 0.0, 0.0, false,
nil, 0))
nil, 0, false, true))
mock.ExpectQuery(`SELECT updated_at FROM workspaces`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"updated_at"}).AddRow(removedAt))
@@ -181,6 +183,7 @@ func TestWorkspaceGet_RemovedReturns410WithNullRemovedAtOnTimestampFetchFailure(
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs(id).
@@ -188,7 +191,7 @@ func TestWorkspaceGet_RemovedReturns410WithNullRemovedAtOnTimestampFetchFailure(
AddRow(id, "Vanished", "worker", 1, string(models.StatusRemoved), []byte(`null`),
"", nil, 0, 1, 0.0, "", 0, "", "langgraph",
"", 0.0, 0.0, false,
nil, 0))
nil, 0, false, true))
// Simulate the row vanishing between the two queries.
mock.ExpectQuery(`SELECT updated_at FROM workspaces`).
WithArgs(id).
@@ -243,6 +246,7 @@ func TestWorkspaceGet_RemovedWithIncludeQueryReturns200(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs(id).
@@ -250,7 +254,7 @@ func TestWorkspaceGet_RemovedWithIncludeQueryReturns200(t *testing.T) {
AddRow(id, "Audit Agent", "worker", 1, string(models.StatusRemoved), []byte(`null`),
"", nil, 0, 1, 0.0, "", 0, "", "langgraph",
"", 0.0, 0.0, false,
nil, 0))
nil, 0, false, true))
// last_outbound_at follow-up query (existing path)
mock.ExpectQuery(`SELECT last_outbound_at FROM workspaces`).
WithArgs(id).
@@ -676,6 +680,7 @@ func TestWorkspaceList_Empty(t *testing.T) {
"parent_id", "active_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled",
}))
w := httptest.NewRecorder()
@@ -1379,6 +1384,7 @@ func TestWorkspaceGet_FinancialFieldsStripped(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled",
}
// Populate with non-zero financial values to confirm they are stripped.
mock.ExpectQuery("SELECT w.id, w.name").
@@ -1387,7 +1393,7 @@ func TestWorkspaceGet_FinancialFieldsStripped(t *testing.T) {
AddRow("cccccccc-0010-0000-0000-000000000000", "Finance Test", "worker", 1, "online", []byte(`{}`),
"http://localhost:9001", nil, 0, 1, 0.0, "", 0, "", "langgraph",
"", 0.0, 0.0, false,
int64(50000), int64(12500))) // budget_limit=500 USD, spend=125 USD
int64(50000), int64(12500), false, true)) // budget_limit=500 USD, spend=125 USD
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1435,6 +1441,7 @@ func TestWorkspaceGet_SensitiveFieldsStripped(t *testing.T) {
"parent_id", "active_tasks", "max_concurrent_tasks", "last_error_rate", "last_sample_error",
"uptime_seconds", "current_task", "runtime", "workspace_dir", "x", "y", "collapsed",
"budget_limit", "monthly_spend",
"broadcast_enabled", "talk_to_user_enabled",
}
mock.ExpectQuery("SELECT w.id, w.name").
WithArgs("cccccccc-0955-0000-0000-000000000000").
@@ -1447,7 +1454,7 @@ func TestWorkspaceGet_SensitiveFieldsStripped(t *testing.T) {
"langgraph",
"/home/user/secret-projects/client-work",
0.0, 0.0, false,
nil, 0))
nil, 0, false, true))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -36,6 +36,15 @@ type Workspace struct {
// to activity_logs, agent reads via GET /activity?since_id=). See
// migration 045 + RFC #2339.
DeliveryMode string `json:"delivery_mode" db:"delivery_mode"`
// BroadcastEnabled: when true the workspace may call POST /broadcast to
// deliver a message to all non-removed agent workspaces in the org.
// Default false — only privileged orchestrators should hold this ability.
BroadcastEnabled bool `json:"broadcast_enabled" db:"broadcast_enabled"`
// TalkToUserEnabled: when false the workspace's send_message_to_user calls
// and POST /notify requests are rejected with HTTP 403 so the agent is
// forced to route updates through a parent workspace. Default true
// (preserves existing behaviour for all workspaces).
TalkToUserEnabled bool `json:"talk_to_user_enabled" db:"talk_to_user_enabled"`
// Canvas layout fields (from JOIN)
X float64 `json:"x"`
Y float64 `json:"y"`
@@ -146,6 +146,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
wsAdmin.GET("/workspaces", wh.List)
wsAdmin.POST("/workspaces", wh.Create)
wsAdmin.DELETE("/workspaces/:id", wh.Delete)
// Ability toggles — admin-only so workspace agents cannot self-modify
// broadcast_enabled or talk_to_user_enabled.
wsAdmin.PATCH("/workspaces/:id/abilities", handlers.PatchAbilities)
// Out-of-band bootstrap signal: CP's watcher POSTs here when it
// detects "RUNTIME CRASHED" in a workspace EC2 console output,
// so the canvas flips to failed in seconds instead of waiting
@@ -201,6 +204,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// to 'hibernated'. The workspace auto-wakes on the next A2A message.
wsAuth.POST("/hibernate", wh.Hibernate)
// Broadcast — send a message to all non-removed workspaces in the org.
// Requires broadcast_enabled=true on the source workspace (checked
// inside the handler). WorkspaceAuth on wsAuth proves token ownership.
broadcastH := handlers.NewBroadcastHandler(broadcaster)
wsAuth.POST("/broadcast", broadcastH.Broadcast)
// External-workspace credential lifecycle (issue #319 follow-up to
// the Create flow). Both endpoints reject runtime ≠ external with
// 400 — see external_rotate.go for the rationale.
@@ -0,0 +1,3 @@
ALTER TABLE workspaces
DROP COLUMN IF EXISTS broadcast_enabled,
DROP COLUMN IF EXISTS talk_to_user_enabled;
@@ -0,0 +1,16 @@
-- Workspace abilities: opt-in flags that gate platform-level behaviours.
--
-- broadcast_enabled (default FALSE): when TRUE the workspace may call
-- POST /workspaces/:id/broadcast to send a message to every non-removed
-- agent workspace in the org. Off by default — only privileged
-- orchestrator workspaces should hold this ability.
--
-- talk_to_user_enabled (default TRUE): when FALSE the workspace is not
-- allowed to deliver messages to the canvas user via send_message_to_user /
-- POST /notify. The platform returns HTTP 403 so the agent can forward its
-- update to a parent workspace instead. Default TRUE preserves existing
-- behaviour for all current workspaces.
ALTER TABLE workspaces
ADD COLUMN IF NOT EXISTS broadcast_enabled BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS talk_to_user_enabled BOOLEAN NOT NULL DEFAULT TRUE;
+6
View File
@@ -29,6 +29,7 @@ from typing import Callable
import inbox
from a2a_tools import (
tool_broadcast_message,
tool_chat_history,
tool_check_task_status,
tool_commit_memory,
@@ -160,6 +161,11 @@ async def handle_tool_call(name: str, arguments: dict) -> str:
arguments.get("before_ts", ""),
source_workspace_id=arguments.get("source_workspace_id") or None,
)
elif name == "broadcast_message":
return await tool_broadcast_message(
arguments.get("message", ""),
workspace_id=arguments.get("workspace_id") or None,
)
return f"Unknown tool: {name}"
+1
View File
@@ -137,6 +137,7 @@ from a2a_tools_delegation import ( # noqa: E402 (import after the from-a2a_cli
# identically.
from a2a_tools_messaging import ( # noqa: E402 (import after the top-of-module imports)
_upload_chat_files,
tool_broadcast_message,
tool_chat_history,
tool_get_workspace_info,
tool_list_peers,
+58
View File
@@ -101,6 +101,50 @@ async def _upload_chat_files(
return uploaded, None
async def tool_broadcast_message(
message: str,
workspace_id: str | None = None,
) -> str:
"""Send a broadcast message to ALL agent workspaces in the org.
Requires the workspace to have broadcast_enabled=true (set by a user or
admin via PATCH /workspaces/:id/abilities). Use for urgent org-wide
signals — status changes, critical alerts, coordination instructions.
Every non-removed workspace receives the message in its activity log so
poll-mode agents pick it up, and push-mode canvases get a real-time
BROADCAST_MESSAGE WebSocket event.
Args:
message: The broadcast text. Keep it concise — all agents receive
this, so avoid lengthy prose that floods every context.
workspace_id: Optional. Which registered workspace to send the
broadcast from. Single-workspace agents omit this.
"""
if not message:
return "Error: message is required"
target_workspace_id = (workspace_id or "").strip() or WORKSPACE_ID
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{PLATFORM_URL}/workspaces/{target_workspace_id}/broadcast",
json={"message": message},
headers=_auth_headers_for_heartbeat(target_workspace_id),
)
if resp.status_code == 200:
data = resp.json()
delivered = data.get("delivered", "?")
return f"Broadcast sent to {delivered} workspace(s)"
if resp.status_code == 403:
try:
hint = resp.json().get("hint", "")
except Exception:
hint = ""
return f"Error: broadcast ability not enabled.{(' ' + hint) if hint else ''}"
return f"Error: platform returned {resp.status_code}"
except Exception as e:
return f"Error sending broadcast: {e}"
async def tool_send_message_to_user(
message: str,
attachments: list[str] | None = None,
@@ -151,6 +195,20 @@ async def tool_send_message_to_user(
if uploaded:
return f"Message sent to user with {len(uploaded)} attachment(s)"
return "Message sent to user"
if resp.status_code == 403:
try:
body = resp.json()
if body.get("error") == "talk_to_user_disabled":
hint = body.get("hint", "")
return (
"Error: this workspace is not allowed to send messages "
"directly to the user (talk_to_user is disabled). "
+ (hint + " " if hint else "")
+ "Use delegate_task to forward your update to a parent "
"or supervisor workspace that can reach the user."
)
except Exception:
pass
return f"Error: platform returned {resp.status_code}"
except Exception as e:
return f"Error sending message: {e}"
+48
View File
@@ -3,9 +3,57 @@
import logging
import os
from abc import ABC, abstractmethod
from collections.abc import Mapping
from dataclasses import dataclass, field
from typing import Any
# ---------------------------------------------------------------------------
# Provider routing — type alias + resolver used by individual adapters.
# Each adapter defines its own ProviderRegistry with the providers it accepts.
# ---------------------------------------------------------------------------
# Maps prefix → (ordered_auth_env_vars, default_base_url).
ProviderRegistry = dict[str, tuple[tuple[str, ...], str]]
def resolve_provider_routing(
model_str: str,
env: Mapping[str, str],
*,
registry: ProviderRegistry,
runtime_config: dict[str, Any] | None = None,
) -> tuple[str, str, str]:
"""Resolve a ``provider:model`` string to ``(api_key, base_url, bare_model_id)``.
URL precedence (highest to lowest):
1. ``<PREFIX>_BASE_URL`` env var
2. ``runtime_config["provider_url"]``
3. registry default for the prefix
Unknown prefixes fall back to OPENAI_API_KEY + api.openai.com.
Raises RuntimeError when no API key env var is set for the prefix.
"""
if ":" in model_str:
prefix, model_id = model_str.split(":", 1)
else:
prefix, model_id = "openai", model_str
env_vars, default_url = registry.get(
prefix, (("OPENAI_API_KEY",), "https://api.openai.com/v1")
)
api_key = next((env[v] for v in env_vars if env.get(v)), "")
if not api_key:
raise RuntimeError(
f"No API key found for provider {prefix!r} "
f"(checked: {', '.join(env_vars)}). Set one in workspace secrets."
)
env_url = env.get(f"{prefix.upper()}_BASE_URL", "")
config_url = (runtime_config or {}).get("provider_url", "")
base_url = env_url or config_url or default_url
return api_key, base_url, model_id
from a2a.server.agent_execution import AgentExecutor
from event_log import DisabledEventLog, EventLogBackend
+4
View File
@@ -340,6 +340,10 @@ _CLI_A2A_COMMAND_KEYWORDS: dict[str, str | None] = {
"delegate_task_async": "delegate --async",
"check_task_status": "status",
"get_workspace_info": "info",
# `broadcast_message` is not exposed via the CLI subprocess interface
# today — it's an MCP-first capability. If a2a_cli grows a `broadcast`
# subcommand, map it here and the alignment test will gate the change.
"broadcast_message": None,
# `send_message_to_user` is not exposed via the CLI subprocess
# interface today — it requires a structured `attachments` field
# that wouldn't survive a positional-arg shell invocation cleanly.
+40
View File
@@ -51,6 +51,7 @@ from dataclasses import dataclass
from typing import Any, Literal
from a2a_tools import (
tool_broadcast_message,
tool_chat_history,
tool_check_task_status,
tool_commit_memory,
@@ -288,6 +289,44 @@ _GET_WORKSPACE_INFO = ToolSpec(
section=A2A_SECTION,
)
_BROADCAST_MESSAGE = ToolSpec(
name="broadcast_message",
short=(
"Send a message to ALL agent workspaces in the org simultaneously. "
"Requires broadcast_enabled=true on this workspace (set by user/admin)."
),
when_to_use=(
"Use for urgent, org-wide signals: critical status changes, emergency "
"stop instructions, coordinated task announcements. Every non-removed "
"workspace receives the message in its activity log (poll-mode agents "
"see it on their next poll; push-mode canvases get a real-time banner). "
"This tool returns an error if broadcast_enabled is false — a user or "
"admin must enable it via the workspace abilities settings first."
),
input_schema={
"type": "object",
"properties": {
"message": {
"type": "string",
"description": (
"The broadcast text. Keep it concise — every agent in the "
"org receives this in their activity feed."
),
},
"workspace_id": {
"type": "string",
"description": (
"Optional. Multi-workspace mode: the registered workspace "
"to broadcast from. Single-workspace agents omit this."
),
},
},
"required": ["message"],
},
impl=tool_broadcast_message,
section=A2A_SECTION,
)
_SEND_MESSAGE_TO_USER = ToolSpec(
name="send_message_to_user",
short=(
@@ -603,6 +642,7 @@ TOOLS: list[ToolSpec] = [
_CHECK_TASK_STATUS,
_LIST_PEERS,
_GET_WORKSPACE_INFO,
_BROADCAST_MESSAGE,
_SEND_MESSAGE_TO_USER,
# Inbox (standalone-only; in-container returns informational error)
_WAIT_FOR_MESSAGE,
@@ -5,6 +5,7 @@
- **check_task_status**: Poll the status of a task started with delegate_task_async; returns result when done.
- **list_peers**: List the workspaces this agent can communicate with — name, ID, status, role for each.
- **get_workspace_info**: Get this workspace's own info — ID, name, role, tier, parent, status.
- **broadcast_message**: Send a message to ALL agent workspaces in the org simultaneously. Requires broadcast_enabled=true on this workspace (set by user/admin).
- **send_message_to_user**: Send a message directly to the user's canvas chat — pushed instantly via WebSocket. Use this to: (1) acknowledge a task immediately ('Got it, I'll start working on this'), (2) send interim progress updates while doing long work, (3) deliver follow-up results after delegation completes, (4) attach files (zip, pdf, csv, image) for the user to download via the `attachments` field (NEVER paste file URLs in `message`). The message appears in the user's chat as if you're proactively reaching out.
- **wait_for_message**: Block until the next inbound message (canvas user OR peer agent) arrives, or until ``timeout_secs`` elapses.
- **inbox_peek**: List pending inbound messages without removing them.
@@ -26,6 +27,9 @@ Call this first when you need to delegate but don't know the target's ID. Access
### get_workspace_info
Use to introspect your own identity (e.g. before reporting back to the user, or to determine whether you're a tier-0 root that can write GLOBAL memory).
### broadcast_message
Use for urgent, org-wide signals: critical status changes, emergency stop instructions, coordinated task announcements. Every non-removed workspace receives the message in its activity log (poll-mode agents see it on their next poll; push-mode canvases get a real-time banner). This tool returns an error if broadcast_enabled is false — a user or admin must enable it via the workspace abilities settings first.
### send_message_to_user
Use proactively across the lifecycle of a task — early to acknowledge, mid-flight to update, late to deliver. Never paste file URLs in the message body — always pass absolute paths in `attachments` so the platform serves them as download chips (works on SaaS where external file hosts are unreachable).
+119 -131
View File
@@ -1,153 +1,141 @@
"""Unit tests for OpenClaw adapter env-var key selection and provider URL routing.
"""Unit tests for resolve_provider_routing in adapter_base.
The key-selection and URL-routing logic lives inline in OpenClawAdapter.setup()
(adapter.py lines 84-92). Since setup() carries heavy subprocess dependencies,
these tests isolate the selection logic by reproducing the exact Python expressions
from the adapter source — if the adapter's logic changes, these tests must be kept
in sync.
Organisation:
TestEnvKeyChain — priority order of the 3 currently supported keys
TestProviderUrlMapping — model-prefix → provider URL dict correctness
TestNegativeAndFallback — no keys set / unsupported keys
xfail stubs — AISTUDIO + QIANFAN documented as not-yet-implemented
Covers provider routing, URL-override precedence, and the missing-key error path.
Each adapter defines its own registry; this test file defines one inline that
mirrors what the openclaw adapter uses.
"""
from __future__ import annotations
import os
from unittest.mock import patch
import pytest
from adapter_base import ProviderRegistry, resolve_provider_routing
# ---------------------------------------------------------------------------
# Helpers — mirror the exact expressions from adapter.py lines 84-92.
# Must be kept in sync with the adapter source.
# ---------------------------------------------------------------------------
def _select_key(env: dict) -> str:
"""Mirror line 84: nested os.environ.get priority chain."""
return env.get("OPENAI_API_KEY",
env.get("GROQ_API_KEY",
env.get("OPENROUTER_API_KEY", "")))
_PROVIDER_URLS: dict[str, str] = {
"openai": "https://api.openai.com/v1",
"groq": "https://api.groq.com/openai/v1",
"openrouter": "https://openrouter.ai/api/v1",
# Mirror of the registry in openclaw's adapter.py — kept in sync manually.
PROVIDER_REGISTRY: ProviderRegistry = {
"openai": (("OPENAI_API_KEY",), "https://api.openai.com/v1"),
"groq": (("GROQ_API_KEY",), "https://api.groq.com/openai/v1"),
"openrouter": (("OPENROUTER_API_KEY",), "https://openrouter.ai/api/v1"),
"qianfan": (("QIANFAN_API_KEY", "AISTUDIO_API_KEY"), "https://qianfan.baidubce.com/v2"),
"minimax": (("MINIMAX_API_KEY",), "https://api.minimaxi.com/v1"),
"moonshot": (("KIMI_API_KEY",), "https://api.moonshot.ai/v1"),
}
def _select_url(model: str, runtime_config: dict | None = None) -> str:
"""Mirror lines 86-92: model-prefix → provider URL with optional override."""
prefix = model.split(":")[0] if ":" in model else "openai"
return (runtime_config or {}).get(
"provider_url",
_PROVIDER_URLS.get(prefix, "https://api.openai.com/v1"),
)
class TestProviderRouting:
def test_openai_key_and_url(self):
api_key, base_url, model_id = resolve_provider_routing(
"openai:gpt-4o", {"OPENAI_API_KEY": "sk-openai"}, registry=PROVIDER_REGISTRY
)
assert api_key == "sk-openai"
assert base_url == "https://api.openai.com/v1"
assert model_id == "gpt-4o"
def test_groq_key_and_url(self):
api_key, base_url, model_id = resolve_provider_routing(
"groq:llama-3.3-70b", {"GROQ_API_KEY": "sk-groq"}, registry=PROVIDER_REGISTRY
)
assert api_key == "sk-groq"
assert base_url == "https://api.groq.com/openai/v1"
assert model_id == "llama-3.3-70b"
def test_openrouter_key_and_url(self):
api_key, base_url, model_id = resolve_provider_routing(
"openrouter:anthropic/claude-sonnet-4-5", {"OPENROUTER_API_KEY": "sk-or"}, registry=PROVIDER_REGISTRY
)
assert api_key == "sk-or"
assert base_url == "https://openrouter.ai/api/v1"
assert model_id == "anthropic/claude-sonnet-4-5"
def test_qianfan_primary_key(self):
api_key, _, _ = resolve_provider_routing(
"qianfan:ernie-4.5", {"QIANFAN_API_KEY": "sk-qf", "AISTUDIO_API_KEY": "sk-ai"}, registry=PROVIDER_REGISTRY
)
assert api_key == "sk-qf"
def test_qianfan_fallback_to_aistudio(self):
api_key, base_url, _ = resolve_provider_routing(
"qianfan:ernie-4.5", {"AISTUDIO_API_KEY": "sk-ai"}, registry=PROVIDER_REGISTRY
)
assert api_key == "sk-ai"
assert base_url == "https://qianfan.baidubce.com/v2"
def test_minimax_key_and_url(self):
api_key, base_url, model_id = resolve_provider_routing(
"minimax:MiniMax-M2.7", {"MINIMAX_API_KEY": "sk-mm"}, registry=PROVIDER_REGISTRY
)
assert api_key == "sk-mm"
assert base_url == "https://api.minimaxi.com/v1"
assert model_id == "MiniMax-M2.7"
def test_moonshot_key_and_url(self):
api_key, base_url, model_id = resolve_provider_routing(
"moonshot:kimi-k2.5", {"KIMI_API_KEY": "sk-kimi"}, registry=PROVIDER_REGISTRY
)
assert api_key == "sk-kimi"
assert base_url == "https://api.moonshot.ai/v1"
assert model_id == "kimi-k2.5"
def test_bare_model_id_defaults_to_openai(self):
api_key, base_url, model_id = resolve_provider_routing(
"gpt-4o", {"OPENAI_API_KEY": "sk-openai"}, registry=PROVIDER_REGISTRY
)
assert base_url == "https://api.openai.com/v1"
assert model_id == "gpt-4o"
def test_unknown_prefix_falls_back_to_openai_url(self):
api_key, base_url, model_id = resolve_provider_routing(
"custom-shim:my-model", {"OPENAI_API_KEY": "sk-openai"}, registry=PROVIDER_REGISTRY
)
assert base_url == "https://api.openai.com/v1"
assert model_id == "my-model"
# ---------------------------------------------------------------------------
# 1. Env-var key priority chain (3 keys currently in adapter.py)
# ---------------------------------------------------------------------------
class TestUrlOverridePrecedence:
class TestEnvKeyChain:
def test_env_base_url_beats_registry_default(self):
_, base_url, _ = resolve_provider_routing(
"minimax:MiniMax-M2.7",
{"MINIMAX_API_KEY": "sk-mm", "MINIMAX_BASE_URL": "https://api.minimax.chat/v1"},
registry=PROVIDER_REGISTRY,
)
assert base_url == "https://api.minimax.chat/v1"
def test_openai_key_selected(self):
with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-openai-test"}, clear=True):
assert _select_key(os.environ) == "sk-openai-test"
def test_runtime_config_provider_url_beats_registry_default(self):
_, base_url, _ = resolve_provider_routing(
"openai:gpt-4o",
{"OPENAI_API_KEY": "sk-openai"},
registry=PROVIDER_REGISTRY,
runtime_config={"provider_url": "https://proxy.example.com/v1"},
)
assert base_url == "https://proxy.example.com/v1"
def test_groq_key_selected_when_openai_absent(self):
with patch.dict(os.environ, {"GROQ_API_KEY": "sk-groq-test"}, clear=True):
assert _select_key(os.environ) == "sk-groq-test"
def test_openrouter_key_selected_when_openai_and_groq_absent(self):
with patch.dict(os.environ, {"OPENROUTER_API_KEY": "sk-or-test"}, clear=True):
assert _select_key(os.environ) == "sk-or-test"
def test_openai_beats_groq_when_both_set(self):
with patch.dict(os.environ, {"OPENAI_API_KEY": "openai", "GROQ_API_KEY": "groq"}, clear=True):
assert _select_key(os.environ) == "openai"
def test_groq_beats_openrouter_when_openai_absent(self):
with patch.dict(os.environ, {"GROQ_API_KEY": "groq", "OPENROUTER_API_KEY": "or"}, clear=True):
assert _select_key(os.environ) == "groq"
def test_env_base_url_beats_runtime_config(self):
_, base_url, _ = resolve_provider_routing(
"openai:gpt-4o",
{"OPENAI_API_KEY": "sk-openai", "OPENAI_BASE_URL": "https://env-wins.com/v1"},
registry=PROVIDER_REGISTRY,
runtime_config={"provider_url": "https://config-loses.com/v1"},
)
assert base_url == "https://env-wins.com/v1"
# ---------------------------------------------------------------------------
# 2. Model-prefix → provider URL routing
# ---------------------------------------------------------------------------
class TestMissingKey:
class TestProviderUrlMapping:
def test_raises_when_no_key_set(self):
with pytest.raises(RuntimeError, match="No API key found for provider 'minimax'"):
resolve_provider_routing("minimax:MiniMax-M2.7", {}, registry=PROVIDER_REGISTRY)
def test_openai_prefix_routes_to_openai(self):
assert _select_url("openai:gpt-4o") == "https://api.openai.com/v1"
def test_groq_prefix_routes_to_groq(self):
assert _select_url("groq:llama3-70b") == "https://api.groq.com/openai/v1"
def test_openrouter_prefix_routes_to_openrouter(self):
assert _select_url("openrouter:meta-llama/llama-3.3-70b") == "https://openrouter.ai/api/v1"
def test_runtime_config_override_wins_over_prefix(self):
url = _select_url("openai:gpt-4o", {"provider_url": "https://custom.example.com/v1"})
assert url == "https://custom.example.com/v1"
def test_unknown_prefix_falls_back_to_openai(self):
assert _select_url("some-unknown-model") == "https://api.openai.com/v1"
def test_raises_lists_checked_vars_in_message(self):
with pytest.raises(RuntimeError, match="MINIMAX_API_KEY"):
resolve_provider_routing("minimax:MiniMax-M2.7", {}, registry=PROVIDER_REGISTRY)
# ---------------------------------------------------------------------------
# 3. Negative / fallback cases
# ---------------------------------------------------------------------------
class TestRegistryCompleteness:
"""Smoke-check that every provider in the registry has a non-empty entry."""
class TestNegativeAndFallback:
def test_no_keys_returns_empty_string(self):
with patch.dict(os.environ, {}, clear=True):
assert _select_key(os.environ) == ""
def test_unsupported_aistudio_key_returns_empty(self):
"""Documents that AISTUDIO_API_KEY is NOT yet in the adapter's key chain."""
with patch.dict(os.environ, {"AISTUDIO_API_KEY": "sk-ai"}, clear=True):
assert _select_key(os.environ) == ""
def test_unsupported_qianfan_key_returns_empty(self):
"""Documents that QIANFAN_API_KEY is NOT yet in the adapter's key chain."""
with patch.dict(os.environ, {"QIANFAN_API_KEY": "sk-qf"}, clear=True):
assert _select_key(os.environ) == ""
# ---------------------------------------------------------------------------
# 4. AISTUDIO + QIANFAN — xfail stubs (not yet implemented in adapter.py)
# These fail now; they should be promoted to passing tests once the adapter
# adds AISTUDIO_API_KEY and QIANFAN_API_KEY to its key chain and provider_urls.
# ---------------------------------------------------------------------------
@pytest.mark.xfail(
strict=True,
reason=(
"AISTUDIO_API_KEY not yet in openclaw adapter env-var chain — "
"add to adapter.py line 84 and provider_urls dict with "
"URL https://generativelanguage.googleapis.com/v1beta/openai"
),
)
def test_aistudio_key_routes_to_aistudio_url():
with patch.dict(os.environ, {"AISTUDIO_API_KEY": "sk-ai-test"}, clear=True):
assert _select_key(os.environ) == "sk-ai-test"
assert _select_url("gemini-2.5-flash") == "https://generativelanguage.googleapis.com/v1beta/openai"
@pytest.mark.xfail(
strict=True,
reason=(
"QIANFAN_API_KEY not yet in openclaw adapter env-var chain — "
"add to adapter.py line 84 and provider_urls dict with "
"URL https://qianfan.baidubce.com/v2"
),
)
def test_qianfan_key_routes_to_qianfan_url():
with patch.dict(os.environ, {"QIANFAN_API_KEY": "sk-qf-test"}, clear=True):
assert _select_key(os.environ) == "sk-qf-test"
assert _select_url("ernie-4.5") == "https://qianfan.baidubce.com/v2"
@pytest.mark.parametrize("prefix", PROVIDER_REGISTRY)
def test_all_providers_have_key_vars_and_url(self, prefix):
env_vars, base_url = PROVIDER_REGISTRY[prefix]
assert env_vars, f"{prefix}: env_vars is empty"
assert base_url.startswith("https://"), f"{prefix}: base_url looks wrong: {base_url}"