Compare commits

..

3 Commits

Author SHA1 Message Date
fullstack-engineer 9edc0036a3 channels: add SendAdapter injection + handler test coverage for Test and Send
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 9s
Harness Replays / detect-changes (pull_request) Successful in 7s
CI / Detect changes (pull_request) Successful in 16s
E2E API Smoke Test / detect-changes (pull_request) Successful in 17s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 17s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 7s
qa-review / approved (pull_request) Successful in 8s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 16s
security-review / approved (pull_request) Successful in 7s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m8s
CI / Canvas (Next.js) (pull_request) Successful in 5s
Harness Replays / Harness Replays (pull_request) Successful in 4s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 4s
CI / Python Lint & Test (pull_request) Successful in 3s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3s
CI / Canvas Deploy Reminder (pull_request) Successful in 4s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m28s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 3m16s
CI / Platform (Go) (pull_request) Failing after 5m30s
CI / all-required (pull_request) Successful in 3s
gate-check-v3 / gate-check (pull_request) Successful in 16s
sop-tier-check / tier-check (pull_request) Successful in 14s
sop-checklist / na-declarations (pull_request) awaiting /sop-n/a declaration for: qa-review, security-review
sop-checklist / all-items-acked (pull_request) [info tier:low] acked: 5/7 — missing: root-cause, no-backwards-compat — body-unfilled: five-axis-review, no-backwards-compat, memory-consult
audit-force-merge / audit (pull_request) Successful in 18s
- Extract SendAdapter interface (SendMessage only) from ChannelAdapter so
  tests can inject a MockSendAdapter without hitting real Telegram/Slack APIs
- Make GetSendAdapter a package-level var (default: real adapters; tests
  override via SetGetSendAdapter from channels/testing.go)
- Wire GetSendAdapter into Manager.SendOutbound (was GetAdapter → ChannelAdapter)
- Add 4 handler tests in handlers/channels_test.go:
    TestChannelHandler_Test_Success         — full send-outbound success path
    TestChannelHandler_Test_ChannelNotFound — loadChannel error → 500
    TestChannelHandler_Send_Success         — budget pass → send → 200
    TestChannelHandler_Send_ChannelNotFound — loadChannel error → 500

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-14 09:30:11 +00:00
devops-engineer b25b4fb6ac Merge pull request '[core-devops-agent] chore: promote main→staging v5 (test panic fix)' (#972) from promote/main-to-staging-v5 into staging
Block internal-flavored paths / Block forbidden paths (push) Successful in 16s
CI / Detect changes (push) Successful in 47s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 20s
Runtime PR-Built Compatibility / detect-changes (push) Successful in 35s
Handlers Postgres Integration / detect-changes (push) Successful in 38s
CI / Platform (Go) (push) Successful in 13s
CI / Shellcheck (E2E scripts) (push) Successful in 8s
CI / Canvas (Next.js) (push) Successful in 10s
CI / Python Lint & Test (push) Successful in 8s
CI / Canvas Deploy Reminder (push) Successful in 4s
CI / all-required (push) Successful in 4s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (push) Successful in 2m21s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 3m45s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 22s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 31s
gate-check-v3 / gate-check (pull_request) Successful in 31s
qa-review / approved (pull_request) Successful in 35s
security-review / approved (pull_request) Successful in 33s
sop-checklist / na-declarations (pull_request) awaiting /sop-n/a declaration for: qa-review, security-review
sop-tier-check / tier-check (pull_request) Successful in 38s
sop-checklist / all-items-acked (pull_request) Successful in 43s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m42s
E2E API Smoke Test / detect-changes (pull_request) Successful in 1m51s
CI / Detect changes (pull_request) Successful in 1m54s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 1m49s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 1m54s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 12s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 12s
CI / Canvas (Next.js) (pull_request) Successful in 13s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 12s
CI / Python Lint & Test (pull_request) Successful in 13s
CI / Platform (Go) (pull_request) Successful in 24s
CI / Canvas Deploy Reminder (pull_request) Successful in 4s
CI / all-required (pull_request) Successful in 4s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 27s
audit-force-merge / audit (pull_request) Has been skipped
2026-05-14 05:37:47 +00:00
molecule-operator 956c2480d6 chore: promote main→staging v5 (test panic fix + t.Fatal improvements)
sop-checklist / na-declarations (pull_request) awaiting /sop-n/a declaration for: qa-review, security-review
CI / Detect changes (pull_request) Successful in 1m29s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 24s
E2E API Smoke Test / detect-changes (pull_request) Successful in 1m37s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 1m15s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 1m41s
qa-review / approved (pull_request) Successful in 26s
security-review / approved (pull_request) Successful in 25s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 1m39s
sop-tier-check / tier-check (pull_request) Successful in 24s
gate-check-v3 / gate-check (pull_request) Successful in 21s
audit-force-merge / audit (pull_request) Successful in 24s
sop-checklist / all-items-acked (pull_request) Successful in 18s
CI / Platform (Go) (pull_request) Successful in 9s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 6s
CI / Canvas (Next.js) (pull_request) Successful in 10s
CI / Python Lint & Test (pull_request) Successful in 7s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 9s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 10s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 10s
CI / Canvas Deploy Reminder (pull_request) Successful in 5s
CI / all-required (pull_request) Successful in 3s
Block internal-flavored paths / Block forbidden paths (pull_request) Failing after 13m18s
Resolve merge conflict in org_helpers_security_test.go:
- Keep staging t.TempDir() fix for TestResolveInsideRoot_DotDotWithIntermediate
  (a/b/../../c normalizes to c within root — test correctly expects success)
- t.Fatal vs t.Fatalf are equivalent; staging version retained
2026-05-14 05:34:35 +00:00
88 changed files with 1610 additions and 3750 deletions
-1
View File
@@ -1 +0,0 @@
refire:1778784369
+4 -11
View File
@@ -203,17 +203,12 @@ def ci_jobs_all(ci_doc: dict) -> set[str]:
def ci_job_names(ci_doc: dict) -> set[str]:
"""Set of job keys in ci.yml MINUS the sentinel itself MINUS jobs
whose `if:` gates on `github.event_name` or `github.ref` (those are
event-scoped and can legitimately be `skipped` for a given trigger;
if we required them under the sentinel `needs:`, every PR-only job
whose `if:` gates on `github.event_name` (those are event-scoped
and can legitimately be `skipped` for a given trigger; if we
required them under the sentinel `needs:`, every PR-only job
would be `skipped` on push and the sentinel would interpret
`skipped != success` as failure). RFC §4 spec.
`github.ref` is the companion gate for jobs that run only on direct
pushes to specific branches (e.g. `github.ref == 'refs/heads/main'`).
These never execute in a PR context, so flagging them as missing
from `all-required.needs:` is a false positive (mc#958 / mc#959).
Used for F1 (jobs missing from sentinel needs). NOT used for F1b
(typos in needs) — see `ci_jobs_all` for that."""
jobs = ci_doc.get("jobs")
@@ -226,9 +221,7 @@ def ci_job_names(ci_doc: dict) -> set[str]:
continue
if isinstance(v, dict):
gate = v.get("if")
if isinstance(gate, str) and (
"github.event_name" in gate or "github.ref" in gate
):
if isinstance(gate, str) and "github.event_name" in gate:
continue
names.add(k)
return names
+16 -85
View File
@@ -47,15 +47,6 @@ REQUIRED_CONTEXTS_RAW = _env(
"sop-checklist / all-items-acked (pull_request)"
),
)
# Required contexts for push (main/staging) runs. The push CI uses the same
# aggregator names with " (push)" suffix. Checking these explicitly instead of
# the combined state avoids false-pause when non-blocking jobs (e.g. Platform
# Go with continue-on-error: true due to mc#774) have failed — their failures
# pollute the combined state but do not block merges.
PUSH_REQUIRED_CONTEXTS_RAW = _env(
"PUSH_REQUIRED_CONTEXTS",
default="CI / all-required (push)",
)
OWNER, NAME = (REPO.split("/", 1) + [""])[:2] if REPO else ("", "")
API = f"https://{GITEA_HOST}/api/v1" if GITEA_HOST else ""
@@ -127,24 +118,16 @@ def required_contexts(raw: str) -> list[str]:
return [part.strip() for part in raw.split(",") if part.strip()]
def push_required_contexts() -> list[str]:
"""Required contexts for push (branch) CI runs. See PUSH_REQUIRED_CONTEXTS_RAW."""
return required_contexts(PUSH_REQUIRED_CONTEXTS_RAW)
def status_state(status: dict) -> str:
return str(status.get("status") or status.get("state") or "").lower()
def latest_statuses_by_context(statuses: list[dict]) -> dict[str, dict]:
# Gitea /statuses endpoint returns entries in ascending id order (oldest
# first). We need the LAST occurrence of each context, so iterate in
# reverse to prefer newer entries.
latest: dict[str, dict] = {}
for status in reversed(statuses):
for status in statuses:
context = status.get("context")
if isinstance(context, str):
latest[context] = status # overwrite: reverse order → newest wins
if isinstance(context, str) and context not in latest:
latest[context] = status
return latest
@@ -210,23 +193,16 @@ def evaluate_merge_readiness(
required_contexts: list[str],
pr_has_current_base: bool,
) -> MergeDecision:
# Check push-required contexts explicitly instead of combined state.
# Combined state can be "failure" due to non-blocking jobs
# (continue-on-error: true) that don't actually gate merges.
# CI / all-required (push) is the authoritative gate — it respects
# continue-on-error and correctly aggregates all blocking failures.
main_latest = latest_statuses_by_context(main_status.get("statuses") or [])
main_ok, main_bad = required_contexts_green(main_latest, push_required_contexts())
if not main_ok:
return MergeDecision(False, "pause", "main required contexts not green: " + ", ".join(main_bad))
main_state = str(main_status.get("state") or "").lower()
if main_state != "success":
return MergeDecision(False, "pause", f"main status is {main_state or 'missing'}")
if not pr_has_current_base:
return MergeDecision(False, "update", "PR head does not contain current main")
# Check explicit required contexts instead of combined state. Combined state
# can be "failure" due to non-blocking jobs with continue-on-error: true
# (e.g. publish-runtime-autobump/pr-validate, qa-review on stale tokens).
# The required_contexts list is the authoritative gate — it includes only
# the checks that actually block merges.
pr_state = str(pr_status.get("state") or "").lower()
if pr_state != "success":
return MergeDecision(False, "wait", f"PR combined status is {pr_state or 'missing'}")
latest = latest_statuses_by_context(pr_status.get("statuses") or [])
ok, missing_or_bad = required_contexts_green(latest, required_contexts)
if not ok:
@@ -244,37 +220,10 @@ def get_branch_head(branch: str) -> str:
def get_combined_status(sha: str) -> dict:
"""Combined status + all individual statuses for `sha`.
The /status endpoint caps the `statuses` array at 30 entries (Gitea
default page size), so we fetch the full list via /statuses with a
higher limit. The combined `state` still comes from /status.
"""
_, combined = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status")
if not isinstance(combined, dict):
_, body = api("GET", f"/repos/{OWNER}/{NAME}/commits/{sha}/status")
if not isinstance(body, dict):
raise ApiError(f"status for {sha} response not object")
# Fetch full statuses list; 200 covers >99% of real-world runs.
# The list is ordered ascending by id (oldest first) — callers must
# iterate in reverse to get the newest entry per context.
# Best-effort: large repos (main with 550+ statuses) may time out.
# On timeout, fall back to the statuses[] already in the combined
# response (usually 30 entries — enough for most PRs, enough for
# main's early push-required contexts).
try:
_, all_statuses = api(
"GET",
f"/repos/{OWNER}/{NAME}/commits/{sha}/statuses",
query={"limit": "50"},
)
if isinstance(all_statuses, list):
combined["statuses"] = all_statuses
except (ApiError, urllib.error.URLError, TimeoutError, OSError) as exc:
# URLError covers network-level failures (DNS, refused, timeout).
# TimeoutError and OSError cover socket-level timeouts.
sys.stderr.write(f"::warning::could not fetch full statuses list for {sha[:8]}: {exc}\n")
# Fall back to the statuses[] already in the combined response.
pass
return combined
return body
def list_queued_issues() -> list[dict]:
@@ -345,12 +294,8 @@ def process_once(*, dry_run: bool = False) -> int:
contexts = required_contexts(REQUIRED_CONTEXTS_RAW)
main_sha = get_branch_head(WATCH_BRANCH)
main_status = get_combined_status(main_sha)
# Check push-required contexts explicitly instead of combined state.
# See evaluate_merge_readiness for rationale.
main_latest = latest_statuses_by_context(main_status.get("statuses") or [])
main_ok, main_bad = required_contexts_green(main_latest, push_required_contexts())
if not main_ok:
print(f"::notice::queue paused: {WATCH_BRANCH}@{main_sha[:8]} required contexts not green: {', '.join(main_bad)}")
if str(main_status.get("state") or "").lower() != "success":
print(f"::notice::queue paused: {WATCH_BRANCH}@{main_sha[:8]} is not green")
return 0
issue = choose_next_queued_issue(
@@ -417,21 +362,7 @@ def main() -> int:
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
_require_runtime_env()
try:
return process_once(dry_run=args.dry_run)
except ApiError as exc:
# API errors (401/403/404/500) are transient for a queue tick —
# log and exit 0 so the workflow is not marked failed and the next
# tick can retry. Returning non-zero would permanently fail the
# workflow run, blocking future ticks.
sys.stderr.write(f"::error::queue API error: {exc}\n")
return 0
except urllib.error.URLError as exc:
sys.stderr.write(f"::error::queue network error: {exc}\n")
return 0
except TimeoutError as exc:
sys.stderr.write(f"::error::queue timeout: {exc}\n")
return 0
return process_once(dry_run=args.dry_run)
if __name__ == "__main__":
@@ -85,10 +85,7 @@ def test_pr_needs_update_when_base_sha_absent_from_commits():
def test_merge_decision_requires_main_green_pr_green_and_current_base():
required = ["CI / all-required (pull_request)"]
main_status = {
"state": "success",
"statuses": [{"context": "CI / all-required (push)", "status": "success"}],
}
main_status = {"state": "success", "statuses": []}
pr_status = {
"state": "success",
"statuses": [{"context": "CI / all-required (pull_request)", "status": "success"}],
@@ -107,10 +104,7 @@ def test_merge_decision_requires_main_green_pr_green_and_current_base():
def test_merge_decision_updates_stale_pr_before_merge():
decision = mq.evaluate_merge_readiness(
main_status={
"state": "success",
"statuses": [{"context": "CI / all-required (push)", "status": "success"}],
},
main_status={"state": "success", "statuses": []},
pr_status={"state": "success", "statuses": [{"context": "CI / all-required (pull_request)", "status": "success"}]},
required_contexts=["CI / all-required (pull_request)"],
pr_has_current_base=False,
+119 -141
View File
@@ -133,6 +133,7 @@ jobs:
# the name match works on PRs that don't touch workspace-server/).
platform-build:
name: Platform (Go)
needs: changes
runs-on: ubuntu-latest
# mc#774 (closed 2026-05-14): Phase 4 flip of the platform-build job.
# Phase 4 (#656) originally flipped this to continue-on-error: false based on
@@ -145,37 +146,33 @@ jobs:
# the diagnostic step with its own continue-on-error: true (line 203).
# Flip confirmed by CI / Platform (Go) status = success on main HEAD 363905d3.
continue-on-error: false
# Job-level ceiling. The go test step below runs with a per-step 10m timeout;
# this cap catches any step that leaks past that. Set well above 10m so
# the per-step timeout is the active constraint.
timeout-minutes: 15
defaults:
run:
working-directory: workspace-server
steps:
- if: false
- if: needs.changes.outputs.platform != 'true'
working-directory: .
run: echo "No platform/** changes — skipping real build steps; this job always runs to satisfy the required-check name on branch protection."
- if: always()
- if: needs.changes.outputs.platform == 'true'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- if: always()
- if: needs.changes.outputs.platform == 'true'
uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
with:
go-version: 'stable'
- if: always()
- if: needs.changes.outputs.platform == 'true'
run: go mod download
- if: always()
- if: needs.changes.outputs.platform == 'true'
run: go build ./cmd/server
# CLI (molecli) moved to standalone repo: git.moleculesai.app/molecule-ai/molecule-cli
- if: always()
- if: needs.changes.outputs.platform == 'true'
run: go vet ./...
- if: always()
- if: needs.changes.outputs.platform == 'true'
name: Install golangci-lint
run: go install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@v2.12.2
- if: always()
- if: needs.changes.outputs.platform == 'true'
name: Run golangci-lint
run: $(go env GOPATH)/bin/golangci-lint run --timeout 3m ./...
- if: always()
- if: needs.changes.outputs.platform == 'true'
name: Diagnostic — per-package verbose 60s
run: |
set +e
@@ -191,15 +188,11 @@ jobs:
echo "::endgroup::"
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
- if: always()
- if: needs.changes.outputs.platform == 'true'
name: Run tests with race detection and coverage
# Explicit timeout: cold runner cache causes OOM kills at ~4m39s on the
# full ./... suite with race detection + coverage. A 10m per-step timeout
# lets the suite complete on cold cache (~5-7m) while failing cleanly
# instead of OOM-killing. The job-level timeout (15m) is a backstop.
run: go test -race -timeout 10m -coverprofile=coverage.out ./...
run: go test -race -coverprofile=coverage.out ./...
- if: always()
- if: needs.changes.outputs.platform == 'true'
name: Per-file coverage report
# Advisory — lists every source file with its coverage so reviewers
# can see at-a-glance where gaps are. Sorted ascending so the worst
@@ -213,7 +206,7 @@ jobs:
END {for (f in s) printf "%6.1f%% %s\n", s[f]/c[f], f}' \
| sort -n
- if: always()
- if: needs.changes.outputs.platform == 'true'
name: Check coverage thresholds
# Enforces two gates from #1823 Layer 1:
# 1. Total floor (25% — ratchet plan in COVERAGE_FLOOR.md).
@@ -301,28 +294,28 @@ jobs:
# siblings — verified empirically on PR #2314).
canvas-build:
name: Canvas (Next.js)
needs: changes
runs-on: ubuntu-latest
timeout-minutes: 20
# Phase 4 (RFC #219 §1): confirmed green on main 2026-05-12.
continue-on-error: false
defaults:
run:
working-directory: canvas
steps:
- if: false
- if: needs.changes.outputs.canvas != 'true'
working-directory: .
run: echo "No canvas/** changes — skipping real build steps; this job always runs to satisfy the required-check name on branch protection."
- if: always()
- if: needs.changes.outputs.canvas == 'true'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- if: always()
- if: needs.changes.outputs.canvas == 'true'
uses: actions/setup-node@48b55a011bda9f5d6aeb4c2d9c7362e8dae4041e # v6.4.0
with:
node-version: '22'
- if: always()
- if: needs.changes.outputs.canvas == 'true'
run: rm -f package-lock.json && npm install
- if: always()
- if: needs.changes.outputs.canvas == 'true'
run: npm run build
- if: always()
- if: needs.changes.outputs.canvas == 'true'
name: Run tests with coverage
# Coverage instrumentation is configured in canvas/vitest.config.ts
# (provider: v8, reporters: text + html + json-summary). Step 2 of
@@ -331,7 +324,7 @@ jobs:
# tracked in #1815) after the team sees what current coverage is.
run: npx vitest run --coverage
- name: Upload coverage summary as artifact
if: always()
if: needs.changes.outputs.canvas == 'true' && always()
# Pinned to v3 for Gitea act_runner v0.6 compatibility — v4+ uses
# the GHES 3.10+ artifact protocol that Gitea 1.22.x does NOT
# implement, surfacing as `GHESNotSupportedError: @actions/artifact
@@ -348,15 +341,16 @@ jobs:
# Shellcheck (E2E scripts) — required check, always runs.
shellcheck:
name: Shellcheck (E2E scripts)
needs: changes
runs-on: ubuntu-latest
# Phase 4 (RFC #219 §1): confirmed green on main 2026-05-12.
continue-on-error: false
steps:
- if: false
- if: needs.changes.outputs.scripts != 'true'
run: echo "No tests/e2e/ or infra/scripts/ changes — skipping real shellcheck; this job always runs to satisfy the required-check name on branch protection."
- if: always()
- if: needs.changes.outputs.scripts == 'true'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- if: always()
- if: needs.changes.outputs.scripts == 'true'
name: Run shellcheck on tests/e2e/*.sh and infra/scripts/*.sh
# shellcheck is pre-installed on ubuntu-latest runners (via apt).
# infra/scripts/ is included because setup.sh + nuke.sh gate the
@@ -367,16 +361,16 @@ jobs:
find tests/e2e infra/scripts -type f -name '*.sh' -print0 \
| xargs -0 shellcheck --severity=warning
- if: always()
- if: needs.changes.outputs.scripts == 'true'
name: Lint cleanup-trap hygiene (RFC #2873)
run: bash tests/e2e/lint_cleanup_traps.sh
- if: always()
- if: needs.changes.outputs.scripts == 'true'
name: Run E2E bash unit tests (no live infra)
run: |
bash tests/e2e/test_model_slug.sh
- if: always()
- if: needs.changes.outputs.scripts == 'true'
name: Test ECR promote-tenant-image script (mock-driven, no live infra)
# Covers scripts/promote-tenant-image.sh — the codified
# :staging-latest → :latest ECR promote + tenant fleet redeploy
@@ -386,7 +380,7 @@ jobs:
run: |
bash scripts/test-promote-tenant-image.sh
- if: always()
- if: needs.changes.outputs.scripts == 'true'
name: Shellcheck promote-tenant-image script
# scripts/ is excluded from the bulk shellcheck pass above (legacy
# SC3040/SC3043 cleanup pending). Run shellcheck explicitly on
@@ -400,15 +394,17 @@ jobs:
canvas-deploy-reminder:
name: Canvas Deploy Reminder
runs-on: ubuntu-latest
# This job must run on PRs because all-required needs it. The step exits
# 0 when it is not a main push, giving branch protection a green no-op
# instead of a skipped/missing required dependency.
needs: canvas-build
# mc#774: pre-existing continue-on-error mask; root-fix and remove, do not renew silently.
continue-on-error: true
needs: [changes, canvas-build]
# Keep the job itself always runnable. Gitea 1.22.6 leaves job-level
# event/ref `if:` gates as pending on PRs, which blocks the combined
# status even though this reminder is intentionally non-required.
steps:
- name: Write deploy reminder to step summary
env:
COMMIT_SHA: ${{ github.sha }}
CANVAS_CHANGED: "true"
CANVAS_CHANGED: ${{ needs.changes.outputs.canvas }}
EVENT_NAME: ${{ github.event_name }}
REF_NAME: ${{ github.ref }}
# github.server_url resolves via the workflow-level env override
@@ -453,6 +449,7 @@ jobs:
# Python Lint & Test — required check, always runs.
python-lint:
name: Python Lint & Test
needs: changes
runs-on: ubuntu-latest
# Phase 4 (RFC #219 §1): confirmed green on main 2026-05-12.
continue-on-error: false
@@ -462,25 +459,25 @@ jobs:
run:
working-directory: workspace
steps:
- if: false
- if: needs.changes.outputs.python != 'true'
working-directory: .
run: echo "No workspace/** changes — skipping real lint+test; this job always runs to satisfy the required-check name on branch protection."
- if: always()
- if: needs.changes.outputs.python == 'true'
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- if: always()
- if: needs.changes.outputs.python == 'true'
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:
python-version: '3.11'
cache: pip
cache-dependency-path: workspace/requirements.txt
- if: always()
- if: needs.changes.outputs.python == 'true'
run: pip install -r requirements.txt pytest pytest-asyncio pytest-cov sqlalchemy>=2.0.0
# Coverage flags + fail-under floor moved into workspace/pytest.ini
# (issue #1817) so local `pytest` and CI use identical config.
- if: always()
- if: needs.changes.outputs.python == 'true'
run: python -m pytest --tb=short
- if: always()
- if: needs.changes.outputs.python == 'true'
name: Per-file critical-path coverage (MCP / inbox / auth)
# MCP-critical Python files have a per-file floor on top of the
# 86% total floor in pytest.ini. See issue #2790 for full rationale.
@@ -545,104 +542,85 @@ jobs:
# red silently merged through. See internal#286 for the three concrete
# tonight-of-2026-05-11 incidents that prompted the emergency bump.
#
# This job deliberately has no `needs:`. Gitea 1.22/act_runner can mark a
# job-level `if: always()` + `needs:` sentinel as skipped before upstream
# jobs settle, leaving branch protection with a permanent pending
# `CI / all-required` context. Instead, this independent sentinel polls the
# required commit-status contexts for this SHA and fails if any fail, skip,
# or never emit.
# Three properties of this job each close a failure mode:
#
# canvas-deploy-reminder is intentionally NOT included in all-required.needs.
# It is an informational main-push reminder, not a PR quality gate. Keeping
# it in this dependency list lets a skipped reminder skip the required
# sentinel before the `always()` guard can emit a branch-protection status.
# 1. `if: always()` — runs even when an upstream fails. Without it the
# sentinel is `skipped` and protection treats that as missing → merge
# ungated.
#
# 2. Assertion is `result == "success"` per dep, NOT `!= "failure"`.
# A `skipped` upstream (job gated by `if:` evaluating false, matrix
# entry that couldn't run) must NOT silently pass through.
# `skipped`-as-green is exactly the failure mode this gate closes.
#
# 3. `needs:` is the canonical list of "what counts as required."
# status_check_contexts will reference only `ci/all-required` (Step 5
# follow-up — branch-protection PATCH is Owners-tier per
# `feedback_never_admin_merge_bypass`, separate PR); a new job is
# added simply by listing it in `needs:` here.
# `.gitea/workflows/ci-required-drift.yml` files a [ci-drift] issue
# hourly if this list diverges from status_check_contexts or from
# audit-force-merge.yml's REQUIRED_CHECKS env (RFC §4 + §6).
#
# canvas-deploy-reminder is intentionally excluded from all-required.needs:
# it needs canvas-build, which is skipped on CI-only PRs (canvas=false).
# Including it in all-required.needs causes all-required to hang on
# every CI-only PR. Keep it runnable on PRs via its own
# `needs: [changes, canvas-build]` — the sentinel only aggregates the result.
#
# Phase 3 (RFC #219 §1) safety: underlying build jobs carry
# continue-on-error: true so their failures are masked to null (2026-05-12: re-enabled mc#774 interim)
# (Gitea suppresses status reporting for CoE jobs). This sentinel
# runs with continue-on-error: false so it always reports its
# result to the API — without this, the required-status entry
# (CI / all-required (pull_request)) is never created, which
# blocks PR merges. When Phase 3 ends, flip underlying jobs to
# continue-on-error: false; this sentinel can then be flipped to
# continue-on-error: true if a Phase-4 regression requires it.
continue-on-error: false
runs-on: ubuntu-latest
timeout-minutes: 45
timeout-minutes: 1
needs:
- changes
- platform-build
- canvas-build
- shellcheck
- python-lint
if: ${{ always() }}
steps:
- name: Wait for required CI contexts
env:
GITEA_TOKEN: ${{ secrets.GITHUB_TOKEN }}
API_ROOT: ${{ github.server_url }}/api/v1
REPOSITORY: ${{ github.repository }}
COMMIT_SHA: ${{ github.sha }}
EVENT_NAME: ${{ github.event_name }}
- name: Assert every required dependency succeeded
run: |
set -euo pipefail
python3 - <<'PY'
import json
import os
import sys
import time
import urllib.error
import urllib.request
token = os.environ["GITEA_TOKEN"]
api_root = os.environ["API_ROOT"].rstrip("/")
repo = os.environ["REPOSITORY"]
sha = os.environ["COMMIT_SHA"]
event = os.environ["EVENT_NAME"]
required = [
f"CI / Detect changes ({event})",
f"CI / Platform (Go) ({event})",
f"CI / Canvas (Next.js) ({event})",
f"CI / Shellcheck (E2E scripts) ({event})",
f"CI / Python Lint & Test ({event})",
]
terminal_bad = {"failure", "error"}
deadline = time.time() + 40 * 60
last_summary = None
def fetch_statuses():
statuses = []
for page in range(1, 6):
url = f"{api_root}/repos/{repo}/commits/{sha}/statuses?page={page}&limit=100"
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
with urllib.request.urlopen(req, timeout=10) as resp:
chunk = json.load(resp)
if not chunk:
break
statuses.extend(chunk)
latest = {}
for item in statuses:
ctx = item.get("context")
if not ctx:
continue
prev = latest.get(ctx)
if prev is None or (item.get("updated_at") or item.get("created_at") or "") >= (prev.get("updated_at") or prev.get("created_at") or ""):
latest[ctx] = item
return latest
while True:
try:
latest = fetch_statuses()
except (TimeoutError, OSError, urllib.error.URLError) as exc:
if time.time() >= deadline:
print(f"FAIL: status polling did not recover before deadline: {exc}", file=sys.stderr)
sys.exit(1)
print(f"WARN: status poll failed, retrying: {exc}", flush=True)
time.sleep(15)
continue
states = {ctx: (latest.get(ctx) or {}).get("status") or (latest.get(ctx) or {}).get("state") or "missing" for ctx in required}
summary = ", ".join(f"{ctx}={state}" for ctx, state in states.items())
if summary != last_summary:
print(summary, flush=True)
last_summary = summary
bad = {ctx: state for ctx, state in states.items() if state in terminal_bad}
if bad:
print("FAIL: required CI context failed:", file=sys.stderr)
for ctx, state in bad.items():
desc = (latest.get(ctx) or {}).get("description") or ""
print(f" - {ctx}: {state} {desc}", file=sys.stderr)
sys.exit(1)
if all(state == "success" for state in states.values()):
print(f"OK: all {len(required)} required CI contexts succeeded")
sys.exit(0)
if time.time() >= deadline:
print("FAIL: timed out waiting for required CI contexts:", file=sys.stderr)
for ctx, state in states.items():
print(f" - {ctx}: {state}", file=sys.stderr)
sys.exit(1)
time.sleep(15)
PY
# `needs.*.result` is one of: success | failure | cancelled | skipped | null.
# We assert success per dep (not != failure) — see RFC §2 reasoning above.
# Null results are skipped: they come from Phase 3 (continue-on-error: true
# suppresses status) or from jobs still in-flight. The sentinel succeeds
# rather than blocking PRs on Phase 3 noise.
results='${{ toJSON(needs) }}'
echo "$results"
echo "$results" | python3 -c '
import json, sys
ns = json.load(sys.stdin)
# Phase 3 masked: jobs with continue-on-error: true may report "failure"
# Remove when mc#774 handler test failures are resolved.
PHASE3_MASKED = {"platform-build"}
# Exclude null (Phase 3 suppressed / in-flight) from the bad list.
bad = [(k, v.get("result")) for k, v in ns.items()
if v.get("result") not in ("success", None, "cancelled", "skipped") and k not in PHASE3_MASKED]
if bad:
print(f"FAIL: jobs not green:", file=sys.stderr)
for k, r in bad:
print(f" - {k}: {r}", file=sys.stderr)
sys.exit(1)
pending = [(k, v.get("result")) for k, v in ns.items()
if v.get("result") is None]
cancelled = [(k, v.get("result")) for k, v in ns.items()
if v.get("result") == "cancelled"]
if pending:
print(f"WARN: {len(pending)} job(s) still in-flight (result=null): " +
", ".join(k for k, _ in pending), file=sys.stderr)
if cancelled:
print(f"INFO: {len(cancelled)} job(s) masked by continue-on-error: " +
", ".join(k for k, _ in cancelled), file=sys.stderr)
print(f"OK: all {len(ns)} required jobs succeeded (or Phase-3 suppressed)")
'
-37
View File
@@ -69,13 +69,6 @@ name: E2E API Smoke Test
# 2318) shows Postgres ready in 3s, Redis in 1s, Platform in 1s when
# they DO come up. Timeouts are not the bottleneck; not bumped.
#
# Item #1046 (fixed 2026-05-14): Stale platform-server from cancelled runs
# lingers on :8080 after "Stop platform" step is skipped (workflow cancelled
# before reaching line 335). Added a pre-start "Kill stale platform-server"
# step (line 286) that scans /proc for zombie platform-server processes
# and kills them before the port probe or bind. Makes the ephemeral port
# probe + start sequence deterministic.
#
# Item explicitly NOT fixed here: failing test `Status back online`
# fails because the platform's langgraph workspace template image
# (ghcr.io/molecule-ai/workspace-template-langgraph:latest) returns
@@ -290,35 +283,6 @@ jobs:
echo "PORT=${PLATFORM_PORT}" >> "$GITHUB_ENV"
echo "BASE=http://127.0.0.1:${PLATFORM_PORT}" >> "$GITHUB_ENV"
echo "Platform host port: ${PLATFORM_PORT}"
- name: Kill stale platform-server before start (issue #1046)
if: needs.detect-changes.outputs.api == 'true'
run: |
# Concurrent runs on the same host-network act_runner can leave a
# zombie platform-server from a cancelled/timeout run. Cancelled
# runs never reach the "Stop platform" step (line 335), so the
# old process lingers. Kill it before the ephemeral port probe
# or start so the port is definitively free.
#
# /proc scan — works on any Linux without pkill/lsof/ss.
# comm field is truncated to 15 chars: "platform-serve" matches
# "platform-server". Verify with cmdline to avoid false positives.
killed=0
for pid in $(grep -l "platform-serve" /proc/[0-9]*/comm 2>/dev/null); do
kpid="${pid%/comm}"
kpid="${kpid##*/}"
cmdline=$(cat "/proc/${kpid}/cmdline" 2>/dev/null | tr '\0' ' ')
if echo "$cmdline" | grep -q "platform-server"; then
echo "Killing stale platform-server pid ${kpid}: ${cmdline}"
kill "$kpid" 2>/dev/null || true
killed=$((killed + 1))
fi
done
if [ "$killed" -gt 0 ]; then
sleep 2
echo "Killed $killed stale process(es); port(s) released."
else
echo "No stale platform-server found."
fi
- name: Start platform (background)
if: needs.detect-changes.outputs.api == 'true'
working-directory: workspace-server
@@ -382,4 +346,3 @@ jobs:
run: |
docker rm -f "$PG_CONTAINER" 2>/dev/null || true
docker rm -f "$REDIS_CONTAINER" 2>/dev/null || true
-5
View File
@@ -48,9 +48,4 @@ jobs:
REQUIRED_CONTEXTS: >-
CI / all-required (pull_request),
sop-checklist / all-items-acked (pull_request)
# Push-side required contexts. Checking CI / all-required (push)
# explicitly instead of the combined state avoids false-pause when
# non-blocking jobs (continue-on-error: true) have failed — those
# failures pollute combined state but do not gate merges.
PUSH_REQUIRED_CONTEXTS: CI / all-required (push)
run: python3 .gitea/scripts/gitea-merge-queue.py
+1 -1
View File
@@ -1 +1 @@
staging trigger 2026-05-14T17:35:02Z
staging trigger
-1
View File
@@ -1 +0,0 @@
trigger
+2 -11
View File
@@ -65,18 +65,9 @@ export function ThemeToggle({ className = "" }: { className?: string }) {
// Use direct-child query to scope strictly to this radiogroup's buttons
// and avoid accidentally focusing unrelated [role=radio] elements
// elsewhere in the DOM (e.g. React Flow canvas nodes).
// Guard: skip focus if the current target is no longer in the document
// (e.g. React StrictMode double-invokes handlers during re-render).
if (!e.currentTarget.isConnected) return;
const radiogroup = e.currentTarget.closest("[role=radiogroup]") as HTMLElement | null;
if (!radiogroup) return;
// Use children[] instead of querySelectorAll("> [role=radio]") to avoid
// jsdom's child-combinator selector parsing issues in test environments.
const btns = Array.from(radiogroup.children).filter(
(el): el is HTMLButtonElement =>
el.tagName === "BUTTON" && el.getAttribute("role") === "radio"
);
if (next < btns.length) btns[next]?.focus();
const btns = radiogroup?.querySelectorAll<HTMLButtonElement>("> [role=radio]");
btns?.[next]?.focus();
},
[]
);
@@ -24,12 +24,8 @@ vi.mock("@/lib/theme-provider", () => ({
})),
}));
// Wrap cleanup in act() so any pending React state updates (e.g. from
// keyDown handlers that call setTheme) flush before DOM unmount. Without
// this, cleanup() can race against pending renders and cause INDEX_SIZE_ERR
// when the handleKeyDown callback tries to query the DOM mid-teardown.
afterEach(() => {
act(() => { cleanup(); });
cleanup();
vi.clearAllMocks();
});
@@ -150,7 +146,7 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
const radios = screen.getAllByRole("radio");
// dark (index 2) is current; ArrowRight should wrap to light (index 0)
act(() => { radios[2].focus(); });
act(() => { fireEvent.keyDown(radios[2], { key: "ArrowRight" }); });
fireEvent.keyDown(radios[2], { key: "ArrowRight" });
expect(mockSetTheme).toHaveBeenCalledWith("light");
});
@@ -164,7 +160,7 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
const radios = screen.getAllByRole("radio");
// light (index 0) is current; ArrowLeft should go to dark (index 2)
act(() => { radios[0].focus(); });
act(() => { fireEvent.keyDown(radios[0], { key: "ArrowLeft" }); });
fireEvent.keyDown(radios[0], { key: "ArrowLeft" });
expect(mockSetTheme).toHaveBeenCalledWith("dark");
});
@@ -178,7 +174,7 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
const radios = screen.getAllByRole("radio");
// light (index 0) is current; ArrowDown should go to system (index 1)
act(() => { radios[0].focus(); });
act(() => { fireEvent.keyDown(radios[0], { key: "ArrowDown" }); });
fireEvent.keyDown(radios[0], { key: "ArrowDown" });
expect(mockSetTheme).toHaveBeenCalledWith("system");
});
@@ -191,7 +187,7 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
render(<ThemeToggle />);
const radios = screen.getAllByRole("radio");
act(() => { radios[2].focus(); });
act(() => { fireEvent.keyDown(radios[2], { key: "Home" }); });
fireEvent.keyDown(radios[2], { key: "Home" });
expect(mockSetTheme).toHaveBeenCalledWith("light");
});
@@ -204,14 +200,14 @@ describe("ThemeToggle — keyboard navigation (WCAG 2.1.1 / ARIA radiogroup)", (
render(<ThemeToggle />);
const radios = screen.getAllByRole("radio");
act(() => { radios[0].focus(); });
act(() => { fireEvent.keyDown(radios[0], { key: "End" }); });
fireEvent.keyDown(radios[0], { key: "End" });
expect(mockSetTheme).toHaveBeenCalledWith("dark");
});
it("does nothing on unrelated keys", () => {
render(<ThemeToggle />);
const radios = screen.getAllByRole("radio");
act(() => { fireEvent.keyDown(radios[0], { key: "Enter" }); });
fireEvent.keyDown(radios[0], { key: "Enter" });
expect(mockSetTheme).not.toHaveBeenCalled();
});
});
+95 -322
View File
@@ -6,21 +6,21 @@
// attachments, no A2A topology overlay, no conversation tracing.
import { useEffect, useRef, useState } from "react";
import ReactMarkdown from "react-markdown";
import remarkGfm from "remark-gfm";
import { api } from "@/lib/api";
import { useCanvasStore } from "@/store/canvas";
import { type ChatAttachment, type ChatMessage, createMessage } from "@/components/tabs/chat/types";
import {
useChatHistory,
useChatSend,
useChatSocket,
} from "@/components/tabs/chat/hooks";
import { toMobileAgent } from "./components";
import { MOBILE_FONT_MONO, MOBILE_FONT_SANS, usePalette } from "./palette";
import { Icons, StatusDot, TierChip } from "./primitives";
interface ChatMessage {
id: string;
role: "user" | "agent" | "system";
text: string;
ts: string;
}
const formatStoredTimestamp = (iso: string): string => {
const d = new Date(iso);
if (isNaN(d.getTime())) return "";
@@ -29,171 +29,16 @@ const formatStoredTimestamp = (iso: string): string => {
type SubTab = "my" | "a2a";
function MarkdownBubble({
children,
dark,
accent,
}: {
children: string;
dark: boolean;
accent: string;
}) {
const codeBg = dark ? "rgba(255,255,255,0.08)" : "rgba(0,0,0,0.06)";
const codeBlockBg = dark ? "#1a1a1a" : "#f5f5f0";
const linkColor = accent;
const quoteBorder = dark ? "rgba(255,250,240,0.15)" : "rgba(40,30,20,0.15)";
return (
<ReactMarkdown
remarkPlugins={[remarkGfm]}
components={{
p: ({ children }) => (
<div style={{ margin: "2px 0", lineHeight: "inherit" }}>{children}</div>
),
a: ({ href, children }) => (
<a
href={href}
target="_blank"
rel="noopener noreferrer"
style={{ color: linkColor, textDecoration: "underline" }}
>
{children}
</a>
),
pre: ({ children }) => (
<pre
style={{
background: codeBlockBg,
padding: "8px 10px",
borderRadius: 8,
overflow: "auto",
fontSize: 12,
lineHeight: 1.5,
fontFamily: MOBILE_FONT_MONO,
margin: "4px 0",
}}
>
{children}
</pre>
),
code: ({ children, className }) => {
const isBlock = className != null && String(className).length > 0;
if (isBlock) {
return (
<code style={{ fontFamily: MOBILE_FONT_MONO, fontSize: 12 }}>
{children}
</code>
);
}
return (
<code
style={{
background: codeBg,
padding: "1px 4px",
borderRadius: 4,
fontSize: 13,
fontFamily: MOBILE_FONT_MONO,
}}
>
{children}
</code>
);
},
ul: ({ children }) => (
<ul style={{ margin: "4px 0", paddingLeft: 18, listStyle: "disc" }}>
{children}
</ul>
),
ol: ({ children }) => (
<ol style={{ margin: "4px 0", paddingLeft: 18, listStyle: "decimal" }}>
{children}
</ol>
),
li: ({ children }) => <li style={{ margin: "2px 0" }}>{children}</li>,
strong: ({ children }) => (
<strong style={{ fontWeight: 600 }}>{children}</strong>
),
em: ({ children }) => <em style={{ fontStyle: "italic" }}>{children}</em>,
h1: ({ children }) => (
<div style={{ fontSize: 16, fontWeight: 700, margin: "4px 0" }}>{children}</div>
),
h2: ({ children }) => (
<div style={{ fontSize: 15, fontWeight: 700, margin: "4px 0" }}>{children}</div>
),
h3: ({ children }) => (
<div style={{ fontSize: 14, fontWeight: 700, margin: "4px 0" }}>{children}</div>
),
h4: ({ children }) => (
<div style={{ fontSize: 14, fontWeight: 600, margin: "4px 0" }}>{children}</div>
),
h5: ({ children }) => (
<div style={{ fontSize: 13, fontWeight: 600, margin: "4px 0" }}>{children}</div>
),
h6: ({ children }) => (
<div style={{ fontSize: 13, fontWeight: 600, margin: "4px 0" }}>{children}</div>
),
blockquote: ({ children }) => (
<blockquote
style={{
borderLeft: `2px solid ${quoteBorder}`,
margin: "4px 0",
paddingLeft: 8,
opacity: 0.85,
}}
>
{children}
</blockquote>
),
hr: () => (
<hr
style={{
border: "none",
borderTop: `0.5px solid ${quoteBorder}`,
margin: "6px 0",
}}
/>
),
table: ({ children }) => (
<table
style={{
borderCollapse: "collapse",
fontSize: 13,
margin: "4px 0",
width: "100%",
}}
>
{children}
</table>
),
thead: ({ children }) => <thead style={{ fontWeight: 600 }}>{children}</thead>,
th: ({ children }) => (
<th
style={{
border: `0.5px solid ${quoteBorder}`,
padding: "4px 6px",
textAlign: "left",
}}
>
{children}
</th>
),
td: ({ children }) => (
<td
style={{
border: `0.5px solid ${quoteBorder}`,
padding: "4px 6px",
}}
>
{children}
</td>
),
}}
>
{children}
</ReactMarkdown>
);
interface A2AResponseShape {
result?: {
parts?: Array<{ kind?: string; text?: string }>;
};
error?: { message?: string };
}
const formatTime = (date: Date) =>
date.toLocaleTimeString([], { hour: "numeric", minute: "2-digit" });
export function MobileChat({
agentId,
dark,
@@ -205,37 +50,33 @@ export function MobileChat({
}) {
const p = usePalette(dark);
const node = useCanvasStore((s) => s.nodes.find((n) => n.id === agentId));
// Bootstrap from the canvas store's per-workspace message buffer so the
// user sees their prior thread on entry. The store is updated by the
// socket → ChatTab flows the desktop runs; on mobile we read from the
// same buffer to keep state coherent across viewports.
// NOTE: selector returns undefined (stable) — do NOT use ?? [] here,
// that creates a new [] reference on every store update when the key is
// absent, causing infinite re-render (React error #185).
const storedMessages = useCanvasStore((s) => s.agentMessages[agentId]);
const [messages, setMessages] = useState<ChatMessage[]>(() =>
(storedMessages ?? []).map((m) => ({
id: m.id,
role: "agent",
text: m.content,
ts: formatStoredTimestamp(m.timestamp),
})),
);
const [draft, setDraft] = useState("");
const [tab, setTab] = useState<SubTab>("my");
const [sending, setSending] = useState(false);
const [error, setError] = useState<string | null>(null);
const scrollRef = useRef<HTMLDivElement>(null);
// Synchronous re-entry guard. `setSending(true)` schedules a state
// update but doesn't flush before a second tap can fire send() — a ref
// mirrors the desktop ChatTab pattern (sendInFlightRef) and closes the
// double-send race a stale `sending` lets through.
const sendInFlightRef = useRef(false);
const composerRef = useRef<HTMLTextAreaElement>(null);
const fileInputRef = useRef<HTMLInputElement>(null);
const [pendingFiles, setPendingFiles] = useState<File[]>([]);
const {
messages,
loading: historyLoading,
loadError: historyError,
appendMessageDeduped,
} = useChatHistory(agentId);
const {
sending,
uploading,
sendMessage,
error: sendError,
clearError,
releaseSendGuards,
} = useChatSend(agentId, {
getHistoryMessages: () => messages,
onUserMessage: appendMessageDeduped,
onAgentMessage: appendMessageDeduped,
});
useChatSocket(agentId, {
onAgentMessage: appendMessageDeduped,
onSendComplete: releaseSendGuards,
});
// Auto-grow the textarea: reset height to 'auto' so the scrollHeight
// shrinks when the user deletes text, then size to scrollHeight up to
@@ -254,20 +95,6 @@ export function MobileChat({
}
}, [messages]);
// Consume any agent messages that arrived while history was loading.
const initialConsumeDoneRef = useRef(false);
useEffect(() => {
if (historyLoading || initialConsumeDoneRef.current) return;
initialConsumeDoneRef.current = true;
const consume = useCanvasStore.getState().consumeAgentMessages;
const msgs = consume(agentId);
for (const m of msgs) {
appendMessageDeduped(
createMessage("agent", m.content, m.attachments),
);
}
}, [historyLoading, agentId, appendMessageDeduped]);
if (!node) {
return (
<div
@@ -289,27 +116,54 @@ export function MobileChat({
const a = toMobileAgent(node);
const reachable = a.status === "online" || a.status === "degraded";
const onFilesPicked = (fileList: FileList | null) => {
if (!fileList) return;
const picked = Array.from(fileList);
setPendingFiles((prev) => {
const keyed = new Set(prev.map((f) => `${f.name}:${f.size}`));
return [...prev, ...picked.filter((f) => !keyed.has(`${f.name}:${f.size}`))];
});
if (fileInputRef.current) fileInputRef.current.value = "";
};
const removePendingFile = (index: number) =>
setPendingFiles((prev) => prev.filter((_, i) => i !== index));
const send = async () => {
const text = draft.trim();
if ((!text && pendingFiles.length === 0) || sending || !reachable) return;
clearError();
if (!text || sending || !reachable) return;
if (sendInFlightRef.current) return;
sendInFlightRef.current = true;
setDraft("");
const files = pendingFiles;
setPendingFiles([]);
await sendMessage(text, files);
setError(null);
setSending(true);
const myMsg: ChatMessage = {
id: crypto.randomUUID(),
role: "user",
text,
ts: formatTime(new Date()),
};
setMessages((m) => [...m, myMsg]);
try {
const res = await api.post<A2AResponseShape>(`/workspaces/${agentId}/a2a`, {
method: "message/send",
params: {
message: {
role: "user",
messageId: crypto.randomUUID(),
parts: [{ kind: "text", text }],
},
},
});
const reply =
res.result?.parts?.find((part) => part.kind === "text")?.text ?? "";
if (reply) {
setMessages((m) => [
...m,
{
id: crypto.randomUUID(),
role: "agent",
text: reply,
ts: formatTime(new Date()),
},
]);
} else if (res.error?.message) {
setError(res.error.message);
}
} catch (e) {
setError(e instanceof Error ? e.message : "Failed to send");
} finally {
setSending(false);
sendInFlightRef.current = false;
}
};
return (
@@ -454,17 +308,7 @@ export function MobileChat({
Agent Comms peer-to-peer A2A traffic surfaces in the Comms tab.
</div>
)}
{tab === "my" && historyLoading && (
<div style={{ padding: "20px 4px", textAlign: "center", color: p.text3, fontSize: 13 }}>
Loading chat history
</div>
)}
{tab === "my" && !historyLoading && historyError && messages.length === 0 && (
<div style={{ padding: "20px 4px", textAlign: "center", color: p.text3, fontSize: 13 }}>
{historyError}
</div>
)}
{tab === "my" && !historyLoading && !historyError && messages.length === 0 && (
{tab === "my" && messages.length === 0 && (
<div style={{ padding: "20px 4px", textAlign: "center", color: p.text3, fontSize: 13 }}>
Send a message to start chatting.
</div>
@@ -493,9 +337,7 @@ export function MobileChat({
overflowWrap: "anywhere",
}}
>
<MarkdownBubble dark={dark} accent={p.accent}>
{m.content}
</MarkdownBubble>
{m.text}
<div
style={{
fontSize: 10,
@@ -504,13 +346,13 @@ export function MobileChat({
fontFamily: MOBILE_FONT_MONO,
}}
>
{formatStoredTimestamp(m.timestamp)}
{m.ts}
</div>
</div>
</div>
);
})}
{sendError && (
{error && (
<div
role="alert"
style={{
@@ -522,7 +364,7 @@ export function MobileChat({
fontSize: 12,
}}
>
{sendError}
{error}
</div>
)}
</div>
@@ -553,60 +395,6 @@ export function MobileChat({
backdropFilter: "blur(14px)",
}}
>
{pendingFiles.length > 0 && (
<div
style={{
display: "flex",
flexWrap: "wrap",
gap: 6,
marginBottom: 8,
paddingLeft: 2,
}}
>
{pendingFiles.map((f, i) => (
<div
key={`${f.name}:${f.size}`}
style={{
display: "flex",
alignItems: "center",
gap: 4,
padding: "3px 8px",
borderRadius: 10,
background: dark ? "#2a2823" : "#ece9e0",
fontSize: 12,
color: p.text2,
maxWidth: "100%",
}}
>
<span
style={{
overflow: "hidden",
textOverflow: "ellipsis",
whiteSpace: "nowrap",
}}
>
{f.name}
</span>
<button
type="button"
onClick={() => removePendingFile(i)}
aria-label={`Remove ${f.name}`}
style={{
border: "none",
background: "transparent",
color: p.text3,
cursor: "pointer",
fontSize: 12,
padding: 0,
lineHeight: 1,
}}
>
</button>
</div>
))}
</div>
)}
<div
style={{
display: "flex",
@@ -618,32 +406,21 @@ export function MobileChat({
padding: "6px 6px 6px 12px",
}}
>
<input
ref={fileInputRef}
type="file"
multiple
style={{ display: "none" }}
onChange={(e) => onFilesPicked(e.target.files)}
aria-hidden="true"
/>
<button
type="button"
onClick={() => fileInputRef.current?.click()}
disabled={!reachable || sending || uploading}
aria-label="Attach"
style={{
width: 32,
height: 32,
borderRadius: 999,
border: "none",
cursor: reachable && !sending && !uploading ? "pointer" : "not-allowed",
cursor: "pointer",
background: "transparent",
color: p.text3,
flexShrink: 0,
display: "flex",
alignItems: "center",
justifyContent: "center",
opacity: !reachable || sending || uploading ? 0.4 : 1,
}}
>
{Icons.attach({ size: 16 })}
@@ -689,32 +466,28 @@ export function MobileChat({
<button
type="button"
onClick={send}
disabled={(!draft.trim() && pendingFiles.length === 0) || !reachable || sending || uploading}
disabled={!draft.trim() || !reachable || sending}
aria-label="Send"
style={{
width: 36,
height: 36,
borderRadius: 999,
border: "none",
cursor: (draft.trim() || pendingFiles.length > 0) && !sending && !uploading ? "pointer" : "not-allowed",
cursor: draft.trim() && !sending ? "pointer" : "not-allowed",
flexShrink: 0,
background:
(draft.trim() || pendingFiles.length > 0) && reachable && !sending && !uploading
draft.trim() && reachable && !sending
? p.accent
: dark
? "#2a2823"
: "#ece9e0",
color: (draft.trim() || pendingFiles.length > 0) && reachable && !sending && !uploading ? "#fff" : p.text3,
color: draft.trim() && reachable && !sending ? "#fff" : p.text3,
display: "flex",
alignItems: "center",
justifyContent: "center",
}}
>
{uploading ? (
<span style={{ fontSize: 10, fontWeight: 600 }}></span>
) : (
Icons.send({ size: 16 })
)}
{Icons.send({ size: 16 })}
</button>
</div>
</div>
+4 -6
View File
@@ -12,7 +12,6 @@ import { useEffect, useState } from "react";
import { api } from "@/lib/api";
import { type Template } from "@/lib/deploy-preflight";
import { isSaaSTenant } from "@/lib/tenant";
import { tierCode } from "./palette";
import { MOBILE_FONT_MONO, MOBILE_FONT_SANS, type MobilePalette, usePalette } from "./palette";
@@ -27,7 +26,6 @@ const TIER_LABEL: Record<"T1" | "T2" | "T3" | "T4", string> = {
export function MobileSpawn({ dark, onClose }: { dark: boolean; onClose: () => void }) {
const p = usePalette(dark);
const isSaaS = isSaaSTenant();
const [templates, setTemplates] = useState<Template[]>([]);
const [loadingTemplates, setLoadingTemplates] = useState(true);
const [tplId, setTplId] = useState<string | null>(null);
@@ -45,7 +43,7 @@ export function MobileSpawn({ dark, onClose }: { dark: boolean; onClose: () => v
setTemplates(list);
if (list.length > 0) {
setTplId(list[0].id);
setTier(isSaaS ? "T4" : tierCode(list[0].tier));
setTier(tierCode(list[0].tier));
}
})
.catch(() => {
@@ -57,7 +55,7 @@ export function MobileSpawn({ dark, onClose }: { dark: boolean; onClose: () => v
return () => {
cancelled = true;
};
}, [isSaaS]);
}, []);
const handleSpawn = async () => {
if (busy || !tplId) return;
@@ -69,7 +67,7 @@ export function MobileSpawn({ dark, onClose }: { dark: boolean; onClose: () => v
await api.post<{ id: string }>("/workspaces", {
name: (name.trim() || chosen.name),
template: chosen.id,
tier: isSaaS ? 4 : Number(tier.slice(1)),
tier: Number(tier.slice(1)),
canvas: {
x: Math.random() * 400 + 100,
y: Math.random() * 300 + 100,
@@ -205,7 +203,7 @@ export function MobileSpawn({ dark, onClose }: { dark: boolean; onClose: () => v
>
{templates.map((t) => {
const on = tplId === t.id;
const tCode = isSaaS ? "T4" : tierCode(t.tier);
const tCode = tierCode(t.tier);
return (
<button
key={t.id}
@@ -8,7 +8,7 @@
* NOTE: No @testing-library/jest-dom — use DOM APIs.
*/
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { cleanup, render, waitFor } from "@testing-library/react";
import { cleanup, render } from "@testing-library/react";
import React from "react";
import { MobileChat } from "../MobileChat";
@@ -33,12 +33,7 @@ const mockStoreState = {
vi.mock("@/store/canvas", () => ({
useCanvasStore: Object.assign(
vi.fn((sel) => sel(mockStoreState)),
{
getState: () => ({
...mockStoreState,
consumeAgentMessages: vi.fn(() => []),
}),
},
{ getState: () => mockStoreState },
),
summarizeWorkspaceCapabilities: vi.fn((data: Record<string, unknown>) => {
const agentCard = data.agentCard as Record<string, unknown> | null;
@@ -65,12 +60,8 @@ const { mockApiPost } = vi.hoisted(() => ({
mockApiPost: vi.fn().mockResolvedValue({ result: { parts: [] } }),
}));
const { mockApiGet } = vi.hoisted(() => ({
mockApiGet: vi.fn().mockResolvedValue({ messages: [] }),
}));
vi.mock("@/lib/api", () => ({
api: { get: mockApiGet, post: mockApiPost },
api: { post: mockApiPost },
}));
// ─── Fixtures ────────────────────────────────────────────────────────────────
@@ -157,7 +148,6 @@ function renderChat(agentId: string, dark = false) {
beforeEach(() => {
mockOnBack.mockClear();
mockApiGet.mockClear();
mockStoreState.nodes = [];
mockStoreState.agentMessages = {};
mockApiPost.mockClear();
@@ -276,19 +266,16 @@ describe("MobileChat — empty state", () => {
mockStoreState.nodes = [onlineNode];
});
it('shows "Send a message to start chatting." when no messages', async () => {
it('shows "Send a message to start chatting." when no messages', () => {
const { container } = renderChat(mockAgentId);
await waitFor(() =>
expect(container.textContent ?? "").toContain("Send a message to start chatting."),
);
expect(container.textContent ?? "").toContain("Send a message to start chatting.");
});
it("shows no messages when agentMessages[agentId] is absent (undefined)", async () => {
it("shows no messages when agentMessages[agentId] is absent (undefined)", () => {
// Explicitly set to empty to simulate no stored messages
mockStoreState.agentMessages = {};
const { container } = renderChat(mockAgentId);
await waitFor(() =>
expect(container.textContent ?? "").toContain("Send a message to start chatting."),
);
expect(container.textContent ?? "").toContain("Send a message to start chatting.");
});
});
+696 -96
View File
@@ -3,20 +3,18 @@
import { useState, useRef, useEffect, useCallback, useLayoutEffect } from "react";
import ReactMarkdown from "react-markdown";
import remarkGfm from "remark-gfm";
import { api } from "@/lib/api";
import { useCanvasStore, type WorkspaceNodeData } from "@/store/canvas";
import { useSocketEvent } from "@/hooks/useSocketEvent";
import { type ChatMessage, type ChatAttachment, createMessage, appendMessageDeduped } from "./chat/types";
import { downloadChatFile, isPlatformAttachment } from "./chat/uploads";
import { uploadChatFiles, downloadChatFile, isPlatformAttachment } from "./chat/uploads";
import { PendingAttachmentPill } from "./chat/AttachmentViews";
import { AttachmentPreview } from "./chat/AttachmentPreview";
import { extractFilesFromTask } from "./chat/message-parser";
import { AgentCommsPanel } from "./chat/AgentCommsPanel";
import { appendActivityLine } from "./chat/activityLog";
import { runtimeDisplayName } from "@/lib/runtime-names";
import { ConfirmDialog } from "@/components/ConfirmDialog";
import { useChatHistory } from "./chat/hooks/useChatHistory";
import { useChatSend } from "./chat/hooks/useChatSend";
import { useChatSocket } from "./chat/hooks/useChatSocket";
export { extractReplyText } from "./chat/hooks/useChatSend";
interface Props {
workspaceId: string;
@@ -25,6 +23,147 @@ interface Props {
type ChatSubTab = "my-chat" | "agent-comms";
// A2A response shape (subset). The full schema is in @a2a-js/sdk but we only
// need parts/artifacts text + file extraction for the synchronous fallback.
interface A2AFileRef {
name?: string;
mimeType?: string;
uri?: string;
bytes?: string;
size?: number;
}
// Outbound shape matches a2a-sdk's JSON-RPC `SendMessageRequest`
// Pydantic union (TextPart | FilePart | DataPart). The flat
// protobuf shape `{url, filename, mediaType}` is rejected at the
// request boundary with `Field required` errors — keep this
// outbound shape unless a2a-sdk migrates the JSON-RPC schema.
interface A2APart {
kind: string;
text?: string;
file?: A2AFileRef;
}
interface A2AResponse {
result?: {
parts?: A2APart[];
artifacts?: Array<{ parts: A2APart[] }>;
};
}
// Internal-self-message filtering moved server-side in RFC #2945
// PR-C/D — the platform's /chat-history endpoint applies the
// IsInternalSelfMessage predicate before returning rows, so the
// client no longer needs the local backstop on the history path.
// The proper fix is still X-Workspace-ID header (source_id=workspace_id);
// the platform-side prefix filter handles the residual cases.
// extractReplyText pulls the agent's text reply out of an A2A response.
// Concatenates ALL text parts (joined with "\n") rather than returning
// just the first. Claude Code and other runtimes commonly emit multi-
// part text replies for long content (markdown tables, code blocks),
// and the prior "first part wins" implementation silently truncated
// the rest — observed on a 15k-char Wave 1 brief that rendered only
// the table header. Mirrors extractTextsFromParts in message-parser.ts.
//
// Server-side counterpart in workspace-server/internal/channels/
// manager.go has the same single-part bug; fix that too if/when a
// channel-delivered reply (Slack, Lark, etc.) gets truncated.
export function extractReplyText(resp: A2AResponse): string {
const collect = (parts: A2APart[] | undefined): string => {
if (!parts) return "";
return parts
.filter((p) => p.kind === "text")
.map((p) => p.text ?? "")
.filter(Boolean)
.join("\n");
};
const result = resp?.result;
const collected: string[] = [];
const fromParts = collect(result?.parts);
if (fromParts) collected.push(fromParts);
// Walk artifacts even if parts had text — some producers (Hermes
// tool calls) emit a summary in parts AND details in artifacts.
// Returning early on parts dropped the artifact body silently.
if (result?.artifacts) {
for (const a of result.artifacts) {
const t = collect(a.parts);
if (t) collected.push(t);
}
}
return collected.join("\n");
}
// Agent-returned files live on the same response shape as text —
// delegated to extractFilesFromTask in message-parser.ts, which also
// walks status.message.parts (that ChatTab's legacy text extractor
// doesn't). Single source of truth for file-part parsing across
// live chat, activity log replay, and any future consumers.
/** Initial chat history page size. The newest N messages are rendered
* on first paint; older history is fetched on demand via loadOlder()
* when the user scrolls the top sentinel into view. */
const INITIAL_HISTORY_LIMIT = 10;
/** Subsequent older-history batch size. Larger than INITIAL so a long
* scroll-back doesn't fan out into many round-trips. */
const OLDER_HISTORY_BATCH = 20;
/**
* Load chat history from the platform's typed /chat-history endpoint.
*
* Server-side rendering of activity_logs rows into ChatMessage shape
* lives in workspace-server/internal/messagestore/postgres_store.go
* (RFC #2945 PR-C/D). The server already applies the canvas-source
* filter, the internal-self-message predicate, the role decision
* (status=error vs agent-error prefix → system), and the v0/v1
* file-shape extraction. Canvas just renders what it receives.
*
* Wire shape (mirrors ChatMessage exactly, no per-row mapping needed):
*
* GET /workspaces/:id/chat-history?limit=N&before_ts=T
* 200 → {"messages": ChatMessage[], "reached_end": boolean}
*
* Pagination:
* - Pass `limit` to bound the page size (newest-first from server).
* - Pass `beforeTs` (RFC3339) to fetch rows STRICTLY OLDER than that
* timestamp. Combined with limit, this yields the next-older page
* when scrolling backward through history.
*
* `reachedEnd` is propagated from the server. The server computes it
* by comparing rowCount vs limit so a partial last page is correctly
* detected even when the row→bubble fan-out is non-1:1 (each row
* produces 1-2 bubbles).
*/
async function loadMessagesFromDB(
workspaceId: string,
limit: number,
beforeTs?: string,
): Promise<{ messages: ChatMessage[]; error: string | null; reachedEnd: boolean }> {
try {
const params = new URLSearchParams({ limit: String(limit) });
if (beforeTs) params.set("before_ts", beforeTs);
const resp = await api.get<{ messages: ChatMessage[]; reached_end: boolean }>(
`/workspaces/${workspaceId}/chat-history?${params.toString()}`,
);
// Server emits oldest-first within the page (RFC #2945 PR-C-2
// post-fix: server reverses row-aware before returning so the
// wire is display-ready). Canvas appends/prepends without
// reordering — this avoids the pair-flip bug a naive flat
// reverse causes when each row produces a (user, agent) pair
// with the same timestamp.
return {
messages: resp.messages ?? [],
error: null,
reachedEnd: resp.reached_end,
};
} catch (err) {
return {
messages: [],
error: err instanceof Error ? err.message : "Failed to load chat history",
reachedEnd: true,
};
}
}
/**
* ChatTab container — renders sub-tab bar + My Chat or Agent Comms panel.
*/
@@ -108,68 +247,268 @@ export function ChatTab({ workspaceId, data }: Props) {
* MyChatPanel — user↔agent conversation (extracted from original ChatTab).
*/
function MyChatPanel({ workspaceId, data }: Props) {
const [messages, setMessages] = useState<ChatMessage[]>([]);
const [input, setInput] = useState("");
const [pendingFiles, setPendingFiles] = useState<File[]>([]);
const [activityLog, setActivityLog] = useState<string[]>([]);
// `sending` is strictly the "this tab kicked off a send and hasn't
// seen the reply yet" signal. Previously this was initialized from
// data.currentTask to pick up in-flight agent work on mount, but
// that conflated agent-busy (workspace heartbeat) with user-
// in-flight (local send): when the WS dropped a TASK_COMPLETE event,
// currentTask lingered, the component re-mounted with sending=true,
// and the Send button stayed disabled forever even though nothing
// local was in flight. For the "agent is busy, show spinner" UX,
// use data.currentTask directly in the render path.
const [sending, setSending] = useState(false);
const [thinkingElapsed, setThinkingElapsed] = useState(0);
const [activityLog, setActivityLog] = useState<string[]>([]);
const [loading, setLoading] = useState(true);
const [loadError, setLoadError] = useState<string | null>(null);
const currentTaskRef = useRef(data.currentTask);
const sendingFromAPIRef = useRef(false);
const [agentReachable, setAgentReachable] = useState(false);
const [error, setError] = useState<string | null>(null);
const [confirmRestart, setConfirmRestart] = useState(false);
const [dragOver, setDragOver] = useState(false);
const bottomRef = useRef<HTMLDivElement>(null);
// First-mount scroll-to-bottom needs `behavior: "instant"` — long
// conversations smooth-animate for ~300ms which any concurrent
// re-render can interrupt, leaving the user stuck mid-conversation
// when the chat tab opens. Subsequent appends (new agent messages)
// keep `smooth` for the visual "landing" feel. Flipped the first
// time messages.length goes positive, so a workspace switch (which
// remounts ChatTab) gets a fresh instant jump too.
const hasInitialScrollRef = useRef(false);
// Lazy-load older history on scroll-up.
// - containerRef = the scrollable messages viewport
// - topRef = sentinel above the messages list; IO observes it
// and triggers loadOlder() when it enters view
// - hasMore = false once a fetch returns < limit rows; stops IO
// - loadingOlder = drives the "Loading older messages…" UI label
// - inflightRef = synchronous guard against double-entry of loadOlder
// when the IO callback fires twice in the same
// microtask (state-based guard would be stale until
// the next React commit)
// - scrollAnchorRef = saves distance-from-bottom before a prepend
// so the useLayoutEffect below can restore the
// user's exact viewport position. Without this,
// prepending older messages would jump the scroll
// position by the height of the new content.
// - oldestMessageRef / hasMoreRef = let the loadOlder closure read
// the latest values without taking them as deps —
// every live agent push mutates `messages`, and
// having loadOlder depend on `messages` would tear
// down + re-arm the IntersectionObserver on every
// push. Refs decouple the observer lifecycle from
// message-list updates.
const containerRef = useRef<HTMLDivElement>(null);
const topRef = useRef<HTMLDivElement>(null);
const bottomRef = useRef<HTMLDivElement>(null);
const hasInitialScrollRef = useRef(false);
const [hasMore, setHasMore] = useState(true);
const [loadingOlder, setLoadingOlder] = useState(false);
const inflightRef = useRef(false);
// The scroll anchor includes the first-message id as it was BEFORE
// the prepend — see useLayoutEffect below for why. Without this tag,
// a live agent push that appends WHILE loadOlder is in flight would
// run useLayoutEffect against the append (anchor still set), the
// "restore" math would scroll the user to a stale offset, AND the
// append's normal scroll-to-bottom would be swallowed.
const scrollAnchorRef = useRef<
{ savedDistanceFromBottom: number; expectFirstIdNotEqual: string | null } | null
>(null);
const oldestMessageRef = useRef<ChatMessage | null>(null);
const hasMoreRef = useRef(true);
// Monotonic token bumped on workspace switch + on every loadOlder
// entry. Each fetch's .then() captures its own token; if the token
// has moved, the resolved messages belong to a stale workspace or a
// superseded fetch and we silently drop them. Without this guard, a
// workspace switch mid-fetch would have the in-flight promise
// resolve into the new workspace's setMessages — the user sees
// someone else's history briefly.
const fetchTokenRef = useRef(0);
// Files the user has picked but not yet sent. Cleared on send
// (upload success) or by the × on each pill.
const [pendingFiles, setPendingFiles] = useState<File[]>([]);
const [uploading, setUploading] = useState(false);
const fileInputRef = useRef<HTMLInputElement>(null);
const dragDepthRef = useRef(0);
const pasteCounterRef = useRef(0);
// Guard against a double-click during the upload phase: React
// state updates from the click that started the upload haven't
// flushed yet, so the disabled-button logic sees `uploading=false`
// from the closure and lets a second `sendMessage` enter. A ref
// observes the latest value synchronously.
const sendInFlightRef = useRef(false);
// Monotonic token bumped on every sendMessage entry. Each .then()/
// .catch() captures its own token in closure and bails if a newer
// send has superseded it — prevents a late HTTP response for an
// earlier message from clobbering the flags / appending text that
// belong to a newer in-flight send. Race scenario the token closes:
// (1) send msg #1 (2) WS push for msg #1 arrives, releases guards
// (3) user sends msg #2 (4) HTTP for msg #1 finally lands — without
// the token check, .then() sees sendingFromAPIRef=true (set by
// msg #2's send), enters the main body, and processes msg #1's body
// as if it were msg #2's reply.
const sendTokenRef = useRef(0);
const history = useChatHistory(workspaceId, containerRef);
const chatSend = useChatSend(workspaceId, {
getHistoryMessages: () => history.messages,
onUserMessage: (msg) => history.setMessages((prev) => [...prev, msg]),
onAgentMessage: (msg) => history.setMessages((prev) => appendMessageDeduped(prev, msg)),
});
const { sending, uploading, sendMessage, error: sendError, clearError: clearSendError, releaseSendGuards, sendingFromAPIRef } = chatSend;
// Release every in-flight send guard at once. Used by every site
// that ends a send: pendingAgentMsgs WS push, ACTIVITY_LOGGED
// a2a_receive ok/error WS event, HTTP .then() success, and HTTP
// .catch() success. Keep these in lockstep — a future contributor
// adding a new "I saw the reply" path that only clears `sending` +
// `sendingFromAPIRef` (the natural pair) silently re-introduces
// the post-WS Send-button freeze, because the disabled-button
// logic can't see `sendInFlightRef` and so the visible state diverges
// from the synchronous re-entry guard at line 464.
const releaseSendGuards = useCallback(() => {
setSending(false);
sendingFromAPIRef.current = false;
sendInFlightRef.current = false;
}, []);
const displayError = error || sendError;
// Initial-load fetch — used by the mount effect and the "Retry"
// button below. Single source of truth so the two paths can't drift
// (e.g. INITIAL_HISTORY_LIMIT bumped in the effect but not the
// retry, leading to inconsistent first-paint sizes).
const loadInitial = useCallback(() => {
setLoading(true);
setLoadError(null);
setHasMore(true);
// Bump the token; any in-flight fetch from the previous workspace
// (or a previous retry) will see token != myToken in its .then()
// and silently bail — the late response can't clobber the new
// workspace's state.
fetchTokenRef.current += 1;
const myToken = fetchTokenRef.current;
loadMessagesFromDB(workspaceId, INITIAL_HISTORY_LIMIT).then(
({ messages: msgs, error: fetchErr, reachedEnd }) => {
if (fetchTokenRef.current !== myToken) return;
setMessages(msgs);
setLoadError(fetchErr);
setHasMore(!reachedEnd);
setLoading(false);
},
);
}, [workspaceId]);
useChatSocket(workspaceId, {
onAgentMessage: (msg) => {
history.setMessages((prev) => appendMessageDeduped(prev, msg));
if (sendingFromAPIRef.current) {
releaseSendGuards();
// Load chat history on mount / workspace switch.
// Initial load is bounded to INITIAL_HISTORY_LIMIT (newest 10) — the
// rest streams in as the user scrolls up via loadOlder() below. Pre-
// 2026-05-05 this fetched the newest 50 in one shot; on a long-running
// workspace that meant 50× message-bubble paint + DOM cost on every
// tab-open even when the user only wanted to read the last few.
useEffect(() => {
loadInitial();
}, [loadInitial]);
// Mirror the latest oldest-message + hasMore into refs so loadOlder
// can read them without taking `messages` as a dep. Every live push
// through agentMessages would otherwise recreate loadOlder and tear
// down the IO observer.
useEffect(() => {
oldestMessageRef.current = messages[0] ?? null;
}, [messages]);
useEffect(() => {
hasMoreRef.current = hasMore;
}, [hasMore]);
// Fetch the next-older batch and prepend. Stable identity (deps =
// [workspaceId]) so the IntersectionObserver effect below doesn't
// re-arm on every messages update.
const loadOlder = useCallback(async () => {
// inflightRef is the load-bearing guard — synchronous, set BEFORE
// any await, so two IO callbacks dispatched in the same microtask
// can't both pass. The state checks are defensive secondary
// gates for the slow-scroll case.
if (inflightRef.current || !hasMoreRef.current) return;
const oldest = oldestMessageRef.current;
if (!oldest) return;
const container = containerRef.current;
if (!container) return;
inflightRef.current = true;
// Capture the user's distance-from-bottom BEFORE we prepend so the
// useLayoutEffect can restore it after the new DOM lands. The
// expectFirstIdNotEqual tag is what the layout effect checks
// against `messages[0].id` to disambiguate prepend (id changed) vs
// append (id unchanged → live message landed mid-fetch). Without
// it, an agent push during loadOlder runs the "restore" against a
// stale anchor — user gets yanked + the append's bottom-pin is
// swallowed.
scrollAnchorRef.current = {
savedDistanceFromBottom: container.scrollHeight - container.scrollTop,
expectFirstIdNotEqual: oldest.id,
};
fetchTokenRef.current += 1;
const myToken = fetchTokenRef.current;
setLoadingOlder(true);
try {
const { messages: older, reachedEnd } = await loadMessagesFromDB(
workspaceId,
OLDER_HISTORY_BATCH,
oldest.timestamp,
);
// Workspace switched (or another loadOlder bumped the token)
// mid-fetch — drop these results, they belong to a stale tab.
if (fetchTokenRef.current !== myToken) {
scrollAnchorRef.current = null;
return;
}
},
onActivityLog: (entry) => {
if (!sending) return;
setActivityLog((prev) => appendActivityLine(prev, entry));
},
onSendComplete: () => {
if (sendingFromAPIRef.current) {
releaseSendGuards();
if (older.length > 0) {
setMessages((prev) => [...older, ...prev]);
} else {
// Nothing came back — clear the anchor so the next paint doesn't
// try to "restore" against a no-op prepend.
scrollAnchorRef.current = null;
}
},
onSendError: (err) => {
if (sendingFromAPIRef.current) {
releaseSendGuards();
setError(err);
}
},
});
setHasMore(!reachedEnd);
} finally {
setLoadingOlder(false);
inflightRef.current = false;
}
}, [workspaceId]);
// IntersectionObserver on the top sentinel. Fires loadOlder() the
// moment the user scrolls within 200px of the top. AbortController
// unwires cleanly on workspace switch / unmount; root is the
// scrollable container so we observe only what's visible inside it.
//
// Dependencies:
// - loadOlder — stable per workspaceId (refs decouple it from
// message updates), so this dep is here for the
// workspace-switch case only
// - hasMore — re-run when older history runs out so we
// disconnect cleanly
// - hasMessages — load-bearing: the sentinel JSX is gated on
// `messages.length > 0`, so topRef.current is null
// on the empty-messages render. We re-arm exactly
// once when messages first land. NOT depending on
// `messages.length` (or `messages`) directly so
// each subsequent message append doesn't tear down
// + re-arm the observer.
const hasMessages = messages.length > 0;
useEffect(() => {
const top = topRef.current;
const container = containerRef.current;
if (!top || !container) return;
if (!hasMore) return; // stop observing when no older history exists
const ac = new AbortController();
const io = new IntersectionObserver(
(entries) => {
if (ac.signal.aborted) return;
if (entries[0]?.isIntersecting) loadOlder();
},
{ root: container, rootMargin: "200px 0px 0px 0px", threshold: 0 },
);
io.observe(top);
ac.signal.addEventListener("abort", () => io.disconnect());
return () => ac.abort();
}, [loadOlder, hasMore, hasMessages]);
// Agent reachability
useEffect(() => {
const reachable = data.status === "online" || data.status === "degraded";
setAgentReachable(reachable);
if (reachable) {
setError(null);
clearSendError();
} else {
setError(`Agent is ${data.status}`);
}
}, [data.status, clearSendError]);
setError(reachable ? null : `Agent is ${data.status}`);
}, [data.status]);
useEffect(() => {
currentTaskRef.current = data.currentTask;
}, [data.currentTask]);
// Scroll behavior across messages updates:
// - Prepend (loadOlder landed) → restore the user's saved
@@ -179,24 +518,71 @@ function MyChatPanel({ workspaceId, data }: Props) {
// paint — otherwise the user sees the page jump for one frame.
useLayoutEffect(() => {
const container = containerRef.current;
const anchor = history.scrollAnchorRef.current;
const anchor = scrollAnchorRef.current;
// Only honor the anchor when this messages-update is the prepend
// we expected. messages[0].id is the test:
// - prepend → messages[0] is one of the older rows → id !== expectFirstIdNotEqual
// - append → messages[0] unchanged → id === expectFirstIdNotEqual → fall through
// Without this check, an agent push that lands mid-loadOlder would
// run the restore against the append's update, yank the user's
// scroll, AND swallow the append's bottom-pin.
if (
anchor &&
container &&
history.messages.length > 0 &&
history.messages[0].id !== anchor.expectFirstIdNotEqual
messages.length > 0 &&
messages[0].id !== anchor.expectFirstIdNotEqual
) {
container.scrollTop = container.scrollHeight - anchor.savedDistanceFromBottom;
history.scrollAnchorRef.current = null;
scrollAnchorRef.current = null;
return;
}
if (!hasInitialScrollRef.current && history.messages.length > 0) {
// Instant on first arrival of messages — smooth-scroll on a long
// conversation gets interrupted by concurrent renders and leaves
// the user stuck in the middle. After the first jump, subsequent
// appends animate as before.
if (!hasInitialScrollRef.current && messages.length > 0) {
hasInitialScrollRef.current = true;
bottomRef.current?.scrollIntoView({ behavior: "instant" as ScrollBehavior });
return;
}
bottomRef.current?.scrollIntoView({ behavior: "smooth" });
}, [history.messages, history.scrollAnchorRef]);
}, [messages]);
// Consume agent push messages (send_message_to_user) from global store.
// Runtimes like Claude Code SDK deliver their reply via a WS push rather
// than the /a2a HTTP response — when that happens, the push is the
// authoritative "reply arrived" signal for the UI, so clear `sending`
// here too. The HTTP .then() coordinates through sendingFromAPIRef so
// whichever path clears first wins.
const pendingAgentMsgs = useCanvasStore((s) => s.agentMessages[workspaceId]);
useEffect(() => {
if (!pendingAgentMsgs || pendingAgentMsgs.length === 0) return;
const consume = useCanvasStore.getState().consumeAgentMessages;
const msgs = consume(workspaceId);
for (const m of msgs) {
// Dedupe in case the agent proactively pushed the same text the
// HTTP /a2a response already delivered (observed with the Hermes
// runtime, which emits both a reply body and a send_message_to_user
// push for the same content). Attachments ride along with the
// message so files returned by the A2A_RESPONSE WS path render
// their download chips.
setMessages((prev) => appendMessageDeduped(prev, createMessage("agent", m.content, m.attachments)));
}
if (sendingFromAPIRef.current && msgs.length > 0) {
// Reply arrived via WS push (e.g. claude-code SDK). Release all
// three guards together — without sendInFlightRef the next
// sendMessage() silently no-ops at the synchronous re-entry
// check.
releaseSendGuards();
}
}, [pendingAgentMsgs, workspaceId]);
// Resolve workspace ID → name for activity display
const resolveWorkspaceName = useCallback((id: string) => {
const nodes = useCanvasStore.getState().nodes;
const node = nodes.find((n) => n.id === id);
return (node?.data as WorkspaceNodeData)?.name || id.slice(0, 8);
}, []);
// Elapsed timer while sending
useEffect(() => {
@@ -223,43 +609,211 @@ function MyChatPanel({ workspaceId, data }: Props) {
setActivityLog([`Processing with ${runtimeDisplayName(data.runtime)}...`]);
}, [sending, data.runtime]);
// IntersectionObserver on the top sentinel. Fires loadOlder() the
// moment the user scrolls within 200px of the top. AbortController
// unwires cleanly on workspace switch / unmount; root is the
// scrollable container so we observe only what's visible inside it.
const hasMessages = history.messages.length > 0;
useEffect(() => {
const top = topRef.current;
const container = containerRef.current;
if (!top || !container) return;
if (!history.hasMore) return;
const ac = new AbortController();
const io = new IntersectionObserver(
(entries) => {
if (ac.signal.aborted) return;
if (entries[0]?.isIntersecting) history.loadOlder();
},
{ root: container, rootMargin: "200px 0px 0px 0px", threshold: 0 },
);
io.observe(top);
ac.signal.addEventListener("abort", () => io.disconnect());
return () => ac.abort();
}, [history.loadOlder, history.hasMore, hasMessages]);
// Subscribe to global WS via the singleton ReconnectingSocket (no
// per-component WebSocket — the previous pattern dropped events
// silently on any reconnect because each panel's raw socket had no
// onclose handler).
useSocketEvent((msg) => {
if (!sending) return;
try {
if (msg.event === "ACTIVITY_LOGGED") {
// Filter to events for THIS workspace. The platform's
// BroadcastOnly fires to every connected client, and
// without this guard a sibling workspace's a2a_send would
// surface as "→ Delegating to X..." inside the wrong
// chat panel. (workspace_id on the WS envelope is the
// workspace whose activity_log row we just wrote.)
if (msg.workspace_id !== workspaceId) return;
const handleSend = async () => {
const p = msg.payload || {};
const type = p.activity_type as string;
const method = (p.method as string) || "";
const status = (p.status as string) || "";
const targetId = (p.target_id as string) || "";
const durationMs = p.duration_ms as number | undefined;
const summary = (p.summary as string) || "";
let line = "";
if (type === "a2a_receive" && method === "message/send") {
const targetName = resolveWorkspaceName(targetId || msg.workspace_id);
if (status === "ok" && durationMs) {
const sec = Math.round(durationMs / 1000);
line = `${targetName} responded (${sec}s)`;
// The platform logs a successful a2a_receive once the workspace
// has fully produced its reply. That's the authoritative "done"
// signal for the spinner — clear it even if the reply hasn't
// surfaced through the store yet (it may be delivered shortly
// via pendingAgentMsgs or the HTTP .then()).
const own = (targetId || msg.workspace_id) === workspaceId;
if (own && sendingFromAPIRef.current) {
releaseSendGuards();
}
} else if (status === "error") {
line = `${targetName} error`;
const own = (targetId || msg.workspace_id) === workspaceId;
if (own && sendingFromAPIRef.current) {
releaseSendGuards();
setError("Agent error (Exception) — see workspace logs for details.");
}
}
} else if (type === "a2a_send") {
const targetName = resolveWorkspaceName(targetId);
line = `→ Delegating to ${targetName}...`;
} else if (type === "task_update") {
if (summary) line = `${summary}`;
} else if (type === "agent_log") {
// Per-tool-use telemetry from claude_sdk_executor's
// _report_tool_use. The summary already carries an icon
// + human-readable args (📄 Read /path, ⚡ Bash: …)
// so we render it verbatim. No icon prefix here — the
// emoji at the start of summary is the visual marker.
if (summary) line = summary;
}
if (line) {
setActivityLog((prev) => appendActivityLine(prev, line));
}
} else if (msg.event === "TASK_UPDATED" && msg.workspace_id === workspaceId) {
const task = (msg.payload?.current_task as string) || "";
if (task) {
setActivityLog((prev) => appendActivityLine(prev, `${task}`));
}
}
// A2A_RESPONSE is already consumed by the store and its text is
// appended to messages via the pendingAgentMsgs effect above; we
// don't need to duplicate it here.
} catch { /* ignore */ }
});
const sendMessage = async () => {
const text = input.trim();
const files = pendingFiles;
if ((!text && files.length === 0) || !agentReachable || sending || uploading) return;
const filesToSend = pendingFiles;
// Allow sending if EITHER text OR attachments are present — a user
// can drop a file with no text and the agent still receives it.
if ((!text && filesToSend.length === 0) || !agentReachable || sending || uploading) return;
// Synchronous re-entry guard — see sendInFlightRef comment.
if (sendInFlightRef.current) return;
sendInFlightRef.current = true;
// Upload attachments first so we can include URIs in the A2A
// message parts. Sequential-before-send: a message with references
// to files not yet staged would fail agent-side; staging happens
// synchronously via /chat/uploads before message/send dispatch.
let uploaded: ChatAttachment[] = [];
if (filesToSend.length > 0) {
setUploading(true);
try {
uploaded = await uploadChatFiles(workspaceId, filesToSend);
} catch (e) {
setUploading(false);
sendInFlightRef.current = false;
setError(e instanceof Error ? `Upload failed: ${e.message}` : "Upload failed");
return;
}
setUploading(false);
}
setInput("");
setPendingFiles([]);
clearSendError();
setMessages((prev) => [...prev, createMessage("user", text, uploaded)]);
setSending(true);
sendingFromAPIRef.current = true;
setError(null);
await sendMessage(text, files);
// Capture this send's token so the .then()/.catch() callbacks can
// detect a newer send that may have superseded them. See the
// sendTokenRef declaration for the race scenario this closes.
const myToken = ++sendTokenRef.current;
// Build conversation history from prior messages (last 20)
const history = messages
.filter((m) => m.role === "user" || m.role === "agent")
.slice(-20)
.map((m) => ({
role: m.role === "user" ? "user" : "agent",
parts: [{ kind: "text", text: m.content }],
}));
// A2A parts: text part (if any) + file parts (per attachment). The
// agent sees both in a single turn, matching the A2A spec shape.
// Wire shape is v0 — see A2APart definition above.
const parts: A2APart[] = [];
if (text) parts.push({ kind: "text", text });
for (const att of uploaded) {
parts.push({
kind: "file",
file: {
name: att.name,
mimeType: att.mimeType,
uri: att.uri,
size: att.size,
},
});
}
// A2A calls can legitimately take minutes — LLM latency +
// multi-turn tool use is common on slower providers (Hermes+minimax,
// Claude Code invoking bash/file tools, etc.). The 15s default
// would silently abort the fetch here, leaving the server to
// complete the reply and the user staring at
// "agent may be unreachable". Match the upload timeout (60s × 2)
// for the happy-path ceiling; anything longer is genuinely stuck.
api.post<A2AResponse>(`/workspaces/${workspaceId}/a2a`, {
method: "message/send",
params: {
message: {
role: "user",
messageId: crypto.randomUUID(),
parts,
},
metadata: { history },
},
}, { timeoutMs: 120_000 })
.then((resp) => {
// Bail without touching any flags if a newer sendMessage has
// already run — its myToken bumped sendTokenRef, so this is
// a stale callback for an earlier message. The newer send
// owns the in-flight guards now.
if (sendTokenRef.current !== myToken) return;
// Skip if the WS A2A_RESPONSE event already handled this response.
// Both paths (WS + HTTP) check sendingFromAPIRef — whichever clears
// it first wins, the other becomes a no-op (no duplicate messages).
if (!sendingFromAPIRef.current) {
sendInFlightRef.current = false;
return;
}
const replyText = extractReplyText(resp);
const replyFiles = extractFilesFromTask((resp?.result ?? {}) as Record<string, unknown>);
if (replyText || replyFiles.length > 0) {
setMessages((prev) =>
appendMessageDeduped(prev, createMessage("agent", replyText, replyFiles)),
);
}
releaseSendGuards();
})
.catch(() => {
// Stale-callback guard — same rationale as .then().
if (sendTokenRef.current !== myToken) return;
// Same dedup guard as .then(): if a WS path (pendingAgentMsgs
// or ACTIVITY_LOGGED a2a_receive ok) already delivered the
// reply, sendingFromAPIRef is already false and there's
// nothing to roll back. Surfacing "Failed to send" here would
// contradict the agent reply the user is currently reading —
// exactly the false-positive observed when the HTTP request
// hung up (proxy idle / 502) after WS already won.
if (!sendingFromAPIRef.current) {
sendInFlightRef.current = false;
return;
}
releaseSendGuards();
setError("Failed to send message — agent may be unreachable");
});
};
const onFilesPicked = (fileList: FileList | null) => {
if (!fileList) return;
const picked = Array.from(fileList);
// Deduplicate against current pending set by name+size — user
// picking the same file twice shouldn't append it.
setPendingFiles((prev) => {
const keyed = new Set(prev.map((f) => `${f.name}:${f.size}`));
return [...prev, ...picked.filter((f) => !keyed.has(`${f.name}:${f.size}`))];
@@ -270,7 +824,35 @@ function MyChatPanel({ workspaceId, data }: Props) {
const removePendingFile = (index: number) =>
setPendingFiles((prev) => prev.filter((_, i) => i !== index));
// Monotonic counter so two paste events within the same wall-clock
// second still produce distinct filenames. Without this, on
// Firefox (where pasted images have an empty `file.name`), two
// pastes ~100ms apart could yield identical synthetic names AND
// identical sizes, collapsing into one attachment via the
// `name:size` dedup in onFilesPicked.
const pasteCounterRef = useRef(0);
/** Paste-from-clipboard image attachment.
*
* Browser clipboard image items arrive as `File`s whose `name` is
* often a generic "image.png" (Chrome) or empty (Firefox/Safari),
* so two consecutive screenshot pastes collide on the name+size
* dedup the file-picker uses. Re-tag each pasted image with a
* per-paste unique name so dedup keeps them apart and the upload
* pipeline (which expects a non-empty filename) is happy.
*
* Falls through to onFilesPicked via direct File[] (NOT through
* the DataTransfer constructor — that throws on Safari < 14.1
* and old Edge, silently aborting the paste).
*
* Only intercepts the paste when the clipboard has at least one
* image; text-only pastes fall through to the textarea's default
* behaviour. */
const mimeToExt = (mime: string): string => {
// Avoid raw `mime.split("/")[1]` — that yields `"svg+xml"`,
// `"jpeg"`, `"webp"` etc. which produce ugly filenames and may
// trip server-side extension allowlists. Map known types
// explicitly; unknown falls back to a safe default.
if (mime === "image/svg+xml") return "svg";
if (mime === "image/jpeg") return "jpg";
if (mime === "image/png") return "png";
@@ -291,16 +873,26 @@ function MyChatPanel({ workspaceId, data }: Props) {
const file = item.getAsFile();
if (!file) continue;
const ext = mimeToExt(file.type);
const stamp = new Date().toISOString().replace(/[:.]/g, "-").slice(0, 19);
const stamp = new Date()
.toISOString()
.replace(/[:.]/g, "-")
.slice(0, 19);
const seq = pasteCounterRef.current++;
const fname = `pasted-${stamp}-${seq}-${i}.${ext}`;
imageFiles.push(new File([file], fname, { type: file.type }));
}
if (imageFiles.length === 0) return;
e.preventDefault();
// Reuse the picker path so file-size guards, dedup, and pending-
// list state all run through the same code. Build a synthetic
// FileList-like object to avoid the DataTransfer constructor —
// that's missing on Safari < 14.1 / old Edge and would silently
// throw, leaving the paste a no-op.
addPastedFiles(imageFiles);
};
// Variant of onFilesPicked that accepts a File[] directly, sidestepping
// the DataTransfer-FileList round-trip. Same dedup + state shape.
const addPastedFiles = (files: File[]) => {
setPendingFiles((prev) => {
const keyed = new Set(prev.map((f) => `${f.name}:${f.size}`));
@@ -308,6 +900,11 @@ function MyChatPanel({ workspaceId, data }: Props) {
});
};
// Drag-and-drop staging. dragDepthRef counts enter vs leave events so
// the overlay doesn't flicker when the cursor crosses nested children
// (textarea, buttons) — dragenter/dragleave fire for every boundary.
const [dragOver, setDragOver] = useState(false);
const dragDepthRef = useRef(0);
const dropEnabled = agentReachable && !sending && !uploading;
const isFileDrag = (e: React.DragEvent) =>
Array.from(e.dataTransfer.types || []).includes("Files");
@@ -337,6 +934,9 @@ function MyChatPanel({ workspaceId, data }: Props) {
};
const downloadAttachment = (att: ChatAttachment) => {
// Errors here are rare but user-visible (401 on a revoked token,
// 404 if the agent deleted the file). Surface via the inline
// error banner — the message list itself stays untouched.
downloadChatFile(workspaceId, att).catch((e) => {
setError(e instanceof Error ? `Download failed: ${e.message}` : "Download failed");
});
@@ -364,26 +964,26 @@ function MyChatPanel({ workspaceId, data }: Props) {
)}
{/* Messages */}
<div ref={containerRef} className="flex-1 overflow-y-auto p-3 space-y-3">
{history.loading && (
{loading && (
<div className="text-xs text-ink-mid text-center py-4">Loading chat history...</div>
)}
{!history.loading && history.loadError !== null && history.messages.length === 0 && (
{!loading && loadError !== null && messages.length === 0 && (
<div
role="alert"
className="mx-2 mt-2 rounded-lg border border-red-800/50 bg-red-950/30 px-3 py-2.5"
>
<p className="text-[11px] text-bad mb-1.5">
Failed to load chat history: {history.loadError}
Failed to load chat history: {loadError}
</p>
<button
onClick={history.loadInitial}
onClick={loadInitial}
className="text-[10px] px-2 py-0.5 rounded bg-red-800 text-red-200 hover:bg-red-700 transition-colors"
>
Retry
</button>
</div>
)}
{!history.loading && history.loadError === null && history.messages.length === 0 && (
{!loading && loadError === null && messages.length === 0 && (
<div className="text-xs text-ink-mid text-center py-8">
No messages yet. Send a message to start chatting with this agent.
</div>
@@ -401,12 +1001,12 @@ function MyChatPanel({ workspaceId, data }: Props) {
instead of showing a "no more messages" footer — the user's
scroll resting against the top of the conversation IS the
signal. */}
{history.hasMore && history.messages.length > 0 && (
{hasMore && messages.length > 0 && (
<div ref={topRef} className="text-xs text-ink-mid text-center py-1">
{history.loadingOlder ? "Loading older messages…" : " "}
{loadingOlder ? "Loading older messages…" : " "}
</div>
)}
{history.messages.map((msg) => (
{messages.map((msg) => (
<div key={msg.id} className={`flex ${msg.role === "user" ? "justify-end" : "justify-start"}`}>
<div
className={`max-w-[85%] rounded-lg px-3 py-2 text-xs ${
@@ -566,10 +1166,10 @@ function MyChatPanel({ workspaceId, data }: Props) {
</div>
{/* Error banner */}
{displayError && (
{error && (
<div className="px-3 py-2 bg-red-900/20 border-t border-red-800/30">
<div className="flex items-center justify-between">
<span className="text-[10px] text-red-300">{displayError}</span>
<span className="text-[10px] text-red-300">{error}</span>
{!isOnline && (
<button
onClick={() => setConfirmRestart(true)}
@@ -637,7 +1237,7 @@ function MyChatPanel({ workspaceId, data }: Props) {
e.keyCode !== 229
) {
e.preventDefault();
handleSend();
sendMessage();
}
}}
onPaste={onPasteIntoComposer}
@@ -647,7 +1247,7 @@ function MyChatPanel({ workspaceId, data }: Props) {
className="flex-1 bg-surface-card border border-line rounded-lg px-3 py-2 text-xs text-ink placeholder-ink-soft dark:bg-zinc-800 dark:border-zinc-600 dark:placeholder-zinc-500 focus:outline-none focus:border-accent focus-visible:ring-2 focus-visible:ring-accent/40 resize-none disabled:opacity-50"
/>
<button
onClick={handleSend}
onClick={sendMessage}
disabled={(!input.trim() && pendingFiles.length === 0) || !agentReachable || sending || uploading}
className="px-4 py-2 bg-accent-strong hover:bg-accent text-xs font-medium rounded-lg text-white disabled:opacity-30 transition-colors shrink-0"
>
@@ -1,3 +0,0 @@
export { useChatHistory } from "./useChatHistory";
export { useChatSend } from "./useChatSend";
export { useChatSocket } from "./useChatSocket";
@@ -1,11 +0,0 @@
"use client";
import { useCanvasStore, type WorkspaceNodeData } from "@/store/canvas";
/** Resolve a workspace ID to its human-readable name.
* Falls back to the first 8 chars of the ID. */
export function resolveWorkspaceName(id: string): string {
const nodes = useCanvasStore.getState().nodes;
const node = nodes.find((n) => n.id === id);
return (node?.data as WorkspaceNodeData)?.name || id.slice(0, 8);
}
@@ -1,134 +0,0 @@
"use client";
import { useCallback, useEffect, useRef, useState } from "react";
import { api } from "@/lib/api";
import { type ChatMessage, appendMessageDeduped as appendMessageDedupedFn } from "../types";
const INITIAL_HISTORY_LIMIT = 10;
const OLDER_HISTORY_BATCH = 20;
async function loadMessagesFromDB(
workspaceId: string,
limit: number,
beforeTs?: string,
): Promise<{ messages: ChatMessage[]; error: string | null; reachedEnd: boolean }> {
try {
const params = new URLSearchParams({ limit: String(limit) });
if (beforeTs) params.set("before_ts", beforeTs);
const resp = await api.get<{ messages: ChatMessage[]; reached_end: boolean }>(
`/workspaces/${workspaceId}/chat-history?${params.toString()}`,
);
return {
messages: resp.messages ?? [],
error: null,
reachedEnd: resp.reached_end,
};
} catch (err) {
return {
messages: [],
error: err instanceof Error ? err.message : "Failed to load chat history",
reachedEnd: true,
};
}
}
export interface ScrollAnchor {
savedDistanceFromBottom: number;
expectFirstIdNotEqual: string | null;
}
export function useChatHistory(
workspaceId: string,
containerRef?: React.RefObject<HTMLDivElement | null>,
) {
const [messages, setMessages] = useState<ChatMessage[]>([]);
const [loading, setLoading] = useState(true);
const [loadError, setLoadError] = useState<string | null>(null);
const [loadingOlder, setLoadingOlder] = useState(false);
const [hasMore, setHasMore] = useState(true);
const fetchTokenRef = useRef(0);
const oldestMessageRef = useRef<ChatMessage | null>(null);
const hasMoreRef = useRef(true);
const inflightRef = useRef(false);
const scrollAnchorRef = useRef<ScrollAnchor | null>(null);
useEffect(() => {
oldestMessageRef.current = messages[0] ?? null;
}, [messages]);
useEffect(() => {
hasMoreRef.current = hasMore;
}, [hasMore]);
const loadInitial = useCallback(() => {
setLoading(true);
setLoadError(null);
setHasMore(true);
fetchTokenRef.current += 1;
const myToken = fetchTokenRef.current;
return loadMessagesFromDB(workspaceId, INITIAL_HISTORY_LIMIT).then(
({ messages: msgs, error: fetchErr, reachedEnd }) => {
if (fetchTokenRef.current !== myToken) return;
setMessages(msgs);
setLoadError(fetchErr);
setHasMore(!reachedEnd);
setLoading(false);
},
);
}, [workspaceId]);
useEffect(() => {
loadInitial();
}, [loadInitial]);
const loadOlder = useCallback(async () => {
if (inflightRef.current || !hasMoreRef.current) return;
const oldest = oldestMessageRef.current;
if (!oldest) return;
const container = containerRef?.current;
if (!container) return;
inflightRef.current = true;
scrollAnchorRef.current = {
savedDistanceFromBottom: container.scrollHeight - container.scrollTop,
expectFirstIdNotEqual: oldest.id,
};
fetchTokenRef.current += 1;
const myToken = fetchTokenRef.current;
setLoadingOlder(true);
try {
const { messages: older, reachedEnd } = await loadMessagesFromDB(
workspaceId,
OLDER_HISTORY_BATCH,
oldest.timestamp,
);
if (fetchTokenRef.current !== myToken) {
scrollAnchorRef.current = null;
return;
}
if (older.length > 0) {
setMessages((prev) => [...older, ...prev]);
} else {
scrollAnchorRef.current = null;
}
setHasMore(!reachedEnd);
} finally {
setLoadingOlder(false);
inflightRef.current = false;
}
}, [workspaceId, containerRef]);
return {
messages,
loading,
loadError,
loadingOlder,
hasMore,
loadInitial,
loadOlder,
appendMessageDeduped: (msg: ChatMessage) =>
setMessages((prev) => appendMessageDedupedFn(prev, msg)),
setMessages,
scrollAnchorRef,
};
}
@@ -1,182 +0,0 @@
"use client";
import { useCallback, useRef, useState } from "react";
import { api } from "@/lib/api";
import { uploadChatFiles } from "../uploads";
import { createMessage, type ChatMessage, type ChatAttachment } from "../types";
import { extractFilesFromTask } from "../message-parser";
interface A2APart {
kind: string;
text?: string;
file?: {
name?: string;
mimeType?: string;
uri?: string;
size?: number;
};
}
interface A2AResponse {
result?: {
parts?: A2APart[];
artifacts?: Array<{ parts: A2APart[] }>;
};
}
export function extractReplyText(resp: A2AResponse): string {
const collect = (parts: A2APart[] | undefined): string => {
if (!parts) return "";
return parts
.filter((p) => p.kind === "text")
.map((p) => p.text ?? "")
.filter(Boolean)
.join("\n");
};
const result = resp?.result;
const collected: string[] = [];
const fromParts = collect(result?.parts);
if (fromParts) collected.push(fromParts);
if (result?.artifacts) {
for (const a of result.artifacts) {
const t = collect(a.parts);
if (t) collected.push(t);
}
}
return collected.join("\n");
}
export interface UseChatSendOptions {
getHistoryMessages: () => ChatMessage[];
onUserMessage?: (msg: ChatMessage) => void;
onAgentMessage?: (msg: ChatMessage) => void;
}
export function useChatSend(workspaceId: string, options: UseChatSendOptions) {
const [sending, setSending] = useState(false);
const [uploading, setUploading] = useState(false);
const [error, setError] = useState<string | null>(null);
const sendInFlightRef = useRef(false);
const sendingFromAPIRef = useRef(false);
const sendTokenRef = useRef(0);
const optionsRef = useRef(options);
optionsRef.current = options;
const releaseSendGuards = useCallback(() => {
setSending(false);
sendingFromAPIRef.current = false;
sendInFlightRef.current = false;
}, []);
const clearError = useCallback(() => setError(null), []);
const sendMessage = useCallback(
async (text: string, files: File[] = []) => {
const trimmed = text.trim();
if ((!trimmed && files.length === 0) || sending || uploading) return;
if (sendInFlightRef.current) return;
sendInFlightRef.current = true;
let uploaded: ChatAttachment[] = [];
if (files.length > 0) {
setUploading(true);
try {
uploaded = await uploadChatFiles(workspaceId, files);
} catch (e) {
setUploading(false);
sendInFlightRef.current = false;
setError(
e instanceof Error ? `Upload failed: ${e.message}` : "Upload failed",
);
return;
}
setUploading(false);
}
const userMsg = createMessage("user", trimmed, uploaded);
optionsRef.current.onUserMessage?.(userMsg);
setSending(true);
sendingFromAPIRef.current = true;
setError(null);
const myToken = ++sendTokenRef.current;
const history = optionsRef.current
.getHistoryMessages()
.filter((m) => m.role === "user" || m.role === "agent")
.slice(-20)
.map((m) => ({
role: m.role === "user" ? "user" : "agent",
parts: [{ kind: "text", text: m.content }],
}));
const parts: A2APart[] = [];
if (trimmed) parts.push({ kind: "text", text: trimmed });
for (const att of uploaded) {
parts.push({
kind: "file",
file: {
name: att.name,
mimeType: att.mimeType,
uri: att.uri,
size: att.size,
},
});
}
api
.post<A2AResponse>(
`/workspaces/${workspaceId}/a2a`,
{
method: "message/send",
params: {
message: {
role: "user",
messageId: crypto.randomUUID(),
parts,
},
metadata: { history },
},
},
{ timeoutMs: 120_000 },
)
.then((resp) => {
if (sendTokenRef.current !== myToken) return;
if (!sendingFromAPIRef.current) {
sendInFlightRef.current = false;
return;
}
const replyText = extractReplyText(resp);
const replyFiles = extractFilesFromTask(
(resp?.result ?? {}) as Record<string, unknown>,
);
if (replyText || replyFiles.length > 0) {
optionsRef.current.onAgentMessage?.(
createMessage("agent", replyText, replyFiles),
);
}
releaseSendGuards();
})
.catch(() => {
if (sendTokenRef.current !== myToken) return;
if (!sendingFromAPIRef.current) {
sendInFlightRef.current = false;
return;
}
releaseSendGuards();
setError("Failed to send message — agent may be unreachable");
});
},
[workspaceId, sending, uploading],
);
return {
sending,
uploading,
sendMessage,
error,
clearError,
releaseSendGuards,
sendingFromAPIRef,
};
}
@@ -1,100 +0,0 @@
"use client";
import { useCallback, useEffect, useRef } from "react";
import { useCanvasStore, type WorkspaceNodeData } from "@/store/canvas";
import { useSocketEvent } from "@/hooks/useSocketEvent";
import { createMessage, type ChatMessage } from "../types";
export interface UseChatSocketCallbacks {
onAgentMessage?: (msg: ChatMessage) => void;
onActivityLog?: (entry: string) => void;
onSendComplete?: () => void;
onSendError?: (error: string) => void;
}
export function useChatSocket(
workspaceId: string,
callbacks: UseChatSocketCallbacks,
): void {
const callbacksRef = useRef(callbacks);
callbacksRef.current = callbacks;
// Agent push messages from global store
const pendingAgentMsgs = useCanvasStore((s) => s.agentMessages[workspaceId]);
useEffect(() => {
if (!pendingAgentMsgs || pendingAgentMsgs.length === 0) return;
const consume = useCanvasStore.getState().consumeAgentMessages;
const msgs = consume(workspaceId);
for (const m of msgs) {
callbacksRef.current.onAgentMessage?.(
createMessage("agent", m.content, m.attachments),
);
}
if (msgs.length > 0) {
callbacksRef.current.onSendComplete?.();
}
}, [pendingAgentMsgs, workspaceId]);
const resolveWorkspaceName = useCallback((id: string) => {
const nodes = useCanvasStore.getState().nodes;
const node = nodes.find((n) => n.id === id);
return (node?.data as WorkspaceNodeData)?.name || id.slice(0, 8);
}, []);
useSocketEvent((msg) => {
try {
if (msg.event === "ACTIVITY_LOGGED") {
if (msg.workspace_id !== workspaceId) return;
const p = msg.payload || {};
const type = p.activity_type as string;
const method = (p.method as string) || "";
const status = (p.status as string) || "";
const targetId = (p.target_id as string) || "";
const durationMs = p.duration_ms as number | undefined;
const summary = (p.summary as string) || "";
let line = "";
if (type === "a2a_receive" && method === "message/send") {
const targetName = resolveWorkspaceName(targetId || msg.workspace_id);
if (status === "ok" && durationMs) {
const sec = Math.round(durationMs / 1000);
line = `${targetName} responded (${sec}s)`;
const own = (targetId || msg.workspace_id) === workspaceId;
if (own) callbacksRef.current.onSendComplete?.();
} else if (status === "error") {
line = `${targetName} error`;
const own = (targetId || msg.workspace_id) === workspaceId;
if (own) {
callbacksRef.current.onSendComplete?.();
callbacksRef.current.onSendError?.(
"Agent error (Exception) — see workspace logs for details.",
);
}
}
} else if (type === "a2a_send") {
const targetName = resolveWorkspaceName(targetId);
line = `→ Delegating to ${targetName}...`;
} else if (type === "task_update") {
if (summary) line = `${summary}`;
} else if (type === "agent_log") {
if (summary) line = summary;
}
if (line) {
callbacksRef.current.onActivityLog?.(line);
}
} else if (
msg.event === "TASK_UPDATED" &&
msg.workspace_id === workspaceId
) {
const task = (msg.payload?.current_task as string) || "";
if (task) {
callbacksRef.current.onActivityLog?.(`${task}`);
}
}
} catch {
/* ignore */
}
});
}
-3
View File
@@ -1,5 +1,2 @@
export { type ChatMessage, createMessage, appendMessageDeduped } from "./types";
export { extractAgentText, extractTextsFromParts, extractResponseText } from "./message-parser";
export { useChatHistory } from "./hooks/useChatHistory";
export { useChatSend } from "./hooks/useChatSend";
export { useChatSocket } from "./hooks/useChatSocket";
+1 -2
View File
@@ -8,7 +8,6 @@ import {
type PreflightResult,
type Template,
} from "@/lib/deploy-preflight";
import { isSaaSTenant } from "@/lib/tenant";
import { MissingKeysModal } from "@/components/MissingKeysModal";
/**
@@ -106,7 +105,7 @@ export function useTemplateDeploy(
const ws = await api.post<{ id: string }>("/workspaces", {
name: template.name,
template: template.id,
tier: isSaaSTenant() ? 4 : template.tier,
tier: template.tier,
canvas: coords,
...(model ? { model } : {}),
});
+2 -2
View File
@@ -21,8 +21,8 @@ export function statusDotClass(status: string): string {
export const TIER_CONFIG: Record<number, { label: string; color: string; border: string }> = {
1: { label: "T1", color: "text-ink-mid bg-surface-card border border-line", border: "text-ink-mid border-line" },
2: { label: "T2", color: "text-white bg-accent border border-accent-strong", border: "text-accent border-accent" },
3: { label: "T3", color: "text-white bg-violet-600 border border-violet-700", border: "text-white border-violet-500" },
4: { label: "T4", color: "text-white bg-warm border border-warm", border: "text-white border-warm" },
3: { label: "T3", color: "text-white bg-violet-600 border border-violet-700", border: "text-violet-600 border-violet-500" },
4: { label: "T4", color: "text-white bg-warm border border-warm", border: "text-warm border-warm" },
};
export const COMM_TYPE_LABELS: Record<string, string> = {
@@ -402,7 +402,7 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
return err
}
adapter, ok := GetAdapter(ch.ChannelType)
adapter, ok := GetSendAdapter(ch.ChannelType)
if !ok {
return fmt.Errorf("no adapter for %s", ch.ChannelType)
}
@@ -1,5 +1,7 @@
package channels
import "context"
// Registry of all available channel adapters.
// To add a new platform: implement ChannelAdapter, register here.
var adapters = map[string]ChannelAdapter{
@@ -9,6 +11,27 @@ var adapters = map[string]ChannelAdapter{
"discord": &DiscordAdapter{},
}
// SendAdapter is the subset of ChannelAdapter needed by SendOutbound.
// Extracted so tests can inject a no-op/mock adapter without hitting real
// platform APIs (Telegram Bot API, Slack API, etc.).
type SendAdapter interface {
SendMessage(ctx context.Context, config map[string]interface{}, chatID string, text string) error
}
// getSendAdapter is the production implementation of GetSendAdapter —
// returns the real registered adapter's SendMessage method.
func getSendAdapter(channelType string) (SendAdapter, bool) {
a, ok := adapters[channelType]
if !ok {
return nil, false
}
return a, true
}
// GetSendAdapter returns the SendAdapter for a channel type.
// Defaults to the real adapter; overridden by SetTestSendAdapter in tests.
var GetSendAdapter = getSendAdapter
// GetAdapter returns the adapter for a channel type.
func GetAdapter(channelType string) (ChannelAdapter, bool) {
a, ok := adapters[channelType]
@@ -0,0 +1,30 @@
package channels
import "context"
// MockSendAdapter implements SendAdapter for handler tests. It records every
// call and returns a configurable error (nil = success, non-nil = failure).
type MockSendAdapter struct {
Calls int
Err error
SentText string
SentChat string
}
func (m *MockSendAdapter) SendMessage(_ context.Context, _ map[string]interface{}, chatID string, text string) error {
m.Calls++
m.SentText = text
m.SentChat = chatID
return m.Err
}
// SetGetSendAdapter replaces the package-level GetSendAdapter variable.
// Tests MUST call ResetSendAdapters() in their t.Cleanup.
func SetGetSendAdapter(fn func(string) (SendAdapter, bool)) {
GetSendAdapter = fn
}
// ResetSendAdapters restores GetSendAdapter to the production implementation.
func ResetSendAdapters() {
GetSendAdapter = getSendAdapter
}
+21 -21
View File
@@ -97,28 +97,28 @@ const maxProxyResponseBody = 10 << 20
//
// Timeout model — three independent budgets, none of which gets in each other's way:
//
// 1. Client.Timeout — DELIBERATELY UNSET. Client.Timeout is a hard wall on
// the entire request including streamed body reads, and would pre-empt
// legitimate slow cold-start flows (Claude Code first-token over OAuth
// can take 30-60s on boot; long-running agent synthesis can stream
// tokens for minutes). Total-request budget is enforced per-request
// via context deadline (canvas = idle-only, agent-to-agent = 30 min ceiling).
// 1. Client.Timeout — DELIBERATELY UNSET. Client.Timeout is a hard wall on
// the entire request including streamed body reads, and would pre-empt
// legitimate slow cold-start flows (Claude Code first-token over OAuth
// can take 30-60s on boot; long-running agent synthesis can stream
// tokens for minutes). Total-request budget is enforced per-request
// via context deadline (canvas = idle-only, agent-to-agent = 30 min ceiling).
//
// 2. Transport.DialContext — 10s connect timeout. When a workspace's EC2
// black-holes TCP connects (instance terminated mid-flight, security group
// flipped, NACL bug), the OS default is 75s on Linux / 21s on macOS — long
// enough that Cloudflare's ~100s edge timeout can fire first and surface
// a generic 502 page to canvas. 10s is well above realistic intra-region
// latencies and well below CF's edge timeout.
// 2. Transport.DialContext — 10s connect timeout. When a workspace's EC2
// black-holes TCP connects (instance terminated mid-flight, security group
// flipped, NACL bug), the OS default is 75s on Linux / 21s on macOS — long
// enough that Cloudflare's ~100s edge timeout can fire first and surface
// a generic 502 page to canvas. 10s is well above realistic intra-region
// latencies and well below CF's edge timeout.
//
// 3. Transport.ResponseHeaderTimeout — 180s default. From request-body-end
// to response-headers-start. Configurable via
// A2A_PROXY_RESPONSE_HEADER_TIMEOUT (envx.Duration). Covers cold-start
// first-byte (30-60s OAuth flow above) with enough room for Opus agent
// turns (big context + internal delegate_task round-trips routinely exceed
// the old 60s ceiling). Body streaming after headers is governed by the
// per-request context deadline, NOT this timeout — so multi-minute agent
// responses still work fine.
// 3. Transport.ResponseHeaderTimeout — 180s default. From request-body-end
// to response-headers-start. Configurable via
// A2A_PROXY_RESPONSE_HEADER_TIMEOUT (envx.Duration). Covers cold-start
// first-byte (30-60s OAuth flow above) with enough room for Opus agent
// turns (big context + internal delegate_task round-trips routinely exceed
// the old 60s ceiling). Body streaming after headers is governed by the
// per-request context deadline, NOT this timeout — so multi-minute agent
// responses still work fine.
//
// The point of (2) and (3) is to surface a *structured* 503 from
// handleA2ADispatchError when the workspace agent is unreachable, so canvas
@@ -645,7 +645,7 @@ func (h *WorkspaceHandler) resolveAgentURL(ctx context.Context, workspaceID stri
// the caller can retry once the workspace is back online (~10s).
if status == "hibernated" {
log.Printf("ProxyA2A: waking hibernated workspace %s", workspaceID)
h.goAsync(func() { h.RestartByID(workspaceID) })
go h.RestartByID(workspaceID)
return "", &proxyA2AError{
Status: http.StatusServiceUnavailable,
Headers: map[string]string{"Retry-After": "15"},
@@ -194,7 +194,7 @@ func (h *WorkspaceHandler) maybeMarkContainerDead(ctx context.Context, workspace
}
db.ClearWorkspaceKeys(ctx, workspaceID)
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{})
h.goAsync(func() { h.RestartByID(workspaceID) })
go h.RestartByID(workspaceID)
return true
}
@@ -241,7 +241,7 @@ func (h *WorkspaceHandler) preflightContainerHealth(ctx context.Context, workspa
}
db.ClearWorkspaceKeys(ctx, workspaceID)
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceOffline), workspaceID, map[string]interface{}{})
h.goAsync(func() { h.RestartByID(workspaceID) })
go h.RestartByID(workspaceID)
return &proxyA2AError{
Status: http.StatusServiceUnavailable,
Response: gin.H{
@@ -262,8 +262,8 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle
errWsName = workspaceID
}
summary := "A2A request to " + errWsName + " failed: " + errMsg
h.goAsync(func() {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
go func(parent context.Context) {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
@@ -277,7 +277,7 @@ func (h *WorkspaceHandler) logA2AFailure(ctx context.Context, workspaceID, calle
Status: "error",
ErrorDetail: &errMsg,
})
})
}(ctx)
}
// logA2ASuccess records a successful A2A round-trip and (for canvas-initiated
@@ -298,19 +298,19 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
// silent workspaces. Only update when callerID is a real workspace (not
// canvas, not a system caller) and the target returned 2xx/3xx.
if callerID != "" && !isSystemCaller(callerID) && statusCode < 400 {
h.goAsync(func() {
go func() {
bgCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if _, err := db.DB.ExecContext(bgCtx,
`UPDATE workspaces SET last_outbound_at = NOW() WHERE id = $1`, callerID); err != nil {
log.Printf("last_outbound_at update failed for %s: %v", callerID, err)
}
})
}()
}
summary := a2aMethod + " → " + wsNameForLog
toolTrace := extractToolTrace(respBody)
h.goAsync(func() {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
go func(parent context.Context) {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
@@ -325,7 +325,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle
DurationMs: &durationMs,
Status: logStatus,
})
})
}(ctx)
if callerID == "" && statusCode < 400 {
h.broadcaster.BroadcastOnly(workspaceID, string(events.EventA2AResponse), map[string]interface{}{
@@ -510,8 +510,8 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID,
wsName = workspaceID
}
summary := a2aMethod + " → " + wsName + " (queued for poll)"
h.goAsync(func() {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 30*time.Second)
go func(parent context.Context) {
logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second)
defer cancel()
LogActivity(logCtx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
@@ -523,7 +523,7 @@ func (h *WorkspaceHandler) logA2AReceiveQueued(ctx context.Context, workspaceID,
RequestBody: json.RawMessage(body),
Status: "ok",
})
})
}(ctx)
}
// readUsageMap extracts input_tokens / output_tokens from the "usage" key of m.
@@ -54,7 +54,6 @@ func TestPreflight_ContainerRunning_ReturnsNil(t *testing.T) {
_ = setupTestDB(t)
stub := &preflightLocalProv{running: true, err: nil}
h := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, h)
h.provisioner = stub
if err := h.preflightContainerHealth(context.Background(), "ws-running-123"); err != nil {
@@ -187,8 +186,8 @@ func TestProxyA2A_Preflight_RoutesThroughProvisionerSSOT(t *testing.T) {
}
var (
callsIsRunning bool
callsContainerInspectRaw bool
callsIsRunning bool
callsContainerInspectRaw bool
callsRunningContainerNameDirect bool
)
ast.Inspect(fn.Body, func(n ast.Node) bool {
@@ -262,7 +262,6 @@ func TestProxyA2A_Upstream502_TriggersContainerDeadCheck(t *testing.T) {
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
cp := &fakeCPProv{running: false}
handler.SetCPProvisioner(cp)
@@ -325,7 +324,6 @@ func TestProxyA2A_Upstream502_AliveAgent_PropagatesAsIs(t *testing.T) {
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
cp := &fakeCPProv{running: true}
handler.SetCPProvisioner(cp)
@@ -515,7 +513,6 @@ func TestProxyA2A_AllowedSelf_SkipsAccessCheck(t *testing.T) {
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
@@ -664,18 +661,18 @@ func TestProxyA2A_CallerIDDerivedFromBearer(t *testing.T) {
// (column order: workspace_id, activity_type, source_id, target_id, ...)
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs(
"ws-target", // $1 workspace_id
"a2a_receive", // $2 activity_type
sqlmock.AnyArg(), // $3 source_id — *string("ws-caller"), checked below
sqlmock.AnyArg(), // $4 target_id
sqlmock.AnyArg(), // $5 method
sqlmock.AnyArg(), // $6 summary
sqlmock.AnyArg(), // $7 request_body
sqlmock.AnyArg(), // $8 response_body
sqlmock.AnyArg(), // $9 tool_trace
sqlmock.AnyArg(), // $10 duration_ms
sqlmock.AnyArg(), // $11 status
sqlmock.AnyArg(), // $12 error_detail
"ws-target", // $1 workspace_id
"a2a_receive", // $2 activity_type
sqlmock.AnyArg(), // $3 source_id — *string("ws-caller"), checked below
sqlmock.AnyArg(), // $4 target_id
sqlmock.AnyArg(), // $5 method
sqlmock.AnyArg(), // $6 summary
sqlmock.AnyArg(), // $7 request_body
sqlmock.AnyArg(), // $8 response_body
sqlmock.AnyArg(), // $9 tool_trace
sqlmock.AnyArg(), // $10 duration_ms
sqlmock.AnyArg(), // $11 status
sqlmock.AnyArg(), // $12 error_detail
).
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -1719,6 +1716,7 @@ func TestDispatchA2A_RejectsUnsafeURL(t *testing.T) {
}
}
// --- handleA2ADispatchError ---
func TestHandleA2ADispatchError_ContextDeadline(t *testing.T) {
@@ -1805,7 +1803,6 @@ func TestMaybeMarkContainerDead_CPOnly_NotRunning(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
cp := &fakeCPProv{running: false}
handler.SetCPProvisioner(cp)
@@ -1958,7 +1955,6 @@ func TestLogA2AFailure_Smoke(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
// Sync workspace-name lookup (called in the caller goroutine).
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
@@ -1977,7 +1973,6 @@ func TestLogA2AFailure_EmptyNameFallback(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
// Empty name from DB → summary uses the workspaceID as the name.
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
@@ -1994,7 +1989,6 @@ func TestLogA2ASuccess_Smoke(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
WithArgs("ws-ok").
@@ -2011,7 +2005,6 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
mock.ExpectQuery(`SELECT name FROM workspaces WHERE id =`).
WithArgs("ws-err").
@@ -26,19 +26,14 @@ import (
// setupTestDBForQueueTests creates a sqlmock DB using QueryMatcherEqual (exact
// string matching) so that ExpectQuery/ExpectExec patterns are compared verbatim.
// Uses the same global db.DB as setupTestDB so the handler can use it.
//
// IMPORTANT: db.DB is saved before assignment and restored via t.Cleanup so
// that tests running after this one are not polluted by a closed mock.
// Same fix as setupTestDB (handlers_test.go); same root cause as mc#975.
func setupTestDBForQueueTests(t *testing.T) sqlmock.Sqlmock {
t.Helper()
mockDB, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
t.Cleanup(func() { mockDB.Close() })
return mock
}
@@ -14,18 +14,16 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
type ActivityHandler struct {
broadcaster *events.Broadcaster
notifier *push.Notifier
}
func NewActivityHandler(b *events.Broadcaster, notifier *push.Notifier) *ActivityHandler {
return &ActivityHandler{broadcaster: b, notifier: notifier}
func NewActivityHandler(b *events.Broadcaster) *ActivityHandler {
return &ActivityHandler{broadcaster: b}
}
// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id=
@@ -478,7 +476,7 @@ func (h *ActivityHandler) Notify(c *gin.Context) {
for _, a := range body.Attachments {
attachments = append(attachments, AgentMessageAttachment(a))
}
writer := NewAgentMessageWriter(db.DB, h.broadcaster, h.notifier)
writer := NewAgentMessageWriter(db.DB, h.broadcaster)
if err := writer.Send(c.Request.Context(), workspaceID, body.Message, attachments); err != nil {
if errors.Is(err, ErrWorkspaceNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
@@ -40,7 +40,7 @@ func TestActivityHandler_SinceID_ReturnsNewerASC(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -69,7 +69,7 @@ func TestActivityHandler_SinceID_CursorNotFound_410(t *testing.T) {
WillReturnError(sql.ErrNoRows)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -101,7 +101,7 @@ func TestActivityHandler_SinceID_CrossWorkspaceCursor_410(t *testing.T) {
WillReturnError(sql.ErrNoRows)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -137,7 +137,7 @@ func TestActivityHandler_SinceID_CombinedWithSinceSecs(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -41,7 +41,7 @@ func TestActivityHandler_SinceSecs_Accepted(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -70,7 +70,7 @@ func TestActivityHandler_SinceSecs_ClampedAt30Days(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -106,7 +106,7 @@ func TestActivityHandler_SinceSecs_InvalidRejected(t *testing.T) {
// No DB call expected; bad input must be caught before the query.
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -142,7 +142,7 @@ func TestActivityHandler_SinceSecs_Omitted(t *testing.T) {
WillReturnRows(newActivityRows())
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -22,7 +22,7 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
rows := sqlmock.NewRows([]string{
"kind", "id", "workspace_id", "label", "content", "method", "status", "request_body", "response_body", "created_at",
@@ -68,7 +68,7 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) {
func TestActivityList_SourceCanvas(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
// Expect query with "source_id IS NULL"
mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND source_id IS NULL`).
@@ -97,7 +97,7 @@ func TestActivityList_SourceCanvas(t *testing.T) {
func TestActivityList_SourceAgent(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
// Expect query with "source_id IS NOT NULL"
mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND source_id IS NOT NULL`).
@@ -126,7 +126,7 @@ func TestActivityList_SourceAgent(t *testing.T) {
func TestActivityList_SourceInvalid(t *testing.T) {
gin.SetMode(gin.TestMode)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -142,7 +142,7 @@ func TestActivityList_SourceInvalid(t *testing.T) {
func TestActivityList_SourceWithType(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
// Both type and source filters
mock.ExpectQuery(`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND activity_type = .+ AND source_id IS NULL`).
@@ -181,7 +181,7 @@ const testPeerUUID = "11111111-2222-3333-4444-555555555555"
func TestActivityList_PeerIDFilter(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
// peer_id binds twice in the query (source_id OR target_id) but is
// added to args once — sqlmock matches positional args, so the
@@ -220,7 +220,7 @@ func TestActivityList_PeerIDComposesWithType(t *testing.T) {
// of the builder can't silently rearrange placeholders.
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
mock.ExpectQuery(
`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND activity_type = .+ AND source_id IS NOT NULL AND \(source_id = .+ OR target_id = .+\)`,
@@ -258,7 +258,7 @@ func TestActivityList_PeerIDRejectsNonUUID(t *testing.T) {
// otherwise interpolate the value into the URL or another query.
gin.SetMode(gin.TestMode)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
for _, bad := range []string{
"not-a-uuid",
@@ -292,7 +292,7 @@ func TestActivityList_PeerIDRejectsNonUUID(t *testing.T) {
func TestActivityList_BeforeTSFilter(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
cutoff, _ := time.Parse(time.RFC3339, "2026-05-01T00:00:00Z")
mock.ExpectQuery(
@@ -328,7 +328,7 @@ func TestActivityList_BeforeTSComposesWithPeerID(t *testing.T) {
// can't silently drop one filter or reorder placeholders.
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
cutoff, _ := time.Parse(time.RFC3339, "2026-05-01T00:00:00Z")
mock.ExpectQuery(
@@ -363,7 +363,7 @@ func TestActivityList_BeforeTSComposesWithPeerID(t *testing.T) {
func TestActivityList_BeforeTSRejectsInvalidFormat(t *testing.T) {
gin.SetMode(gin.TestMode)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
for _, bad := range []string{
"yesterday",
@@ -388,19 +388,15 @@ func TestActivityList_BeforeTSRejectsInvalidFormat(t *testing.T) {
// ---------- Activity type allowlist (#125: memory_write added) ----------
func TestActivityReport_AcceptsMemoryWriteType(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
mockDB, mock, _ := sqlmock.New()
defer mockDB.Close()
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnResult(sqlmock.NewResult(1, 1))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -417,16 +413,12 @@ func TestActivityReport_AcceptsMemoryWriteType(t *testing.T) {
}
func TestActivityReport_RejectsUnknownType(t *testing.T) {
mockDB, _, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
mockDB, _, _ := sqlmock.New()
defer mockDB.Close()
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -455,13 +447,9 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) {
// - Have source_id NULL (canvas-source filter)
// - Carry the message text in response_body so extractResponseText
// can reconstruct the agent reply on reload
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
mockDB, mock, _ := sqlmock.New()
defer mockDB.Close()
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
// Workspace existence check
mock.ExpectQuery(`SELECT name FROM workspaces`).
@@ -478,7 +466,7 @@ func TestNotify_PersistsToActivityLogsForReloadRecovery(t *testing.T) {
WillReturnResult(sqlmock.NewResult(1, 1))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -503,13 +491,9 @@ func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) {
// download chips after a page reload. Without `parts`, the bubble
// shows up but the attachment chip is silently dropped on every
// refresh.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
mockDB, mock, _ := sqlmock.New()
defer mockDB.Close()
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
mock.ExpectQuery(`SELECT name FROM workspaces`).
WithArgs("ws-attach").
@@ -527,7 +511,7 @@ func TestNotify_WithAttachments_PersistsFilePartsForReload(t *testing.T) {
WillReturnResult(sqlmock.NewResult(1, 1))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -581,19 +565,15 @@ func TestNotify_RejectsAttachmentWithEmptyURIOrName(t *testing.T) {
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
mockDB, _, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
mockDB, _, _ := sqlmock.New()
defer mockDB.Close()
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
// No DB expectations — handler must reject with 400 BEFORE
// reaching SELECT/INSERT. sqlmock will fail "expectations not met"
// only if the handler unexpectedly queries.
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -632,13 +612,9 @@ func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) {
// WebSocket push (which the user is already seeing in their open
// canvas). Pre-fix the WS push always succeeded; we don't want
// the new persistence step to regress that path.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
mockDB, mock, _ := sqlmock.New()
defer mockDB.Close()
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
mock.ExpectQuery(`SELECT name FROM workspaces`).
WithArgs("ws-x").
@@ -647,7 +623,7 @@ func TestNotify_DBFailure_StillBroadcastsAnd200(t *testing.T) {
WillReturnError(fmt.Errorf("simulated db hiccup"))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
@@ -44,7 +44,6 @@ import (
"log"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/textutil"
)
@@ -77,14 +76,12 @@ type AgentMessageAttachment struct {
type AgentMessageWriter struct {
db *sql.DB
broadcaster events.EventEmitter
notifier *push.Notifier
}
// NewAgentMessageWriter binds the writer to the platform's DB pool +
// WebSocket broadcaster. notifier may be nil if push notifications are
// not configured.
func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter, notifier *push.Notifier) *AgentMessageWriter {
return &AgentMessageWriter{db: db, broadcaster: broadcaster, notifier: notifier}
// WebSocket broadcaster.
func NewAgentMessageWriter(db *sql.DB, broadcaster events.EventEmitter) *AgentMessageWriter {
return &AgentMessageWriter{db: db, broadcaster: broadcaster}
}
// Send delivers a single agent → user message. Look up + broadcast +
@@ -135,12 +132,7 @@ func (w *AgentMessageWriter) Send(
}
w.broadcaster.BroadcastOnly(workspaceID, string(events.EventAgentMessage), broadcastPayload)
// 3. Send push notifications to mobile devices.
if w.notifier != nil {
w.notifier.NotifyAgentMessage(ctx, workspaceID, wsName, message)
}
// 4. Persist for chat-history hydration. response_body shape MUST stay
// 3. Persist for chat-history hydration. response_body shape MUST stay
// in sync with extractResponseText + extractFilesFromTask in
// canvas/src/components/tabs/chat/historyHydration.ts:
// - extractResponseText reads body.result (string) → renders text
@@ -86,7 +86,7 @@ func (c *capturingEmitter) RecordAndBroadcast(_ context.Context, eventType strin
// path: workspace lookup, broadcast, INSERT, return nil.
func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-1").
@@ -114,7 +114,7 @@ func TestAgentMessageWriter_Send_Success_NoAttachments(t *testing.T) {
// Drift here = chips disappear on chat reload.
func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-att").
@@ -171,7 +171,7 @@ func TestAgentMessageWriter_Send_Success_WithAttachments(t *testing.T) {
func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
mock := setupTestDB(t)
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter, nil)
w := NewAgentMessageWriter(db.DB, emitter)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-missing").
@@ -200,7 +200,7 @@ func TestAgentMessageWriter_Send_WorkspaceNotFound(t *testing.T) {
// broadcast.
func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-dbfail").
@@ -221,7 +221,7 @@ func TestAgentMessageWriter_Send_DBInsertFailureStillReturnsNil(t *testing.T) {
// table doesn't carry multi-KB summaries that bloat list queries.
func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-trunc").
@@ -261,7 +261,7 @@ func TestAgentMessageWriter_Send_PreviewTruncation(t *testing.T) {
func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
mock := setupTestDB(t)
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter, nil)
w := NewAgentMessageWriter(db.DB, emitter)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-bc").
@@ -312,7 +312,7 @@ func TestAgentMessageWriter_Send_BroadcastsAgentMessageEvent(t *testing.T) {
// real incidents in alerting.
func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
transientErr := errors.New("connection refused")
mock.ExpectQuery("SELECT name FROM workspaces").
@@ -344,7 +344,7 @@ func TestAgentMessageWriter_Send_DBErrorOnLookupReturnsWrapped(t *testing.T) {
// coverage. Now it does.
func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) {
mock := setupTestDB(t)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster(), nil)
w := NewAgentMessageWriter(db.DB, newTestBroadcaster())
// 200-rune CJK message — exceeds the 80-rune cap, would have hit
// the byte-slice bug.
@@ -393,7 +393,7 @@ func TestAgentMessageWriter_Send_NonASCIIMessagePersists(t *testing.T) {
func TestAgentMessageWriter_Send_OmitsAttachmentsKeyWhenEmpty(t *testing.T) {
mock := setupTestDB(t)
emitter := &capturingEmitter{}
w := NewAgentMessageWriter(db.DB, emitter, nil)
w := NewAgentMessageWriter(db.DB, emitter)
mock.ExpectQuery("SELECT name FROM workspaces").
WithArgs("ws-noatt").
@@ -15,7 +15,6 @@ import (
sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/channels"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
@@ -328,6 +327,207 @@ func TestChannelHandler_Send_EmptyText(t *testing.T) {
}
}
// ==================== Test (send outbound) ====================
// TestChannelHandler_Test_Success exercises the /channels/:channelId/test endpoint
// with a mock SendAdapter so the full success path is covered without hitting real
// Telegram/Slack/etc. APIs.
func TestChannelHandler_Test_Success(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewChannelHandler(newTestChannelManager())
mockAdapter := &channels.MockSendAdapter{Err: nil}
channels.SetGetSendAdapter(func(ct string) (channels.SendAdapter, bool) {
if ct == "telegram" {
return mockAdapter, true
}
return channels.GetSendAdapter(ct)
})
t.Cleanup(channels.ResetSendAdapters)
// loadChannel → valid row
mock.ExpectQuery("SELECT .+ FROM workspace_channels WHERE id").
WithArgs("ch-test-ok").
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "channel_type", "channel_config",
"enabled", "allowed_users",
}).AddRow("ch-test-ok", "ws-1", "telegram",
`{"bot_token":"123:AAA","chat_id":"-100"}`,
true, `[]`))
// UPDATE message_count + last_message_at
mock.ExpectExec("UPDATE workspace_channels SET last_message_at").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/workspaces/ws-1/channels/ch-test-ok/test", nil)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "channelId", Value: "ch-test-ok"}}
handler.Test(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["status"] != "ok" {
t.Errorf("expected status 'ok', got %v", resp["status"])
}
if mockAdapter.Calls != 1 {
t.Errorf("expected SendMessage called once, got %d", mockAdapter.Calls)
}
if mockAdapter.SentChat != "-100" {
t.Errorf("expected chat_id '-100', got %q", mockAdapter.SentChat)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// TestChannelHandler_Test_ChannelNotFound verifies that when loadChannel returns
// no rows, the Test handler returns 500 with a "test message failed" error.
func TestChannelHandler_Test_ChannelNotFound(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewChannelHandler(newTestChannelManager())
// loadChannel → no rows
mock.ExpectQuery("SELECT .+ FROM workspace_channels WHERE id").
WithArgs("ch-missing").
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "channel_type", "channel_config",
"enabled", "allowed_users",
}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/workspaces/ws-1/channels/ch-missing/test", nil)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "channelId", Value: "ch-missing"}}
handler.Test(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500 for missing channel, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["error"] != "test message failed" {
t.Errorf("expected error 'test message failed', got %v", resp["error"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// TestChannelHandler_Send_Success covers the full outbound send success path:
// budget check passes → loadChannel → mock SendMessage succeeds → UPDATE count → 200.
func TestChannelHandler_Send_Success(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewChannelHandler(newTestChannelManager())
mockAdapter := &channels.MockSendAdapter{Err: nil}
channels.SetGetSendAdapter(func(ct string) (channels.SendAdapter, bool) {
if ct == "telegram" {
return mockAdapter, true
}
return channels.GetSendAdapter(ct)
})
t.Cleanup(channels.ResetSendAdapters)
// Budget check: count=0, no budget limit
mock.ExpectQuery("SELECT message_count, channel_budget FROM workspace_channels WHERE id").
WithArgs("ch-send-ok").
WillReturnRows(sqlmock.NewRows([]string{"message_count", "channel_budget"}).
AddRow(0, nil))
// loadChannel → valid row
mock.ExpectQuery("SELECT .+ FROM workspace_channels WHERE id").
WithArgs("ch-send-ok").
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "channel_type", "channel_config",
"enabled", "allowed_users",
}).AddRow("ch-send-ok", "ws-1", "telegram",
`{"bot_token":"123:AAA","chat_id":"-100"}`,
true, `[]`))
// UPDATE message_count
mock.ExpectExec("UPDATE workspace_channels SET last_message_at").
WillReturnResult(sqlmock.NewResult(0, 1))
body, _ := json.Marshal(map[string]string{"text": "hello from test"})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/workspaces/ws-1/channels/ch-send-ok/send", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "channelId", Value: "ch-send-ok"}}
handler.Send(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["status"] != "sent" {
t.Errorf("expected status 'sent', got %v", resp["status"])
}
if mockAdapter.Calls != 1 {
t.Errorf("expected SendMessage called once, got %d", mockAdapter.Calls)
}
if mockAdapter.SentText != "hello from test" {
t.Errorf("expected 'hello from test', got %q", mockAdapter.SentText)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// TestChannelHandler_Send_ChannelNotFound verifies that after the budget check
// passes, a missing channel returns 500 (not 404) with "send failed".
func TestChannelHandler_Send_ChannelNotFound(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewChannelHandler(newTestChannelManager())
// Budget check passes (NULL budget → no limit)
mock.ExpectQuery("SELECT message_count, channel_budget FROM workspace_channels WHERE id").
WithArgs("ch-send-missing").
WillReturnRows(sqlmock.NewRows([]string{"message_count", "channel_budget"}).
AddRow(0, nil))
// loadChannel → no rows
mock.ExpectQuery("SELECT .+ FROM workspace_channels WHERE id").
WithArgs("ch-send-missing").
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "channel_type", "channel_config",
"enabled", "allowed_users",
}))
body, _ := json.Marshal(map[string]string{"text": "hello"})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/workspaces/ws-1/channels/ch-send-missing/send", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
c.Params = gin.Params{{Key: "id", Value: "ws-1"}, {Key: "channelId", Value: "ch-send-missing"}}
handler.Send(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("expected 500 for missing channel, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if resp["error"] != "send failed" {
t.Errorf("expected error 'send failed', got %v", resp["error"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations not met: %v", err)
}
}
// ==================== Webhook ====================
func TestChannelHandler_Webhook_UnknownType(t *testing.T) {
@@ -365,20 +565,6 @@ func TestChannelHandler_Discover_MissingToken(t *testing.T) {
}
func TestChannelHandler_Discover_UnsupportedType(t *testing.T) {
// Set up db.DB so PausePollersForToken (called inside Discover) doesn't panic.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
t.Cleanup(func() { mockDB.Close() })
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB })
mock.ExpectQuery(`SELECT id, channel_config FROM workspace_channels WHERE enabled = true AND workspace_id`).
WithArgs("ws-test").
WillReturnRows(sqlmock.NewRows([]string{"id", "channel_config"}))
handler := NewChannelHandler(newTestChannelManager())
// #329: workspace_id required — include so we actually reach the
@@ -402,20 +588,6 @@ func TestChannelHandler_Discover_UnsupportedType(t *testing.T) {
}
func TestChannelHandler_Discover_InvalidBotToken(t *testing.T) {
// Set up db.DB so PausePollersForToken (called inside Discover) doesn't panic.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
t.Cleanup(func() { mockDB.Close() })
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB })
mock.ExpectQuery(`SELECT id, channel_config FROM workspace_channels WHERE enabled = true AND workspace_id`).
WithArgs("ws-test").
WillReturnRows(sqlmock.NewRows([]string{"id", "channel_config"}))
handler := NewChannelHandler(newTestChannelManager())
body, _ := json.Marshal(map[string]interface{}{
@@ -2,7 +2,6 @@ package handlers
import (
"context"
"database/sql"
"encoding/json"
"log"
"net/http"
@@ -263,20 +262,14 @@ func insertDelegationRow(ctx context.Context, c *gin.Context, sourceID string, b
"task": body.Task,
"delegation_id": delegationID,
})
// Store delegation_id in response_body so agent check_delegation_status
// (which reads response_body->>delegation_id) can locate this row even
// when request_body hasn't propagated yet. Fixes mc#984.
respJSON, _ := json.Marshal(map[string]interface{}{
"delegation_id": delegationID,
})
var idemArg interface{}
if body.IdempotencyKey != "" {
idemArg = body.IdempotencyKey
}
_, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status, idempotency_key)
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, $6::jsonb, 'pending', $7)
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), string(respJSON), idemArg)
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status, idempotency_key)
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending', $6)
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), idemArg)
if err == nil {
// RFC #2829 #318 — mirror to the durable delegations ledger
// (gated by DELEGATION_LEDGER_WRITE; default off → no-op).
@@ -551,15 +544,10 @@ func (h *DelegationHandler) Record(c *gin.Context) {
"task": body.Task,
"delegation_id": body.DelegationID,
})
// Store delegation_id in response_body so agent check_delegation_status
// can locate this row. Fixes mc#984.
respJSON, _ := json.Marshal(map[string]interface{}{
"delegation_id": body.DelegationID,
})
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status)
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, $6::jsonb, 'dispatched')
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON), string(respJSON)); err != nil {
INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status)
VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'dispatched')
`, sourceID, sourceID, body.TargetID, "Delegating to "+body.TargetID, string(taskJSON)); err != nil {
log.Printf("Delegation Record: insert failed for %s: %v", body.DelegationID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to record delegation"})
return
@@ -699,8 +687,7 @@ func (h *DelegationHandler) listDelegationsFromLedger(ctx context.Context, works
var result []map[string]interface{}
for rows.Next() {
var delegationID, callerID, calleeID, taskPreview, status string
var resultPreview, errorDetail sql.NullString
var delegationID, callerID, calleeID, taskPreview, status, resultPreview, errorDetail string
var lastHeartbeat, deadline, createdAt, updatedAt *time.Time
if err := rows.Scan(
&delegationID, &callerID, &calleeID, &taskPreview,
@@ -719,11 +706,11 @@ func (h *DelegationHandler) listDelegationsFromLedger(ctx context.Context, works
"updated_at": updatedAt,
"_ledger": true, // marker so callers know this row is from the ledger
}
if resultPreview.Valid && resultPreview.String != "" {
entry["response_preview"] = textutil.TruncateBytes(resultPreview.String, 300)
if resultPreview != "" {
entry["response_preview"] = textutil.TruncateBytes(resultPreview, 300)
}
if errorDetail.Valid && errorDetail.String != "" {
entry["error"] = errorDetail.String
if errorDetail != "" {
entry["error"] = errorDetail
}
if lastHeartbeat != nil {
entry["last_heartbeat"] = lastHeartbeat
@@ -1,488 +0,0 @@
package handlers
// delegation_list_test.go — unit tests for listDelegationsFromLedger and
// listDelegationsFromActivityLogs. Both methods are the data-backend of the
// ListDelegations handler; coverage was missing (cf. infra-sre review of PR #942).
import (
"context"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// ---------- listDelegationsFromLedger ----------
func TestListDelegationsFromLedger_EmptyResult(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
rows := sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail",
"last_heartbeat", "deadline", "created_at", "updated_at",
})
mock.ExpectQuery("SELECT .+ FROM delegations").
WithArgs("ws-1").
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
got := dh.listDelegationsFromLedger(context.Background(), "ws-1")
if got != nil {
t.Errorf("empty result: expected nil, got %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
func TestListDelegationsFromLedger_SingleRow(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
now := time.Now()
// Use time.Time{} for nullable *time.Time columns — sqlmock passes the
// zero value to the handler's scan destination. The handler checks Valid
// before using each nullable field, so zero values are safe.
rows := sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail",
"last_heartbeat", "deadline", "created_at", "updated_at",
}).AddRow(
"del-1", "ws-1", "ws-2", "summarise the report",
"completed", "the report is about Q1",
"", now, now, now, now,
)
mock.ExpectQuery("SELECT .+ FROM delegations").
WithArgs("ws-1").
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
got := dh.listDelegationsFromLedger(context.Background(), "ws-1")
if len(got) != 1 {
t.Fatalf("expected 1 entry, got %d", len(got))
}
e := got[0]
if e["delegation_id"] != "del-1" {
t.Errorf("delegation_id: got %v, want del-1", e["delegation_id"])
}
if e["source_id"] != "ws-1" {
t.Errorf("source_id: got %v, want ws-1", e["source_id"])
}
if e["target_id"] != "ws-2" {
t.Errorf("target_id: got %v, want ws-2", e["target_id"])
}
if e["status"] != "completed" {
t.Errorf("status: got %v, want completed", e["status"])
}
if e["response_preview"] != "the report is about Q1" {
t.Errorf("response_preview: got %v", e["response_preview"])
}
if _, ok := e["error"]; ok {
t.Errorf("error should be absent when empty, got %v", e["error"])
}
if e["_ledger"] != true {
t.Errorf("_ledger marker: got %v, want true", e["_ledger"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
func TestListDelegationsFromLedger_MultipleRows(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
now := time.Now()
rows := sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail",
"last_heartbeat", "deadline", "created_at", "updated_at",
}).
AddRow("del-a", "ws-1", "ws-2", "task a", "in_progress", "", "", now, now, now, now).
AddRow("del-b", "ws-1", "ws-3", "task b", "failed", "", "timeout", now, now, now, now).
AddRow("del-c", "ws-1", "ws-4", "task c", "completed", "result c", "", now, now, now, now)
mock.ExpectQuery("SELECT .+ FROM delegations").
WithArgs("ws-1").
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
got := dh.listDelegationsFromLedger(context.Background(), "ws-1")
if len(got) != 3 {
t.Fatalf("expected 3 entries, got %d", len(got))
}
if got[0]["delegation_id"] != "del-a" || got[1]["delegation_id"] != "del-b" || got[2]["delegation_id"] != "del-c" {
t.Errorf("unexpected order: %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
func TestListDelegationsFromLedger_NullsOmitted(t *testing.T) {
// last_heartbeat, deadline, result_preview, error_detail are all NULL.
// Handler must not panic and must omit those keys from the map.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
now := time.Now()
rows := sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail",
"last_heartbeat", "deadline", "created_at", "updated_at",
}).
AddRow("del-1", "ws-1", "ws-2", "task", "queued", nil, nil, nil, nil, now, now)
mock.ExpectQuery("SELECT .+ FROM delegations").
WithArgs("ws-1").
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
got := dh.listDelegationsFromLedger(context.Background(), "ws-1")
if len(got) != 1 {
t.Fatalf("expected 1 entry, got %d", len(got))
}
e := got[0]
if _, ok := e["last_heartbeat"]; ok {
t.Error("last_heartbeat should be absent when NULL")
}
if _, ok := e["deadline"]; ok {
t.Error("deadline should be absent when NULL")
}
if _, ok := e["response_preview"]; ok {
t.Error("response_preview should be absent when NULL result_preview")
}
if _, ok := e["error"]; ok {
t.Error("error should be absent when NULL error_detail")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
func TestListDelegationsFromLedger_QueryError(t *testing.T) {
// Query failure returns nil — graceful fallback, no panic.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
mock.ExpectQuery("SELECT .+ FROM delegations").
WithArgs("ws-1").
WillReturnError(context.DeadlineExceeded)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
got := dh.listDelegationsFromLedger(context.Background(), "ws-1")
if got != nil {
t.Errorf("query error: expected nil, got %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
func TestListDelegationsFromLedger_RowsErr(t *testing.T) {
// rows.Err() mid-stream: handler collects partial results and returns them.
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
now := time.Now()
// RowError(0) before AddRow(0): row 0 is "bad", rows.Next() returns false
// on first call — the row never scans, result stays nil. To get partial
// results (row 0 scanned) with rows.Err() non-nil, we use 2 rows and put
// RowError(1) after AddRow(1): row 0 scans normally, row 1 is bad,
// rows.Err() is error, handler returns partial result.
rows := sqlmock.NewRows([]string{
"delegation_id", "caller_id", "callee_id", "task_preview",
"status", "result_preview", "error_detail",
"last_heartbeat", "deadline", "created_at", "updated_at",
}).
AddRow("del-1", "ws-1", "ws-2", "task", "queued", "", "", now, now, now, now).
AddRow("del-2", "ws-1", "ws-3", "another task", "queued", "", "", now, now, now, now).
RowError(1, context.DeadlineExceeded)
mock.ExpectQuery("SELECT .+ FROM delegations").
WithArgs("ws-1").
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
got := dh.listDelegationsFromLedger(context.Background(), "ws-1")
// Row 0 scanned and appended; row 1 is bad; rows.Err() is non-nil.
// Handler logs the error but returns result (partial results because result != nil).
if got == nil || len(got) != 1 {
t.Errorf("rows.Err path: expected 1 partial result, got %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
// TestListDelegationsFromLedger_ScanError is removed.
//
// In Go 1.25 sqlmock.NewRows validates column count at AddRow() time and
// panics when len(values) != len(columns). The old pattern
// sqlmock.NewRows([]string{}).AddRow("only-one-col")
// therefore panics in test SETUP, not inside the handler. The handler has no
// recover(), so a scan panic would propagate out of listDelegationsFromLedger
// and crash the process — this is the correct behaviour (not silently skipping
// a row). The correct way to cover this path is a real-DB integration test.
//
// ---------- listDelegationsFromActivityLogs ----------
func TestListDelegationsFromActivityLogs_EmptyResult(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
rows := sqlmock.NewRows([]string{
"id", "activity_type", "source_id", "target_id",
"summary", "status", "error_detail",
"response_preview", "delegation_id", "created_at",
})
mock.ExpectQuery("SELECT .+ FROM activity_logs").
WithArgs("ws-1").
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
got := dh.listDelegationsFromActivityLogs(context.Background(), "ws-1")
if len(got) != 0 {
t.Errorf("empty result: expected empty slice, got %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
func TestListDelegationsFromActivityLogs_SingleDelegateRow(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
now := time.Now()
rows := sqlmock.NewRows([]string{
"id", "activity_type", "source_id", "target_id",
"summary", "status", "error_detail",
"response_preview", "delegation_id", "created_at",
}).AddRow(
"act-1", "delegate",
"ws-1", "ws-2",
"analyse Q1 numbers",
"in_progress",
"", "", "",
now,
)
mock.ExpectQuery("SELECT .+ FROM activity_logs").
WithArgs("ws-1").
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
got := dh.listDelegationsFromActivityLogs(context.Background(), "ws-1")
if len(got) != 1 {
t.Fatalf("expected 1 entry, got %d", len(got))
}
e := got[0]
if e["id"] != "act-1" {
t.Errorf("id: got %v, want act-1", e["id"])
}
if e["type"] != "delegate" {
t.Errorf("type: got %v, want delegate", e["type"])
}
if e["source_id"] != "ws-1" {
t.Errorf("source_id: got %v, want ws-1", e["source_id"])
}
if e["target_id"] != "ws-2" {
t.Errorf("target_id: got %v, want ws-2", e["target_id"])
}
if e["summary"] != "analyse Q1 numbers" {
t.Errorf("summary: got %v", e["summary"])
}
if e["status"] != "in_progress" {
t.Errorf("status: got %v", e["status"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
func TestListDelegationsFromActivityLogs_DelegateResultWithError(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
now := time.Now()
rows := sqlmock.NewRows([]string{
"id", "activity_type", "source_id", "target_id",
"summary", "status", "error_detail",
"response_preview", "delegation_id", "created_at",
}).AddRow(
"act-2", "delegate_result",
"ws-1", "ws-2",
"result summary",
"failed",
"Callee workspace not reachable",
`{"text":"the result body text"}`,
"del-abc",
now,
)
mock.ExpectQuery("SELECT .+ FROM activity_logs").
WithArgs("ws-1").
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
got := dh.listDelegationsFromActivityLogs(context.Background(), "ws-1")
if len(got) != 1 {
t.Fatalf("expected 1 entry, got %d", len(got))
}
e := got[0]
if e["type"] != "delegate_result" {
t.Errorf("type: got %v", e["type"])
}
if e["error"] != "Callee workspace not reachable" {
t.Errorf("error: got %v", e["error"])
}
if e["response_preview"] != `{"text":"the result body text"}` {
t.Errorf("response_preview: got %v", e["response_preview"])
}
if e["delegation_id"] != "del-abc" {
t.Errorf("delegation_id: got %v", e["delegation_id"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
func TestListDelegationsFromActivityLogs_QueryError(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
mock.ExpectQuery("SELECT .+ FROM activity_logs").
WithArgs("ws-1").
WillReturnError(context.DeadlineExceeded)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
got := dh.listDelegationsFromActivityLogs(context.Background(), "ws-1")
// Error → returns empty slice, not nil.
if len(got) != 0 {
t.Errorf("query error: expected empty slice, got %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
func TestListDelegationsFromActivityLogs_RowsErr(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
now := time.Now()
// RowError(0) before AddRow(0): row 0 is "bad", rows.Next() returns false
// on first call — the row never scans, result stays nil. To get partial
// results (row 0 scanned) with rows.Err() non-nil, we use 2 rows and put
// RowError(1) after AddRow(1): row 0 scans normally, row 1 is bad,
// rows.Err() is error, handler returns partial result.
rows := sqlmock.NewRows([]string{
"id", "activity_type", "source_id", "target_id",
"summary", "status", "error_detail",
"response_preview", "delegation_id", "created_at",
}).
AddRow("act-1", "delegate", "ws-1", "ws-2", "task", "queued", "", "", "", now).
AddRow("act-2", "delegate", "ws-1", "ws-3", "another task", "queued", "", "", "", now).
RowError(1, context.DeadlineExceeded)
mock.ExpectQuery("SELECT .+ FROM activity_logs").
WithArgs("ws-1").
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
got := dh.listDelegationsFromActivityLogs(context.Background(), "ws-1")
// Row 0 scanned and appended; row 1 is bad; rows.Err() is non-nil.
// Handler logs the error but returns result (partial results because result != nil).
if got == nil || len(got) != 1 {
t.Errorf("rows.Err path: expected 1 partial result, got %v", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("sqlmock expectations: %v", err)
}
}
@@ -133,9 +133,9 @@ func TestDelegate_Success(t *testing.T) {
targetID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
// Expect INSERT into activity_logs for delegation tracking
// (6th arg is response_body, 7th is idempotency_key — nil here since the request omits it)
// (6th arg is idempotency_key — nil here since the request omits it)
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), nil).
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), nil).
WillReturnResult(sqlmock.NewResult(0, 1))
// Expect RecordAndBroadcast INSERT into structure_events
@@ -189,9 +189,9 @@ func TestDelegate_DBInsertFails_Still202WithWarning(t *testing.T) {
targetID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
// DB insert fails (6th arg = response_body, 7th = idempotency_key, nil for this test)
// DB insert fails (6th arg = idempotency_key, nil for this test)
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), nil).
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), nil).
WillReturnError(fmt.Errorf("database connection lost"))
// RecordAndBroadcast still fires
@@ -491,7 +491,6 @@ func TestDelegationRecord_InsertsActivityLogRow(t *testing.T) {
"550e8400-e29b-41d4-a716-446655440001", // target_id
"Delegating to 550e8400-e29b-41d4-a716-446655440001", // summary
sqlmock.AnyArg(), // request_body (jsonb)
sqlmock.AnyArg(), // response_body (jsonb) — mc#984 fix
).
WillReturnResult(sqlmock.NewResult(0, 1))
// RecordAndBroadcast INSERT for DELEGATION_SENT
@@ -700,9 +699,9 @@ func TestDelegate_IdempotentFailedRowIsReleasedAndReplaced(t *testing.T) {
mock.ExpectExec("DELETE FROM activity_logs").
WithArgs("ws-source", "retry-key").
WillReturnResult(sqlmock.NewResult(0, 1))
// Fresh insert with the same idempotency key (response_body added as mc#984 fix).
// Fresh insert with the same idempotency key.
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), "retry-key").
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "retry-key").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -746,9 +745,9 @@ func TestDelegate_IdempotentRaceUniqueViolationReturnsExisting(t *testing.T) {
mock.ExpectQuery("SELECT request_body->>'delegation_id', status, target_id").
WithArgs("ws-source", "race-key").
WillReturnError(fmt.Errorf("sql: no rows in result set"))
// Insert loses the race against a concurrent caller (response_body added as mc#984 fix).
// Insert loses the race against a concurrent caller.
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), "race-key").
WithArgs("ws-source", "ws-source", targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "race-key").
WillReturnError(fmt.Errorf("pq: duplicate key value violates unique constraint \"activity_logs_idempotency_uniq\""))
// Re-query returns the winner.
mock.ExpectQuery("SELECT request_body->>'delegation_id', status").
@@ -29,20 +29,14 @@ func init() {
// setupTestDB creates a sqlmock DB and assigns it to the global db.DB.
// It also disables the SSRF URL check so that httptest.NewServer loopback
// URLs and fake hostnames (*.example) used in tests don't trigger rejections.
//
// IMPORTANT: db.DB is saved before assignment and restored via t.Cleanup so
// that tests running after this one are not polluted by a closed mock.
// This is the single root cause of the systemic CI/Platform (Go) failures on
// main HEAD 8026f020 (mc#975).
func setupTestDB(t *testing.T) sqlmock.Sqlmock {
t.Helper()
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = prevDB; mockDB.Close() })
t.Cleanup(func() { mockDB.Close() })
// Disable SSRF checks for the duration of this test only. Restore
// the previous state via t.Cleanup so that TestIsSafeURL_* tests
@@ -62,11 +56,6 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock {
return mock
}
func waitForHandlerAsyncBeforeDBCleanup(t *testing.T, h *WorkspaceHandler) {
t.Helper()
t.Cleanup(h.waitAsyncForTest)
}
// setupTestRedis creates a miniredis instance and assigns it to the global db.RDB.
func setupTestRedis(t *testing.T) *miniredis.Miniredis {
t.Helper()
@@ -366,11 +355,6 @@ func TestWorkspaceCreate(t *testing.T) {
}
func TestBuildProvisionerConfig_IncludesAwarenessSettings(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT digest FROM runtime_image_pins`).
WithArgs("claude-code").
WillReturnError(sql.ErrNoRows)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", "/tmp/configs")
@@ -382,7 +366,7 @@ func TestBuildProvisionerConfig_IncludesAwarenessSettings(t *testing.T) {
"ws-123",
"/tmp/configs/template",
map[string][]byte{"config.yaml": []byte("name: test")},
models.CreateWorkspacePayload{Tier: 2, Runtime: "claude-code", WorkspaceDir: "/tmp/workspace", WorkspaceAccess: "read_write"},
models.CreateWorkspacePayload{Tier: 2, Runtime: "claude-code"},
map[string]string{"OPENAI_API_KEY": "sk-test"},
"/tmp/plugins",
"workspace:ws-123",
@@ -646,7 +630,7 @@ func TestActivityHandler_List(t *testing.T) {
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -695,7 +679,7 @@ func TestActivityHandler_ListByType(t *testing.T) {
WillReturnRows(rows)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -723,7 +707,7 @@ func TestActivityHandler_Report(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
// Expect the INSERT into activity_logs
mock.ExpectExec("INSERT INTO activity_logs").
@@ -752,7 +736,7 @@ func TestActivityHandler_Report_InvalidType(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -980,7 +964,7 @@ func TestActivityHandler_ListEmpty(t *testing.T) {
WillReturnRows(sqlmock.NewRows(columns))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1014,7 +998,7 @@ func TestActivityHandler_ListCustomLimit(t *testing.T) {
WillReturnRows(sqlmock.NewRows(columns))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1047,7 +1031,7 @@ func TestActivityHandler_ListMaxLimit(t *testing.T) {
WillReturnRows(sqlmock.NewRows(columns))
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1075,7 +1059,7 @@ func TestActivityHandler_ReportAllValidTypes(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -1106,7 +1090,7 @@ func TestActivityHandler_ReportAllValidTypes(t *testing.T) {
func TestActivityHandler_ReportMissingBody(t *testing.T) {
setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1179,7 +1163,7 @@ func TestActivityHandler_Report_SourceIDSpoofRejected(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -1202,7 +1186,7 @@ func TestActivityHandler_Report_MatchingSourceIDAccepted(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
@@ -1232,7 +1216,7 @@ func TestActivityHandler_Report_SourceIDLogInjection(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster, nil)
handler := NewActivityHandler(broadcaster)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -2,12 +2,10 @@ package handlers
import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"regexp"
"testing"
"time"
@@ -82,135 +80,117 @@ func TestInstructionsList_ByWorkspaceID(t *testing.T) {
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var result []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &result); err != nil {
t.Fatalf("invalid JSON: %v", err)
var out []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &out); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
if len(result) != 2 {
t.Fatalf("expected 2 instructions, got %d", len(result))
if len(out) != 2 {
t.Errorf("expected 2 instructions, got %d", len(out))
}
if result[0].Scope != "global" || result[1].Scope != "workspace" {
t.Fatalf("expected global then workspace instructions, got %#v", result)
if out[0].Scope != "global" {
t.Errorf("first row scope: expected global, got %s", out[0].Scope)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
t.Errorf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_List_WithScopeFilter(t *testing.T) {
func TestInstructionsList_ByScope(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
rows := sqlmock.NewRows([]string{
"id", "scope", "scope_target", "title", "content", "priority", "enabled", "created_at", "updated_at",
}).AddRow("inst-1", "global", nil, "Be kind", "Always be kind", 10, true,
time.Now(), time.Now())
w, c := newGetRequest("/instructions?scope=global")
c.Request = httptest.NewRequest(http.MethodGet, "/instructions?scope=global", nil)
mock.ExpectQuery(regexp.QuoteMeta("SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE 1=1 AND scope = $1 ORDER BY scope, priority DESC, created_at")).
rows := sqlmock.NewRows(instructionCols).
AddRow("inst-g", "global", nil, "Global Rule", "Follow policy.", 10, true, time.Now(), time.Now())
mock.ExpectQuery("SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE 1=1").
WithArgs("global").
WillReturnRows(rows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/instructions?scope=global", nil)
handler.List(c)
h.List(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var result []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &result); err != nil {
t.Fatalf("invalid JSON: %v", err)
var out []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &out); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
if len(result) != 1 {
t.Fatalf("expected 1 instruction, got %d", len(result))
}
if result[0].Scope != "global" {
t.Errorf("expected scope 'global', got %q", result[0].Scope)
if len(out) != 1 || out[0].Scope != "global" {
t.Errorf("unexpected response: %v", out)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
t.Errorf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_List_WithWorkspaceID(t *testing.T) {
func TestInstructionsList_AllNoParams(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
wsID := "ws-test-123"
h := NewInstructionsHandler()
rows := sqlmock.NewRows([]string{
"id", "scope", "scope_target", "title", "content", "priority", "enabled", "created_at", "updated_at",
}).AddRow("inst-1", "global", nil, "Global rule", "Stay safe", 5, true,
time.Now(), time.Now()).
AddRow("inst-2", "workspace", &wsID, "WS rule", "Use HTTPS", 10, true,
time.Now(), time.Now())
w, c := newGetRequest("/instructions")
mock.ExpectQuery("SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE enabled = true AND \\(").
WithArgs(wsID).
rows := sqlmock.NewRows(instructionCols)
mock.ExpectQuery("SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE 1=1").
WillReturnRows(rows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/instructions?workspace_id="+wsID, nil)
handler.List(c)
h.List(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var result []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &result); err != nil {
t.Fatalf("invalid JSON: %v", err)
var out []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &out); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
if len(result) != 2 {
t.Fatalf("expected 2 instructions, got %d", len(result))
// Empty slice, not nil
if out == nil {
t.Error("expected empty slice, got nil")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
t.Errorf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_List_QueryError(t *testing.T) {
func TestInstructionsList_DBError(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
w, c := newGetRequest("/instructions")
c.Request = httptest.NewRequest(http.MethodGet, "/instructions", nil)
mock.ExpectQuery("SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE 1=1").
WillReturnError(context.DeadlineExceeded)
WillReturnError(errors.New("connection refused"))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/instructions", nil)
handler.List(c)
h.List(c)
if w.Code != http.StatusInternalServerError {
t.Fatalf("expected 500, got %d", w.Code)
t.Fatalf("expected 500, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// ── Create ──────────────────────────────────────────────────────────────────────
// ── Create ───────────────────────────────────────────────────────────────────
func TestInstructionsHandler_Create_Success(t *testing.T) {
func TestInstructionsCreate_ValidGlobal(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
w, c := newPostRequest("/instructions", map[string]interface{}{
"scope": "global",
"title": "Be Helpful",
"content": "Always be helpful to the user.",
"priority": 10,
})
mock.ExpectQuery("INSERT INTO platform_instructions").
WithArgs("global", nil, "Be kind", "Always be kind", 5).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("new-inst-id"))
WithArgs("global", nil, "Be Helpful", "Always be helpful to the user.", 10).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("new-inst-1"))
body, _ := json.Marshal(map[string]interface{}{
"scope": "global",
"title": "Be kind",
"content": "Always be kind",
"priority": 5,
})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/instructions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
h.Create(c)
if w.Code != http.StatusCreated {
t.Fatalf("expected 201, got %d: %s", w.Code, w.Body.String())
@@ -219,8 +199,8 @@ func TestInstructionsHandler_Create_Success(t *testing.T) {
if err := json.Unmarshal(w.Body.Bytes(), &out); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
if out["id"] != "new-inst-id" {
t.Errorf("expected id new-inst-id, got %s", out["id"])
if out["id"] != "new-inst-1" {
t.Errorf("expected id new-inst-1, got %s", out["id"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
@@ -319,65 +299,56 @@ func TestInstructionsCreate_InvalidScope(t *testing.T) {
}
}
func TestInstructionsHandler_Create_WorkspaceScopeMissingScopeTarget(t *testing.T) {
func TestInstructionsCreate_WorkspaceScopeNoTarget(t *testing.T) {
setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
body, _ := json.Marshal(map[string]interface{}{
w, c := newPostRequest("/instructions", map[string]interface{}{
"scope": "workspace",
"title": "Test",
"content": "Test content",
"title": "Missing Target",
"content": "Workspace scope without scope_target.",
})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/instructions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
h.Create(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestInstructionsHandler_Create_ContentTooLong(t *testing.T) {
func TestInstructionsCreate_ContentTooLong(t *testing.T) {
setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
longContent := string(bytes.Repeat([]byte("x"), 8193))
body, _ := json.Marshal(map[string]interface{}{
// Build a string longer than maxInstructionContentLen (8192).
longContent := string(make([]byte, maxInstructionContentLen+1))
w, c := newPostRequest("/instructions", map[string]interface{}{
"scope": "global",
"title": "Test",
"title": "Too Long",
"content": longContent,
})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/instructions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
h.Create(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestInstructionsHandler_Create_TitleTooLong(t *testing.T) {
func TestInstructionsCreate_TitleTooLong(t *testing.T) {
setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
longTitle := string(bytes.Repeat([]byte("x"), 201))
body, _ := json.Marshal(map[string]interface{}{
longTitle := string(make([]byte, 201))
w, c := newPostRequest("/instructions", map[string]interface{}{
"scope": "global",
"title": longTitle,
"content": "Short content",
"content": "Short content.",
})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/instructions", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
h.Create(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
@@ -871,250 +842,43 @@ func TestInstructionsResolve_ScopeTransitionOnlyGlobal(t *testing.T) {
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
var out struct {
Instructions string `json:"instructions"`
}
}
func TestInstructionsHandler_Update_NotFound(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
mock.ExpectExec(regexp.QuoteMeta("UPDATE platform_instructions SET\n\t\t\t\ttitle = COALESCE($2, title),\n\t\t\t\tcontent = COALESCE($3, content),\n\t\t\t\tpriority = COALESCE($4, priority),\n\t\t\t\tenabled = COALESCE($5, enabled),\n\t\t\t\tupdated_at = NOW()\n\t\t\t\tWHERE id = $1")).
WithArgs("nonexistent", sqlmock.AnyArg(), nil, nil, nil).
WillReturnResult(sqlmock.NewResult(0, 0))
body, _ := json.Marshal(map[string]interface{}{"title": "Updated title"})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "nonexistent"}}
c.Request = httptest.NewRequest("PUT", "/instructions/nonexistent", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Update(c)
if w.Code != http.StatusNotFound {
t.Fatalf("expected 404, got %d: %s", w.Code, w.Body.String())
if err := json.Unmarshal(w.Body.Bytes(), &out); err != nil {
t.Fatalf("response not valid JSON: %v", err)
}
// Two global instructions share one section header.
if bytes.Count([]byte(out.Instructions), []byte("Platform-Wide Rules")) != 1 {
t.Error("expect exactly one 'Platform-Wide Rules' header for consecutive global rows")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
t.Errorf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_Update_ContentTooLong(t *testing.T) {
setupTestDB(t)
handler := NewInstructionsHandler()
// ─── Update: empty body (all nil — no-op update) ─────────────────────────────
longContent := string(bytes.Repeat([]byte("x"), 8193))
body, _ := json.Marshal(map[string]interface{}{"content": longContent})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "inst-1"}}
c.Request = httptest.NewRequest("PUT", "/instructions/inst-1", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Update(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
func TestInstructionsHandler_Update_TitleTooLong(t *testing.T) {
setupTestDB(t)
handler := NewInstructionsHandler()
longTitle := string(bytes.Repeat([]byte("x"), 201))
body, _ := json.Marshal(map[string]interface{}{"title": longTitle})
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "inst-1"}}
c.Request = httptest.NewRequest("PUT", "/instructions/inst-1", bytes.NewReader(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Update(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
// ── Delete ─────────────────────────────────────────────────────────────────────
func TestInstructionsHandler_Delete_Success(t *testing.T) {
func TestInstructionsUpdate_EmptyBody(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
h := NewInstructionsHandler()
mock.ExpectExec(regexp.QuoteMeta("DELETE FROM platform_instructions WHERE id = $1")).
WithArgs("inst-1").
instID := "inst-empty-update"
w, c := newPutRequest("/instructions/"+instID, map[string]interface{}{})
c.Params = []gin.Param{{Key: "id", Value: instID}}
// COALESCE(nil, ...) = unchanged; still updates updated_at.
// Args order: ($1=id, $2=title, $3=content, $4=priority, $5=enabled)
mock.ExpectExec("UPDATE platform_instructions SET").
WithArgs(instID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "inst-1"}}
c.Request = httptest.NewRequest("DELETE", "/instructions/inst-1", nil)
handler.Delete(c)
h.Update(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
t.Fatalf("expected 200 for empty body, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_Delete_NotFound(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
mock.ExpectExec(regexp.QuoteMeta("DELETE FROM platform_instructions WHERE id = $1")).
WithArgs("nonexistent").
WillReturnResult(sqlmock.NewResult(0, 0))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "nonexistent"}}
c.Request = httptest.NewRequest("DELETE", "/instructions/nonexistent", nil)
handler.Delete(c)
if w.Code != http.StatusNotFound {
t.Fatalf("expected 404, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// ── Resolve ────────────────────────────────────────────────────────────────────
func TestInstructionsHandler_Resolve_Empty(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
wsID := "ws-resolve-1"
mock.ExpectQuery("SELECT scope, title, content FROM platform_instructions WHERE enabled = true AND").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"scope", "title", "content"}))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+wsID+"/instructions/resolve", nil)
handler.Resolve(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
if resp["workspace_id"] != wsID {
t.Errorf("expected workspace_id %q, got %v", wsID, resp["workspace_id"])
}
if resp["instructions"] != "" {
t.Errorf("expected empty instructions, got %q", resp["instructions"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_Resolve_WithInstructions(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
wsID := "ws-resolve-2"
rows := sqlmock.NewRows([]string{"scope", "title", "content"}).
AddRow("global", "Be safe", "No SSRF").
AddRow("workspace", "WS Rule", "Use HTTPS")
mock.ExpectQuery("SELECT scope, title, content FROM platform_instructions WHERE enabled = true AND").
WithArgs(wsID).
WillReturnRows(rows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+wsID+"/instructions/resolve", nil)
handler.Resolve(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
instructions, ok := resp["instructions"].(string)
if !ok {
t.Fatalf("instructions field is not a string: %T", resp["instructions"])
}
if instructions == "" {
t.Fatalf("expected non-empty instructions")
}
// Verify scope headers are present
if !bytes.Contains([]byte(instructions), []byte("Platform-Wide Rules")) {
t.Errorf("expected 'Platform-Wide Rules' header in instructions")
}
if !bytes.Contains([]byte(instructions), []byte("Role-Specific Rules")) {
t.Errorf("expected 'Role-Specific Rules' header in instructions")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestInstructionsHandler_Resolve_MissingWorkspaceID(t *testing.T) {
setupTestDB(t)
handler := NewInstructionsHandler()
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: ""}}
c.Request = httptest.NewRequest("GET", "/workspaces//instructions/resolve", nil)
handler.Resolve(c)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected 400, got %d: %s", w.Code, w.Body.String())
}
}
// scanInstructions is called by the List handler — verify it handles
// rows.Err() gracefully without panicking.
func TestInstructionsHandler_List_ScanErrorContinues(t *testing.T) {
mock := setupTestDB(t)
handler := NewInstructionsHandler()
rows := sqlmock.NewRows([]string{
"id", "scope", "scope_target", "title", "content", "priority", "enabled", "created_at", "updated_at",
}).AddRow("inst-1", "global", nil, "Good", "Content here", 5, true, time.Now(), time.Now()).
RowError(1, context.DeadlineExceeded) // error on row 2 (if it existed)
mock.ExpectQuery("SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE 1=1").
WillReturnRows(rows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/instructions", nil)
handler.List(c)
// Should still return 200 and the one valid row
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
var result []Instruction
if err := json.Unmarshal(w.Body.Bytes(), &result); err != nil {
t.Fatalf("invalid JSON: %v", err)
}
// The valid row should still be returned (error is logged, not fatal)
if len(result) != 1 {
t.Fatalf("expected 1 instruction despite row error, got %d", len(result))
t.Errorf("unmet expectations: %v", err)
}
}
+2 -5
View File
@@ -34,7 +34,6 @@ import (
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
"github.com/gin-gonic/gin"
)
@@ -85,7 +84,6 @@ type mcpTool struct {
type MCPHandler struct {
database *sql.DB
broadcaster *events.Broadcaster
notifier *push.Notifier
// memv2 is the v2 memory plugin wiring (RFC #2728). nil-safe:
// every v2 tool calls memoryV2Available() first and returns a
@@ -96,9 +94,8 @@ type MCPHandler struct {
// NewMCPHandler wires the handler to db and broadcaster.
// Pass db.DB and the platform broadcaster at router-setup time.
// notifier may be nil if push notifications are not configured.
func NewMCPHandler(database *sql.DB, broadcaster *events.Broadcaster, notifier *push.Notifier) *MCPHandler {
return &MCPHandler{database: database, broadcaster: broadcaster, notifier: notifier}
func NewMCPHandler(database *sql.DB, broadcaster *events.Broadcaster) *MCPHandler {
return &MCPHandler{database: database, broadcaster: broadcaster}
}
// ─────────────────────────────────────────────────────────────────────────────
@@ -26,7 +26,7 @@ import (
func newMCPHandler(t *testing.T) (*MCPHandler, sqlmock.Sqlmock) {
t.Helper()
mock := setupTestDB(t)
h := NewMCPHandler(db.DB, newTestBroadcaster(), nil)
h := NewMCPHandler(db.DB, newTestBroadcaster())
return h, mock
}
@@ -392,7 +392,7 @@ func (h *MCPHandler) toolSendMessageToUser(ctx context.Context, workspaceID stri
// (the tool args don't accept them); pass nil. If a future tool
// schema adds an attachments arg, build []AgentMessageAttachment
// and pass through.
writer := NewAgentMessageWriter(h.database, h.broadcaster, h.notifier)
writer := NewAgentMessageWriter(h.database, h.broadcaster)
if err := writer.Send(ctx, workspaceID, message, nil); err != nil {
if errors.Is(err, ErrWorkspaceNotFound) {
return "", fmt.Errorf("workspace not found")
@@ -15,7 +15,6 @@ import (
"gopkg.in/yaml.v3"
)
// resolvePromptRef reads a prompt body from either an inline string or a
// file ref relative to the workspace's files_dir. Inline always wins when
// both are non-empty (caller-provided inline is more authoritative than a
@@ -79,105 +78,26 @@ func hasUnresolvedVarRef(original, expanded string) bool {
}
// expandWithEnv expands ${VAR} and $VAR references in s using the env map.
// Falls back to the platform process env only when the whole value is a
// single variable reference; embedded process-env expansion is too broad for
// imported org YAML because host variables such as HOME are not template data.
// Falls back to the platform process env if a var isn't in the map.
// Shell variables must start with a letter or '_' per POSIX; invalid identifiers
// are returned literally so that "$100" and "$5" stay as-is.
func expandWithEnv(s string, env map[string]string) string {
if s == "" {
return ""
}
var b strings.Builder
for i := 0; i < len(s); {
if s[i] != '$' {
b.WriteByte(s[i])
i++
continue
return os.Expand(s, func(key string) string {
if len(key) == 0 {
return "$"
}
if i+1 >= len(s) {
b.WriteByte('$')
i++
continue
c := key[0]
if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_') {
return "$" + key // not a valid shell identifier — return literal
}
if s[i+1] == '{' {
end := strings.IndexByte(s[i+2:], '}')
if end < 0 {
b.WriteByte('$')
i++
continue
}
end += i + 2
key := s[i+2 : end]
ref := s[i : end+1]
b.WriteString(expandEnvRef(key, ref, s, env))
i = end + 1
continue
if v, ok := env[key]; ok {
return v
}
if !isEnvIdentStart(s[i+1]) {
b.WriteByte('$')
i++
continue
}
j := i + 2
for j < len(s) && isEnvIdentPart(s[j]) {
j++
}
key := s[i+1 : j]
ref := s[i:j]
b.WriteString(expandEnvRef(key, ref, s, env))
i = j
}
return b.String()
}
func isEnvIdentStart(c byte) bool {
return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_'
}
func isEnvIdentPart(c byte) bool {
return isEnvIdentStart(c) || (c >= '0' && c <= '9')
}
// expandEnvRef resolves a single variable reference extracted from s.
//
// Guards:
// - Empty key → "$$" escape, return "$"
// - key[0] not POSIX ident start → "$" + partial chars, return "$<chars>"
// - Key in env map → return the mapped value (template override wins)
// - Otherwise → only fall back to os.Getenv if the whole input string IS the
// variable reference (ref == whole).
//
// Bare $VAR format:
// $HOME (alone) → ref==whole → os.Getenv ✓ (host HOME is org-template HOME)
// $HOME/path (partial) → ref!=whole → literal "$HOME" ✓ (CWE-78: prevents host leak)
//
// Braced ${VAR} format:
// ${HOME} (alone) → ref==whole → os.Getenv ✓
// ${ROLE}/admin (partial) → ref!=whole → literal ✓
// "yes and ${NOT_SET}" (embedded) → ref!=whole → literal ✓
//
// This is the CWE-78 fix from commit a3a358f9.
func expandEnvRef(key, ref, whole string, env map[string]string) string {
if key == "" {
return "$"
}
if !isEnvIdentStart(key[0]) {
return "$" + key
}
if v, ok := env[key]; ok {
return v
}
if ref == whole {
return os.Getenv(key)
}
return ref
})
}
// loadWorkspaceEnv reads the org root .env and the workspace-specific .env .env and the workspace-specific .env
// loadWorkspaceEnv reads the org root .env and the workspace-specific .env
// (workspace overrides org root). Used by both secret injection and channel
// config expansion.
//
@@ -104,8 +104,8 @@ func TestHasUnresolvedVarRef_Resolved(t *testing.T) {
// documents this design choice; callers who need empty=resolved should
// pre-process the output before calling hasUnresolvedVarRef.
{"${VAR}", "", true},
{"${VAR}", "value", false}, // var replaced
{"$VAR", "value", false}, // bare var replaced
{"${VAR}", "value", false}, // var replaced
{"$VAR", "value", false}, // bare var replaced
{"prefix${VAR}suffix", "prefixvaluesuffix", false},
{"${A}${B}", "ab", false},
// FOO=FOO and BAR=BAR — both vars found and replaced. Expanded output
@@ -125,14 +125,14 @@ func TestHasUnresolvedVarRef_Resolved(t *testing.T) {
func TestHasUnresolvedVarRef_Unresolved(t *testing.T) {
// Expansion left the refs intact → unresolved.
cases := []struct {
orig string
orig string
expanded string
}{
{"${VAR}", "${VAR}"}, // untouched
{"$VAR", "$VAR"}, // bare untouched
{"${VAR}", "${VAR}"}, // untouched
{"$VAR", "$VAR"}, // bare untouched
{"prefix${VAR}suffix", "prefix${VAR}suffix"},
{"${A}${B}", "${A}${B}"}, // both unresolved
{"${FOO}", ""}, // empty result with var ref in original
{"${A}${B}", "${A}${B}"}, // both unresolved
{"${FOO}", ""}, // empty result with var ref in original
}
for _, tc := range cases {
t.Run(tc.orig, func(t *testing.T) {
@@ -205,8 +205,8 @@ func TestMergeCategoryRouting_WorkspaceOverrides(t *testing.T) {
"ui": {"Frontend Engineer"},
}
ws := map[string][]string{
"security": {"SRE Team"}, // narrows
"ui": {}, // drops
"security": {"SRE Team"}, // narrows
"ui": {}, // drops
"infra": {"Platform Team"}, // adds
}
r := mergeCategoryRouting(defaults, ws)
@@ -462,45 +462,8 @@ func TestExpandWithEnv_LiteralDollar(t *testing.T) {
func TestExpandWithEnv_PartiallyPresent(t *testing.T) {
env := map[string]string{"SET": "yes"}
result := expandWithEnv("${SET} and ${NOT_SET}", env)
assert.Equal(t, "yes and ${NOT_SET}", result)
}
func TestExpandWithEnv_EmbeddedMissingProcessEnvStaysLiteral(t *testing.T) {
t.Setenv("MOL_TEST_EMBEDDED_MISSING", "")
result := expandWithEnv("prefix/${MOL_TEST_EMBEDDED_MISSING}/suffix", map[string]string{})
assert.Equal(t, "prefix/${MOL_TEST_EMBEDDED_MISSING}/suffix", result)
}
// POSIX identifier guard regression tests (CWE-78 fix).
// Keys not starting with [a-zA-Z_] must not be looked up in env or os.Getenv.
func TestExpandWithEnv_DigitPrefix_NotExpanded(t *testing.T) {
// ${0}, ${5}, ${1VAR} — numeric prefix → not a valid shell identifier.
// Guard must return "$0", "$5", "$1VAR" literally; no env lookup.
cases := []struct {
input string
want string
}{
{"${0}", "$0"},
{"${5}", "$5"},
{"${1VAR}", "$1VAR"},
{"prefix ${0} suffix", "prefix $0 suffix"},
{"$0", "$0"},
{"$5", "$5"},
{"HOME=${HOME}", "HOME=${HOME}"}, // HOME is valid but embedded in larger string
}
for _, tc := range cases {
t.Run(tc.input, func(t *testing.T) {
got := expandWithEnv(tc.input, map[string]string{})
assert.Equal(t, tc.want, got)
})
}
}
func TestExpandWithEnv_EmptyKey_ReturnsDollar(t *testing.T) {
// ${} → "$" (empty key, guard returns "$")
result := expandWithEnv("value=${}", map[string]string{})
assert.Equal(t, "value=$", result)
// ${SET} resolved; ${NOT_SET} -> "" via empty fallback.
assert.Equal(t, "yes and ", result)
}
// mergeCategoryRouting tests — unions defaults with per-workspace routing.
@@ -582,8 +545,8 @@ func TestRenderCategoryRoutingYAML_SingleCategory(t *testing.T) {
func TestRenderCategoryRoutingYAML_MultipleCategoriesSorted(t *testing.T) {
routing := map[string][]string{
"zebra": {"RoleZ"},
"alpha": {"RoleA"},
"zebra": {"RoleZ"},
"alpha": {"RoleA"},
"middleware": {"RoleM"},
}
result, err := renderCategoryRoutingYAML(routing)
@@ -50,19 +50,18 @@ func TestResolveInsideRoot_DotDotTraversal(t *testing.T) {
// but resolveInsideRoot correctly returns nil (the path stays within root).
// The OFFSEC-006 concern is covered by ../../etc/passwd which DOES escape.
func TestResolveInsideRoot_DotDotWithIntermediate(t *testing.T) {
// a/b/../../c normalises to "c" — a valid descendant inside any root.
// Must use t.TempDir() for a real filesystem path so filepath.Abs resolves.
root := t.TempDir()
got, err := resolveInsideRoot(root, "a/b/../../c")
if err != nil {
t.Fatalf("a/b/../../c should resolve within root: %v", err)
t.Fatalf("a/b/../../c should resolve (normalizes to c within root): %v", err)
}
// Verify result is inside root and ends with "c"
if !strings.HasPrefix(got, root+string(filepath.Separator)) {
t.Errorf("result should be inside root %q, got %q", root, got)
}
if got[len(got)-1:] != "c" {
t.Errorf("resolved path should end in 'c', got %q", got)
// Ensure the suffix is "c"
parts := strings.Split(strings.TrimPrefix(got, root), string(filepath.Separator))
if parts[len(parts)-1] != "c" {
t.Errorf("expected filename 'c', got %q", got)
}
}
@@ -144,55 +143,8 @@ func TestResolveInsideRoot_SiblingNotEscaped(t *testing.T) {
}
// ── isSafeRoleName ────────────────────────────────────────────────────────────
func TestIsSafeRoleName_Empty(t *testing.T) {
if isSafeRoleName("") {
t.Error("isSafeRoleName(\"\"): expected false, got true")
}
}
func TestIsSafeRoleName_Dot(t *testing.T) {
if isSafeRoleName(".") {
t.Error("isSafeRoleName(\".\"): expected false, got true")
}
}
func TestIsSafeRoleName_DotDot(t *testing.T) {
if isSafeRoleName("..") {
t.Error("isSafeRoleName(\"..\"): expected false, got true")
}
}
func TestIsSafeRoleName_PathTraversal(t *testing.T) {
unsafe := []string{
"../etc",
"foo/../../../etc",
"foo/../../bar",
}
for _, name := range unsafe {
if isSafeRoleName(name) {
t.Errorf("isSafeRoleName(%q): expected false (path traversal), got true", name)
}
}
}
func TestIsSafeRoleName_SpecialChars(t *testing.T) {
unsafe := []string{
"foo:bar",
"foo bar",
"foo\tbar",
"foo\nbar",
"foo\x00bar",
"foo@bar",
"foo#bar",
"foo$bar",
}
for _, name := range unsafe {
if isSafeRoleName(name) {
t.Errorf("isSafeRoleName(%q): expected false (special char), got true", name)
}
}
}
// isSafeRoleName is tested comprehensively in org_helpers_pure_test.go.
// Only security-critical path-injection cases live here.
// ── mergeCategoryRouting ──────────────────────────────────────────────────────
// Duplicate mergeCategoryRouting tests removed to avoid redeclaration with
@@ -312,121 +264,3 @@ func TestSecureRouting_OriginalMapsUnmodified(t *testing.T) {
t.Error("ws routing should be unmodified after merge")
}
}
// ── expandWithEnv ─────────────────────────────────────────────────────────────
//
// CWE-78 regression tests. The original fix (a3a358f9) ensures that partial
// variable references like $HOME/path are NOT resolved via os.Getenv — the
// host HOME env var must not leak into org template values. Only whole-string
// references ($VAR or ${VAR}) may fall back to the host process environment.
func TestExpandWithEnv_PartialRefDollarHomePath(t *testing.T) {
// $HOME/path must NOT resolve to the host's HOME env var.
// The literal $HOME must be returned as-is.
got := expandWithEnv("$HOME/path", nil)
if got != "$HOME/path" {
t.Errorf("$HOME/path: got %q, want literal $HOME/path", got)
}
}
func TestExpandWithEnv_PartialRefBracedRoleAdmin(t *testing.T) {
// ${ROLE}/admin — ROLE is not in env, so expand to the literal ${ROLE}/admin.
got := expandWithEnv("${ROLE}/admin", nil)
if got != "${ROLE}/admin" {
t.Errorf("${ROLE}/admin: got %q, want literal ${ROLE}/admin", got)
}
}
func TestExpandWithEnv_PartialRefMiddleOfString(t *testing.T) {
// $ROLE in the middle of a string — literal, not os.Getenv.
got := expandWithEnv("prefix/$ROLE/suffix", nil)
if got != "prefix/$ROLE/suffix" {
t.Errorf("prefix/$ROLE/suffix: got %q, want literal", got)
}
}
func TestExpandWithEnv_WholeVarInEnv(t *testing.T) {
// Whole-string $VAR that IS in env — env value wins.
env := map[string]string{"FOO": "barvalue"}
got := expandWithEnv("$FOO", env)
if got != "barvalue" {
t.Errorf("$FOO with FOO=barvalue: got %q, want barvalue", got)
}
}
func TestExpandWithEnv_WholeVarBracedInEnv(t *testing.T) {
// Whole-string ${VAR} that IS in env — env value wins.
env := map[string]string{"FOO": "barvalue"}
got := expandWithEnv("${FOO}", env)
if got != "barvalue" {
t.Errorf("${FOO} with FOO=barvalue: got %q, want barvalue", got)
}
}
func TestExpandWithEnv_WholeVarNotInEnvBare(t *testing.T) {
// Whole-string $VAR not in env — falls back to os.Getenv.
// If the host has the var, we get the host value. If not, empty.
// At minimum, the result must NOT be the literal "$UNDEFINED_VAR_9Z".
got := expandWithEnv("$UNDEFINED_VAR_9Z", nil)
if got == "$UNDEFINED_VAR_9Z" {
t.Errorf("$UNDEFINED_VAR_9Z: should expand (whole-string fallback to os.Getenv), got literal")
}
}
func TestExpandWithEnv_WholeVarNotInEnvBraced(t *testing.T) {
// Whole-string ${VAR} not in env — falls back to os.Getenv.
got := expandWithEnv("${UNDEFINED_VAR_9Z}", nil)
if got == "${UNDEFINED_VAR_9Z}" {
t.Errorf("${UNDEFINED_VAR_9Z}: should expand (whole-string fallback to os.Getenv), got literal")
}
}
func TestExpandWithEnv_EmptyString(t *testing.T) {
got := expandWithEnv("", map[string]string{"FOO": "bar"})
if got != "" {
t.Errorf("empty string: got %q, want empty", got)
}
}
func TestExpandWithEnv_NoVarRefs(t *testing.T) {
got := expandWithEnv("plain string with no vars", map[string]string{"FOO": "bar"})
if got != "plain string with no vars" {
t.Errorf("plain string: got %q, want unchanged", got)
}
}
func TestExpandWithEnv_MultipleVarRefs(t *testing.T) {
// Two vars, both whole — both expand from env.
env := map[string]string{"A": "alpha", "B": "beta"}
got := expandWithEnv("$A and $B and more", env)
if got != "alpha and beta and more" {
t.Errorf("multiple vars: got %q, want alpha and beta and more", got)
}
}
func TestExpandWithEnv_NumericVarRef(t *testing.T) {
// $5 — starts with digit, not a valid identifier start.
// Must return the literal "$5", not expand via os.Getenv.
got := expandWithEnv("$5", map[string]string{"5": "five"})
if got != "$5" {
t.Errorf("$5: got %q, want literal $5", got)
}
}
func TestExpandWithEnv_DollarEscape(t *testing.T) {
// $$ → both $ written literally (each $ is not followed by an identifier char,
// so it is written as-is). No special escape sequence for $$.
got := expandWithEnv("$$", nil)
if got != "$$" {
t.Errorf("$$: got %q, want literal $$", got)
}
}
func TestExpandWithEnv_MixedPartialAndWhole(t *testing.T) {
// $A is in env (whole), $HOME is partial — only $A expands.
env := map[string]string{"A": "alpha"}
got := expandWithEnv("$A at $HOME", env)
if got != "alpha at $HOME" {
t.Errorf("$A at $HOME: got %q, want alpha at $HOME", got)
}
}
@@ -342,11 +342,6 @@ func TestPluginInstall_InstanceLookupError_Returns503(t *testing.T) {
// ---------- dispatch: uninstall ----------
func TestPluginUninstall_SaaS_DispatchesToEIC(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectExec("DELETE FROM workspace_plugins WHERE workspace_id").
WithArgs("ws-1", "browser-automation").
WillReturnResult(sqlmock.NewResult(0, 1))
stubReadPluginManifestViaEIC(t, func(ctx context.Context, instanceID, runtime, pluginName string) ([]byte, error) {
return []byte("name: browser-automation\nskills:\n - browse\n"), nil
})
@@ -629,9 +629,6 @@ func TestPluginInstall_RejectsUnknownScheme(t *testing.T) {
}
func TestPluginInstall_LocalSourceReachesContainerLookup(t *testing.T) {
mock := setupTestDB(t)
expectAllowlistAllowAll(mock)
base := t.TempDir()
pluginDir := filepath.Join(base, "demo")
_ = os.MkdirAll(pluginDir, 0o755)
@@ -958,14 +955,14 @@ func TestLogInstallLimitsOnce(t *testing.T) {
func TestRegexpEscapeForAwk(t *testing.T) {
cases := map[string]string{
"my-plugin": `my-plugin`,
"# Plugin: foo /": `# Plugin: foo \/`,
"# Plugin: a.b /": `# Plugin: a\.b \/`,
"foo[bar]": `foo\[bar\]`,
"a*b+c?": `a\*b\+c\?`,
"path|with|pipes": `path\|with\|pipes`,
`back\slash`: `back\\slash`,
"": ``,
"my-plugin": `my-plugin`,
"# Plugin: foo /": `# Plugin: foo \/`,
"# Plugin: a.b /": `# Plugin: a\.b \/`,
"foo[bar]": `foo\[bar\]`,
"a*b+c?": `a\*b\+c\?`,
"path|with|pipes": `path\|with\|pipes`,
`back\slash`: `back\\slash`,
"": ``,
}
for in, want := range cases {
got := regexpEscapeForAwk(in)
@@ -1250,7 +1247,7 @@ func TestPluginDownload_GithubSchemeStreamsTarball(t *testing.T) {
scheme: "github",
fetchFn: func(_ context.Context, _ string, dst string) (string, error) {
files := map[string]string{
"plugin.yaml": "name: remote-plugin\nversion: 1.0.0\n",
"plugin.yaml": "name: remote-plugin\nversion: 1.0.0\n",
"skills/x/SKILL.md": "---\nname: x\n---\n",
"adapters/claude_code.py": "from plugins_registry.builtins import AgentskillsAdaptor as Adaptor\n",
}
@@ -58,7 +58,7 @@ func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID s
// Non-blocking send — don't stall the restart cycle.
// Run in a detached goroutine so the caller (runRestartCycle) can
// proceed to stopForRestart without waiting.
h.goAsync(func() {
go func() {
signalCtx, cancel := context.WithTimeout(context.Background(), restartSignalTimeout)
defer cancel()
@@ -109,7 +109,7 @@ func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID s
} else {
log.Printf("A2AGracefulRestart: %s returned status %d — proceeding with stop", workspaceID, resp.StatusCode)
}
})
}()
}
// resolveAgentURLForRestartSignal returns the routable URL for the workspace
@@ -271,7 +271,6 @@ func TestGracefulPreRestart_URLResolutionError(t *testing.T) {
WorkspaceHandler: newHandlerWithTestDeps(t),
errToReturn: context.DeadlineExceeded,
}
waitForHandlerAsyncBeforeDBCleanup(t, hWrapper.WorkspaceHandler)
hWrapper.gracefulPreRestart(context.Background(), "ws-url-err-111")
time.Sleep(200 * time.Millisecond)
@@ -63,9 +63,6 @@ func (h *SecretsHandler) List(c *gin.Context) {
"updated_at": updatedAt,
})
}
if err := rows.Err(); err != nil {
log.Printf("List secrets rows.Err: %v", err)
}
// 2. Global secrets not overridden at workspace level
globalRows, err := db.DB.QueryContext(ctx,
@@ -94,9 +91,6 @@ func (h *SecretsHandler) List(c *gin.Context) {
"updated_at": updatedAt,
})
}
if err := globalRows.Err(); err != nil {
log.Printf("List secrets (global) rows.Err: %v", err)
}
c.JSON(http.StatusOK, secrets)
}
@@ -180,9 +174,6 @@ func (h *SecretsHandler) Values(c *gin.Context) {
out[k] = string(decrypted)
}
}
if err := globalRows.Err(); err != nil {
log.Printf("secrets.Values globalRows.Err: %v", err)
}
}
wsRows, wErr := db.DB.QueryContext(ctx,
@@ -204,9 +195,6 @@ func (h *SecretsHandler) Values(c *gin.Context) {
out[k] = string(decrypted) // workspace override wins over global
}
}
if err := wsRows.Err(); err != nil {
log.Printf("secrets.Values wsRows.Err: %v", err)
}
}
if len(failedKeys) > 0 {
@@ -336,9 +324,6 @@ func (h *SecretsHandler) ListGlobal(c *gin.Context) {
"scope": "global",
})
}
if err := rows.Err(); err != nil {
log.Printf("ListGlobal rows.Err: %v", err)
}
c.JSON(http.StatusOK, secrets)
}
@@ -415,9 +400,6 @@ func (h *SecretsHandler) restartAllAffectedByGlobalKey(key string) {
ids = append(ids, id)
}
}
if err := rows.Err(); err != nil {
log.Printf("restartAllAffectedByGlobalKey rows.Err: %v", err)
}
if len(ids) == 0 {
return
}
@@ -186,16 +186,11 @@ func (h *TemplatesHandler) List(c *gin.Context) {
model = raw.RuntimeConfig.Model
}
tier := raw.Tier
if h.wh != nil && h.wh.IsSaaS() {
tier = h.wh.DefaultTier()
}
templates = append(templates, templateSummary{
ID: id,
Name: raw.Name,
Description: raw.Description,
Tier: tier,
Tier: raw.Tier,
Runtime: raw.Runtime,
Model: model,
Models: raw.RuntimeConfig.Models,
@@ -345,11 +340,6 @@ func (h *TemplatesHandler) ListFiles(c *gin.Context) {
if err != nil || path == walkRoot {
return nil
}
// Skip symlinks to prevent path traversal via malicious symlinks
// inside the workspace config directory (OFFSEC-010).
if info.Mode()&os.ModeSymlink != 0 {
return nil
}
rel, _ := filepath.Rel(walkRoot, path)
// Enforce depth limit
if strings.Count(rel, string(filepath.Separator))+1 > depth {
@@ -847,58 +847,6 @@ func TestListFiles_FallbackToHost_WithTemplate(t *testing.T) {
}
}
func TestListFiles_FallbackToHost_SkipsSymlinks(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
tmpDir := t.TempDir()
tmplDir := filepath.Join(tmpDir, "test-agent")
if err := os.MkdirAll(tmplDir, 0755); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(tmplDir, "config.yaml"), []byte("name: Test Agent\n"), 0644); err != nil {
t.Fatal(err)
}
secret := filepath.Join(t.TempDir(), "secret.txt")
if err := os.WriteFile(secret, []byte("do-not-list"), 0600); err != nil {
t.Fatal(err)
}
if err := os.Symlink(secret, filepath.Join(tmplDir, "leaked-secret")); err != nil {
t.Fatal(err)
}
handler := NewTemplatesHandler(tmpDir, nil, nil)
mock.ExpectQuery(`SELECT name, COALESCE\(instance_id, ''\), COALESCE\(runtime, ''\) FROM workspaces WHERE id =`).
WithArgs("ws-tmpl").
WillReturnRows(sqlmock.NewRows([]string{"name", "instance_id", "runtime"}).AddRow("Test Agent", "", ""))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-tmpl"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-tmpl/files", nil)
handler.ListFiles(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatal(err)
}
for _, file := range resp {
if file["path"] == "leaked-secret" {
t.Fatalf("symlink should not be listed: %#v", resp)
}
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ==================== GET /workspaces/:id/files/*path ====================
func TestReadFile_PathTraversal(t *testing.T) {
@@ -1252,3 +1200,4 @@ func TestCWE78_DeleteFile_TraversalVariants(t *testing.T) {
})
}
}
@@ -340,11 +340,6 @@ func TestSSHCommandCmd_BuildsArgv(t *testing.T) {
// a workspace must still be able to access its own terminal. The CanCommunicate
// fast-path returns true when callerID == targetID.
func TestTerminalConnect_KI005_AllowsOwnTerminal(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery("SELECT COALESCE").
WithArgs("ws-alice").
WillReturnRows(sqlmock.NewRows([]string{"instance_id"}).AddRow(""))
// CanCommunicate fast-path: callerID == targetID → returns true without DB.
prev := canCommunicateCheck
canCommunicateCheck = func(callerID, targetID string) bool { return callerID == targetID }
@@ -372,11 +367,6 @@ func TestTerminalConnect_KI005_AllowsOwnTerminal(t *testing.T) {
// skip the CanCommunicate check entirely and fall through to the Docker auth path.
// We assert they get the nil-docker 503 instead of 403.
func TestTerminalConnect_KI005_SkipsCheckWithoutHeader(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery("SELECT COALESCE").
WithArgs("ws-any").
WillReturnRows(sqlmock.NewRows([]string{"instance_id"}).AddRow(""))
h := NewTerminalHandler(nil) // nil docker → 503 if reached
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -449,9 +439,6 @@ func TestTerminalConnect_KI005_AllowsSiblingWorkspace(t *testing.T) {
mock.ExpectExec(`UPDATE workspace_auth_tokens SET last_used_at`).
WithArgs(sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectQuery("SELECT COALESCE").
WithArgs("ws-dev").
WillReturnRows(sqlmock.NewRows([]string{"instance_id"}).AddRow(""))
h := NewTerminalHandler(nil)
w := httptest.NewRecorder()
@@ -476,10 +463,7 @@ func TestTerminalConnect_KI005_AllowsSiblingWorkspace(t *testing.T) {
// introduced in GH#1885: internal routing uses org tokens which are not in
// workspace_auth_tokens, so ValidateToken would always fail for them.
func TestKI005_OrgToken_SkipsValidateToken(t *testing.T) {
mock := setupTestDB(t) // no ValidateToken ExpectQuery — none should fire
mock.ExpectQuery("SELECT COALESCE").
WithArgs("ws-target").
WillReturnRows(sqlmock.NewRows([]string{"instance_id"}).AddRow(""))
setupTestDB(t) // no ValidateToken ExpectQuery — none should fire
prev := canCommunicateCheck
canCommunicateCheck = func(callerID, targetID string) bool {
// Simulate platform agent → target workspace (same org).
@@ -560,3 +544,4 @@ func TestSSHCommandCmd_ConnectTimeoutPresent(t *testing.T) {
args)
}
}
@@ -15,7 +15,6 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto"
@@ -74,19 +73,6 @@ type WorkspaceHandler struct {
// memory plugin). main.go sets this to plugin.DeleteNamespace
// when MEMORY_PLUGIN_URL is configured.
namespaceCleanupFn func(ctx context.Context, workspaceID string)
asyncWG sync.WaitGroup
}
func (h *WorkspaceHandler) goAsync(fn func()) {
h.asyncWG.Add(1)
go func() {
defer h.asyncWG.Done()
fn()
}()
}
func (h *WorkspaceHandler) waitAsyncForTest() {
h.asyncWG.Wait()
}
func NewWorkspaceHandler(b events.EventEmitter, p *provisioner.Provisioner, platformURL, configsDir string) *WorkspaceHandler {
@@ -161,14 +147,15 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
id := uuid.New().String()
awarenessNamespace := workspaceAwarenessNamespace(id)
if h.IsSaaS() {
// SaaS hard gate: every hosted workspace gets its own sibling
// EC2 instance, so T4 is the only meaningful runtime boundary.
// Do not trust stale clients/templates that still send T1/T2/T3.
payload.Tier = 4
} else if payload.Tier == 0 {
// Self-hosted default remains T3. Lower tiers (T1 sandboxed,
// T2 standard) stay explicit opt-ins for low-trust local agents.
if payload.Tier == 0 {
// SaaS-aware default. SaaS → T4 (full host access; each
// workspace runs on its own sibling EC2 so the tier boundary
// is a Docker resource limit on the only container present —
// no neighbour to protect from). Self-hosted → T3 (read-write
// workspace mount + Docker daemon access, most templates'
// baseline). Lower tiers (T1 sandboxed, T2 standard) remain
// explicit opt-ins for low-trust agents. Matches the canvas
// CreateWorkspaceDialog defaults so the API and the UI agree.
payload.Tier = h.DefaultTier()
}
@@ -111,11 +111,11 @@ func (h *WorkspaceHandler) provisionWorkspaceAuto(workspaceID, templatePath stri
"sync": false,
})
if h.cpProv != nil {
h.goAsync(func() { h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload) })
go h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload)
return true
}
if h.provisioner != nil {
h.goAsync(func() { h.provisionWorkspace(workspaceID, templatePath, configFiles, payload) })
go h.provisionWorkspace(workspaceID, templatePath, configFiles, payload)
return true
}
// No backend wired — mark failed so the workspace doesn't linger in
@@ -275,13 +275,13 @@ func (h *WorkspaceHandler) RestartWorkspaceAutoOpts(ctx context.Context, workspa
if h.cpProv != nil {
h.cpStopWithRetry(ctx, workspaceID, "RestartWorkspaceAuto")
// resetClaudeSession is Docker-only — CP has no session state to clear.
h.goAsync(func() { h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload) })
go h.provisionWorkspaceCP(workspaceID, templatePath, configFiles, payload)
return true
}
if h.provisioner != nil {
// Docker.Stop has no retry — see docstring rationale.
h.provisioner.Stop(ctx, workspaceID)
h.goAsync(func() { h.provisionWorkspaceOpts(workspaceID, templatePath, configFiles, payload, resetClaudeSession) })
go h.provisionWorkspaceOpts(workspaceID, templatePath, configFiles, payload, resetClaudeSession)
return true
}
// No backend wired — same shape as provisionWorkspaceAuto's no-backend
@@ -144,7 +144,6 @@ func TestProvisionWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) {
rec := &trackingCPProv{startErr: errors.New("simulated CP rejection")}
bcast := &concurrentSafeBroadcaster{}
h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, h)
h.SetCPProvisioner(rec)
wsID := "ws-routes-to-cp-0123456789abcdef"
@@ -596,7 +595,6 @@ func TestRestartWorkspaceAuto_RoutesToCPWhenSet(t *testing.T) {
// Mock DB so cpStopWithRetry can run without a real Postgres.
mock := setupTestDB(t)
waitForHandlerAsyncBeforeDBCleanup(t, h)
mock.MatchExpectationsInOrder(false)
// provisionWorkspaceCP runs in the goroutine and will hit secrets
// SELECTs + UPDATE workspace as failed (we make CP Start return
@@ -672,7 +670,6 @@ func TestRestartWorkspaceAuto_RoutesToDockerWhenOnlyDocker(t *testing.T) {
bcast := &concurrentSafeBroadcaster{}
h := NewWorkspaceHandler(bcast, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, h)
stub := &stoppingLocalProv{}
h.provisioner = stub
@@ -2,7 +2,6 @@ package handlers
import (
"context"
"database/sql"
"fmt"
"net/http"
"os"
@@ -635,11 +634,6 @@ func TestSeedInitialMemories_EmptyMemoriesNil(t *testing.T) {
// ==================== buildProvisionerConfig ====================
func TestBuildProvisionerConfig_BasicFields(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT COALESCE\(workspace_dir`).
WithArgs("ws-basic").
WillReturnRows(sqlmock.NewRows([]string{"workspace_dir", "workspace_access"}).AddRow("", "none"))
broadcaster := newTestBroadcaster()
tmpDir := t.TempDir()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", tmpDir)
@@ -684,14 +678,6 @@ func TestBuildProvisionerConfig_BasicFields(t *testing.T) {
}
func TestBuildProvisionerConfig_WorkspacePathFromEnv(t *testing.T) {
mock := setupTestDB(t)
mock.ExpectQuery(`SELECT COALESCE\(workspace_dir`).
WithArgs("ws-env").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT digest FROM runtime_image_pins`).
WithArgs("claude-code").
WillReturnError(sql.ErrNoRows)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
@@ -410,44 +410,6 @@ func TestWorkspaceCreate_DefaultsApplied(t *testing.T) {
}
}
func TestWorkspaceCreate_SaaSHardForcesTier4(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
handler.SetCPProvisioner(&trackingCPProv{})
mock.ExpectBegin()
mock.ExpectExec("INSERT INTO workspaces").
WithArgs(sqlmock.AnyArg(), "SaaS External Agent", nil, 4, "external", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
mock.ExpectExec("INSERT INTO canvas_layouts").
WithArgs(sqlmock.AnyArg(), float64(0), float64(0)).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("UPDATE workspaces SET url").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"name":"SaaS External Agent","runtime":"external","external":true,"url":"https://example.com/agent","tier":2}`
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.Create(c)
if w.Code != http.StatusCreated {
t.Errorf("expected status 201, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// TestWorkspaceCreate_WithSecrets_Persists asserts that secrets in the create
// payload are written to workspace_secrets inside the same transaction as the
// workspace row, and that the handler returns 201.
@@ -207,7 +207,7 @@ func setupSwapEnv(t *testing.T) (*handlers.MCPHandler, *flatPlugin, sqlmock.Sqlm
resolver := namespace.New(db)
// MCPHandler needs a real *sql.DB; pass the sqlmock-backed one.
h := handlers.NewMCPHandler(db, nil, nil).WithMemoryV2(cl, resolver)
h := handlers.NewMCPHandler(db, nil).WithMemoryV2(cl, resolver)
return h, plugin, mock
}
@@ -430,7 +430,7 @@ func TestE2E_PluginUnreachable_AgentSeesClearError(t *testing.T) {
db, _, _ := sqlmock.New()
defer db.Close()
resolver := namespace.New(db)
h := handlers.NewMCPHandler(db, nil, nil).WithMemoryV2(cl, resolver)
h := handlers.NewMCPHandler(db, nil).WithMemoryV2(cl, resolver)
_, err := h.Dispatch(context.Background(), "root-1", "commit_memory_v2", map[string]interface{}{
"content": "x",
@@ -4,14 +4,12 @@ import (
"bytes"
"context"
"database/sql"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"
@@ -158,7 +156,6 @@ type cpProvisionRequest struct {
Tier int `json:"tier"`
PlatformURL string `json:"platform_url"`
Env map[string]string `json:"env"`
ConfigFiles map[string]string `json:"config_files,omitempty"`
}
type cpProvisionResponse struct {
@@ -182,11 +179,6 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
}
env["ADMIN_TOKEN"] = p.adminToken
}
configFiles, err := collectCPConfigFiles(cfg)
if err != nil {
return "", fmt.Errorf("cp provisioner: collect config files: %w", err)
}
req := cpProvisionRequest{
OrgID: p.orgID,
WorkspaceID: cfg.WorkspaceID,
@@ -194,7 +186,6 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
Tier: cfg.Tier,
PlatformURL: cfg.PlatformURL,
Env: env,
ConfigFiles: configFiles,
}
body, err := json.Marshal(req)
@@ -246,90 +237,6 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
return result.InstanceID, nil
}
const cpConfigFilesMaxBytes = 12 << 10
func isCPTemplateConfigFile(name string) bool {
name = filepath.ToSlash(filepath.Clean(name))
return name == "config.yaml" || strings.HasPrefix(name, "prompts/")
}
func collectCPConfigFiles(cfg WorkspaceConfig) (map[string]string, error) {
files := make(map[string]string)
total := 0
addFile := func(name string, data []byte) error {
name = filepath.ToSlash(filepath.Clean(name))
if name == "." || strings.HasPrefix(name, "../") || strings.HasPrefix(name, "/") || strings.Contains(name, "/../") {
return fmt.Errorf("invalid config file path %q", name)
}
total += len(data)
if total > cpConfigFilesMaxBytes {
return fmt.Errorf("config files exceed %d bytes", cpConfigFilesMaxBytes)
}
files[name] = base64.StdEncoding.EncodeToString(data)
return nil
}
if cfg.TemplatePath != "" {
// Reject symlinks on the root itself — WalkDir follows symlinks,
// so a symlink TemplatePath that escapes the intended root directory
// would bypass the subsequent path-relativization checks below.
rootInfo, err := os.Lstat(cfg.TemplatePath)
if err != nil {
return nil, fmt.Errorf("collectCPConfigFiles: lstat template path: %w", err)
}
if rootInfo.Mode()&os.ModeSymlink != 0 {
return nil, fmt.Errorf("collectCPConfigFiles: template path must not be a symlink")
}
err = filepath.WalkDir(cfg.TemplatePath, func(path string, d os.DirEntry, walkErr error) error {
if walkErr != nil {
return walkErr
}
// Skip symlinks — WalkDir follows them by default, which means
// a symlink inside the template dir pointing to /etc/passwd
// would be traversed even though the resulting relative-path
// check would correctly reject it. Defense-in-depth: don't
// follow symlinks at all. (OFFSEC-010)
if d.Type()&os.ModeSymlink != 0 {
return nil
}
if d.IsDir() {
return nil
}
info, err := d.Info()
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
rel, err := filepath.Rel(cfg.TemplatePath, path)
if err != nil {
return err
}
if !isCPTemplateConfigFile(rel) {
return nil
}
data, err := os.ReadFile(path)
if err != nil {
return err
}
return addFile(rel, data)
})
if err != nil {
return nil, err
}
}
for name, data := range cfg.ConfigFiles {
if err := addFile(name, data); err != nil {
return nil, err
}
}
if len(files) == 0 {
return nil, nil
}
return files, nil
}
// Stop terminates the workspace's EC2 instance via the control plane.
//
// Looks up the actual EC2 instance_id from the workspaces table before
@@ -484,9 +391,7 @@ func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool
// Don't leak the body — upstream errors may echo headers.
return true, fmt.Errorf("cp provisioner: status: unexpected %d", resp.StatusCode)
}
var result struct {
State string `json:"state"`
}
var result struct{ State string `json:"state"` }
// Cap body read at 64 KiB for parity with Start — a misconfigured
// or compromised CP streaming a huge body could otherwise exhaust
// memory in this hot path (called reactively per-request from
@@ -1,15 +1,11 @@
package provisioner
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"time"
@@ -217,59 +213,6 @@ func TestStart_HappyPath(t *testing.T) {
}
}
func TestStart_SendsTemplateAndGeneratedConfigFiles(t *testing.T) {
tmpl := t.TempDir()
if err := os.WriteFile(filepath.Join(tmpl, "config.yaml"), []byte("name: template\n"), 0o600); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(tmpl, "adapter.py"), bytes.Repeat([]byte("x"), cpConfigFilesMaxBytes), 0o600); err != nil {
t.Fatal(err)
}
if err := os.Mkdir(filepath.Join(tmpl, "prompts"), 0o700); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(tmpl, "prompts", "system.md"), []byte("hello"), 0o600); err != nil {
t.Fatal(err)
}
var body cpProvisionRequest
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Errorf("decode request: %v", err)
}
w.WriteHeader(http.StatusCreated)
_, _ = io.WriteString(w, `{"instance_id":"i-abc123","state":"pending"}`)
}))
defer srv.Close()
p := &CPProvisioner{baseURL: srv.URL, orgID: "org-1", httpClient: srv.Client()}
_, err := p.Start(context.Background(), WorkspaceConfig{
WorkspaceID: "ws-1",
Runtime: "claude-code",
Tier: 4,
PlatformURL: "http://tenant",
TemplatePath: tmpl,
ConfigFiles: map[string][]byte{
"config.yaml": []byte("name: generated\n"),
},
})
if err != nil {
t.Fatalf("Start: %v", err)
}
wantConfig := base64.StdEncoding.EncodeToString([]byte("name: generated\n"))
if got := body.ConfigFiles["config.yaml"]; got != wantConfig {
t.Errorf("config.yaml payload = %q, want generated override %q", got, wantConfig)
}
wantPrompt := base64.StdEncoding.EncodeToString([]byte("hello"))
if got := body.ConfigFiles["prompts/system.md"]; got != wantPrompt {
t.Errorf("prompt payload = %q, want %q", got, wantPrompt)
}
if _, ok := body.ConfigFiles["adapter.py"]; ok {
t.Error("non-config template file adapter.py must not be sent to CP")
}
}
// TestStart_Non201ReturnsStructuredError — when CP returns 401 with a
// structured {"error":"..."} body, Start surfaces that error message.
// Verifies the defense against log-leaking raw upstream bodies.
@@ -473,9 +416,9 @@ func TestStop_4xxResponseSurfacesError(t *testing.T) {
func TestStop_2xxVariantsAllSucceed(t *testing.T) {
primeInstanceIDLookup(t, map[string]string{"ws-1": "i-ok"})
for _, code := range []int{
http.StatusOK, // 200
http.StatusAccepted, // 202
http.StatusNoContent, // 204
http.StatusOK, // 200
http.StatusAccepted, // 202
http.StatusNoContent, // 204
} {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(code)
@@ -543,11 +486,11 @@ func TestIsRunning_ParsesStateField(t *testing.T) {
_, _ = io.WriteString(w, `{"state":"`+state+`"}`)
}))
p := &CPProvisioner{
baseURL: srv.URL,
orgID: "org-1",
baseURL: srv.URL,
orgID: "org-1",
sharedSecret: "s3cret",
adminToken: "tok-xyz",
httpClient: srv.Client(),
httpClient: srv.Client(),
}
got, err := p.IsRunning(context.Background(), "ws-1")
srv.Close()
@@ -899,67 +842,3 @@ func TestIsRunning_EmptyInstanceIDReturnsFalse(t *testing.T) {
t.Errorf("IsRunning with empty instance_id should return running=false, got true")
}
}
// TestCollectCPConfigFiles_SkipsSymlinks — WalkDir follows symlinks by default,
// but collectCPConfigFiles must skip them so a symlink inside a template dir
// pointing outside (e.g. ln -s /etc snapshot) cannot be traversed.
// Verifies OFFSEC-010 defense-in-depth fix. (OFFSEC-010)
func TestCollectCPConfigFiles_SkipsSymlinks(t *testing.T) {
tmpl := t.TempDir()
// Write a real file that should be included.
if err := os.WriteFile(filepath.Join(tmpl, "config.yaml"), []byte("name: real\n"), 0o600); err != nil {
t.Fatal(err)
}
// Create a subdir with a file that will be symlinked-outside.
sensitiveDir := t.TempDir()
if err := os.WriteFile(filepath.Join(sensitiveDir, "secret.txt"), []byte("SENSITIVE\n"), 0o600); err != nil {
t.Fatal(err)
}
// Symlink inside template dir pointing to outside path.
symlinkPath := filepath.Join(tmpl, "snapshot")
if err := os.Symlink(sensitiveDir, symlinkPath); err != nil {
t.Fatal(err)
}
files, err := collectCPConfigFiles(WorkspaceConfig{TemplatePath: tmpl})
if err != nil {
t.Fatalf("collectCPConfigFiles: %v", err)
}
if files == nil {
t.Fatal("files should not be nil")
}
// config.yaml must be present.
if _, ok := files["config.yaml"]; !ok {
t.Errorf("config.yaml missing from files")
}
// The symlinked path must NOT be included (even though WalkDir would
// traverse it, the d.Type()&os.ModeSymlink guard skips the entry).
for k := range files {
if strings.Contains(k, "snapshot") || strings.Contains(k, "secret") {
t.Errorf("symlink path %q should not be in files — OFFSEC-010 regression", k)
}
}
}
// TestCollectCPConfigFiles_RejectsRootSymlink — if cfg.TemplatePath itself is
// a symlink, WalkDir would follow it to an arbitrary directory, bypassing the
// cfg.TemplatePath boundary. The function must reject this case explicitly.
// (OFFSEC-010)
func TestCollectCPConfigFiles_RejectsRootSymlink(t *testing.T) {
real := t.TempDir()
if err := os.WriteFile(filepath.Join(real, "config.yaml"), []byte("name: real\n"), 0o600); err != nil {
t.Fatal(err)
}
link := filepath.Join(t.TempDir(), "template-link")
if err := os.Symlink(real, link); err != nil {
t.Fatal(err)
}
_, err := collectCPConfigFiles(WorkspaceConfig{TemplatePath: link})
if err == nil {
t.Error("collectCPConfigFiles with symlink TemplatePath should return error")
}
if err != nil && !strings.Contains(err.Error(), "symlink") {
t.Errorf("expected symlink-related error, got: %v", err)
}
}
@@ -481,22 +481,6 @@ func (p *Provisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, e
return "", fmt.Errorf("failed to create container: %w", err)
}
// Seed /configs before the entrypoint starts. molecule-runtime reads
// /configs/config.yaml immediately; post-start copy races fast runtimes
// into a FileNotFoundError crash loop.
if cfg.TemplatePath != "" {
if err := p.CopyTemplateToContainer(ctx, resp.ID, cfg.TemplatePath); err != nil {
_ = p.cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{Force: true})
return "", fmt.Errorf("failed to copy template to container %s before start: %w", name, err)
}
}
if len(cfg.ConfigFiles) > 0 {
if err := p.WriteFilesToContainer(ctx, resp.ID, cfg.ConfigFiles); err != nil {
_ = p.cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{Force: true})
return "", fmt.Errorf("failed to write config files to container %s before start: %w", name, err)
}
}
if err := p.cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil {
// Clean up created container on start failure
_ = p.cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{Force: true})
@@ -512,6 +496,20 @@ func (p *Provisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, e
// /configs and /workspace, then drops to agent via gosu). No per-start
// chown needed here.
// Copy template files into /configs if TemplatePath is set
if cfg.TemplatePath != "" {
if err := p.CopyTemplateToContainer(ctx, resp.ID, cfg.TemplatePath); err != nil {
log.Printf("Provisioner: warning — failed to copy template to container %s: %v", name, err)
}
}
// Write generated config files into /configs if ConfigFiles is set
if len(cfg.ConfigFiles) > 0 {
if err := p.WriteFilesToContainer(ctx, resp.ID, cfg.ConfigFiles); err != nil {
log.Printf("Provisioner: warning — failed to write config files to container %s: %v", name, err)
}
}
// Resolve the host-mapped port. Retry inspect up to 3 times if Docker hasn't
// bound the ephemeral port yet (rare race under heavy load).
hostURL := InternalURL(cfg.WorkspaceID) // fallback to Docker-internal
@@ -773,15 +771,6 @@ func ApplyTierConfig(hostCfg *container.HostConfig, cfg WorkspaceConfig, configM
// CopyTemplateToContainer copies files from a host directory into /configs in the container.
func (p *Provisioner) CopyTemplateToContainer(ctx context.Context, containerID, templatePath string) error {
buf, err := buildTemplateTar(templatePath)
if err != nil {
return err
}
return p.cli.CopyToContainer(ctx, containerID, "/configs", buf, container.CopyToContainerOptions{})
}
func buildTemplateTar(templatePath string) (*bytes.Buffer, error) {
// Resolve symlinks at the root before walking. filepath.Walk does
// NOT follow a symlink that IS the root — it Lstats the path, sees
// a symlink (non-directory), and emits exactly one entry without
@@ -804,15 +793,6 @@ func buildTemplateTar(templatePath string) (*bytes.Buffer, error) {
if err != nil {
return err
}
// OFFSEC-010: skip symlinks to prevent path traversal via malicious
// template symlinks (e.g. template/.ssh → /root/.ssh). filepath.Walk
// follows symlinks by default, so without this guard a crafted symlink
// inside the template directory could escape to include arbitrary host
// files in the tar archive. We intentionally skip rather than error so
// a broken symlink in an org template is a silent no-op.
if info.Mode()&os.ModeSymlink != 0 {
return nil
}
rel, err := filepath.Rel(templatePath, path)
if err != nil {
return err
@@ -853,13 +833,13 @@ func buildTemplateTar(templatePath string) (*bytes.Buffer, error) {
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to create tar from %s: %w", templatePath, err)
return fmt.Errorf("failed to create tar from %s: %w", templatePath, err)
}
if err := tw.Close(); err != nil {
return nil, fmt.Errorf("failed to close tar writer: %w", err)
return fmt.Errorf("failed to close tar writer: %w", err)
}
return &buf, nil
return p.cli.CopyToContainer(ctx, containerID, "/configs", &buf, container.CopyToContainerOptions{})
}
// WriteFilesToContainer writes in-memory files into /configs in the container.
@@ -1,9 +1,7 @@
package provisioner
import (
"archive/tar"
"errors"
"io"
"os"
"path/filepath"
"strings"
@@ -64,72 +62,6 @@ func TestValidateConfigSource_TemplateIsDirName(t *testing.T) {
}
}
func TestStartSeedsConfigsBeforeContainerStart(t *testing.T) {
src, err := os.ReadFile("provisioner.go")
if err != nil {
t.Fatalf("read provisioner.go: %v", err)
}
text := string(src)
copyTemplate := strings.Index(text, "p.CopyTemplateToContainer(ctx, resp.ID, cfg.TemplatePath)")
writeFiles := strings.Index(text, "p.WriteFilesToContainer(ctx, resp.ID, cfg.ConfigFiles)")
start := strings.Index(text, "p.cli.ContainerStart(ctx, resp.ID, container.StartOptions{})")
if copyTemplate < 0 || writeFiles < 0 || start < 0 {
t.Fatalf("expected Start to copy template, write config files, and start container")
}
if copyTemplate >= start || writeFiles >= start {
t.Fatalf("config seeding must happen before ContainerStart: copyTemplate=%d writeFiles=%d start=%d", copyTemplate, writeFiles, start)
}
}
func TestBuildTemplateTar_SkipsSymlinks(t *testing.T) {
dir := t.TempDir()
if err := os.WriteFile(filepath.Join(dir, "config.yaml"), []byte("name: safe\n"), 0644); err != nil {
t.Fatalf("write config: %v", err)
}
outside := filepath.Join(t.TempDir(), "secret.txt")
if err := os.WriteFile(outside, []byte("do-not-copy\n"), 0644); err != nil {
t.Fatalf("write outside target: %v", err)
}
if err := os.Symlink(outside, filepath.Join(dir, "linked-secret.txt")); err != nil {
t.Fatalf("create symlink: %v", err)
}
buf, err := buildTemplateTar(dir)
if err != nil {
t.Fatalf("buildTemplateTar: %v", err)
}
names := map[string]string{}
tr := tar.NewReader(buf)
for {
hdr, err := tr.Next()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
t.Fatalf("read tar: %v", err)
}
body, err := io.ReadAll(tr)
if err != nil {
t.Fatalf("read body for %s: %v", hdr.Name, err)
}
names[hdr.Name] = string(body)
}
if got := names["config.yaml"]; got != "name: safe\n" {
t.Fatalf("config.yaml body = %q, want safe config", got)
}
if _, ok := names["linked-secret.txt"]; ok {
t.Fatalf("symlink entry was copied into template tar: %#v", names)
}
for name, body := range names {
if strings.Contains(body, "do-not-copy") {
t.Fatalf("symlink target leaked through %s: %q", name, body)
}
}
}
// baseHostConfig returns a fresh HostConfig with typical pre-tier binds,
// mimicking what Start() builds before calling ApplyTierConfig.
func baseHostConfig(pluginsPath string) *container.HostConfig {
-75
View File
@@ -1,75 +0,0 @@
package push
import (
"net/http"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
// Handler exposes HTTP endpoints for push-token management.
type Handler struct {
repo *Repo
}
// NewHandler creates a push-token HTTP handler.
func NewHandler(repo *Repo) *Handler {
return &Handler{repo: repo}
}
// RegisterRoutes mounts push-token routes on the given router group.
func (h *Handler) RegisterRoutes(rg *gin.RouterGroup) {
rg.POST("/push-tokens", h.Create)
rg.DELETE("/push-tokens", h.Delete)
}
// Create handles POST /push-tokens.
// Body: { "token": "ExponentPushToken[xxx]", "platform": "ios" | "android" }
func (h *Handler) Create(c *gin.Context) {
workspaceID := c.Param("id")
if _, err := uuid.Parse(workspaceID); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"})
return
}
var body struct {
Token string `json:"token" binding:"required"`
Platform string `json:"platform" binding:"required,oneof=ios android"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := h.repo.SaveToken(c.Request.Context(), workspaceID, body.Token, body.Platform); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to save token"})
return
}
c.Status(http.StatusNoContent)
}
// Delete handles DELETE /push-tokens.
// Body: { "token": "ExponentPushToken[xxx]" }
func (h *Handler) Delete(c *gin.Context) {
workspaceID := c.Param("id")
if _, err := uuid.Parse(workspaceID); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace id"})
return
}
var body struct {
Token string `json:"token" binding:"required"`
}
if err := c.ShouldBindJSON(&body); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := h.repo.DeleteToken(c.Request.Context(), workspaceID, body.Token); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete token"})
return
}
c.Status(http.StatusNoContent)
}
-102
View File
@@ -1,102 +0,0 @@
package push
import (
"context"
"database/sql"
"log"
"os"
"time"
)
// Notifier sends push notifications for agent messages.
type Notifier struct {
repo *Repo
sender *Sender
}
// NewNotifier creates a Notifier.
func NewNotifier(db *sql.DB, sender *Sender) *Notifier {
return &Notifier{
repo: NewRepo(db),
sender: sender,
}
}
// NotifyAgentMessage sends a push notification to all registered devices for a
// workspace when an agent sends a message. It runs asynchronously (fire-and-
// forget) so the caller's WebSocket broadcast is never blocked.
func (n *Notifier) NotifyAgentMessage(ctx context.Context, workspaceID, workspaceName, message string) {
if n == nil || n.sender == nil {
return
}
// Capture values for the goroutine.
wsID := workspaceID
wsName := workspaceName
msg := message
go func() {
// Use a fresh context with timeout so a slow Expo API doesn't
// leak the caller's context deadline.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
tokens, err := n.repo.GetTokens(ctx, wsID)
if err != nil {
log.Printf("push: failed to get tokens for workspace %s: %v", wsID, err)
return
}
if len(tokens) == 0 {
return
}
// Expo accepts batches of up to ~100 messages; we cap lower to stay
// well under the limit.
const batchSize = 50
for i := 0; i < len(tokens); i += batchSize {
end := i + batchSize
if end > len(tokens) {
end = len(tokens)
}
batch := tokens[i:end]
messages := make([]Message, 0, len(batch))
for _, t := range batch {
messages = append(messages, Message{
To: t.Token,
Title: wsName,
Body: truncate(msg, 100),
Data: map[string]string{
"type": "agent_message",
"workspaceId": wsID,
"workspaceSlug": os.Getenv("MOLECULE_ORG_SLUG"),
},
Sound: "default",
Priority: "high",
})
}
results, err := n.sender.Send(ctx, messages)
if err != nil {
log.Printf("push: send failed for workspace %s: %v", wsID, err)
continue
}
// Remove invalid tokens.
for j, r := range results {
if ShouldRemoveToken(r) {
if delErr := n.repo.DeleteToken(ctx, wsID, batch[j].Token); delErr != nil {
log.Printf("push: failed to delete invalid token for workspace %s: %v", wsID, delErr)
}
}
}
}
}()
}
func truncate(s string, max int) string {
if len(s) <= max {
return s
}
return s[:max] + "…"
}
-159
View File
@@ -1,159 +0,0 @@
package push
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSenderSend(t *testing.T) {
gin.SetMode(gin.TestMode)
expoResponse := map[string]interface{}{
"data": []map[string]interface{}{
{"status": "ok", "id": "abc123"},
{"status": "error", "message": "Invalid token", "details": map[string]string{"error": "DeviceNotRegistered"}},
},
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "POST", r.Method)
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
var msgs []Message
require.NoError(t, json.NewDecoder(r.Body).Decode(&msgs))
assert.Len(t, msgs, 2)
assert.Equal(t, "ExponentPushToken[test1]", msgs[0].To)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(expoResponse)
}))
defer server.Close()
sender := NewSender("")
sender.apiURL = server.URL
results, err := sender.Send(context.Background(), []Message{
{To: "ExponentPushToken[test1]", Title: "Test", Body: "Hello"},
{To: "ExponentPushToken[test2]", Title: "Test", Body: "World"},
})
require.NoError(t, err)
require.Len(t, results, 2)
assert.Equal(t, "ok", results[0].Status)
assert.Equal(t, "error", results[1].Status)
assert.True(t, ShouldRemoveToken(results[1]))
}
func TestSenderSendEmpty(t *testing.T) {
sender := NewSender("")
results, err := sender.Send(context.Background(), nil)
require.NoError(t, err)
assert.Nil(t, results)
}
func TestHandlerCreate(t *testing.T) {
gin.SetMode(gin.TestMode)
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectExec("INSERT INTO push_tokens").
WithArgs("11111111-1111-1111-1111-111111111111", "ExponentPushToken[abc]", "ios").
WillReturnResult(sqlmock.NewResult(1, 1))
repo := NewRepo(db)
handler := NewHandler(repo)
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[abc]","platform":"ios"}`
req, _ := http.NewRequest("POST", "/workspaces/11111111-1111-1111-1111-111111111111/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusNoContent, w.Code)
require.NoError(t, mock.ExpectationsWereMet())
}
func TestHandlerCreateInvalidPlatform(t *testing.T) {
gin.SetMode(gin.TestMode)
db, _, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
handler := NewHandler(NewRepo(db))
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[abc]","platform":"windows"}`
req, _ := http.NewRequest("POST", "/workspaces/11111111-1111-1111-1111-111111111111/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusBadRequest, w.Code)
}
func TestHandlerDelete(t *testing.T) {
gin.SetMode(gin.TestMode)
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectExec("DELETE FROM push_tokens").
WithArgs("22222222-2222-2222-2222-222222222222", "ExponentPushToken[del]").
WillReturnResult(sqlmock.NewResult(0, 1))
repo := NewRepo(db)
handler := NewHandler(repo)
router := gin.New()
group := router.Group("/workspaces/:id")
handler.RegisterRoutes(group)
w := httptest.NewRecorder()
body := `{"token":"ExponentPushToken[del]"}`
req, _ := http.NewRequest("DELETE", "/workspaces/22222222-2222-2222-2222-222222222222/push-tokens", bytes.NewBufferString(body))
req.Header.Set("Content-Type", "application/json")
router.ServeHTTP(w, req)
assert.Equal(t, http.StatusNoContent, w.Code)
require.NoError(t, mock.ExpectationsWereMet())
}
func TestRepoGetTokens(t *testing.T) {
db, mock, err := sqlmock.New()
require.NoError(t, err)
defer db.Close()
mock.ExpectQuery("SELECT id, workspace_id, token, platform, created_at FROM push_tokens").
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"id", "workspace_id", "token", "platform", "created_at"}).
AddRow("1", "ws-1", "ExponentPushToken[a]", "ios", "2026-01-01T00:00:00Z").
AddRow("2", "ws-1", "ExponentPushToken[b]", "android", "2026-01-01T00:00:00Z"))
repo := NewRepo(db)
tokens, err := repo.GetTokens(context.Background(), "ws-1")
require.NoError(t, err)
require.Len(t, tokens, 2)
assert.Equal(t, "ExponentPushToken[a]", tokens[0].Token)
assert.Equal(t, "ios", tokens[0].Platform)
assert.Equal(t, "ExponentPushToken[b]", tokens[1].Token)
require.NoError(t, mock.ExpectationsWereMet())
}
-76
View File
@@ -1,76 +0,0 @@
package push
import (
"context"
"database/sql"
"fmt"
)
// Token is one registered push token for a workspace.
type Token struct {
ID string
WorkspaceID string
Token string
Platform string
CreatedAt string
}
// Repo reads and writes push tokens in Postgres.
type Repo struct {
db *sql.DB
}
// NewRepo creates a token repository backed by db.
func NewRepo(db *sql.DB) *Repo {
return &Repo{db: db}
}
// SaveToken registers a push token for a workspace. If the same token already
// exists for the workspace, it updates the timestamp.
func (r *Repo) SaveToken(ctx context.Context, workspaceID, token, platform string) error {
_, err := r.db.ExecContext(ctx, `
INSERT INTO push_tokens (workspace_id, token, platform)
VALUES ($1, $2, $3)
ON CONFLICT (workspace_id, token) DO UPDATE
SET updated_at = now()
`, workspaceID, token, platform)
if err != nil {
return fmt.Errorf("push_tokens: save: %w", err)
}
return nil
}
// DeleteToken removes a push token. Returns nil even if the token did not exist.
func (r *Repo) DeleteToken(ctx context.Context, workspaceID, token string) error {
_, err := r.db.ExecContext(ctx, `
DELETE FROM push_tokens
WHERE workspace_id = $1 AND token = $2
`, workspaceID, token)
if err != nil {
return fmt.Errorf("push_tokens: delete: %w", err)
}
return nil
}
// GetTokens returns all active push tokens for a workspace.
func (r *Repo) GetTokens(ctx context.Context, workspaceID string) ([]Token, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, workspace_id, token, platform, created_at
FROM push_tokens
WHERE workspace_id = $1
`, workspaceID)
if err != nil {
return nil, fmt.Errorf("push_tokens: list: %w", err)
}
defer rows.Close()
var tokens []Token
for rows.Next() {
var t Token
if err := rows.Scan(&t.ID, &t.WorkspaceID, &t.Token, &t.Platform, &t.CreatedAt); err != nil {
return nil, fmt.Errorf("push_tokens: scan: %w", err)
}
tokens = append(tokens, t)
}
return tokens, rows.Err()
}
-104
View File
@@ -1,104 +0,0 @@
package push
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
)
const expoPushAPI = "https://exp.host/--/api/v2/push/send"
// Message is one Expo push notification.
type Message struct {
To string `json:"to"`
Title string `json:"title,omitempty"`
Body string `json:"body,omitempty"`
Data map[string]string `json:"data,omitempty"`
Sound string `json:"sound,omitempty"`
Priority string `json:"priority,omitempty"`
}
// Sender delivers push notifications via the Expo Push Service.
type Sender struct {
apiURL string
httpClient *http.Client
expoToken string // optional Expo access token for authenticated requests
}
// NewSender creates a Sender. expoToken may be empty for unauthenticated
// requests (sufficient for most use cases).
func NewSender(expoToken string) *Sender {
return &Sender{
apiURL: expoPushAPI,
httpClient: &http.Client{
Timeout: 10 * time.Second,
},
expoToken: expoToken,
}
}
// SendResult is the per-recipient status from Expo.
type SendResult struct {
Status string `json:"status"`
ID string `json:"id"`
Message string `json:"message,omitempty"`
Details struct {
Error string `json:"error,omitempty"`
} `json:"details,omitempty"`
}
// expoResponse is the wrapper shape returned by the Expo API.
type expoResponse struct {
Data []SendResult `json:"data"`
}
// Send fires a batch of push messages. It returns a slice of results in the
// same order as the input, plus an error only when the HTTP call itself fails.
// Callers should inspect each result's Status field for per-message errors
// (e.g. "DeviceNotRegistered" → token should be deleted).
func (s *Sender) Send(ctx context.Context, messages []Message) ([]SendResult, error) {
if len(messages) == 0 {
return nil, nil
}
body, err := json.Marshal(messages)
if err != nil {
return nil, fmt.Errorf("push: marshal: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.apiURL, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("push: new request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
req.Header.Set("Accept-Encoding", "gzip, deflate")
if s.expoToken != "" {
req.Header.Set("Authorization", "Bearer "+s.expoToken)
}
res, err := s.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("push: post: %w", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("push: expo returned %d", res.StatusCode)
}
var resp expoResponse
if err := json.NewDecoder(res.Body).Decode(&resp); err != nil {
return nil, fmt.Errorf("push: decode: %w", err)
}
return resp.Data, nil
}
// ShouldRemoveToken reports whether a SendResult indicates the token is no
// longer valid and should be deleted from the database.
func ShouldRemoveToken(r SendResult) bool {
return r.Status == "error" && r.Details.Error == "DeviceNotRegistered"
}
@@ -14,9 +14,8 @@ func setupMockDB(t *testing.T) sqlmock.Sqlmock {
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
t.Cleanup(func() { mockDB.Close() })
return mock
}
@@ -31,9 +31,8 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock {
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
t.Cleanup(func() { mockDB.Close() })
return mock
}
@@ -17,9 +17,8 @@ func setupHibernationMock(t *testing.T) sqlmock.Sqlmock {
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
t.Cleanup(func() { mockDB.Close() })
return mock
}
@@ -18,9 +18,8 @@ func setupLivenessTestDB(t *testing.T) sqlmock.Sqlmock {
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
t.Cleanup(func() { mockDB.Close() })
return mock
}
+2 -15
View File
@@ -20,7 +20,6 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/plugins"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/push"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
"github.com/docker/docker/client"
@@ -319,25 +318,13 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// Remaining auth-gated workspace sub-routes — appended to wsAuth group declared above.
{
// Push notifications (mobile)
var pushNotifier *push.Notifier
if expoToken := os.Getenv("EXPO_ACCESS_TOKEN"); expoToken != "" {
pushNotifier = push.NewNotifier(db.DB, push.NewSender(expoToken))
}
// Activity Logs
acth := handlers.NewActivityHandler(broadcaster, pushNotifier)
acth := handlers.NewActivityHandler(broadcaster)
wsAuth.GET("/activity", acth.List)
wsAuth.GET("/session-search", acth.SessionSearch)
wsAuth.POST("/activity", acth.Report)
wsAuth.POST("/notify", acth.Notify)
// Push token registration (mobile)
if pushNotifier != nil {
pushH := push.NewHandler(push.NewRepo(db.DB))
pushH.RegisterRoutes(wsAuth)
}
// Chat history — RFC #2945 PR-C (issue #3017) + PR-D (issue
// #3026). Server-side rendering of activity_logs rows into
// the canonical ChatMessage shape; storage is plugin-shaped
@@ -441,7 +428,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// opencode session cannot saturate the platform.
// C3: commit_memory/recall_memory with scope=GLOBAL → permission error;
// send_message_to_user excluded unless MOLECULE_MCP_ALLOW_SEND_MESSAGE=true.
mcpH := handlers.NewMCPHandler(db.DB, broadcaster, pushNotifier)
mcpH := handlers.NewMCPHandler(db.DB, broadcaster)
if memBundle != nil {
mcpH.WithMemoryV2(memBundle.Plugin, memBundle.Resolver)
}
@@ -24,9 +24,8 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock {
if err != nil {
t.Fatalf("failed to create sqlmock: %v", err)
}
prevDB := db.DB
db.DB = mockDB
t.Cleanup(func() { mockDB.Close(); db.DB = prevDB })
t.Cleanup(func() { mockDB.Close() })
return mock
}
@@ -1 +0,0 @@
DROP TABLE IF EXISTS push_tokens;
@@ -1,11 +0,0 @@
CREATE TABLE push_tokens (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
token TEXT NOT NULL,
platform TEXT NOT NULL CHECK (platform IN ('ios', 'android')),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
UNIQUE(workspace_id, token)
);
CREATE INDEX idx_push_tokens_workspace ON push_tokens(workspace_id);
+2 -4
View File
@@ -40,8 +40,6 @@ _A2A_BOUNDARY_END = "[/A2A_RESULT_FROM_PEER]"
# inside the trusted zone. Escape BOTH boundary markers in the raw text
# before wrapping so they can never close the boundary early.
# We use "[/ " as the escape prefix — visually distinct from the real marker.
_A2A_BOUNDARY_START_ESCAPED = "[/ A2A_RESULT_FROM_PEER]"
_A2A_BOUNDARY_END_ESCAPED = "[/ /A2A_RESULT_FROM_PEER]"
def _escape_boundary_markers(text: str) -> str:
@@ -52,8 +50,8 @@ def _escape_boundary_markers(text: str) -> str:
the boundary early or inject a fake opener.
"""
return (
text.replace(_A2A_BOUNDARY_START, _A2A_BOUNDARY_START_ESCAPED)
.replace(_A2A_BOUNDARY_END, _A2A_BOUNDARY_END_ESCAPED)
text.replace(_A2A_BOUNDARY_START, "[/ A2A_RESULT_FROM_PEER]")
.replace(_A2A_BOUNDARY_END, "[/ /A2A_RESULT_FROM_PEER]")
)
+3 -7
View File
@@ -686,8 +686,8 @@ def _format_channel_content(
# --- MCP Server (JSON-RPC over stdio) ---
def _assert_stdio_is_pipe_compatible(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
"""Assert that stdio fds are pipe/socket/char-device compatible.
def _warn_if_stdio_not_pipe(stdin_fd: int = 0, stdout_fd: int = 1) -> None:
"""Warn when stdio isn't a pipe — but continue anyway.
The legacy asyncio.connect_read_pipe / connect_write_pipe transport
rejected regular files, PTYs, and sockets with:
@@ -711,10 +711,6 @@ def _assert_stdio_is_pipe_compatible(stdin_fd: int = 0, stdout_fd: int = 1) -> N
)
# Deprecated alias — the canonical name is _assert_stdio_is_pipe_compatible.
_warn_if_stdio_not_pipe = _assert_stdio_is_pipe_compatible
async def main(): # pragma: no cover
"""Run MCP server on stdio — reads JSON-RPC requests, writes responses.
@@ -971,7 +967,7 @@ def cli_main(transport: str = "stdio", port: int = 9100) -> None: # pragma: no
if transport == "http":
asyncio.run(_run_http_server(port))
else:
_assert_stdio_is_pipe_compatible()
_warn_if_stdio_not_pipe()
asyncio.run(main())
+1 -13
View File
@@ -49,9 +49,7 @@ from a2a_client import (
from a2a_tools_rbac import auth_headers_for_heartbeat as _auth_headers_for_heartbeat
from _sanitize_a2a import (
_A2A_BOUNDARY_END,
_A2A_BOUNDARY_END_ESCAPED,
_A2A_BOUNDARY_START,
_A2A_BOUNDARY_START_ESCAPED,
sanitize_a2a_result,
) # noqa: E402
@@ -332,18 +330,8 @@ async def tool_delegate_task(
# markers so the agent can distinguish trusted (own output) from untrusted
# (peer-supplied) content. Explicit wrapping here rather than inside
# sanitize_a2a_result preserves a clean separation of concerns.
#
# Truncate at the closer BEFORE sanitizing so the raw closer (which gets
# lost during escaping) is removed from the content. After truncation,
# sanitize the remaining text and wrap with escaped boundary markers.
if _A2A_BOUNDARY_END in result:
result = result[:result.index(_A2A_BOUNDARY_END)]
escaped = sanitize_a2a_result(result)
return (
f"{_A2A_BOUNDARY_START_ESCAPED}\n"
f"{escaped}\n"
f"{_A2A_BOUNDARY_END_ESCAPED}"
)
return f"{_A2A_BOUNDARY_START}\n{escaped}\n{_A2A_BOUNDARY_END}"
async def tool_delegate_task_async(
+10 -10
View File
@@ -1826,8 +1826,8 @@ def test_inbox_bridge_swallows_closed_loop_runtime_error():
class TestStdioPipeAssertion:
"""Pin _assert_stdio_is_pipe_compatible — the canonical function name.
_warn_if_stdio_not_pipe is a deprecated alias.
"""Pin _warn_if_stdio_not_pipe — the diagnostic warning that replaces
the old fatal _assert_stdio_is_pipe_compatible guard.
The universal stdio transport now works with ANY file descriptor
(pipes, regular files, PTYs, sockets), so the old exit-2 behavior
@@ -1838,12 +1838,12 @@ class TestStdioPipeAssertion:
def test_pipe_pair_passes_silently(self, caplog):
"""Happy path — both fds are pipes. No warning emitted."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
from a2a_mcp_server import _warn_if_stdio_not_pipe
r, w = os.pipe()
try:
with caplog.at_level("WARNING"):
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
assert "not a pipe" not in caplog.text
finally:
os.close(r)
@@ -1852,14 +1852,14 @@ class TestStdioPipeAssertion:
def test_regular_file_stdout_warns(self, tmp_path, caplog):
"""Reproducer for runtime#61: stdout redirected to a regular file.
Now emits a warning instead of exiting."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
from a2a_mcp_server import _warn_if_stdio_not_pipe
r, _w = os.pipe()
regular = tmp_path / "captured.log"
f = open(regular, "wb")
try:
with caplog.at_level("WARNING"):
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=f.fileno())
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=f.fileno())
assert "stdout" in caplog.text
assert "not a pipe" in caplog.text
finally:
@@ -1868,7 +1868,7 @@ class TestStdioPipeAssertion:
def test_regular_file_stdin_warns(self, tmp_path, caplog):
"""Symmetric case — stdin redirected from a regular file."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
from a2a_mcp_server import _warn_if_stdio_not_pipe
regular = tmp_path / "input.json"
regular.write_bytes(b'{"jsonrpc":"2.0","id":1,"method":"initialize"}\n')
@@ -1876,7 +1876,7 @@ class TestStdioPipeAssertion:
_r, w = os.pipe()
try:
with caplog.at_level("WARNING"):
_assert_stdio_is_pipe_compatible(stdin_fd=f.fileno(), stdout_fd=w)
_warn_if_stdio_not_pipe(stdin_fd=f.fileno(), stdout_fd=w)
assert "stdin" in caplog.text
assert "not a pipe" in caplog.text
finally:
@@ -1886,13 +1886,13 @@ class TestStdioPipeAssertion:
def test_closed_fd_warns_about_stat_error(self, caplog):
"""If stdio is closed, os.fstat raises OSError. Warning is
skipped silently (can't stat the fd)."""
from a2a_mcp_server import _assert_stdio_is_pipe_compatible
from a2a_mcp_server import _warn_if_stdio_not_pipe
r, w = os.pipe()
os.close(w) # Now `w` is a stale fd — fstat will fail.
try:
with caplog.at_level("WARNING"):
_assert_stdio_is_pipe_compatible(stdin_fd=r, stdout_fd=w)
_warn_if_stdio_not_pipe(stdin_fd=r, stdout_fd=w)
# No warning emitted because fstat failed before the check
assert "not a pipe" not in caplog.text
finally:
@@ -17,12 +17,11 @@ Test coverage for:
Issue references: #491 (delegate_task), #537 (builtin_tools/a2a_tools.py sibling)
Key sanitization facts (for test authors):
_escape_boundary_markers: replaces "[A2A_RESULT_FROM_PEER]" with
"[/ A2A_RESULT_FROM_PEER]" and "[/A2A_RESULT_FROM_PEER]" with
"[/ /A2A_RESULT_FROM_PEER]". The escape form is "[/ " (bracket-space).
Assertion pattern: assert "[/ A2A_RESULT_FROM_PEER]" in result.
Defense-in-depth injection escape patterns replace SYSTEM/OVERRIDE/
INSTRUCTIONS/IGNORE ALL/YOU ARE NOW with "[ESCAPED_*]" forms.
_escape_boundary_markers: inserts ZWSP (U+200B) before '[' at line-start.
The substring "[A2A_RESULT_FROM_PEER]" IS STILL in the output (preceded by ZWSP).
Assertion pattern: assert ZWSP in result.
_strip_closed_blocks: removes everything after the closer.
Assertion pattern: assert "hidden content" not in result.
Error path: when peer returns an error-prefixed string (starts with
_A2A_ERROR_PREFIX), the raw error text is included in the user-facing
"DELEGATION FAILED" message. This is intentional errors from peers
@@ -41,8 +40,7 @@ import pytest
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# Escape form used by _escape_boundary_markers (primary OFFSEC-003 control)
ESCAPED_START = "[/ A2A_RESULT_FROM_PEER]"
ZWSP = "" # Zero-width space (U+200B) — escape character
MARKER_FROM_PEER = "[A2A_RESULT_FROM_PEER]"
MARKER_ERROR = "[A2A_ERROR]"
@@ -119,8 +117,8 @@ class TestDelegateTaskSanitization:
to the agent via ``sanitize_a2a_result``.
"""
async def test_boundary_marker_escaped(self):
"""Peer response with [A2A_RESULT_FROM_PEER] must be escaped."""
async def test_boundary_marker_escaped_with_zwsp(self):
"""Peer response with [A2A_RESULT_FROM_PEER] must be ZWSP-escaped."""
import a2a_tools
peer = {"id": "peer-1", "url": "http://peer:9000", "name": "Peer", "status": "online"}
@@ -131,7 +129,7 @@ class TestDelegateTaskSanitization:
patch("a2a_tools.report_activity", new=AsyncMock()):
result = await a2a_tools.tool_delegate_task("peer-1", "do it")
assert ESCAPED_START in result, f"Expected escape form in result: {repr(result)}"
assert ZWSP in result, f"Expected ZWSP escape, got: {repr(result)}"
# Raw marker at line boundary must not appear
assert not result.startswith(MARKER_FROM_PEER)
assert f"\n{MARKER_FROM_PEER}" not in result
@@ -152,19 +150,19 @@ class TestDelegateTaskSanitization:
assert "real response" in result
async def test_log_line_breaK_injection_escaped(self):
"""Newline-prefixed boundary marker from peer must be escaped."""
"""Newline-prefixed [A2A_ERROR] from peer must be ZWSP-escaped."""
import a2a_tools
peer = {"id": "peer-1", "url": "http://peer:9000", "name": "Peer", "status": "online"}
injected = f"\n{MARKER_FROM_PEER} malicious log line\n"
injected = f"\n{MARKER_ERROR} malicious log line\n"
with patch("a2a_tools_delegation.discover_peer", return_value=peer), \
patch("a2a_tools_delegation.send_a2a_message", return_value=injected), \
patch("a2a_tools.report_activity", new=AsyncMock()):
result = await a2a_tools.tool_delegate_task("peer-1", "do it")
assert ESCAPED_START in result
assert f"\n{MARKER_FROM_PEER}" not in result
assert ZWSP in result
assert f"\n{MARKER_ERROR}" not in result
async def test_queued_fallback_result_is_sanitized(self, monkeypatch):
"""Poll-mode fallback path must sanitize the delegation result."""
@@ -205,8 +203,8 @@ class TestDelegateTaskSanitization:
result = await a2a_tools.tool_delegate_task("peer-1", "do it")
assert poll_called.get("yes"), "Polling path was not reached"
assert ESCAPED_START in result
assert MARKER_FROM_PEER not in result
assert ZWSP in result
assert MARKER_FROM_PEER not in result or ZWSP in result
# ---------------------------------------------------------------------------
@@ -241,7 +239,7 @@ class TestDelegateSyncViaPollingSanitization:
with patch("a2a_tools_delegation.httpx.AsyncClient", return_value=client):
result = await _delegate_sync_via_polling("peer-1", "do it", "src-ws")
assert ESCAPED_START in result
assert ZWSP in result
assert f"\n{MARKER_FROM_PEER}" not in result
async def test_failed_polling_sanitizes_error_detail(self, monkeypatch):
@@ -254,7 +252,7 @@ class TestDelegateSyncViaPollingSanitization:
{
"delegation_id": "del-fail",
"status": "failed",
"error_detail": MARKER_FROM_PEER + " escalation via error",
"error_detail": MARKER_ERROR + " escalation via error",
}
])
@@ -271,7 +269,7 @@ class TestDelegateSyncViaPollingSanitization:
result = await _delegate_sync_via_polling("peer-1", "do it", "src-ws")
assert result.startswith(_A2A_ERROR_PREFIX)
assert ESCAPED_START in result # boundary marker in error_detail is escaped
assert ZWSP in result # raw error text inside the sentinel block is escaped
# ---------------------------------------------------------------------------
@@ -287,7 +285,7 @@ class TestCheckTaskStatusSanitization:
delegation_data = {
"delegation_id": "del-filter",
"status": "completed",
"summary": MARKER_FROM_PEER + " elevation via summary",
"summary": MARKER_ERROR + " elevation via summary",
"response_preview": "clean preview",
}
client = _make_async_client(get_resp=_http(200, [delegation_data]))
@@ -298,8 +296,8 @@ class TestCheckTaskStatusSanitization:
)
parsed = json.loads(result)
assert ESCAPED_START in parsed["summary"]
assert MARKER_FROM_PEER not in parsed["summary"]
assert ZWSP in parsed["summary"]
assert f"\n{MARKER_ERROR}" not in parsed["summary"]
assert parsed["response_preview"] == "clean preview"
async def test_filtered_sanitizes_response_preview(self):
@@ -320,7 +318,7 @@ class TestCheckTaskStatusSanitization:
)
parsed = json.loads(result)
assert ESCAPED_START in parsed["response_preview"]
assert ZWSP in parsed["response_preview"]
assert f"\n{MARKER_FROM_PEER}" not in parsed["response_preview"]
assert parsed["summary"] == "clean summary"
@@ -333,7 +331,7 @@ class TestCheckTaskStatusSanitization:
"delegation_id": "del-1",
"target_id": "peer-1",
"status": "completed",
"summary": MARKER_FROM_PEER + " from delegation 1",
"summary": MARKER_ERROR + " from delegation 1",
"response_preview": "",
},
{
@@ -354,9 +352,10 @@ class TestCheckTaskStatusSanitization:
parsed = json.loads(result)
summaries = [d["summary"] for d in parsed["delegations"]]
for s in summaries:
assert ESCAPED_START in s, f"Expected escape in summary: {repr(s)}"
assert ZWSP in s, f"Expected ZWSP escape in summary: {repr(s)}"
for s in summaries:
assert MARKER_FROM_PEER not in s
assert f"\n{MARKER_ERROR}" not in s
assert f"\n{MARKER_FROM_PEER}" not in s
async def test_not_found_returns_clean_json(self):
"""task_id given but no match → returns clean not_found JSON."""
@@ -398,7 +397,7 @@ class TestRegression491:
# Must not be returned as-is
assert result != raw_result
# Must be escaped
assert ESCAPED_START in result
assert ZWSP in result
# Must not appear at a line boundary
assert not result.startswith(MARKER_FROM_PEER)
assert f"\n{MARKER_FROM_PEER}" not in result
+2 -3
View File
@@ -218,8 +218,7 @@ class TestPollingPathSanitization:
result = asyncio.run(d.tool_delegate_task("ws-peer", "do it"))
# tool_delegate_task wraps the sanitized text in _A2A_BOUNDARY_START/END
# (NOT _A2A_RESULT_FROM_PEER — that marker is for the messaging path).
# Wrapped in escaped form to prevent raw closer from appearing in output.
assert d._A2A_BOUNDARY_START_ESCAPED in result
assert d._A2A_BOUNDARY_END_ESCAPED in result
assert d._A2A_BOUNDARY_START in result
assert d._A2A_BOUNDARY_END in result
assert "Sanitized peer reply" in result
+3 -3
View File
@@ -277,7 +277,7 @@ class TestToolDelegateTask:
patch("a2a_tools.report_activity", new=AsyncMock()):
result = await a2a_tools.tool_delegate_task("ws-1", "do something")
assert result == "[/ A2A_RESULT_FROM_PEER]\nTask completed!\n[/ /A2A_RESULT_FROM_PEER]"
assert result == "[A2A_RESULT_FROM_PEER]\nTask completed!\n[/A2A_RESULT_FROM_PEER]"
async def test_error_response_returns_delegation_failed_message(self):
"""When send_a2a_message returns _A2A_ERROR_PREFIX text, delegation fails."""
@@ -305,7 +305,7 @@ class TestToolDelegateTask:
patch("a2a_tools.report_activity", new=AsyncMock()):
result = await a2a_tools.tool_delegate_task("ws-cached", "task")
assert result == "[/ A2A_RESULT_FROM_PEER]\ndone\n[/ /A2A_RESULT_FROM_PEER]"
assert result == "[A2A_RESULT_FROM_PEER]\ndone\n[/A2A_RESULT_FROM_PEER]"
async def test_peer_name_falls_back_to_id_prefix(self):
"""When peer has no name and cache is empty, name = first 8 chars of workspace_id."""
@@ -319,7 +319,7 @@ class TestToolDelegateTask:
patch("a2a_tools.report_activity", new=AsyncMock()):
result = await a2a_tools.tool_delegate_task("ws-nona000", "task")
assert result == "[/ A2A_RESULT_FROM_PEER]\nok\n[/ /A2A_RESULT_FROM_PEER]"
assert result == "[A2A_RESULT_FROM_PEER]\nok\n[/A2A_RESULT_FROM_PEER]"
# Cache should now have been set
assert a2a_tools._peer_names.get("ws-nona000") is not None
@@ -69,7 +69,7 @@ class TestFlagOffLegacyPath:
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
from _sanitize_a2a import _A2A_BOUNDARY_END_ESCAPED, _A2A_BOUNDARY_START_ESCAPED
from _sanitize_a2a import _A2A_BOUNDARY_END, _A2A_BOUNDARY_START
send_calls = []
async def fake_send(workspace_id, task, source_workspace_id=None):
@@ -91,8 +91,8 @@ class TestFlagOffLegacyPath:
)
# OFFSEC-003: result is wrapped in boundary markers
assert _A2A_BOUNDARY_START_ESCAPED in result
assert _A2A_BOUNDARY_END_ESCAPED in result
assert _A2A_BOUNDARY_START in result
assert _A2A_BOUNDARY_END in result
assert "legacy ok" in result
assert send_calls == [("ws-target", "task body", "ws-self")]
poll_mock.assert_not_called()
@@ -124,7 +124,7 @@ class TestPollModeAutoFallback:
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
from _sanitize_a2a import _A2A_BOUNDARY_END_ESCAPED, _A2A_BOUNDARY_START_ESCAPED
from _sanitize_a2a import _A2A_BOUNDARY_END, _A2A_BOUNDARY_START
from a2a_client import _A2A_QUEUED_PREFIX
send_calls = []
@@ -159,8 +159,8 @@ class TestPollModeAutoFallback:
assert poll_calls[0] == ("ws-target", "task body", "ws-self")
# Caller sees the real reply, NOT the queued sentinel and NOT
# a DELEGATION FAILED string. Wrapped in OFFSEC-003 boundary markers.
assert _A2A_BOUNDARY_START_ESCAPED in result
assert _A2A_BOUNDARY_END_ESCAPED in result
assert _A2A_BOUNDARY_START in result
assert _A2A_BOUNDARY_END in result
assert "real response from poll-mode peer" in result
async def test_non_queued_send_result_does_not_trigger_fallback(self, monkeypatch):
@@ -169,7 +169,7 @@ class TestPollModeAutoFallback:
monkeypatch.delenv("DELEGATION_SYNC_VIA_INBOX", raising=False)
import a2a_tools
from _sanitize_a2a import _A2A_BOUNDARY_END_ESCAPED, _A2A_BOUNDARY_START_ESCAPED
from _sanitize_a2a import _A2A_BOUNDARY_END, _A2A_BOUNDARY_START
async def fake_send(*_a, **_kw):
return "normal reply"
@@ -189,8 +189,8 @@ class TestPollModeAutoFallback:
)
# OFFSEC-003: wrapped in boundary markers
assert _A2A_BOUNDARY_START_ESCAPED in result
assert _A2A_BOUNDARY_END_ESCAPED in result
assert _A2A_BOUNDARY_START in result
assert _A2A_BOUNDARY_END in result
assert "normal reply" in result
poll_mock.assert_not_called()