Compare commits

...

8 Commits

Author SHA1 Message Date
Molecule AI Dev Engineer A (Kimi) 20249dc735 Merge main into fix/internal-805-cf-auth-drift to pick up e2e-chat curl fix
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
CI / Python Lint & Test (pull_request) Successful in 6s
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Successful in 2s
E2E API Smoke Test / detect-changes (pull_request) Successful in 13s
E2E Chat / detect-changes (pull_request) Successful in 14s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 12s
Harness Replays / detect-changes (pull_request) Successful in 5s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Successful in 3s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 17s
Lint forbidden tenant-env keys / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 2s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 4s
CI / Detect changes (pull_request) Successful in 28s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Successful in 6s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m6s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 5s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m11s
gate-check-v3 / gate-check (pull_request_target) Successful in 4s
qa-review / approved (pull_request_target) Failing after 6s
security-review / approved (pull_request_target) Failing after 4s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 59s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m11s
E2E Chat / E2E Chat (pull_request) Successful in 3s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 3s
Harness Replays / Harness Replays (pull_request) Successful in 2s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 2m17s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 2m17s
CI / Canvas (Next.js) (pull_request) Successful in 19s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 28s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m5s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 1m30s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / Platform (Go) (pull_request) Successful in 4m32s
CI / all-required (pull_request) Successful in 2s
sop-checklist / review-refire (pull_request_target) Has been skipped
sop-checklist / all-items-acked (pull_request) [info tier:low] acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request_target) Successful in 4s
sop-tier-check / tier-check (pull_request_target) Failing after 5s
qa-review / approved (pull_request_review) Has been skipped
security-review / approved (pull_request_review) Has been skipped
sop-tier-check / tier-check (pull_request_review) Successful in 6s
audit-force-merge / audit (pull_request_target) Has been skipped
2026-06-04 05:48:12 +00:00
Molecule AI Dev Engineer A (Kimi) 402b899ed8 fix(ci): add CLOUDFLARE_* secret fallback to sweep-cf workflows (internal#805)
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Successful in 0s
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 10s
CI / Python Lint & Test (pull_request) Successful in 4s
E2E API Smoke Test / detect-changes (pull_request) Successful in 7s
CI / Detect changes (pull_request) Successful in 36s
Harness Replays / detect-changes (pull_request) Successful in 4s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Successful in 3s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 8s
E2E Chat / detect-changes (pull_request) Successful in 8s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 7s
Lint forbidden tenant-env keys / Scan for repo-host token write into tenant workspace surface (pull_request) Successful in 3s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Successful in 3s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 4s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Failing after 15s
qa-review / approved (pull_request_target) Failing after 16s
gate-check-v3 / gate-check (pull_request_target) Successful in 17s
sop-checklist / all-items-acked (pull_request) acked: 0/7 — missing: comprehensive-testing, local-postgres-e2e, staging-smoke, +4
sop-checklist / na-declarations (pull_request) N/A: (none)
sop-checklist / all-items-acked (pull_request_target) Successful in 8s
sop-checklist / review-refire (pull_request_target) Has been skipped
security-review / approved (pull_request_target) Failing after 9s
sop-tier-check / tier-check (pull_request_target) Successful in 10s
lint-required-no-paths / lint-required-no-paths (pull_request) Successful in 54s
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Successful in 1m1s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Successful in 1m10s
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Successful in 1m11s
Ops Scripts Tests / Ops scripts (unittest) (pull_request) Successful in 1m2s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Successful in 1m10s
CI / Canvas (Next.js) (pull_request) Successful in 20s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 17s
Harness Replays / Harness Replays (pull_request) Successful in 5s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 2s
E2E Chat / E2E Chat (pull_request) Successful in 9s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m12s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 1m16s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CI / Platform (Go) (pull_request) Successful in 7m9s
CI / all-required (pull_request) Successful in 5s
The sweep-cf-orphans and sweep-cf-tunnels workflows reference CI-scoped
secret names (CF_API_TOKEN, CF_ZONE_ID, CF_ACCOUNT_ID) while the
operator-host canonical names are CLOUDFLARE_API_TOKEN,
CLOUDFLARE_ZONE_ID, CLOUDFLARE_ACCOUNT_ID. When the CI-scoped duplicates
are missing from the secret store, the scheduled sweeps hard-fail even
though the canonical names are present.

Changes:
- Workflow YAML: `secrets.CF_API_TOKEN || secrets.CLOUDFLARE_API_TOKEN`
  (same pattern for ZONE_ID and ACCOUNT_ID).
- Scripts: add env-var fallback so direct local invocation also works.
- Comments and error messages updated to mention both naming conventions.

Closes internal#805

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-04 04:08:10 +00:00
Molecule AI Dev Engineer A (Kimi) 59a3366b5c fix(ci-drift): add REQUIRED_CHECKS_JSON variant support (internal#804)
The ci-required-drift parser only looked for REQUIRED_CHECKS while
audit-force-merge.yml switched to REQUIRED_CHECKS_JSON (branch-aware
dict). This caused F3 drift detection to fail on repos using the JSON
variant.

Changes:
- required_checks_env() now detects both REQUIRED_CHECKS_JSON (preferred)
  and REQUIRED_CHECKS (legacy fallback).
- For JSON variant: parse the dict, extract the array for the target
  branch, validate structure, return as a set of context names.
- For legacy variant: unchanged newline-split behavior.
- Error messages updated to mention both env vars.
- render_body() resolution text updated to mention both variants.
- Tests added for JSON precedence, fallback, missing branch, malformed
  JSON, and full drift-class coverage (F3a/F3b/happy-path).

Closes internal#804

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-04 04:08:10 +00:00
Molecule AI Dev Engineer A (Kimi) ca45964049 fix(integration): avoid invalid-UTF-8 insert into workspace_schedules.prompt
Postgres TEXT columns in a UTF-8 database reject raw bytes like 0x80 and
0xff. The test was trying to insert these into workspace_schedules.prompt
via insertSchedule, which failed with:

  pq: invalid byte sequence for encoding "UTF8": 0x80

Fix: insert a valid prompt into the DB fixture, then call fireSchedule
directly with a scheduleRow whose Prompt field carries the invalid bytes.
This still exercises the #2026 regression path (sanitizeUTF8 before jsonb
INSERT) without tripping Postgres TEXT validation.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-04 04:08:10 +00:00
Molecule AI Dev Engineer A (Kimi) 132409abf8 test(scheduler): fix fixture enum drift — 'active' → 'online' (internal#795)
The workspace_status enum migrated away from 'active' in migration
043_workspace_status_enum.up.sql; valid values are provisioning/online/
offline/degraded/failed/removed/paused/hibernated/awaiting_agent/
hibernating. Inserting 'active' caused all five scheduler integration
tests to fail at fixture setup with:

  invalid input value for enum workspace_status: "active"

Fix: use 'online' (a valid enum member) for runnable fixture workspaces.
Also updates the helper comment to cite enum validity.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-04 04:08:10 +00:00
molecule-code-reviewer 3d9a0b2b84 ci(handlers-pg): run scheduler real-PG integration tests (#2149) 2026-06-04 04:08:10 +00:00
molecule-code-reviewer 53914b5d5d ci(detect-changes): trigger handlers-postgres profile on scheduler pkg (#2149) 2026-06-04 04:08:10 +00:00
molecule-code-reviewer dec102d5f9 test(scheduler): real-PG regression tests for cron firing loop (#2149)
Closes #2149
2026-06-04 04:08:10 +00:00
10 changed files with 885 additions and 41 deletions
+78 -29
View File
@@ -8,7 +8,8 @@ pair diverges.
Sources:
A. `.gitea/workflows/ci.yml` jobs (CI source — the actual job set)
B. `status_check_contexts` in branch_protections (the merge gate)
C. `REQUIRED_CHECKS` env in audit-force-merge.yml (the audit env)
C. `REQUIRED_CHECKS_JSON` (preferred) or `REQUIRED_CHECKS` (legacy)
env in audit-force-merge.yml (the audit env)
Three failure classes:
F1 Job in (A) is not under the sentinel's `needs:` — sentinel
@@ -250,13 +251,21 @@ def sentinel_needs(ci_doc: dict) -> set[str]:
return set(needs)
def required_checks_env(audit_doc: dict) -> set[str]:
"""Pull the REQUIRED_CHECKS env value from audit-force-merge.yml.
def required_checks_env(audit_doc: dict, branch: str) -> set[str]:
"""Pull the required-checks env value from audit-force-merge.yml.
Walks the YAML AST per `feedback_behavior_based_ast_gates`: we do
NOT grep for `REQUIRED_CHECKS:` — that breaks under reformatting,
NOT grep for env keys — that breaks under reformatting,
multi-job workflows, or a future move of the env to a different
step. Instead, look inside every job's every step's `env:` map."""
found: list[str] = []
step. Instead, look inside every job's every step's `env:` map.
Supports two variants:
- REQUIRED_CHECKS_JSON (preferred): JSON dict keyed by branch name.
We extract the array for the target branch.
- REQUIRED_CHECKS (legacy): newline-separated list of context names.
"""
found_json: list[str] = []
found_legacy: list[str] = []
jobs = audit_doc.get("jobs", {})
if not isinstance(jobs, dict):
sys.stderr.write(f"::warning::{AUDIT_WORKFLOW_PATH} has no jobs: mapping\n")
@@ -268,27 +277,67 @@ def required_checks_env(audit_doc: dict) -> set[str]:
if not isinstance(step, dict):
continue
step_env = step.get("env") or {}
if isinstance(step_env, dict) and "REQUIRED_CHECKS" in step_env:
v = step_env["REQUIRED_CHECKS"]
if isinstance(v, str):
found.append(v)
if not found:
sys.stderr.write(
f"::error::REQUIRED_CHECKS env not found in any step of "
f"{AUDIT_WORKFLOW_PATH}\n"
)
sys.exit(3)
if len(found) > 1:
# Defensive: refuse to guess which one is canonical.
sys.stderr.write(
f"::error::REQUIRED_CHECKS env present in {len(found)} steps; ambiguous\n"
)
sys.exit(3)
raw = found[0]
# YAML block-scalars (`|`) leave a trailing newline + blanks; trim
# consistently with audit-force-merge.sh's parser so both sides
# produce identical sets.
return {line.strip() for line in raw.splitlines() if line.strip()}
if isinstance(step_env, dict):
if "REQUIRED_CHECKS_JSON" in step_env:
v = step_env["REQUIRED_CHECKS_JSON"]
if isinstance(v, str):
found_json.append(v)
if "REQUIRED_CHECKS" in step_env:
v = step_env["REQUIRED_CHECKS"]
if isinstance(v, str):
found_legacy.append(v)
# JSON variant takes precedence.
if found_json:
if len(found_json) > 1:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON env present in {len(found_json)} steps; ambiguous\n"
)
sys.exit(3)
try:
parsed = json.loads(found_json[0])
except json.JSONDecodeError as e:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON is not valid JSON: {e}\n"
)
sys.exit(3)
if not isinstance(parsed, dict):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON parsed to {type(parsed).__name__}, expected dict\n"
)
sys.exit(3)
branch_checks = parsed.get(branch)
if branch_checks is None:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON has no entry for branch '{branch}'\n"
)
sys.exit(3)
if not isinstance(branch_checks, list):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'] is {type(branch_checks).__name__}, expected list\n"
)
sys.exit(3)
return {str(item).strip() for item in branch_checks if str(item).strip()}
# Legacy variant fallback.
if found_legacy:
if len(found_legacy) > 1:
# Defensive: refuse to guess which one is canonical.
sys.stderr.write(
f"::error::REQUIRED_CHECKS env present in {len(found_legacy)} steps; ambiguous\n"
)
sys.exit(3)
raw = found_legacy[0]
# YAML block-scalars (`|`) leave a trailing newline + blanks; trim
# consistently with audit-force-merge.sh's parser so both sides
# produce identical sets.
return {line.strip() for line in raw.splitlines() if line.strip()}
sys.stderr.write(
f"::error::Neither REQUIRED_CHECKS_JSON nor REQUIRED_CHECKS env found in any step of "
f"{AUDIT_WORKFLOW_PATH}\n"
)
sys.exit(3)
# --------------------------------------------------------------------------
@@ -330,7 +379,7 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
jobs = ci_job_names(ci_doc)
jobs_all = ci_jobs_all(ci_doc)
needs = sentinel_needs(ci_doc)
env_set = required_checks_env(audit_doc)
env_set = required_checks_env(audit_doc, branch)
# Protection
# api() raises ApiError on non-2xx. Transient 5xx should fail loud.
@@ -524,7 +573,7 @@ def render_body(branch: str, findings: list[str], debug: dict) -> str:
"- **F2**: rename the protection context to match an emitter, "
"or remove it from `status_check_contexts` "
"(PATCH `/api/v1/repos/{owner}/{repo}/branch_protections/{branch}`).",
"- **F3a / F3b**: bring `REQUIRED_CHECKS` env in "
"- **F3a / F3b**: bring `REQUIRED_CHECKS_JSON` (or `REQUIRED_CHECKS` legacy) env in "
"`.gitea/workflows/audit-force-merge.yml` into set-equality with "
"`status_check_contexts` (single PR, both files).",
"",
+5
View File
@@ -26,6 +26,10 @@ PROFILES: dict[str, dict[str, str]] = {
"handlers": (
r"^workspace-server/internal/handlers/"
r"|^workspace-server/internal/wsauth/"
# #2149: the scheduler real-PG integration tests run in this same
# workflow (they reuse its migrated Postgres), so changes to the
# scheduler package must trigger the job too.
r"|^workspace-server/internal/scheduler/"
r"|^workspace-server/migrations/"
r"|^\.gitea/workflows/handlers-postgres-integration\.yml$"
),
@@ -174,3 +178,4 @@ def main(argv: list[str]) -> int:
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
@@ -1,4 +1,5 @@
import importlib.util
import json
import sys
from pathlib import Path
from unittest.mock import patch
@@ -36,6 +37,76 @@ def _make_audit_doc(required_checks: list[str]) -> dict:
}
def _make_audit_doc_json(required_checks_json: dict) -> dict:
return {
"jobs": {
"audit": {
"steps": [
{"env": {"REQUIRED_CHECKS_JSON": json.dumps(required_checks_json)}}
]
}
}
}
# ---------------------------------------------------------------------------
# required_checks_env — dual-variant parsing
# ---------------------------------------------------------------------------
def test_required_checks_env_prefers_json_over_legacy():
doc = {
"jobs": {
"audit": {
"steps": [
{
"env": {
"REQUIRED_CHECKS_JSON": json.dumps(
{"main": ["ctx-a"], "staging": ["ctx-b"]}
),
"REQUIRED_CHECKS": "ctx-legacy\nctx-old",
}
}
]
}
}
}
assert drift.required_checks_env(doc, "main") == {"ctx-a"}
assert drift.required_checks_env(doc, "staging") == {"ctx-b"}
def test_required_checks_env_falls_back_to_legacy():
doc = _make_audit_doc(["legacy-ctx"])
assert drift.required_checks_env(doc, "main") == {"legacy-ctx"}
def test_required_checks_env_json_missing_branch_fails():
doc = _make_audit_doc_json({"staging": ["ctx-b"]})
try:
drift.required_checks_env(doc, "main")
except SystemExit as exc:
assert exc.code == 3
else:
raise AssertionError("expected SystemExit(3)")
def test_required_checks_env_json_malformed_fails():
doc = {
"jobs": {
"audit": {
"steps": [
{"env": {"REQUIRED_CHECKS_JSON": "not-json"}}
]
}
}
}
try:
drift.required_checks_env(doc, "main")
except SystemExit as exc:
assert exc.code == 3
else:
raise AssertionError("expected SystemExit(3)")
# ---------------------------------------------------------------------------
# sentinel_needs
# ---------------------------------------------------------------------------
@@ -243,7 +243,8 @@ jobs:
# MUST exist for the integration tests to be meaningful. Hard-
# fail if any didn't land — that would be a real regression we
# want loud.
for tbl in delegations workspaces activity_logs pending_uploads; do
# workspace_schedules added for the #2149 scheduler integration tests.
for tbl in delegations workspaces activity_logs pending_uploads workspace_schedules; do
if ! psql -h "${PG_HOST}" -U postgres -d molecule -tA \
-c "SELECT 1 FROM information_schema.tables WHERE table_name = '$tbl'" \
| grep -q 1; then
@@ -274,6 +275,16 @@ jobs:
# workflow runs don't fight over a host-net 5432 port.
go test -tags=integration -timeout 5m -v ./internal/handlers/ -run "^TestIntegration_"
- if: needs.detect-changes.outputs.handlers == 'true'
name: Run scheduler integration tests (#2149)
run: |
# #2149: real-PG regression coverage for the scheduler firing loop
# (tick → A2A fire → write-back of last_run_at/next_run_at/run_count/
# activity_logs jsonb incl. invalid-UTF-8 sanitization + sweepPhantomBusy).
# Reuses the same migrated Postgres (workspace_schedules / activity_logs
# / workspaces all landed by the migration replay step above).
go test -tags=integration -timeout 5m -v ./internal/scheduler/ -run "^TestIntegration_"
- if: failure() && needs.detect-changes.outputs.handlers == 'true'
name: Diagnostic dump on failure
env:
+7 -4
View File
@@ -34,8 +34,10 @@ name: Sweep stale Cloudflare DNS records
# scripts/ops/test_sweep_cf_decide.py (#2027) cover the rule
# classifier.
#
# Secrets: CF_API_TOKEN, CF_ZONE_ID, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
# are confirmed existing per issue #425 §425 audit. CP_ADMIN_API_TOKEN and
# Secrets: CF_API_TOKEN (preferred CI-scoped name) or CLOUDFLARE_API_TOKEN
# (operator-host canonical name) are accepted — the workflow falls back
# automatically. Same for CF_ZONE_ID / CLOUDFLARE_ZONE_ID. Confirmed
# existing per issue #425 §425 audit. CP_ADMIN_API_TOKEN and
# CP_STAGING_ADMIN_API_TOKEN are unconfirmed — if missing, the verify step
# (schedule → hard-fail, dispatch → soft-skip) surfaces it clearly.
@@ -79,8 +81,8 @@ jobs:
# each individually capped at 10s by the script's curl -m flag.
timeout-minutes: 3
env:
CF_API_TOKEN: ${{ secrets.CF_API_TOKEN }}
CF_ZONE_ID: ${{ secrets.CF_ZONE_ID }}
CF_API_TOKEN: ${{ secrets.CF_API_TOKEN || secrets.CLOUDFLARE_API_TOKEN }}
CF_ZONE_ID: ${{ secrets.CF_ZONE_ID || secrets.CLOUDFLARE_ZONE_ID }}
CP_ADMIN_API_TOKEN: ${{ secrets.CP_ADMIN_API_TOKEN }}
CP_STAGING_ADMIN_API_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }}
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
@@ -129,6 +131,7 @@ jobs:
fi
echo "::error::sweep cannot run — required secrets missing: ${missing[*]}"
echo "::error::set them at Settings → Secrets and Variables → Actions, or disable this workflow."
echo "::error::Cloudflare secrets accept either the CI-scoped name (CF_API_TOKEN / CF_ZONE_ID) or the operator-host canonical name (CLOUDFLARE_API_TOKEN / CLOUDFLARE_ZONE_ID)."
echo "::error::a silent skip masked an active CF DNS leak (152/200 zone records) caught only by a manual audit on 2026-04-28; this gate exists to make the gap visible."
exit 1
fi
+8 -6
View File
@@ -29,10 +29,12 @@ name: Sweep stale Cloudflare Tunnels
# the DNS sweep's 50% because tenant-shaped tunnels are mostly
# orphans by design) refuses to nuke past the threshold.
#
# Secrets: CF_API_TOKEN, CF_ACCOUNT_ID are confirmed existing per
# issue #425 §425 audit. CP_ADMIN_API_TOKEN and CP_STAGING_ADMIN_API_TOKEN
# are unconfirmed — if missing, the verify step (schedule → hard-fail,
# dispatch → soft-skip) surfaces it clearly.
# Secrets: CF_API_TOKEN (preferred CI-scoped name) or CLOUDFLARE_API_TOKEN
# (operator-host canonical name) are accepted — the workflow falls back
# automatically. Same for CF_ACCOUNT_ID / CLOUDFLARE_ACCOUNT_ID. Confirmed
# existing per issue #425 §425 audit. CP_ADMIN_API_TOKEN and
# CP_STAGING_ADMIN_API_TOKEN are unconfirmed — if missing, the verify step
# (schedule → hard-fail, dispatch → soft-skip) surfaces it clearly.
on:
schedule:
@@ -74,8 +76,8 @@ jobs:
# the sweep-cf-orphans companion job).
timeout-minutes: 30
env:
CF_API_TOKEN: ${{ secrets.CF_API_TOKEN }}
CF_ACCOUNT_ID: ${{ secrets.CF_ACCOUNT_ID }}
CF_API_TOKEN: ${{ secrets.CF_API_TOKEN || secrets.CLOUDFLARE_API_TOKEN }}
CF_ACCOUNT_ID: ${{ secrets.CF_ACCOUNT_ID || secrets.CLOUDFLARE_ACCOUNT_ID }}
CP_ADMIN_API_TOKEN: ${{ secrets.CP_ADMIN_API_TOKEN }}
CP_STAGING_ADMIN_API_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }}
MAX_DELETE_PCT: ${{ github.event.inputs.max_delete_pct || '90' }}
+10 -1
View File
@@ -19,7 +19,10 @@
#
# Env vars required:
# CF_API_TOKEN — Cloudflare token with zone:dns:edit
# (falls back to CLOUDFLARE_API_TOKEN if CF_API_TOKEN is unset;
# the workflow YAML maps both secret names into CF_API_TOKEN)
# CF_ZONE_ID — the zone (moleculesai.app)
# (falls back to CLOUDFLARE_ZONE_ID if CF_ZONE_ID is unset)
# CP_ADMIN_API_TOKEN — CP admin bearer for api.moleculesai.app
# CP_STAGING_ADMIN_API_TOKEN — CP admin bearer for staging-api.moleculesai.app
# AWS_* — standard AWS creds (default region us-east-2)
@@ -56,6 +59,12 @@ need() {
exit 1
fi
}
# Fallback: operator-host canonical names → CI-scoped names.
# The workflow YAML already maps both, but direct script invocation
# (e.g. local ops) may only have the canonical names set.
CF_API_TOKEN="${CF_API_TOKEN:-${CLOUDFLARE_API_TOKEN:-}}"
CF_ZONE_ID="${CF_ZONE_ID:-${CLOUDFLARE_ZONE_ID:-}}"
need CF_API_TOKEN
need CF_ZONE_ID
need CP_ADMIN_API_TOKEN
@@ -121,7 +130,7 @@ if not payload.get("success", False) or not isinstance(payload.get("result"), li
print(f"ERROR: Cloudflare DNS list failed: {detail}", file=sys.stderr)
raise SystemExit(1)
'; then
log "Cloudflare DNS list failed; verify CF_API_TOKEN has Zone:DNS:Edit and CF_ZONE_ID is the moleculesai.app zone."
log "Cloudflare DNS list failed; verify CF_API_TOKEN (or CLOUDFLARE_API_TOKEN) has Zone:DNS:Edit and CF_ZONE_ID (or CLOUDFLARE_ZONE_ID) is the moleculesai.app zone."
exit 1
fi
TOTAL_CF=$(echo "$CF_JSON" | python3 -c "import json,sys; print(len(json.load(sys.stdin)['result']))")
+9
View File
@@ -29,8 +29,11 @@
# account:cloudflare_tunnel:edit scope.
# (Same secret as sweep-cf-orphans, but the
# token must include the tunnel scope.)
# (falls back to CLOUDFLARE_API_TOKEN if CF_API_TOKEN is unset;
# the workflow YAML maps both secret names into CF_API_TOKEN)
# CF_ACCOUNT_ID — the account that owns the tunnels (visible
# in dash.cloudflare.com URL path)
# (falls back to CLOUDFLARE_ACCOUNT_ID if CF_ACCOUNT_ID is unset)
# CP_ADMIN_API_TOKEN — CP admin bearer for api.moleculesai.app
# CP_STAGING_ADMIN_API_TOKEN — CP admin bearer for staging-api.moleculesai.app
#
@@ -70,6 +73,12 @@ need() {
exit 1
fi
}
# Fallback: operator-host canonical names → CI-scoped names.
# The workflow YAML already maps both, but direct script invocation
# (e.g. local ops) may only have the canonical names set.
CF_API_TOKEN="${CF_API_TOKEN:-${CLOUDFLARE_API_TOKEN:-}}"
CF_ACCOUNT_ID="${CF_ACCOUNT_ID:-${CLOUDFLARE_ACCOUNT_ID:-}}"
need CF_API_TOKEN
need CF_ACCOUNT_ID
need CP_ADMIN_API_TOKEN
+127
View File
@@ -18,6 +18,7 @@ No network. No live Gitea calls.
from __future__ import annotations
import importlib.util
import json
import os
import textwrap
from pathlib import Path
@@ -117,6 +118,31 @@ def _write_audit_yaml(tmp_path: Path, required_checks: list[str]) -> Path:
return p
def _write_audit_yaml_json(tmp_path: Path, required_checks_json: dict) -> Path:
"""Write a synthetic audit-force-merge.yml with REQUIRED_CHECKS_JSON env."""
block = json.dumps(required_checks_json, indent=2)
text = textwrap.dedent(
f"""\
name: audit-force-merge
on:
schedule:
- cron: '*/30 * * * *'
jobs:
audit:
runs-on: ubuntu-latest
steps:
- name: Run audit
env:
REQUIRED_CHECKS_JSON: |
{block.replace(chr(10), chr(10) + ' ')}
run: bash .gitea/scripts/audit-force-merge.sh
"""
)
p = tmp_path / "audit-force-merge.yml"
p.write_text(text, encoding="utf-8")
return p
def _make_stub_api(responses: dict):
"""Build a fake `api()` callable.
@@ -363,6 +389,107 @@ def test_happy_path_no_drift(drift_module, tmp_path, monkeypatch):
assert findings == [], findings
# --------------------------------------------------------------------------
# REQUIRED_CHECKS_JSON variant drift tests
# --------------------------------------------------------------------------
def test_f3a_env_wider_than_protection_json_variant(drift_module, tmp_path, monkeypatch):
"""F3a: REQUIRED_CHECKS_JSON env has a context NOT in protection."""
ci = _write_ci_yaml(
tmp_path,
jobs={"build": {"runs-on": "ubuntu-latest"}},
sentinel_needs=["build"],
)
audit = _write_audit_yaml_json(
tmp_path,
{"main": ["ci / build (pull_request)", "ci / ghost (pull_request)"]},
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{"status_check_contexts": ["ci / build (pull_request)"]},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F3a" in f and "ghost" in f for f in findings), findings
def test_f3b_protection_wider_than_env_json_variant(drift_module, tmp_path, monkeypatch):
"""F3b: protection has a context NOT in REQUIRED_CHECKS_JSON env."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"test": {"runs-on": "ubuntu-latest"},
},
sentinel_needs=["build", "test"],
)
audit = _write_audit_yaml_json(
tmp_path,
{"main": ["ci / build (pull_request)"]},
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / test (pull_request)",
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F3b" in f and "ci / test (pull_request)" in f for f in findings), findings
def test_happy_path_no_drift_json_variant(drift_module, tmp_path, monkeypatch):
"""Happy path with REQUIRED_CHECKS_JSON: all aligned."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"test": {"runs-on": "ubuntu-latest"},
},
sentinel_needs=["build", "test"],
)
audit = _write_audit_yaml_json(
tmp_path,
{
"main": [
"ci / build (pull_request)",
"ci / test (pull_request)",
"ci / all-required (pull_request)",
]
},
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / test (pull_request)",
"ci / all-required (pull_request)",
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert findings == [], findings
# --------------------------------------------------------------------------
# MUST-FIX 1: find_open_issue must raise on transient HTTP errors
# --------------------------------------------------------------------------
@@ -0,0 +1,558 @@
//go:build integration
// +build integration
// scheduler_integration_test.go — REAL Postgres integration tests for the
// workspace-server cron scheduler firing loop. Regression coverage for
// molecule-core issue #2149 (filed under SOP rule internal#765).
//
// Run with:
//
// docker run --rm -d --name pg-integration \
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
// -p 55432:5432 postgres:15-alpine
// sleep 4
// # apply every migration up.sql / legacy .sql in lexicographic order
// for f in $(ls workspace-server/migrations/*.sql | grep -v '\.down\.sql$' | sort); do \
// psql "postgres://postgres:test@localhost:55432/molecule?sslmode=disable" -f "$f"; done
// cd workspace-server
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
// go test -tags=integration ./internal/scheduler/ -run '^TestIntegration_'
//
// CI: .gitea/workflows/handlers-postgres-integration.yml runs these on every
// PR/push that touches workspace-server/internal/scheduler/ (the
// `handlers-postgres` detect-changes profile was extended to include the
// scheduler package + this workflow file).
//
// Why these are NOT the existing sqlmock unit tests (scheduler_test.go)
// --------------------------------------------------------------------
// The strict-sqlmock unit tests pin which SQL statements fire — fast, no DB.
// But sqlmock CANNOT validate:
// - the activity_logs `$3::jsonb` cast (#2026 wedge) — sqlmock never parses
// the payload, so an invalid-UTF-8 jsonb body that wedges a real INSERT
// looks "green" under mock.ExpectExec(`INSERT INTO activity_logs`).
// - the ROW STATE after tick()/fireSchedule run: that last_run_at,
// next_run_at, run_count, last_status actually landed on the row.
// - sweepPhantomBusy's NOT IN (SELECT … activity_logs) subquery semantics
// against real rows — it has no unit test at all (#2149).
//
// A SQL regression here = a fleet-wide silent cron outage (#85 ran 12h before
// detection). These tests boot a real Postgres, insert real rows, run the
// production tick()/sweepPhantomBusy, and SELECT the rows back to assert the
// observable end state — the gap sqlmock structurally cannot cover.
//
// Watch-fail intent: each test is written to FAIL on a regression of the
// behavior under test (e.g. drop the activity_logs INSERT, drop the
// write-back UPDATE, drop the UTF-8 sanitize, or break the phantom-busy
// subquery) and to PASS against the current-correct scheduler.go.
package scheduler
import (
"context"
"database/sql"
"os"
"testing"
"time"
mdb "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
_ "github.com/lib/pq"
)
// ── test doubles ──────────────────────────────────────────────────────────
// recordingProxy is an A2AProxy that records each fire and returns a
// configurable response. Used to assert that tick()/fireSchedule actually
// reached the A2A boundary for the due schedule.
type recordingProxy struct {
status int
body []byte
err error
fires int
lastBody []byte
lastCaller string
lastLogFlag bool
lastWSID string
}
func (p *recordingProxy) ProxyA2ARequest(
_ context.Context, workspaceID string, body []byte, callerID string, logActivity bool,
) (int, []byte, error) {
p.fires++
p.lastWSID = workspaceID
p.lastBody = body
p.lastCaller = callerID
p.lastLogFlag = logActivity
if p.err != nil {
return 0, nil, p.err
}
return p.status, p.body, nil
}
// ── connection + fixture helpers ──────────────────────────────────────────
// integrationDB returns the configured integration-test connection or skips
// the test if INTEGRATION_DB_URL is unset. Hot-swaps the package-level
// mdb.DB so the production scheduler helpers (tick, fireSchedule,
// sweepPhantomBusy) operate on this connection; restores it via t.Cleanup.
//
// NOT SAFE FOR t.Parallel(): the package-global swap races across tests.
func integrationDB(t *testing.T) *sql.DB {
t.Helper()
url := os.Getenv("INTEGRATION_DB_URL")
if url == "" {
t.Skip("INTEGRATION_DB_URL not set; skipping (local devs: see file header)")
}
conn, err := sql.Open("postgres", url)
if err != nil {
t.Fatalf("open: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := conn.PingContext(ctx); err != nil {
t.Fatalf("ping: %v", err)
}
// Clean slate. activity_logs + workspace_schedules cascade off workspaces,
// but we DELETE explicitly (and in FK order) so a partial prior run can't
// leave orphan rows that perturb the next test's assertions.
cctx, ccancel := context.WithTimeout(context.Background(), 10*time.Second)
defer ccancel()
for _, q := range []string{
`DELETE FROM activity_logs`,
`DELETE FROM workspace_schedules`,
`DELETE FROM workspaces`,
} {
if _, err := conn.ExecContext(cctx, q); err != nil {
t.Fatalf("cleanup %q: %v", q, err)
}
}
prev := mdb.DB
mdb.DB = conn
t.Cleanup(func() {
mdb.DB = prev
conn.Close()
})
return conn
}
// insertWorkspace inserts a workspace row and returns its UUID. active is the
// initial active_tasks value; status defaults to 'online' (valid workspace_status enum).
func insertWorkspace(t *testing.T, conn *sql.DB, name string, active int) string {
t.Helper()
var id string
err := conn.QueryRowContext(context.Background(), `
INSERT INTO workspaces (name, status, active_tasks, max_concurrent_tasks)
VALUES ($1, 'online', $2, 1)
RETURNING id
`, name, active).Scan(&id)
if err != nil {
t.Fatalf("insertWorkspace(%s): %v", name, err)
}
return id
}
// insertSchedule inserts an enabled workspace_schedules row whose next_run_at
// is in the past (so tick() picks it up immediately) and returns its UUID.
func insertSchedule(t *testing.T, conn *sql.DB, wsID, name, cronExpr, prompt string) string {
t.Helper()
var id string
err := conn.QueryRowContext(context.Background(), `
INSERT INTO workspace_schedules
(workspace_id, name, cron_expr, timezone, prompt, enabled, next_run_at, source)
VALUES ($1, $2, $3, 'UTC', $4, true, now() - interval '1 minute', 'runtime')
RETURNING id
`, wsID, name, cronExpr, prompt).Scan(&id)
if err != nil {
t.Fatalf("insertSchedule(%s): %v", name, err)
}
return id
}
type scheduleState struct {
lastRunAt sql.NullTime
nextRunAt sql.NullTime
runCount int
lastStatus string
lastError string
}
func readScheduleState(t *testing.T, conn *sql.DB, id string) scheduleState {
t.Helper()
var st scheduleState
var status, errStr sql.NullString
err := conn.QueryRowContext(context.Background(), `
SELECT last_run_at, next_run_at, run_count, last_status, last_error
FROM workspace_schedules WHERE id = $1
`, id).Scan(&st.lastRunAt, &st.nextRunAt, &st.runCount, &status, &errStr)
if err != nil {
t.Fatalf("readScheduleState(%s): %v", id, err)
}
st.lastStatus = status.String
st.lastError = errStr.String
return st
}
// ── TestIntegration_TickFiresAndWritesBack (#2149 core) ───────────────────
//
// Insert one due schedule, run tick() once, and assert the full firing
// loop landed against a REAL Postgres:
// - the A2A proxy was invoked exactly once for the schedule's workspace
// - the post-fire UPDATE wrote last_run_at (was NULL), advanced next_run_at
// into the future, bumped run_count to 1, set last_status='ok'
// - a cron_run activity_logs row was inserted with VALID jsonb request_body
// (the `$3::jsonb` cast #2026 path) carrying the schedule metadata
//
// Regression watch-fail: if a refactor drops the write-back UPDATE, the
// activity_logs INSERT, or breaks the jsonb cast, this test fails where every
// sqlmock unit test stays green.
func TestIntegration_TickFiresAndWritesBack(t *testing.T) {
conn := integrationDB(t)
wsID := insertWorkspace(t, conn, "cron-fire-ws", 0)
schedID := insertSchedule(t, conn, wsID, "hourly-audit", "0 * * * *", "run the hourly audit")
proxy := &recordingProxy{
status: 200,
body: []byte(`{"jsonrpc":"2.0","result":{"kind":"message","parts":[{"kind":"text","text":"done"}]},"id":"1"}`),
}
s := New(proxy, nil)
s.tick(context.Background())
// 1. A2A boundary reached exactly once for the right workspace.
if proxy.fires != 1 {
t.Fatalf("proxy fires = %d, want 1 (tick must fire the one due schedule)", proxy.fires)
}
if proxy.lastWSID != wsID {
t.Errorf("proxy fired for workspace %q, want %q", proxy.lastWSID, wsID)
}
// Empty callerID = canvas-style (bypasses access control); logActivity=true.
if proxy.lastCaller != "" {
t.Errorf("callerID = %q, want empty (canvas-style scheduler fire)", proxy.lastCaller)
}
if !proxy.lastLogFlag {
t.Error("logActivity flag = false, want true")
}
// 2. Row write-back.
st := readScheduleState(t, conn, schedID)
if !st.lastRunAt.Valid {
t.Error("last_run_at is NULL after fire, want set (write-back UPDATE did not land)")
}
if !st.nextRunAt.Valid {
t.Fatal("next_run_at is NULL after fire, want a future timestamp")
}
if !st.nextRunAt.Time.After(time.Now()) {
t.Errorf("next_run_at = %v, want a time in the future (schedule would tight-loop otherwise)", st.nextRunAt.Time)
}
if st.runCount != 1 {
t.Errorf("run_count = %d, want 1", st.runCount)
}
if st.lastStatus != "ok" {
t.Errorf("last_status = %q, want \"ok\"", st.lastStatus)
}
// 3. activity_logs cron_run row with valid jsonb request_body.
var actCount int
var summary, status string
var reqBody []byte
err := conn.QueryRowContext(context.Background(), `
SELECT count(*) OVER (), summary, status, request_body
FROM activity_logs
WHERE workspace_id = $1 AND activity_type = 'cron_run'
LIMIT 1
`, wsID).Scan(&actCount, &summary, &status, &reqBody)
if err == sql.ErrNoRows {
t.Fatal("no cron_run activity_logs row inserted after fire (#152/#2026 path missing)")
}
if err != nil {
t.Fatalf("read activity_logs: %v", err)
}
if actCount != 1 {
t.Errorf("cron_run activity_logs rows = %d, want 1", actCount)
}
if status != "ok" {
t.Errorf("activity_logs.status = %q, want \"ok\"", status)
}
// request_body must be valid jsonb carrying the schedule_id — proves the
// `$3::jsonb` cast accepted the payload (the #2026 wedge surface).
var sid string
if err := conn.QueryRowContext(context.Background(), `
SELECT request_body->>'schedule_id'
FROM activity_logs WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1
`, wsID).Scan(&sid); err != nil {
t.Fatalf("request_body is not queryable jsonb: %v", err)
}
if sid != schedID {
t.Errorf("activity_logs request_body->>'schedule_id' = %q, want %q", sid, schedID)
}
}
// ── TestIntegration_InvalidUTF8PromptSanitizedIntoJsonb (#2026 / #2149) ────
//
// The agent-editable prompt can carry raw invalid-UTF-8 bytes. Postgres jsonb
// columns REJECT invalid UTF-8, which (pre-#2026) wedged the activity_logs
// INSERT and held the transaction open — stalling the whole scheduler.
// fireSchedule now sanitizeUTF8()s every string before the `$3::jsonb` insert.
//
// Postgres TEXT columns (workspace_schedules.prompt) also reject invalid UTF-8
// in a UTF-8 database, so we cannot INSERT the bad bytes through the fixture.
// Instead we insert a valid prompt, then call fireSchedule directly with a
// scheduleRow whose Prompt field contains the invalid bytes — this simulates
// the real regression path (e.g. truncation splitting a multi-byte rune, or
// an agent-edited template arriving via a path that bypasses DB validation).
//
// Assertions:
// - the fire still completed (write-back UPDATE landed)
// - the cron_run activity_logs row was inserted (the jsonb cast accepted
// the SANITIZED payload — the INSERT did not wedge)
// - the stored request_body is queryable jsonb (valid UTF-8 on disk)
//
// Watch-fail: remove the sanitizeUTF8() wrapping around the jsonb payload and
// this test fails on a real Postgres (INSERT errors / row absent), while the
// sqlmock unit test that only checks "an INSERT fired" stays green.
func TestIntegration_InvalidUTF8PromptSanitizedIntoJsonb(t *testing.T) {
conn := integrationDB(t)
wsID := insertWorkspace(t, conn, "utf8-ws", 0)
// Insert with valid UTF-8 — Postgres TEXT rejects 0x80/0xff.
schedID := insertSchedule(t, conn, wsID, "utf8-job", "0 * * * *", "valid prompt")
// Prompt with invalid UTF-8: orphan continuation byte + bare 0xff.
badPrompt := "audit \x80 report \xff end"
row := scheduleRow{
ID: schedID,
WorkspaceID: wsID,
Name: "utf8-job",
CronExpr: "0 * * * *",
Timezone: "UTC",
Prompt: badPrompt,
}
proxy := &recordingProxy{
status: 200,
body: []byte(`{"result":{"kind":"message","parts":[{"kind":"text","text":"ok"}]}}`),
}
s := New(proxy, nil)
s.fireSchedule(context.Background(), row)
if proxy.fires != 1 {
t.Fatalf("proxy fires = %d, want 1", proxy.fires)
}
// Write-back must have landed despite the bad prompt bytes.
st := readScheduleState(t, conn, schedID)
if st.runCount != 1 || st.lastStatus != "ok" {
t.Errorf("post-fire state run_count=%d last_status=%q, want 1/\"ok\" "+
"(invalid-UTF-8 prompt must not block the fire)", st.runCount, st.lastStatus)
}
// The cron_run activity_logs row MUST exist — proving the `$3::jsonb`
// INSERT accepted the sanitized payload (did not wedge on invalid UTF-8).
var n int
if err := conn.QueryRowContext(context.Background(), `
SELECT count(*) FROM activity_logs
WHERE workspace_id = $1 AND activity_type = 'cron_run'
`, wsID).Scan(&n); err != nil {
t.Fatalf("count cron_run rows: %v", err)
}
if n != 1 {
t.Fatalf("cron_run activity_logs rows = %d, want 1 — the jsonb INSERT wedged "+
"on invalid UTF-8 (the #2026 regression)", n)
}
// The stored prompt inside request_body must be queryable + valid UTF-8.
var storedPrompt string
if err := conn.QueryRowContext(context.Background(), `
SELECT request_body->>'prompt'
FROM activity_logs WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1
`, wsID).Scan(&storedPrompt); err != nil {
t.Fatalf("request_body->>'prompt' not queryable jsonb: %v", err)
}
if storedPrompt == "" {
t.Error("stored prompt is empty, want the sanitized prompt text")
}
// Round-trip through Postgres jsonb guarantees valid UTF-8; assert the
// replacement character replaced the bad bytes rather than them surviving.
for i := 0; i < len(storedPrompt); i++ {
if storedPrompt[i] == 0x80 || storedPrompt[i] == 0xff {
t.Fatalf("stored prompt still contains raw invalid byte 0x%x at %d", storedPrompt[i], i)
}
}
}
// ── TestIntegration_TickErrorStatusWriteBack (#2149) ──────────────────────
//
// When the A2A proxy returns a transport error, fireSchedule must still write
// back: last_status='error', last_error populated, next_run_at advanced (so
// the schedule does not get stuck re-firing), run_count bumped. Verifies the
// error path persists to a real row, not just that "an UPDATE fired".
func TestIntegration_TickErrorStatusWriteBack(t *testing.T) {
conn := integrationDB(t)
wsID := insertWorkspace(t, conn, "err-ws", 0)
schedID := insertSchedule(t, conn, wsID, "err-job", "0 * * * *", "do work")
proxy := &recordingProxy{err: context.DeadlineExceeded}
s := New(proxy, nil)
s.tick(context.Background())
st := readScheduleState(t, conn, schedID)
if st.lastStatus != "error" {
t.Errorf("last_status = %q, want \"error\"", st.lastStatus)
}
if st.lastError == "" {
t.Error("last_error is empty, want the proxy error text persisted (#152)")
}
if st.runCount != 1 {
t.Errorf("run_count = %d, want 1 (run still counted on error)", st.runCount)
}
if !st.nextRunAt.Valid || !st.nextRunAt.Time.After(time.Now()) {
t.Errorf("next_run_at not advanced to future on error path (= %v) — schedule would tight-loop", st.nextRunAt)
}
// The error activity_logs row must carry status='error' + error_detail.
var status, errDetail string
if err := conn.QueryRowContext(context.Background(), `
SELECT status, COALESCE(error_detail,'') FROM activity_logs
WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1
`, wsID).Scan(&status, &errDetail); err != nil {
t.Fatalf("read error activity_logs: %v", err)
}
if status != "error" {
t.Errorf("activity_logs.status = %q, want \"error\"", status)
}
if errDetail == "" {
t.Error("activity_logs.error_detail empty on error fire, want the error message (#152)")
}
}
// ── TestIntegration_SweepPhantomBusy (#2149 — no prior test) ──────────────
//
// sweepPhantomBusy resets active_tasks=0 for workspaces stuck busy with NO
// activity_logs row in the last phantomStaleThreshold window, and must LEAVE
// ALONE workspaces that have recent activity. The NOT IN (SELECT DISTINCT
// workspace_id FROM activity_logs WHERE created_at > now() - interval) subquery
// is exactly the kind of set-semantics that sqlmock cannot validate — there is
// no unit test for this method at all (#2149).
//
// Fixture:
// - phantomWS: active_tasks=3, NO recent activity_log → must reset to 0
// - recentWS: active_tasks=2, activity_log 1 min ago → must stay at 2
// - staleWS: active_tasks=1, activity_log 30 min ago → must reset to 0
// - removedWS: active_tasks=4, status='removed', no activity → must stay (status guard)
// - idleWS: active_tasks=0 → untouched (not >0)
//
// Watch-fail: break the subquery (e.g. drop the status!='removed' guard, or
// invert the NOT IN), and the asserted end-state diverges on a real Postgres.
func TestIntegration_SweepPhantomBusy(t *testing.T) {
conn := integrationDB(t)
phantomWS := insertWorkspace(t, conn, "phantom-ws", 3)
recentWS := insertWorkspace(t, conn, "recent-ws", 2)
staleWS := insertWorkspace(t, conn, "stale-ws", 1)
idleWS := insertWorkspace(t, conn, "idle-ws", 0)
// removedWS: busy but status='removed' — the sweep must skip it.
var removedWS string
if err := conn.QueryRowContext(context.Background(), `
INSERT INTO workspaces (name, status, active_tasks, max_concurrent_tasks)
VALUES ('removed-ws', 'removed', 4, 1) RETURNING id
`).Scan(&removedWS); err != nil {
t.Fatalf("insert removedWS: %v", err)
}
// recentWS has a fresh activity_log (1 min ago → inside the 10-min window).
if _, err := conn.ExecContext(context.Background(), `
INSERT INTO activity_logs (workspace_id, activity_type, status, created_at)
VALUES ($1, 'a2a_receive', 'ok', now() - interval '1 minute')
`, recentWS); err != nil {
t.Fatalf("insert recent activity_log: %v", err)
}
// staleWS has only an OLD activity_log (30 min ago → outside the window).
if _, err := conn.ExecContext(context.Background(), `
INSERT INTO activity_logs (workspace_id, activity_type, status, created_at)
VALUES ($1, 'a2a_receive', 'ok', now() - interval '30 minutes')
`, staleWS); err != nil {
t.Fatalf("insert stale activity_log: %v", err)
}
s := New(nil, nil)
s.sweepPhantomBusy(context.Background())
active := func(id string) int {
var n int
if err := conn.QueryRowContext(context.Background(),
`SELECT active_tasks FROM workspaces WHERE id = $1`, id).Scan(&n); err != nil {
t.Fatalf("read active_tasks(%s): %v", id, err)
}
return n
}
if got := active(phantomWS); got != 0 {
t.Errorf("phantomWS active_tasks = %d, want 0 (busy + no recent activity → must be swept)", got)
}
if got := active(staleWS); got != 0 {
t.Errorf("staleWS active_tasks = %d, want 0 (only stale activity → must be swept)", got)
}
if got := active(recentWS); got != 2 {
t.Errorf("recentWS active_tasks = %d, want 2 (recent activity → must NOT be swept)", got)
}
if got := active(removedWS); got != 4 {
t.Errorf("removedWS active_tasks = %d, want 4 (status='removed' → sweep must skip it)", got)
}
if got := active(idleWS); got != 0 {
t.Errorf("idleWS active_tasks = %d, want 0 (was never busy)", got)
}
// The swept rows must also have current_task cleared.
var ct string
if err := conn.QueryRowContext(context.Background(),
`SELECT COALESCE(current_task,'') FROM workspaces WHERE id = $1`, phantomWS).Scan(&ct); err != nil {
t.Fatalf("read current_task: %v", err)
}
if ct != "" {
t.Errorf("phantomWS current_task = %q, want empty after sweep", ct)
}
}
// ── TestIntegration_NativeSchedulerSkipAdvancesNextRunAt (#2149) ──────────
//
// When a workspace's adapter owns scheduling natively, tick() must SKIP the
// fire but still advance next_run_at (so the row doesn't tight-loop on every
// poll) — observability (next_run_at) is preserved while the fire is dropped.
// Asserts the native-skip UPDATE landed on a real row and the proxy was NOT
// invoked. This is the native-skip UPDATE path #2149 calls out — sqlmock can
// only assert an UPDATE fired, not that next_run_at moved forward.
func TestIntegration_NativeSchedulerSkipAdvancesNextRunAt(t *testing.T) {
conn := integrationDB(t)
wsID := insertWorkspace(t, conn, "native-ws", 0)
schedID := insertSchedule(t, conn, wsID, "native-job", "0 * * * *", "native run")
// Capture the pre-tick next_run_at (it is in the past by construction).
before := readScheduleState(t, conn, schedID)
if !before.nextRunAt.Valid || before.nextRunAt.Time.After(time.Now()) {
t.Fatalf("precondition: next_run_at should start in the past, got %v", before.nextRunAt)
}
proxy := &recordingProxy{status: 200, body: []byte(`{}`)}
s := New(proxy, nil)
// Every workspace reports native scheduling → fire must be skipped.
s.SetNativeSchedulerCheck(func(string) bool { return true })
s.tick(context.Background())
if proxy.fires != 0 {
t.Errorf("proxy fires = %d, want 0 (native-scheduler workspace must NOT fire)", proxy.fires)
}
after := readScheduleState(t, conn, schedID)
if !after.nextRunAt.Valid || !after.nextRunAt.Time.After(time.Now()) {
t.Errorf("next_run_at = %v, want advanced into the future (native-skip UPDATE must still run)", after.nextRunAt)
}
// Skip path does NOT bump run_count or write last_run_at (no fire happened).
if after.runCount != 0 {
t.Errorf("run_count = %d, want 0 (skip must not count as a run)", after.runCount)
}
if after.lastRunAt.Valid {
t.Error("last_run_at set on native-skip, want NULL (no fire occurred)")
}
}