Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 20249dc735 | |||
| 402b899ed8 | |||
| 59a3366b5c | |||
| ca45964049 | |||
| 132409abf8 | |||
| 3d9a0b2b84 | |||
| 53914b5d5d | |||
| dec102d5f9 |
@@ -8,7 +8,8 @@ pair diverges.
|
||||
Sources:
|
||||
A. `.gitea/workflows/ci.yml` jobs (CI source — the actual job set)
|
||||
B. `status_check_contexts` in branch_protections (the merge gate)
|
||||
C. `REQUIRED_CHECKS` env in audit-force-merge.yml (the audit env)
|
||||
C. `REQUIRED_CHECKS_JSON` (preferred) or `REQUIRED_CHECKS` (legacy)
|
||||
env in audit-force-merge.yml (the audit env)
|
||||
|
||||
Three failure classes:
|
||||
F1 Job in (A) is not under the sentinel's `needs:` — sentinel
|
||||
@@ -250,13 +251,21 @@ def sentinel_needs(ci_doc: dict) -> set[str]:
|
||||
return set(needs)
|
||||
|
||||
|
||||
def required_checks_env(audit_doc: dict) -> set[str]:
|
||||
"""Pull the REQUIRED_CHECKS env value from audit-force-merge.yml.
|
||||
def required_checks_env(audit_doc: dict, branch: str) -> set[str]:
|
||||
"""Pull the required-checks env value from audit-force-merge.yml.
|
||||
|
||||
Walks the YAML AST per `feedback_behavior_based_ast_gates`: we do
|
||||
NOT grep for `REQUIRED_CHECKS:` — that breaks under reformatting,
|
||||
NOT grep for env keys — that breaks under reformatting,
|
||||
multi-job workflows, or a future move of the env to a different
|
||||
step. Instead, look inside every job's every step's `env:` map."""
|
||||
found: list[str] = []
|
||||
step. Instead, look inside every job's every step's `env:` map.
|
||||
|
||||
Supports two variants:
|
||||
- REQUIRED_CHECKS_JSON (preferred): JSON dict keyed by branch name.
|
||||
We extract the array for the target branch.
|
||||
- REQUIRED_CHECKS (legacy): newline-separated list of context names.
|
||||
"""
|
||||
found_json: list[str] = []
|
||||
found_legacy: list[str] = []
|
||||
jobs = audit_doc.get("jobs", {})
|
||||
if not isinstance(jobs, dict):
|
||||
sys.stderr.write(f"::warning::{AUDIT_WORKFLOW_PATH} has no jobs: mapping\n")
|
||||
@@ -268,27 +277,67 @@ def required_checks_env(audit_doc: dict) -> set[str]:
|
||||
if not isinstance(step, dict):
|
||||
continue
|
||||
step_env = step.get("env") or {}
|
||||
if isinstance(step_env, dict) and "REQUIRED_CHECKS" in step_env:
|
||||
v = step_env["REQUIRED_CHECKS"]
|
||||
if isinstance(v, str):
|
||||
found.append(v)
|
||||
if not found:
|
||||
sys.stderr.write(
|
||||
f"::error::REQUIRED_CHECKS env not found in any step of "
|
||||
f"{AUDIT_WORKFLOW_PATH}\n"
|
||||
)
|
||||
sys.exit(3)
|
||||
if len(found) > 1:
|
||||
# Defensive: refuse to guess which one is canonical.
|
||||
sys.stderr.write(
|
||||
f"::error::REQUIRED_CHECKS env present in {len(found)} steps; ambiguous\n"
|
||||
)
|
||||
sys.exit(3)
|
||||
raw = found[0]
|
||||
# YAML block-scalars (`|`) leave a trailing newline + blanks; trim
|
||||
# consistently with audit-force-merge.sh's parser so both sides
|
||||
# produce identical sets.
|
||||
return {line.strip() for line in raw.splitlines() if line.strip()}
|
||||
if isinstance(step_env, dict):
|
||||
if "REQUIRED_CHECKS_JSON" in step_env:
|
||||
v = step_env["REQUIRED_CHECKS_JSON"]
|
||||
if isinstance(v, str):
|
||||
found_json.append(v)
|
||||
if "REQUIRED_CHECKS" in step_env:
|
||||
v = step_env["REQUIRED_CHECKS"]
|
||||
if isinstance(v, str):
|
||||
found_legacy.append(v)
|
||||
|
||||
# JSON variant takes precedence.
|
||||
if found_json:
|
||||
if len(found_json) > 1:
|
||||
sys.stderr.write(
|
||||
f"::error::REQUIRED_CHECKS_JSON env present in {len(found_json)} steps; ambiguous\n"
|
||||
)
|
||||
sys.exit(3)
|
||||
try:
|
||||
parsed = json.loads(found_json[0])
|
||||
except json.JSONDecodeError as e:
|
||||
sys.stderr.write(
|
||||
f"::error::REQUIRED_CHECKS_JSON is not valid JSON: {e}\n"
|
||||
)
|
||||
sys.exit(3)
|
||||
if not isinstance(parsed, dict):
|
||||
sys.stderr.write(
|
||||
f"::error::REQUIRED_CHECKS_JSON parsed to {type(parsed).__name__}, expected dict\n"
|
||||
)
|
||||
sys.exit(3)
|
||||
branch_checks = parsed.get(branch)
|
||||
if branch_checks is None:
|
||||
sys.stderr.write(
|
||||
f"::error::REQUIRED_CHECKS_JSON has no entry for branch '{branch}'\n"
|
||||
)
|
||||
sys.exit(3)
|
||||
if not isinstance(branch_checks, list):
|
||||
sys.stderr.write(
|
||||
f"::error::REQUIRED_CHECKS_JSON['{branch}'] is {type(branch_checks).__name__}, expected list\n"
|
||||
)
|
||||
sys.exit(3)
|
||||
return {str(item).strip() for item in branch_checks if str(item).strip()}
|
||||
|
||||
# Legacy variant fallback.
|
||||
if found_legacy:
|
||||
if len(found_legacy) > 1:
|
||||
# Defensive: refuse to guess which one is canonical.
|
||||
sys.stderr.write(
|
||||
f"::error::REQUIRED_CHECKS env present in {len(found_legacy)} steps; ambiguous\n"
|
||||
)
|
||||
sys.exit(3)
|
||||
raw = found_legacy[0]
|
||||
# YAML block-scalars (`|`) leave a trailing newline + blanks; trim
|
||||
# consistently with audit-force-merge.sh's parser so both sides
|
||||
# produce identical sets.
|
||||
return {line.strip() for line in raw.splitlines() if line.strip()}
|
||||
|
||||
sys.stderr.write(
|
||||
f"::error::Neither REQUIRED_CHECKS_JSON nor REQUIRED_CHECKS env found in any step of "
|
||||
f"{AUDIT_WORKFLOW_PATH}\n"
|
||||
)
|
||||
sys.exit(3)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
@@ -330,7 +379,7 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
|
||||
jobs = ci_job_names(ci_doc)
|
||||
jobs_all = ci_jobs_all(ci_doc)
|
||||
needs = sentinel_needs(ci_doc)
|
||||
env_set = required_checks_env(audit_doc)
|
||||
env_set = required_checks_env(audit_doc, branch)
|
||||
|
||||
# Protection
|
||||
# api() raises ApiError on non-2xx. Transient 5xx should fail loud.
|
||||
@@ -524,7 +573,7 @@ def render_body(branch: str, findings: list[str], debug: dict) -> str:
|
||||
"- **F2**: rename the protection context to match an emitter, "
|
||||
"or remove it from `status_check_contexts` "
|
||||
"(PATCH `/api/v1/repos/{owner}/{repo}/branch_protections/{branch}`).",
|
||||
"- **F3a / F3b**: bring `REQUIRED_CHECKS` env in "
|
||||
"- **F3a / F3b**: bring `REQUIRED_CHECKS_JSON` (or `REQUIRED_CHECKS` legacy) env in "
|
||||
"`.gitea/workflows/audit-force-merge.yml` into set-equality with "
|
||||
"`status_check_contexts` (single PR, both files).",
|
||||
"",
|
||||
|
||||
@@ -26,6 +26,10 @@ PROFILES: dict[str, dict[str, str]] = {
|
||||
"handlers": (
|
||||
r"^workspace-server/internal/handlers/"
|
||||
r"|^workspace-server/internal/wsauth/"
|
||||
# #2149: the scheduler real-PG integration tests run in this same
|
||||
# workflow (they reuse its migrated Postgres), so changes to the
|
||||
# scheduler package must trigger the job too.
|
||||
r"|^workspace-server/internal/scheduler/"
|
||||
r"|^workspace-server/migrations/"
|
||||
r"|^\.gitea/workflows/handlers-postgres-integration\.yml$"
|
||||
),
|
||||
@@ -174,3 +178,4 @@ def main(argv: list[str]) -> int:
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main(sys.argv[1:]))
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import importlib.util
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
@@ -36,6 +37,76 @@ def _make_audit_doc(required_checks: list[str]) -> dict:
|
||||
}
|
||||
|
||||
|
||||
def _make_audit_doc_json(required_checks_json: dict) -> dict:
|
||||
return {
|
||||
"jobs": {
|
||||
"audit": {
|
||||
"steps": [
|
||||
{"env": {"REQUIRED_CHECKS_JSON": json.dumps(required_checks_json)}}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# required_checks_env — dual-variant parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_required_checks_env_prefers_json_over_legacy():
|
||||
doc = {
|
||||
"jobs": {
|
||||
"audit": {
|
||||
"steps": [
|
||||
{
|
||||
"env": {
|
||||
"REQUIRED_CHECKS_JSON": json.dumps(
|
||||
{"main": ["ctx-a"], "staging": ["ctx-b"]}
|
||||
),
|
||||
"REQUIRED_CHECKS": "ctx-legacy\nctx-old",
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
assert drift.required_checks_env(doc, "main") == {"ctx-a"}
|
||||
assert drift.required_checks_env(doc, "staging") == {"ctx-b"}
|
||||
|
||||
|
||||
def test_required_checks_env_falls_back_to_legacy():
|
||||
doc = _make_audit_doc(["legacy-ctx"])
|
||||
assert drift.required_checks_env(doc, "main") == {"legacy-ctx"}
|
||||
|
||||
|
||||
def test_required_checks_env_json_missing_branch_fails():
|
||||
doc = _make_audit_doc_json({"staging": ["ctx-b"]})
|
||||
try:
|
||||
drift.required_checks_env(doc, "main")
|
||||
except SystemExit as exc:
|
||||
assert exc.code == 3
|
||||
else:
|
||||
raise AssertionError("expected SystemExit(3)")
|
||||
|
||||
|
||||
def test_required_checks_env_json_malformed_fails():
|
||||
doc = {
|
||||
"jobs": {
|
||||
"audit": {
|
||||
"steps": [
|
||||
{"env": {"REQUIRED_CHECKS_JSON": "not-json"}}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
try:
|
||||
drift.required_checks_env(doc, "main")
|
||||
except SystemExit as exc:
|
||||
assert exc.code == 3
|
||||
else:
|
||||
raise AssertionError("expected SystemExit(3)")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# sentinel_needs
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -243,7 +243,8 @@ jobs:
|
||||
# MUST exist for the integration tests to be meaningful. Hard-
|
||||
# fail if any didn't land — that would be a real regression we
|
||||
# want loud.
|
||||
for tbl in delegations workspaces activity_logs pending_uploads; do
|
||||
# workspace_schedules added for the #2149 scheduler integration tests.
|
||||
for tbl in delegations workspaces activity_logs pending_uploads workspace_schedules; do
|
||||
if ! psql -h "${PG_HOST}" -U postgres -d molecule -tA \
|
||||
-c "SELECT 1 FROM information_schema.tables WHERE table_name = '$tbl'" \
|
||||
| grep -q 1; then
|
||||
@@ -274,6 +275,16 @@ jobs:
|
||||
# workflow runs don't fight over a host-net 5432 port.
|
||||
go test -tags=integration -timeout 5m -v ./internal/handlers/ -run "^TestIntegration_"
|
||||
|
||||
- if: needs.detect-changes.outputs.handlers == 'true'
|
||||
name: Run scheduler integration tests (#2149)
|
||||
run: |
|
||||
# #2149: real-PG regression coverage for the scheduler firing loop
|
||||
# (tick → A2A fire → write-back of last_run_at/next_run_at/run_count/
|
||||
# activity_logs jsonb incl. invalid-UTF-8 sanitization + sweepPhantomBusy).
|
||||
# Reuses the same migrated Postgres (workspace_schedules / activity_logs
|
||||
# / workspaces all landed by the migration replay step above).
|
||||
go test -tags=integration -timeout 5m -v ./internal/scheduler/ -run "^TestIntegration_"
|
||||
|
||||
- if: failure() && needs.detect-changes.outputs.handlers == 'true'
|
||||
name: Diagnostic dump on failure
|
||||
env:
|
||||
|
||||
@@ -34,8 +34,10 @@ name: Sweep stale Cloudflare DNS records
|
||||
# scripts/ops/test_sweep_cf_decide.py (#2027) cover the rule
|
||||
# classifier.
|
||||
#
|
||||
# Secrets: CF_API_TOKEN, CF_ZONE_ID, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
|
||||
# are confirmed existing per issue #425 §425 audit. CP_ADMIN_API_TOKEN and
|
||||
# Secrets: CF_API_TOKEN (preferred CI-scoped name) or CLOUDFLARE_API_TOKEN
|
||||
# (operator-host canonical name) are accepted — the workflow falls back
|
||||
# automatically. Same for CF_ZONE_ID / CLOUDFLARE_ZONE_ID. Confirmed
|
||||
# existing per issue #425 §425 audit. CP_ADMIN_API_TOKEN and
|
||||
# CP_STAGING_ADMIN_API_TOKEN are unconfirmed — if missing, the verify step
|
||||
# (schedule → hard-fail, dispatch → soft-skip) surfaces it clearly.
|
||||
|
||||
@@ -79,8 +81,8 @@ jobs:
|
||||
# each individually capped at 10s by the script's curl -m flag.
|
||||
timeout-minutes: 3
|
||||
env:
|
||||
CF_API_TOKEN: ${{ secrets.CF_API_TOKEN }}
|
||||
CF_ZONE_ID: ${{ secrets.CF_ZONE_ID }}
|
||||
CF_API_TOKEN: ${{ secrets.CF_API_TOKEN || secrets.CLOUDFLARE_API_TOKEN }}
|
||||
CF_ZONE_ID: ${{ secrets.CF_ZONE_ID || secrets.CLOUDFLARE_ZONE_ID }}
|
||||
CP_ADMIN_API_TOKEN: ${{ secrets.CP_ADMIN_API_TOKEN }}
|
||||
CP_STAGING_ADMIN_API_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }}
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
@@ -129,6 +131,7 @@ jobs:
|
||||
fi
|
||||
echo "::error::sweep cannot run — required secrets missing: ${missing[*]}"
|
||||
echo "::error::set them at Settings → Secrets and Variables → Actions, or disable this workflow."
|
||||
echo "::error::Cloudflare secrets accept either the CI-scoped name (CF_API_TOKEN / CF_ZONE_ID) or the operator-host canonical name (CLOUDFLARE_API_TOKEN / CLOUDFLARE_ZONE_ID)."
|
||||
echo "::error::a silent skip masked an active CF DNS leak (152/200 zone records) caught only by a manual audit on 2026-04-28; this gate exists to make the gap visible."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -29,10 +29,12 @@ name: Sweep stale Cloudflare Tunnels
|
||||
# the DNS sweep's 50% because tenant-shaped tunnels are mostly
|
||||
# orphans by design) refuses to nuke past the threshold.
|
||||
#
|
||||
# Secrets: CF_API_TOKEN, CF_ACCOUNT_ID are confirmed existing per
|
||||
# issue #425 §425 audit. CP_ADMIN_API_TOKEN and CP_STAGING_ADMIN_API_TOKEN
|
||||
# are unconfirmed — if missing, the verify step (schedule → hard-fail,
|
||||
# dispatch → soft-skip) surfaces it clearly.
|
||||
# Secrets: CF_API_TOKEN (preferred CI-scoped name) or CLOUDFLARE_API_TOKEN
|
||||
# (operator-host canonical name) are accepted — the workflow falls back
|
||||
# automatically. Same for CF_ACCOUNT_ID / CLOUDFLARE_ACCOUNT_ID. Confirmed
|
||||
# existing per issue #425 §425 audit. CP_ADMIN_API_TOKEN and
|
||||
# CP_STAGING_ADMIN_API_TOKEN are unconfirmed — if missing, the verify step
|
||||
# (schedule → hard-fail, dispatch → soft-skip) surfaces it clearly.
|
||||
|
||||
on:
|
||||
schedule:
|
||||
@@ -74,8 +76,8 @@ jobs:
|
||||
# the sweep-cf-orphans companion job).
|
||||
timeout-minutes: 30
|
||||
env:
|
||||
CF_API_TOKEN: ${{ secrets.CF_API_TOKEN }}
|
||||
CF_ACCOUNT_ID: ${{ secrets.CF_ACCOUNT_ID }}
|
||||
CF_API_TOKEN: ${{ secrets.CF_API_TOKEN || secrets.CLOUDFLARE_API_TOKEN }}
|
||||
CF_ACCOUNT_ID: ${{ secrets.CF_ACCOUNT_ID || secrets.CLOUDFLARE_ACCOUNT_ID }}
|
||||
CP_ADMIN_API_TOKEN: ${{ secrets.CP_ADMIN_API_TOKEN }}
|
||||
CP_STAGING_ADMIN_API_TOKEN: ${{ secrets.CP_STAGING_ADMIN_API_TOKEN }}
|
||||
MAX_DELETE_PCT: ${{ github.event.inputs.max_delete_pct || '90' }}
|
||||
|
||||
@@ -19,7 +19,10 @@
|
||||
#
|
||||
# Env vars required:
|
||||
# CF_API_TOKEN — Cloudflare token with zone:dns:edit
|
||||
# (falls back to CLOUDFLARE_API_TOKEN if CF_API_TOKEN is unset;
|
||||
# the workflow YAML maps both secret names into CF_API_TOKEN)
|
||||
# CF_ZONE_ID — the zone (moleculesai.app)
|
||||
# (falls back to CLOUDFLARE_ZONE_ID if CF_ZONE_ID is unset)
|
||||
# CP_ADMIN_API_TOKEN — CP admin bearer for api.moleculesai.app
|
||||
# CP_STAGING_ADMIN_API_TOKEN — CP admin bearer for staging-api.moleculesai.app
|
||||
# AWS_* — standard AWS creds (default region us-east-2)
|
||||
@@ -56,6 +59,12 @@ need() {
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
# Fallback: operator-host canonical names → CI-scoped names.
|
||||
# The workflow YAML already maps both, but direct script invocation
|
||||
# (e.g. local ops) may only have the canonical names set.
|
||||
CF_API_TOKEN="${CF_API_TOKEN:-${CLOUDFLARE_API_TOKEN:-}}"
|
||||
CF_ZONE_ID="${CF_ZONE_ID:-${CLOUDFLARE_ZONE_ID:-}}"
|
||||
|
||||
need CF_API_TOKEN
|
||||
need CF_ZONE_ID
|
||||
need CP_ADMIN_API_TOKEN
|
||||
@@ -121,7 +130,7 @@ if not payload.get("success", False) or not isinstance(payload.get("result"), li
|
||||
print(f"ERROR: Cloudflare DNS list failed: {detail}", file=sys.stderr)
|
||||
raise SystemExit(1)
|
||||
'; then
|
||||
log "Cloudflare DNS list failed; verify CF_API_TOKEN has Zone:DNS:Edit and CF_ZONE_ID is the moleculesai.app zone."
|
||||
log "Cloudflare DNS list failed; verify CF_API_TOKEN (or CLOUDFLARE_API_TOKEN) has Zone:DNS:Edit and CF_ZONE_ID (or CLOUDFLARE_ZONE_ID) is the moleculesai.app zone."
|
||||
exit 1
|
||||
fi
|
||||
TOTAL_CF=$(echo "$CF_JSON" | python3 -c "import json,sys; print(len(json.load(sys.stdin)['result']))")
|
||||
|
||||
@@ -29,8 +29,11 @@
|
||||
# account:cloudflare_tunnel:edit scope.
|
||||
# (Same secret as sweep-cf-orphans, but the
|
||||
# token must include the tunnel scope.)
|
||||
# (falls back to CLOUDFLARE_API_TOKEN if CF_API_TOKEN is unset;
|
||||
# the workflow YAML maps both secret names into CF_API_TOKEN)
|
||||
# CF_ACCOUNT_ID — the account that owns the tunnels (visible
|
||||
# in dash.cloudflare.com URL path)
|
||||
# (falls back to CLOUDFLARE_ACCOUNT_ID if CF_ACCOUNT_ID is unset)
|
||||
# CP_ADMIN_API_TOKEN — CP admin bearer for api.moleculesai.app
|
||||
# CP_STAGING_ADMIN_API_TOKEN — CP admin bearer for staging-api.moleculesai.app
|
||||
#
|
||||
@@ -70,6 +73,12 @@ need() {
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
# Fallback: operator-host canonical names → CI-scoped names.
|
||||
# The workflow YAML already maps both, but direct script invocation
|
||||
# (e.g. local ops) may only have the canonical names set.
|
||||
CF_API_TOKEN="${CF_API_TOKEN:-${CLOUDFLARE_API_TOKEN:-}}"
|
||||
CF_ACCOUNT_ID="${CF_ACCOUNT_ID:-${CLOUDFLARE_ACCOUNT_ID:-}}"
|
||||
|
||||
need CF_API_TOKEN
|
||||
need CF_ACCOUNT_ID
|
||||
need CP_ADMIN_API_TOKEN
|
||||
|
||||
@@ -18,6 +18,7 @@ No network. No live Gitea calls.
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import json
|
||||
import os
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
@@ -117,6 +118,31 @@ def _write_audit_yaml(tmp_path: Path, required_checks: list[str]) -> Path:
|
||||
return p
|
||||
|
||||
|
||||
def _write_audit_yaml_json(tmp_path: Path, required_checks_json: dict) -> Path:
|
||||
"""Write a synthetic audit-force-merge.yml with REQUIRED_CHECKS_JSON env."""
|
||||
block = json.dumps(required_checks_json, indent=2)
|
||||
text = textwrap.dedent(
|
||||
f"""\
|
||||
name: audit-force-merge
|
||||
on:
|
||||
schedule:
|
||||
- cron: '*/30 * * * *'
|
||||
jobs:
|
||||
audit:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Run audit
|
||||
env:
|
||||
REQUIRED_CHECKS_JSON: |
|
||||
{block.replace(chr(10), chr(10) + ' ')}
|
||||
run: bash .gitea/scripts/audit-force-merge.sh
|
||||
"""
|
||||
)
|
||||
p = tmp_path / "audit-force-merge.yml"
|
||||
p.write_text(text, encoding="utf-8")
|
||||
return p
|
||||
|
||||
|
||||
def _make_stub_api(responses: dict):
|
||||
"""Build a fake `api()` callable.
|
||||
|
||||
@@ -363,6 +389,107 @@ def test_happy_path_no_drift(drift_module, tmp_path, monkeypatch):
|
||||
assert findings == [], findings
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# REQUIRED_CHECKS_JSON variant drift tests
|
||||
# --------------------------------------------------------------------------
|
||||
def test_f3a_env_wider_than_protection_json_variant(drift_module, tmp_path, monkeypatch):
|
||||
"""F3a: REQUIRED_CHECKS_JSON env has a context NOT in protection."""
|
||||
ci = _write_ci_yaml(
|
||||
tmp_path,
|
||||
jobs={"build": {"runs-on": "ubuntu-latest"}},
|
||||
sentinel_needs=["build"],
|
||||
)
|
||||
audit = _write_audit_yaml_json(
|
||||
tmp_path,
|
||||
{"main": ["ci / build (pull_request)", "ci / ghost (pull_request)"]},
|
||||
)
|
||||
_patch_paths(drift_module, monkeypatch, ci, audit)
|
||||
|
||||
stub = _make_stub_api({
|
||||
("GET", "/repos/owner/repo/branch_protections/main"): (
|
||||
200,
|
||||
{"status_check_contexts": ["ci / build (pull_request)"]},
|
||||
),
|
||||
})
|
||||
monkeypatch.setattr(drift_module, "api", stub)
|
||||
|
||||
findings, _ = drift_module.detect_drift("main")
|
||||
assert any("F3a" in f and "ghost" in f for f in findings), findings
|
||||
|
||||
|
||||
def test_f3b_protection_wider_than_env_json_variant(drift_module, tmp_path, monkeypatch):
|
||||
"""F3b: protection has a context NOT in REQUIRED_CHECKS_JSON env."""
|
||||
ci = _write_ci_yaml(
|
||||
tmp_path,
|
||||
jobs={
|
||||
"build": {"runs-on": "ubuntu-latest"},
|
||||
"test": {"runs-on": "ubuntu-latest"},
|
||||
},
|
||||
sentinel_needs=["build", "test"],
|
||||
)
|
||||
audit = _write_audit_yaml_json(
|
||||
tmp_path,
|
||||
{"main": ["ci / build (pull_request)"]},
|
||||
)
|
||||
_patch_paths(drift_module, monkeypatch, ci, audit)
|
||||
|
||||
stub = _make_stub_api({
|
||||
("GET", "/repos/owner/repo/branch_protections/main"): (
|
||||
200,
|
||||
{
|
||||
"status_check_contexts": [
|
||||
"ci / build (pull_request)",
|
||||
"ci / test (pull_request)",
|
||||
]
|
||||
},
|
||||
),
|
||||
})
|
||||
monkeypatch.setattr(drift_module, "api", stub)
|
||||
|
||||
findings, _ = drift_module.detect_drift("main")
|
||||
assert any("F3b" in f and "ci / test (pull_request)" in f for f in findings), findings
|
||||
|
||||
|
||||
def test_happy_path_no_drift_json_variant(drift_module, tmp_path, monkeypatch):
|
||||
"""Happy path with REQUIRED_CHECKS_JSON: all aligned."""
|
||||
ci = _write_ci_yaml(
|
||||
tmp_path,
|
||||
jobs={
|
||||
"build": {"runs-on": "ubuntu-latest"},
|
||||
"test": {"runs-on": "ubuntu-latest"},
|
||||
},
|
||||
sentinel_needs=["build", "test"],
|
||||
)
|
||||
audit = _write_audit_yaml_json(
|
||||
tmp_path,
|
||||
{
|
||||
"main": [
|
||||
"ci / build (pull_request)",
|
||||
"ci / test (pull_request)",
|
||||
"ci / all-required (pull_request)",
|
||||
]
|
||||
},
|
||||
)
|
||||
_patch_paths(drift_module, monkeypatch, ci, audit)
|
||||
|
||||
stub = _make_stub_api({
|
||||
("GET", "/repos/owner/repo/branch_protections/main"): (
|
||||
200,
|
||||
{
|
||||
"status_check_contexts": [
|
||||
"ci / build (pull_request)",
|
||||
"ci / test (pull_request)",
|
||||
"ci / all-required (pull_request)",
|
||||
]
|
||||
},
|
||||
),
|
||||
})
|
||||
monkeypatch.setattr(drift_module, "api", stub)
|
||||
|
||||
findings, _ = drift_module.detect_drift("main")
|
||||
assert findings == [], findings
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# MUST-FIX 1: find_open_issue must raise on transient HTTP errors
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
@@ -0,0 +1,558 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
// scheduler_integration_test.go — REAL Postgres integration tests for the
|
||||
// workspace-server cron scheduler firing loop. Regression coverage for
|
||||
// molecule-core issue #2149 (filed under SOP rule internal#765).
|
||||
//
|
||||
// Run with:
|
||||
//
|
||||
// docker run --rm -d --name pg-integration \
|
||||
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
|
||||
// -p 55432:5432 postgres:15-alpine
|
||||
// sleep 4
|
||||
// # apply every migration up.sql / legacy .sql in lexicographic order
|
||||
// for f in $(ls workspace-server/migrations/*.sql | grep -v '\.down\.sql$' | sort); do \
|
||||
// psql "postgres://postgres:test@localhost:55432/molecule?sslmode=disable" -f "$f"; done
|
||||
// cd workspace-server
|
||||
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
|
||||
// go test -tags=integration ./internal/scheduler/ -run '^TestIntegration_'
|
||||
//
|
||||
// CI: .gitea/workflows/handlers-postgres-integration.yml runs these on every
|
||||
// PR/push that touches workspace-server/internal/scheduler/ (the
|
||||
// `handlers-postgres` detect-changes profile was extended to include the
|
||||
// scheduler package + this workflow file).
|
||||
//
|
||||
// Why these are NOT the existing sqlmock unit tests (scheduler_test.go)
|
||||
// --------------------------------------------------------------------
|
||||
// The strict-sqlmock unit tests pin which SQL statements fire — fast, no DB.
|
||||
// But sqlmock CANNOT validate:
|
||||
// - the activity_logs `$3::jsonb` cast (#2026 wedge) — sqlmock never parses
|
||||
// the payload, so an invalid-UTF-8 jsonb body that wedges a real INSERT
|
||||
// looks "green" under mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
// - the ROW STATE after tick()/fireSchedule run: that last_run_at,
|
||||
// next_run_at, run_count, last_status actually landed on the row.
|
||||
// - sweepPhantomBusy's NOT IN (SELECT … activity_logs) subquery semantics
|
||||
// against real rows — it has no unit test at all (#2149).
|
||||
//
|
||||
// A SQL regression here = a fleet-wide silent cron outage (#85 ran 12h before
|
||||
// detection). These tests boot a real Postgres, insert real rows, run the
|
||||
// production tick()/sweepPhantomBusy, and SELECT the rows back to assert the
|
||||
// observable end state — the gap sqlmock structurally cannot cover.
|
||||
//
|
||||
// Watch-fail intent: each test is written to FAIL on a regression of the
|
||||
// behavior under test (e.g. drop the activity_logs INSERT, drop the
|
||||
// write-back UPDATE, drop the UTF-8 sanitize, or break the phantom-busy
|
||||
// subquery) and to PASS against the current-correct scheduler.go.
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
mdb "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
// ── test doubles ──────────────────────────────────────────────────────────
|
||||
|
||||
// recordingProxy is an A2AProxy that records each fire and returns a
|
||||
// configurable response. Used to assert that tick()/fireSchedule actually
|
||||
// reached the A2A boundary for the due schedule.
|
||||
type recordingProxy struct {
|
||||
status int
|
||||
body []byte
|
||||
err error
|
||||
|
||||
fires int
|
||||
lastBody []byte
|
||||
lastCaller string
|
||||
lastLogFlag bool
|
||||
lastWSID string
|
||||
}
|
||||
|
||||
func (p *recordingProxy) ProxyA2ARequest(
|
||||
_ context.Context, workspaceID string, body []byte, callerID string, logActivity bool,
|
||||
) (int, []byte, error) {
|
||||
p.fires++
|
||||
p.lastWSID = workspaceID
|
||||
p.lastBody = body
|
||||
p.lastCaller = callerID
|
||||
p.lastLogFlag = logActivity
|
||||
if p.err != nil {
|
||||
return 0, nil, p.err
|
||||
}
|
||||
return p.status, p.body, nil
|
||||
}
|
||||
|
||||
// ── connection + fixture helpers ──────────────────────────────────────────
|
||||
|
||||
// integrationDB returns the configured integration-test connection or skips
|
||||
// the test if INTEGRATION_DB_URL is unset. Hot-swaps the package-level
|
||||
// mdb.DB so the production scheduler helpers (tick, fireSchedule,
|
||||
// sweepPhantomBusy) operate on this connection; restores it via t.Cleanup.
|
||||
//
|
||||
// NOT SAFE FOR t.Parallel(): the package-global swap races across tests.
|
||||
func integrationDB(t *testing.T) *sql.DB {
|
||||
t.Helper()
|
||||
url := os.Getenv("INTEGRATION_DB_URL")
|
||||
if url == "" {
|
||||
t.Skip("INTEGRATION_DB_URL not set; skipping (local devs: see file header)")
|
||||
}
|
||||
conn, err := sql.Open("postgres", url)
|
||||
if err != nil {
|
||||
t.Fatalf("open: %v", err)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := conn.PingContext(ctx); err != nil {
|
||||
t.Fatalf("ping: %v", err)
|
||||
}
|
||||
// Clean slate. activity_logs + workspace_schedules cascade off workspaces,
|
||||
// but we DELETE explicitly (and in FK order) so a partial prior run can't
|
||||
// leave orphan rows that perturb the next test's assertions.
|
||||
cctx, ccancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer ccancel()
|
||||
for _, q := range []string{
|
||||
`DELETE FROM activity_logs`,
|
||||
`DELETE FROM workspace_schedules`,
|
||||
`DELETE FROM workspaces`,
|
||||
} {
|
||||
if _, err := conn.ExecContext(cctx, q); err != nil {
|
||||
t.Fatalf("cleanup %q: %v", q, err)
|
||||
}
|
||||
}
|
||||
prev := mdb.DB
|
||||
mdb.DB = conn
|
||||
t.Cleanup(func() {
|
||||
mdb.DB = prev
|
||||
conn.Close()
|
||||
})
|
||||
return conn
|
||||
}
|
||||
|
||||
// insertWorkspace inserts a workspace row and returns its UUID. active is the
|
||||
// initial active_tasks value; status defaults to 'online' (valid workspace_status enum).
|
||||
func insertWorkspace(t *testing.T, conn *sql.DB, name string, active int) string {
|
||||
t.Helper()
|
||||
var id string
|
||||
err := conn.QueryRowContext(context.Background(), `
|
||||
INSERT INTO workspaces (name, status, active_tasks, max_concurrent_tasks)
|
||||
VALUES ($1, 'online', $2, 1)
|
||||
RETURNING id
|
||||
`, name, active).Scan(&id)
|
||||
if err != nil {
|
||||
t.Fatalf("insertWorkspace(%s): %v", name, err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
// insertSchedule inserts an enabled workspace_schedules row whose next_run_at
|
||||
// is in the past (so tick() picks it up immediately) and returns its UUID.
|
||||
func insertSchedule(t *testing.T, conn *sql.DB, wsID, name, cronExpr, prompt string) string {
|
||||
t.Helper()
|
||||
var id string
|
||||
err := conn.QueryRowContext(context.Background(), `
|
||||
INSERT INTO workspace_schedules
|
||||
(workspace_id, name, cron_expr, timezone, prompt, enabled, next_run_at, source)
|
||||
VALUES ($1, $2, $3, 'UTC', $4, true, now() - interval '1 minute', 'runtime')
|
||||
RETURNING id
|
||||
`, wsID, name, cronExpr, prompt).Scan(&id)
|
||||
if err != nil {
|
||||
t.Fatalf("insertSchedule(%s): %v", name, err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
type scheduleState struct {
|
||||
lastRunAt sql.NullTime
|
||||
nextRunAt sql.NullTime
|
||||
runCount int
|
||||
lastStatus string
|
||||
lastError string
|
||||
}
|
||||
|
||||
func readScheduleState(t *testing.T, conn *sql.DB, id string) scheduleState {
|
||||
t.Helper()
|
||||
var st scheduleState
|
||||
var status, errStr sql.NullString
|
||||
err := conn.QueryRowContext(context.Background(), `
|
||||
SELECT last_run_at, next_run_at, run_count, last_status, last_error
|
||||
FROM workspace_schedules WHERE id = $1
|
||||
`, id).Scan(&st.lastRunAt, &st.nextRunAt, &st.runCount, &status, &errStr)
|
||||
if err != nil {
|
||||
t.Fatalf("readScheduleState(%s): %v", id, err)
|
||||
}
|
||||
st.lastStatus = status.String
|
||||
st.lastError = errStr.String
|
||||
return st
|
||||
}
|
||||
|
||||
// ── TestIntegration_TickFiresAndWritesBack (#2149 core) ───────────────────
|
||||
//
|
||||
// Insert one due schedule, run tick() once, and assert the full firing
|
||||
// loop landed against a REAL Postgres:
|
||||
// - the A2A proxy was invoked exactly once for the schedule's workspace
|
||||
// - the post-fire UPDATE wrote last_run_at (was NULL), advanced next_run_at
|
||||
// into the future, bumped run_count to 1, set last_status='ok'
|
||||
// - a cron_run activity_logs row was inserted with VALID jsonb request_body
|
||||
// (the `$3::jsonb` cast #2026 path) carrying the schedule metadata
|
||||
//
|
||||
// Regression watch-fail: if a refactor drops the write-back UPDATE, the
|
||||
// activity_logs INSERT, or breaks the jsonb cast, this test fails where every
|
||||
// sqlmock unit test stays green.
|
||||
func TestIntegration_TickFiresAndWritesBack(t *testing.T) {
|
||||
conn := integrationDB(t)
|
||||
|
||||
wsID := insertWorkspace(t, conn, "cron-fire-ws", 0)
|
||||
schedID := insertSchedule(t, conn, wsID, "hourly-audit", "0 * * * *", "run the hourly audit")
|
||||
|
||||
proxy := &recordingProxy{
|
||||
status: 200,
|
||||
body: []byte(`{"jsonrpc":"2.0","result":{"kind":"message","parts":[{"kind":"text","text":"done"}]},"id":"1"}`),
|
||||
}
|
||||
s := New(proxy, nil)
|
||||
s.tick(context.Background())
|
||||
|
||||
// 1. A2A boundary reached exactly once for the right workspace.
|
||||
if proxy.fires != 1 {
|
||||
t.Fatalf("proxy fires = %d, want 1 (tick must fire the one due schedule)", proxy.fires)
|
||||
}
|
||||
if proxy.lastWSID != wsID {
|
||||
t.Errorf("proxy fired for workspace %q, want %q", proxy.lastWSID, wsID)
|
||||
}
|
||||
// Empty callerID = canvas-style (bypasses access control); logActivity=true.
|
||||
if proxy.lastCaller != "" {
|
||||
t.Errorf("callerID = %q, want empty (canvas-style scheduler fire)", proxy.lastCaller)
|
||||
}
|
||||
if !proxy.lastLogFlag {
|
||||
t.Error("logActivity flag = false, want true")
|
||||
}
|
||||
|
||||
// 2. Row write-back.
|
||||
st := readScheduleState(t, conn, schedID)
|
||||
if !st.lastRunAt.Valid {
|
||||
t.Error("last_run_at is NULL after fire, want set (write-back UPDATE did not land)")
|
||||
}
|
||||
if !st.nextRunAt.Valid {
|
||||
t.Fatal("next_run_at is NULL after fire, want a future timestamp")
|
||||
}
|
||||
if !st.nextRunAt.Time.After(time.Now()) {
|
||||
t.Errorf("next_run_at = %v, want a time in the future (schedule would tight-loop otherwise)", st.nextRunAt.Time)
|
||||
}
|
||||
if st.runCount != 1 {
|
||||
t.Errorf("run_count = %d, want 1", st.runCount)
|
||||
}
|
||||
if st.lastStatus != "ok" {
|
||||
t.Errorf("last_status = %q, want \"ok\"", st.lastStatus)
|
||||
}
|
||||
|
||||
// 3. activity_logs cron_run row with valid jsonb request_body.
|
||||
var actCount int
|
||||
var summary, status string
|
||||
var reqBody []byte
|
||||
err := conn.QueryRowContext(context.Background(), `
|
||||
SELECT count(*) OVER (), summary, status, request_body
|
||||
FROM activity_logs
|
||||
WHERE workspace_id = $1 AND activity_type = 'cron_run'
|
||||
LIMIT 1
|
||||
`, wsID).Scan(&actCount, &summary, &status, &reqBody)
|
||||
if err == sql.ErrNoRows {
|
||||
t.Fatal("no cron_run activity_logs row inserted after fire (#152/#2026 path missing)")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("read activity_logs: %v", err)
|
||||
}
|
||||
if actCount != 1 {
|
||||
t.Errorf("cron_run activity_logs rows = %d, want 1", actCount)
|
||||
}
|
||||
if status != "ok" {
|
||||
t.Errorf("activity_logs.status = %q, want \"ok\"", status)
|
||||
}
|
||||
// request_body must be valid jsonb carrying the schedule_id — proves the
|
||||
// `$3::jsonb` cast accepted the payload (the #2026 wedge surface).
|
||||
var sid string
|
||||
if err := conn.QueryRowContext(context.Background(), `
|
||||
SELECT request_body->>'schedule_id'
|
||||
FROM activity_logs WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1
|
||||
`, wsID).Scan(&sid); err != nil {
|
||||
t.Fatalf("request_body is not queryable jsonb: %v", err)
|
||||
}
|
||||
if sid != schedID {
|
||||
t.Errorf("activity_logs request_body->>'schedule_id' = %q, want %q", sid, schedID)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestIntegration_InvalidUTF8PromptSanitizedIntoJsonb (#2026 / #2149) ────
|
||||
//
|
||||
// The agent-editable prompt can carry raw invalid-UTF-8 bytes. Postgres jsonb
|
||||
// columns REJECT invalid UTF-8, which (pre-#2026) wedged the activity_logs
|
||||
// INSERT and held the transaction open — stalling the whole scheduler.
|
||||
// fireSchedule now sanitizeUTF8()s every string before the `$3::jsonb` insert.
|
||||
//
|
||||
// Postgres TEXT columns (workspace_schedules.prompt) also reject invalid UTF-8
|
||||
// in a UTF-8 database, so we cannot INSERT the bad bytes through the fixture.
|
||||
// Instead we insert a valid prompt, then call fireSchedule directly with a
|
||||
// scheduleRow whose Prompt field contains the invalid bytes — this simulates
|
||||
// the real regression path (e.g. truncation splitting a multi-byte rune, or
|
||||
// an agent-edited template arriving via a path that bypasses DB validation).
|
||||
//
|
||||
// Assertions:
|
||||
// - the fire still completed (write-back UPDATE landed)
|
||||
// - the cron_run activity_logs row was inserted (the jsonb cast accepted
|
||||
// the SANITIZED payload — the INSERT did not wedge)
|
||||
// - the stored request_body is queryable jsonb (valid UTF-8 on disk)
|
||||
//
|
||||
// Watch-fail: remove the sanitizeUTF8() wrapping around the jsonb payload and
|
||||
// this test fails on a real Postgres (INSERT errors / row absent), while the
|
||||
// sqlmock unit test that only checks "an INSERT fired" stays green.
|
||||
func TestIntegration_InvalidUTF8PromptSanitizedIntoJsonb(t *testing.T) {
|
||||
conn := integrationDB(t)
|
||||
|
||||
wsID := insertWorkspace(t, conn, "utf8-ws", 0)
|
||||
// Insert with valid UTF-8 — Postgres TEXT rejects 0x80/0xff.
|
||||
schedID := insertSchedule(t, conn, wsID, "utf8-job", "0 * * * *", "valid prompt")
|
||||
|
||||
// Prompt with invalid UTF-8: orphan continuation byte + bare 0xff.
|
||||
badPrompt := "audit \x80 report \xff end"
|
||||
row := scheduleRow{
|
||||
ID: schedID,
|
||||
WorkspaceID: wsID,
|
||||
Name: "utf8-job",
|
||||
CronExpr: "0 * * * *",
|
||||
Timezone: "UTC",
|
||||
Prompt: badPrompt,
|
||||
}
|
||||
|
||||
proxy := &recordingProxy{
|
||||
status: 200,
|
||||
body: []byte(`{"result":{"kind":"message","parts":[{"kind":"text","text":"ok"}]}}`),
|
||||
}
|
||||
s := New(proxy, nil)
|
||||
s.fireSchedule(context.Background(), row)
|
||||
|
||||
if proxy.fires != 1 {
|
||||
t.Fatalf("proxy fires = %d, want 1", proxy.fires)
|
||||
}
|
||||
|
||||
// Write-back must have landed despite the bad prompt bytes.
|
||||
st := readScheduleState(t, conn, schedID)
|
||||
if st.runCount != 1 || st.lastStatus != "ok" {
|
||||
t.Errorf("post-fire state run_count=%d last_status=%q, want 1/\"ok\" "+
|
||||
"(invalid-UTF-8 prompt must not block the fire)", st.runCount, st.lastStatus)
|
||||
}
|
||||
|
||||
// The cron_run activity_logs row MUST exist — proving the `$3::jsonb`
|
||||
// INSERT accepted the sanitized payload (did not wedge on invalid UTF-8).
|
||||
var n int
|
||||
if err := conn.QueryRowContext(context.Background(), `
|
||||
SELECT count(*) FROM activity_logs
|
||||
WHERE workspace_id = $1 AND activity_type = 'cron_run'
|
||||
`, wsID).Scan(&n); err != nil {
|
||||
t.Fatalf("count cron_run rows: %v", err)
|
||||
}
|
||||
if n != 1 {
|
||||
t.Fatalf("cron_run activity_logs rows = %d, want 1 — the jsonb INSERT wedged "+
|
||||
"on invalid UTF-8 (the #2026 regression)", n)
|
||||
}
|
||||
|
||||
// The stored prompt inside request_body must be queryable + valid UTF-8.
|
||||
var storedPrompt string
|
||||
if err := conn.QueryRowContext(context.Background(), `
|
||||
SELECT request_body->>'prompt'
|
||||
FROM activity_logs WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1
|
||||
`, wsID).Scan(&storedPrompt); err != nil {
|
||||
t.Fatalf("request_body->>'prompt' not queryable jsonb: %v", err)
|
||||
}
|
||||
if storedPrompt == "" {
|
||||
t.Error("stored prompt is empty, want the sanitized prompt text")
|
||||
}
|
||||
// Round-trip through Postgres jsonb guarantees valid UTF-8; assert the
|
||||
// replacement character replaced the bad bytes rather than them surviving.
|
||||
for i := 0; i < len(storedPrompt); i++ {
|
||||
if storedPrompt[i] == 0x80 || storedPrompt[i] == 0xff {
|
||||
t.Fatalf("stored prompt still contains raw invalid byte 0x%x at %d", storedPrompt[i], i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestIntegration_TickErrorStatusWriteBack (#2149) ──────────────────────
|
||||
//
|
||||
// When the A2A proxy returns a transport error, fireSchedule must still write
|
||||
// back: last_status='error', last_error populated, next_run_at advanced (so
|
||||
// the schedule does not get stuck re-firing), run_count bumped. Verifies the
|
||||
// error path persists to a real row, not just that "an UPDATE fired".
|
||||
func TestIntegration_TickErrorStatusWriteBack(t *testing.T) {
|
||||
conn := integrationDB(t)
|
||||
|
||||
wsID := insertWorkspace(t, conn, "err-ws", 0)
|
||||
schedID := insertSchedule(t, conn, wsID, "err-job", "0 * * * *", "do work")
|
||||
|
||||
proxy := &recordingProxy{err: context.DeadlineExceeded}
|
||||
s := New(proxy, nil)
|
||||
s.tick(context.Background())
|
||||
|
||||
st := readScheduleState(t, conn, schedID)
|
||||
if st.lastStatus != "error" {
|
||||
t.Errorf("last_status = %q, want \"error\"", st.lastStatus)
|
||||
}
|
||||
if st.lastError == "" {
|
||||
t.Error("last_error is empty, want the proxy error text persisted (#152)")
|
||||
}
|
||||
if st.runCount != 1 {
|
||||
t.Errorf("run_count = %d, want 1 (run still counted on error)", st.runCount)
|
||||
}
|
||||
if !st.nextRunAt.Valid || !st.nextRunAt.Time.After(time.Now()) {
|
||||
t.Errorf("next_run_at not advanced to future on error path (= %v) — schedule would tight-loop", st.nextRunAt)
|
||||
}
|
||||
// The error activity_logs row must carry status='error' + error_detail.
|
||||
var status, errDetail string
|
||||
if err := conn.QueryRowContext(context.Background(), `
|
||||
SELECT status, COALESCE(error_detail,'') FROM activity_logs
|
||||
WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1
|
||||
`, wsID).Scan(&status, &errDetail); err != nil {
|
||||
t.Fatalf("read error activity_logs: %v", err)
|
||||
}
|
||||
if status != "error" {
|
||||
t.Errorf("activity_logs.status = %q, want \"error\"", status)
|
||||
}
|
||||
if errDetail == "" {
|
||||
t.Error("activity_logs.error_detail empty on error fire, want the error message (#152)")
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestIntegration_SweepPhantomBusy (#2149 — no prior test) ──────────────
|
||||
//
|
||||
// sweepPhantomBusy resets active_tasks=0 for workspaces stuck busy with NO
|
||||
// activity_logs row in the last phantomStaleThreshold window, and must LEAVE
|
||||
// ALONE workspaces that have recent activity. The NOT IN (SELECT DISTINCT
|
||||
// workspace_id FROM activity_logs WHERE created_at > now() - interval) subquery
|
||||
// is exactly the kind of set-semantics that sqlmock cannot validate — there is
|
||||
// no unit test for this method at all (#2149).
|
||||
//
|
||||
// Fixture:
|
||||
// - phantomWS: active_tasks=3, NO recent activity_log → must reset to 0
|
||||
// - recentWS: active_tasks=2, activity_log 1 min ago → must stay at 2
|
||||
// - staleWS: active_tasks=1, activity_log 30 min ago → must reset to 0
|
||||
// - removedWS: active_tasks=4, status='removed', no activity → must stay (status guard)
|
||||
// - idleWS: active_tasks=0 → untouched (not >0)
|
||||
//
|
||||
// Watch-fail: break the subquery (e.g. drop the status!='removed' guard, or
|
||||
// invert the NOT IN), and the asserted end-state diverges on a real Postgres.
|
||||
func TestIntegration_SweepPhantomBusy(t *testing.T) {
|
||||
conn := integrationDB(t)
|
||||
|
||||
phantomWS := insertWorkspace(t, conn, "phantom-ws", 3)
|
||||
recentWS := insertWorkspace(t, conn, "recent-ws", 2)
|
||||
staleWS := insertWorkspace(t, conn, "stale-ws", 1)
|
||||
idleWS := insertWorkspace(t, conn, "idle-ws", 0)
|
||||
|
||||
// removedWS: busy but status='removed' — the sweep must skip it.
|
||||
var removedWS string
|
||||
if err := conn.QueryRowContext(context.Background(), `
|
||||
INSERT INTO workspaces (name, status, active_tasks, max_concurrent_tasks)
|
||||
VALUES ('removed-ws', 'removed', 4, 1) RETURNING id
|
||||
`).Scan(&removedWS); err != nil {
|
||||
t.Fatalf("insert removedWS: %v", err)
|
||||
}
|
||||
|
||||
// recentWS has a fresh activity_log (1 min ago → inside the 10-min window).
|
||||
if _, err := conn.ExecContext(context.Background(), `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, status, created_at)
|
||||
VALUES ($1, 'a2a_receive', 'ok', now() - interval '1 minute')
|
||||
`, recentWS); err != nil {
|
||||
t.Fatalf("insert recent activity_log: %v", err)
|
||||
}
|
||||
// staleWS has only an OLD activity_log (30 min ago → outside the window).
|
||||
if _, err := conn.ExecContext(context.Background(), `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, status, created_at)
|
||||
VALUES ($1, 'a2a_receive', 'ok', now() - interval '30 minutes')
|
||||
`, staleWS); err != nil {
|
||||
t.Fatalf("insert stale activity_log: %v", err)
|
||||
}
|
||||
|
||||
s := New(nil, nil)
|
||||
s.sweepPhantomBusy(context.Background())
|
||||
|
||||
active := func(id string) int {
|
||||
var n int
|
||||
if err := conn.QueryRowContext(context.Background(),
|
||||
`SELECT active_tasks FROM workspaces WHERE id = $1`, id).Scan(&n); err != nil {
|
||||
t.Fatalf("read active_tasks(%s): %v", id, err)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
if got := active(phantomWS); got != 0 {
|
||||
t.Errorf("phantomWS active_tasks = %d, want 0 (busy + no recent activity → must be swept)", got)
|
||||
}
|
||||
if got := active(staleWS); got != 0 {
|
||||
t.Errorf("staleWS active_tasks = %d, want 0 (only stale activity → must be swept)", got)
|
||||
}
|
||||
if got := active(recentWS); got != 2 {
|
||||
t.Errorf("recentWS active_tasks = %d, want 2 (recent activity → must NOT be swept)", got)
|
||||
}
|
||||
if got := active(removedWS); got != 4 {
|
||||
t.Errorf("removedWS active_tasks = %d, want 4 (status='removed' → sweep must skip it)", got)
|
||||
}
|
||||
if got := active(idleWS); got != 0 {
|
||||
t.Errorf("idleWS active_tasks = %d, want 0 (was never busy)", got)
|
||||
}
|
||||
|
||||
// The swept rows must also have current_task cleared.
|
||||
var ct string
|
||||
if err := conn.QueryRowContext(context.Background(),
|
||||
`SELECT COALESCE(current_task,'') FROM workspaces WHERE id = $1`, phantomWS).Scan(&ct); err != nil {
|
||||
t.Fatalf("read current_task: %v", err)
|
||||
}
|
||||
if ct != "" {
|
||||
t.Errorf("phantomWS current_task = %q, want empty after sweep", ct)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestIntegration_NativeSchedulerSkipAdvancesNextRunAt (#2149) ──────────
|
||||
//
|
||||
// When a workspace's adapter owns scheduling natively, tick() must SKIP the
|
||||
// fire but still advance next_run_at (so the row doesn't tight-loop on every
|
||||
// poll) — observability (next_run_at) is preserved while the fire is dropped.
|
||||
// Asserts the native-skip UPDATE landed on a real row and the proxy was NOT
|
||||
// invoked. This is the native-skip UPDATE path #2149 calls out — sqlmock can
|
||||
// only assert an UPDATE fired, not that next_run_at moved forward.
|
||||
func TestIntegration_NativeSchedulerSkipAdvancesNextRunAt(t *testing.T) {
|
||||
conn := integrationDB(t)
|
||||
|
||||
wsID := insertWorkspace(t, conn, "native-ws", 0)
|
||||
schedID := insertSchedule(t, conn, wsID, "native-job", "0 * * * *", "native run")
|
||||
|
||||
// Capture the pre-tick next_run_at (it is in the past by construction).
|
||||
before := readScheduleState(t, conn, schedID)
|
||||
if !before.nextRunAt.Valid || before.nextRunAt.Time.After(time.Now()) {
|
||||
t.Fatalf("precondition: next_run_at should start in the past, got %v", before.nextRunAt)
|
||||
}
|
||||
|
||||
proxy := &recordingProxy{status: 200, body: []byte(`{}`)}
|
||||
s := New(proxy, nil)
|
||||
// Every workspace reports native scheduling → fire must be skipped.
|
||||
s.SetNativeSchedulerCheck(func(string) bool { return true })
|
||||
s.tick(context.Background())
|
||||
|
||||
if proxy.fires != 0 {
|
||||
t.Errorf("proxy fires = %d, want 0 (native-scheduler workspace must NOT fire)", proxy.fires)
|
||||
}
|
||||
|
||||
after := readScheduleState(t, conn, schedID)
|
||||
if !after.nextRunAt.Valid || !after.nextRunAt.Time.After(time.Now()) {
|
||||
t.Errorf("next_run_at = %v, want advanced into the future (native-skip UPDATE must still run)", after.nextRunAt)
|
||||
}
|
||||
// Skip path does NOT bump run_count or write last_run_at (no fire happened).
|
||||
if after.runCount != 0 {
|
||||
t.Errorf("run_count = %d, want 0 (skip must not count as a run)", after.runCount)
|
||||
}
|
||||
if after.lastRunAt.Valid {
|
||||
t.Error("last_run_at set on native-skip, want NULL (no fire occurred)")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user