fix(ci-drift): add REQUIRED_CHECKS_JSON variant support (internal#804) #2177

Merged
claude-ceo-assistant merged 7 commits from fix/internal-804-parser-json-variant into main 2026-06-04 11:16:54 +00:00
6 changed files with 851 additions and 30 deletions
+78 -29
View File
@@ -8,7 +8,8 @@ pair diverges.
Sources:
A. `.gitea/workflows/ci.yml` jobs (CI source — the actual job set)
B. `status_check_contexts` in branch_protections (the merge gate)
C. `REQUIRED_CHECKS` env in audit-force-merge.yml (the audit env)
C. `REQUIRED_CHECKS_JSON` (preferred) or `REQUIRED_CHECKS` (legacy)
env in audit-force-merge.yml (the audit env)
Three failure classes:
F1 Job in (A) is not under the sentinel's `needs:` — sentinel
@@ -250,13 +251,21 @@ def sentinel_needs(ci_doc: dict) -> set[str]:
return set(needs)
def required_checks_env(audit_doc: dict) -> set[str]:
"""Pull the REQUIRED_CHECKS env value from audit-force-merge.yml.
def required_checks_env(audit_doc: dict, branch: str) -> set[str]:
"""Pull the required-checks env value from audit-force-merge.yml.
Walks the YAML AST per `feedback_behavior_based_ast_gates`: we do
NOT grep for `REQUIRED_CHECKS:` — that breaks under reformatting,
NOT grep for env keys — that breaks under reformatting,
multi-job workflows, or a future move of the env to a different
step. Instead, look inside every job's every step's `env:` map."""
found: list[str] = []
step. Instead, look inside every job's every step's `env:` map.
Supports two variants:
- REQUIRED_CHECKS_JSON (preferred): JSON dict keyed by branch name.
We extract the array for the target branch.
- REQUIRED_CHECKS (legacy): newline-separated list of context names.
"""
found_json: list[str] = []
found_legacy: list[str] = []
jobs = audit_doc.get("jobs", {})
if not isinstance(jobs, dict):
sys.stderr.write(f"::warning::{AUDIT_WORKFLOW_PATH} has no jobs: mapping\n")
@@ -268,27 +277,67 @@ def required_checks_env(audit_doc: dict) -> set[str]:
if not isinstance(step, dict):
continue
step_env = step.get("env") or {}
if isinstance(step_env, dict) and "REQUIRED_CHECKS" in step_env:
v = step_env["REQUIRED_CHECKS"]
if isinstance(v, str):
found.append(v)
if not found:
sys.stderr.write(
f"::error::REQUIRED_CHECKS env not found in any step of "
f"{AUDIT_WORKFLOW_PATH}\n"
)
sys.exit(3)
if len(found) > 1:
# Defensive: refuse to guess which one is canonical.
sys.stderr.write(
f"::error::REQUIRED_CHECKS env present in {len(found)} steps; ambiguous\n"
)
sys.exit(3)
raw = found[0]
# YAML block-scalars (`|`) leave a trailing newline + blanks; trim
# consistently with audit-force-merge.sh's parser so both sides
# produce identical sets.
return {line.strip() for line in raw.splitlines() if line.strip()}
if isinstance(step_env, dict):
if "REQUIRED_CHECKS_JSON" in step_env:
v = step_env["REQUIRED_CHECKS_JSON"]
if isinstance(v, str):
found_json.append(v)
if "REQUIRED_CHECKS" in step_env:
v = step_env["REQUIRED_CHECKS"]
if isinstance(v, str):
found_legacy.append(v)
# JSON variant takes precedence.
if found_json:
if len(found_json) > 1:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON env present in {len(found_json)} steps; ambiguous\n"
)
sys.exit(3)
try:
parsed = json.loads(found_json[0])
except json.JSONDecodeError as e:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON is not valid JSON: {e}\n"
)
sys.exit(3)
if not isinstance(parsed, dict):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON parsed to {type(parsed).__name__}, expected dict\n"
)
sys.exit(3)
branch_checks = parsed.get(branch)
if branch_checks is None:
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON has no entry for branch '{branch}'\n"
)
sys.exit(3)
if not isinstance(branch_checks, list):
sys.stderr.write(
f"::error::REQUIRED_CHECKS_JSON['{branch}'] is {type(branch_checks).__name__}, expected list\n"
)
sys.exit(3)
return {str(item).strip() for item in branch_checks if str(item).strip()}
# Legacy variant fallback.
if found_legacy:
if len(found_legacy) > 1:
# Defensive: refuse to guess which one is canonical.
sys.stderr.write(
f"::error::REQUIRED_CHECKS env present in {len(found_legacy)} steps; ambiguous\n"
)
sys.exit(3)
raw = found_legacy[0]
# YAML block-scalars (`|`) leave a trailing newline + blanks; trim
# consistently with audit-force-merge.sh's parser so both sides
# produce identical sets.
return {line.strip() for line in raw.splitlines() if line.strip()}
sys.stderr.write(
f"::error::Neither REQUIRED_CHECKS_JSON nor REQUIRED_CHECKS env found in any step of "
f"{AUDIT_WORKFLOW_PATH}\n"
)
sys.exit(3)
# --------------------------------------------------------------------------
@@ -330,7 +379,7 @@ def detect_drift(branch: str) -> tuple[list[str], dict]:
jobs = ci_job_names(ci_doc)
jobs_all = ci_jobs_all(ci_doc)
needs = sentinel_needs(ci_doc)
env_set = required_checks_env(audit_doc)
env_set = required_checks_env(audit_doc, branch)
# Protection
# api() raises ApiError on non-2xx. Transient 5xx should fail loud.
@@ -524,7 +573,7 @@ def render_body(branch: str, findings: list[str], debug: dict) -> str:
"- **F2**: rename the protection context to match an emitter, "
"or remove it from `status_check_contexts` "
"(PATCH `/api/v1/repos/{owner}/{repo}/branch_protections/{branch}`).",
"- **F3a / F3b**: bring `REQUIRED_CHECKS` env in "
"- **F3a / F3b**: bring `REQUIRED_CHECKS_JSON` (or `REQUIRED_CHECKS` legacy) env in "
"`.gitea/workflows/audit-force-merge.yml` into set-equality with "
"`status_check_contexts` (single PR, both files).",
"",
+5
View File
@@ -26,6 +26,10 @@ PROFILES: dict[str, dict[str, str]] = {
"handlers": (
r"^workspace-server/internal/handlers/"
r"|^workspace-server/internal/wsauth/"
# #2149: the scheduler real-PG integration tests run in this same
# workflow (they reuse its migrated Postgres), so changes to the
# scheduler package must trigger the job too.
r"|^workspace-server/internal/scheduler/"
r"|^workspace-server/migrations/"
r"|^\.gitea/workflows/handlers-postgres-integration\.yml$"
),
@@ -174,3 +178,4 @@ def main(argv: list[str]) -> int:
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
@@ -1,4 +1,5 @@
import importlib.util
import json
import sys
from pathlib import Path
from unittest.mock import patch
@@ -36,6 +37,76 @@ def _make_audit_doc(required_checks: list[str]) -> dict:
}
def _make_audit_doc_json(required_checks_json: dict) -> dict:
return {
"jobs": {
"audit": {
"steps": [
{"env": {"REQUIRED_CHECKS_JSON": json.dumps(required_checks_json)}}
]
}
}
}
# ---------------------------------------------------------------------------
# required_checks_env — dual-variant parsing
# ---------------------------------------------------------------------------
def test_required_checks_env_prefers_json_over_legacy():
doc = {
"jobs": {
"audit": {
"steps": [
{
"env": {
"REQUIRED_CHECKS_JSON": json.dumps(
{"main": ["ctx-a"], "staging": ["ctx-b"]}
),
"REQUIRED_CHECKS": "ctx-legacy\nctx-old",
}
}
]
}
}
}
assert drift.required_checks_env(doc, "main") == {"ctx-a"}
assert drift.required_checks_env(doc, "staging") == {"ctx-b"}
def test_required_checks_env_falls_back_to_legacy():
doc = _make_audit_doc(["legacy-ctx"])
assert drift.required_checks_env(doc, "main") == {"legacy-ctx"}
def test_required_checks_env_json_missing_branch_fails():
doc = _make_audit_doc_json({"staging": ["ctx-b"]})
try:
drift.required_checks_env(doc, "main")
except SystemExit as exc:
assert exc.code == 3
else:
raise AssertionError("expected SystemExit(3)")
def test_required_checks_env_json_malformed_fails():
doc = {
"jobs": {
"audit": {
"steps": [
{"env": {"REQUIRED_CHECKS_JSON": "not-json"}}
]
}
}
}
try:
drift.required_checks_env(doc, "main")
except SystemExit as exc:
assert exc.code == 3
else:
raise AssertionError("expected SystemExit(3)")
# ---------------------------------------------------------------------------
# sentinel_needs
# ---------------------------------------------------------------------------
@@ -243,7 +243,8 @@ jobs:
# MUST exist for the integration tests to be meaningful. Hard-
# fail if any didn't land — that would be a real regression we
# want loud.
for tbl in delegations workspaces activity_logs pending_uploads; do
# workspace_schedules added for the #2149 scheduler integration tests.
for tbl in delegations workspaces activity_logs pending_uploads workspace_schedules; do
if ! psql -h "${PG_HOST}" -U postgres -d molecule -tA \
-c "SELECT 1 FROM information_schema.tables WHERE table_name = '$tbl'" \
| grep -q 1; then
@@ -274,6 +275,16 @@ jobs:
# workflow runs don't fight over a host-net 5432 port.
go test -tags=integration -timeout 5m -v ./internal/handlers/ -run "^TestIntegration_"
- if: needs.detect-changes.outputs.handlers == 'true'
name: Run scheduler integration tests (#2149)
run: |
# #2149: real-PG regression coverage for the scheduler firing loop
# (tick → A2A fire → write-back of last_run_at/next_run_at/run_count/
# activity_logs jsonb incl. invalid-UTF-8 sanitization + sweepPhantomBusy).
# Reuses the same migrated Postgres (workspace_schedules / activity_logs
# / workspaces all landed by the migration replay step above).
go test -tags=integration -timeout 5m -v ./internal/scheduler/ -run "^TestIntegration_"
- if: failure() && needs.detect-changes.outputs.handlers == 'true'
name: Diagnostic dump on failure
env:
+127
View File
@@ -18,6 +18,7 @@ No network. No live Gitea calls.
from __future__ import annotations
import importlib.util
import json
import os
import textwrap
from pathlib import Path
@@ -117,6 +118,31 @@ def _write_audit_yaml(tmp_path: Path, required_checks: list[str]) -> Path:
return p
def _write_audit_yaml_json(tmp_path: Path, required_checks_json: dict) -> Path:
"""Write a synthetic audit-force-merge.yml with REQUIRED_CHECKS_JSON env."""
block = json.dumps(required_checks_json, indent=2)
text = textwrap.dedent(
f"""\
name: audit-force-merge
on:
schedule:
- cron: '*/30 * * * *'
jobs:
audit:
runs-on: ubuntu-latest
steps:
- name: Run audit
env:
REQUIRED_CHECKS_JSON: |
{block.replace(chr(10), chr(10) + ' ')}
run: bash .gitea/scripts/audit-force-merge.sh
"""
)
p = tmp_path / "audit-force-merge.yml"
p.write_text(text, encoding="utf-8")
return p
def _make_stub_api(responses: dict):
"""Build a fake `api()` callable.
@@ -363,6 +389,107 @@ def test_happy_path_no_drift(drift_module, tmp_path, monkeypatch):
assert findings == [], findings
# --------------------------------------------------------------------------
# REQUIRED_CHECKS_JSON variant drift tests
# --------------------------------------------------------------------------
def test_f3a_env_wider_than_protection_json_variant(drift_module, tmp_path, monkeypatch):
"""F3a: REQUIRED_CHECKS_JSON env has a context NOT in protection."""
ci = _write_ci_yaml(
tmp_path,
jobs={"build": {"runs-on": "ubuntu-latest"}},
sentinel_needs=["build"],
)
audit = _write_audit_yaml_json(
tmp_path,
{"main": ["ci / build (pull_request)", "ci / ghost (pull_request)"]},
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{"status_check_contexts": ["ci / build (pull_request)"]},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F3a" in f and "ghost" in f for f in findings), findings
def test_f3b_protection_wider_than_env_json_variant(drift_module, tmp_path, monkeypatch):
"""F3b: protection has a context NOT in REQUIRED_CHECKS_JSON env."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"test": {"runs-on": "ubuntu-latest"},
},
sentinel_needs=["build", "test"],
)
audit = _write_audit_yaml_json(
tmp_path,
{"main": ["ci / build (pull_request)"]},
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / test (pull_request)",
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert any("F3b" in f and "ci / test (pull_request)" in f for f in findings), findings
def test_happy_path_no_drift_json_variant(drift_module, tmp_path, monkeypatch):
"""Happy path with REQUIRED_CHECKS_JSON: all aligned."""
ci = _write_ci_yaml(
tmp_path,
jobs={
"build": {"runs-on": "ubuntu-latest"},
"test": {"runs-on": "ubuntu-latest"},
},
sentinel_needs=["build", "test"],
)
audit = _write_audit_yaml_json(
tmp_path,
{
"main": [
"ci / build (pull_request)",
"ci / test (pull_request)",
"ci / all-required (pull_request)",
]
},
)
_patch_paths(drift_module, monkeypatch, ci, audit)
stub = _make_stub_api({
("GET", "/repos/owner/repo/branch_protections/main"): (
200,
{
"status_check_contexts": [
"ci / build (pull_request)",
"ci / test (pull_request)",
"ci / all-required (pull_request)",
]
},
),
})
monkeypatch.setattr(drift_module, "api", stub)
findings, _ = drift_module.detect_drift("main")
assert findings == [], findings
# --------------------------------------------------------------------------
# MUST-FIX 1: find_open_issue must raise on transient HTTP errors
# --------------------------------------------------------------------------
@@ -0,0 +1,558 @@
//go:build integration
// +build integration
// scheduler_integration_test.go — REAL Postgres integration tests for the
// workspace-server cron scheduler firing loop. Regression coverage for
// molecule-core issue #2149 (filed under SOP rule internal#765).
//
// Run with:
//
// docker run --rm -d --name pg-integration \
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
// -p 55432:5432 postgres:15-alpine
// sleep 4
// # apply every migration up.sql / legacy .sql in lexicographic order
// for f in $(ls workspace-server/migrations/*.sql | grep -v '\.down\.sql$' | sort); do \
// psql "postgres://postgres:test@localhost:55432/molecule?sslmode=disable" -f "$f"; done
// cd workspace-server
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
// go test -tags=integration ./internal/scheduler/ -run '^TestIntegration_'
//
// CI: .gitea/workflows/handlers-postgres-integration.yml runs these on every
// PR/push that touches workspace-server/internal/scheduler/ (the
// `handlers-postgres` detect-changes profile was extended to include the
// scheduler package + this workflow file).
//
// Why these are NOT the existing sqlmock unit tests (scheduler_test.go)
// --------------------------------------------------------------------
// The strict-sqlmock unit tests pin which SQL statements fire — fast, no DB.
// But sqlmock CANNOT validate:
// - the activity_logs `$3::jsonb` cast (#2026 wedge) — sqlmock never parses
// the payload, so an invalid-UTF-8 jsonb body that wedges a real INSERT
// looks "green" under mock.ExpectExec(`INSERT INTO activity_logs`).
// - the ROW STATE after tick()/fireSchedule run: that last_run_at,
// next_run_at, run_count, last_status actually landed on the row.
// - sweepPhantomBusy's NOT IN (SELECT … activity_logs) subquery semantics
// against real rows — it has no unit test at all (#2149).
//
// A SQL regression here = a fleet-wide silent cron outage (#85 ran 12h before
// detection). These tests boot a real Postgres, insert real rows, run the
// production tick()/sweepPhantomBusy, and SELECT the rows back to assert the
// observable end state — the gap sqlmock structurally cannot cover.
//
// Watch-fail intent: each test is written to FAIL on a regression of the
// behavior under test (e.g. drop the activity_logs INSERT, drop the
// write-back UPDATE, drop the UTF-8 sanitize, or break the phantom-busy
// subquery) and to PASS against the current-correct scheduler.go.
package scheduler
import (
"context"
"database/sql"
"os"
"testing"
"time"
mdb "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
_ "github.com/lib/pq"
)
// ── test doubles ──────────────────────────────────────────────────────────
// recordingProxy is an A2AProxy that records each fire and returns a
// configurable response. Used to assert that tick()/fireSchedule actually
// reached the A2A boundary for the due schedule.
type recordingProxy struct {
status int
body []byte
err error
fires int
lastBody []byte
lastCaller string
lastLogFlag bool
lastWSID string
}
func (p *recordingProxy) ProxyA2ARequest(
_ context.Context, workspaceID string, body []byte, callerID string, logActivity bool,
) (int, []byte, error) {
p.fires++
p.lastWSID = workspaceID
p.lastBody = body
p.lastCaller = callerID
p.lastLogFlag = logActivity
if p.err != nil {
return 0, nil, p.err
}
return p.status, p.body, nil
}
// ── connection + fixture helpers ──────────────────────────────────────────
// integrationDB returns the configured integration-test connection or skips
// the test if INTEGRATION_DB_URL is unset. Hot-swaps the package-level
// mdb.DB so the production scheduler helpers (tick, fireSchedule,
// sweepPhantomBusy) operate on this connection; restores it via t.Cleanup.
//
// NOT SAFE FOR t.Parallel(): the package-global swap races across tests.
func integrationDB(t *testing.T) *sql.DB {
t.Helper()
url := os.Getenv("INTEGRATION_DB_URL")
if url == "" {
t.Skip("INTEGRATION_DB_URL not set; skipping (local devs: see file header)")
}
conn, err := sql.Open("postgres", url)
if err != nil {
t.Fatalf("open: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := conn.PingContext(ctx); err != nil {
t.Fatalf("ping: %v", err)
}
// Clean slate. activity_logs + workspace_schedules cascade off workspaces,
// but we DELETE explicitly (and in FK order) so a partial prior run can't
// leave orphan rows that perturb the next test's assertions.
cctx, ccancel := context.WithTimeout(context.Background(), 10*time.Second)
defer ccancel()
for _, q := range []string{
`DELETE FROM activity_logs`,
`DELETE FROM workspace_schedules`,
`DELETE FROM workspaces`,
} {
if _, err := conn.ExecContext(cctx, q); err != nil {
t.Fatalf("cleanup %q: %v", q, err)
}
}
prev := mdb.DB
mdb.DB = conn
t.Cleanup(func() {
mdb.DB = prev
conn.Close()
})
return conn
}
// insertWorkspace inserts a workspace row and returns its UUID. active is the
// initial active_tasks value; status defaults to 'online' (valid workspace_status enum).
func insertWorkspace(t *testing.T, conn *sql.DB, name string, active int) string {
t.Helper()
var id string
err := conn.QueryRowContext(context.Background(), `
INSERT INTO workspaces (name, status, active_tasks, max_concurrent_tasks)
VALUES ($1, 'online', $2, 1)
RETURNING id
`, name, active).Scan(&id)
if err != nil {
t.Fatalf("insertWorkspace(%s): %v", name, err)
}
return id
}
// insertSchedule inserts an enabled workspace_schedules row whose next_run_at
// is in the past (so tick() picks it up immediately) and returns its UUID.
func insertSchedule(t *testing.T, conn *sql.DB, wsID, name, cronExpr, prompt string) string {
t.Helper()
var id string
err := conn.QueryRowContext(context.Background(), `
INSERT INTO workspace_schedules
(workspace_id, name, cron_expr, timezone, prompt, enabled, next_run_at, source)
VALUES ($1, $2, $3, 'UTC', $4, true, now() - interval '1 minute', 'runtime')
RETURNING id
`, wsID, name, cronExpr, prompt).Scan(&id)
if err != nil {
t.Fatalf("insertSchedule(%s): %v", name, err)
}
return id
}
type scheduleState struct {
lastRunAt sql.NullTime
nextRunAt sql.NullTime
runCount int
lastStatus string
lastError string
}
func readScheduleState(t *testing.T, conn *sql.DB, id string) scheduleState {
t.Helper()
var st scheduleState
var status, errStr sql.NullString
err := conn.QueryRowContext(context.Background(), `
SELECT last_run_at, next_run_at, run_count, last_status, last_error
FROM workspace_schedules WHERE id = $1
`, id).Scan(&st.lastRunAt, &st.nextRunAt, &st.runCount, &status, &errStr)
if err != nil {
t.Fatalf("readScheduleState(%s): %v", id, err)
}
st.lastStatus = status.String
st.lastError = errStr.String
return st
}
// ── TestIntegration_TickFiresAndWritesBack (#2149 core) ───────────────────
//
// Insert one due schedule, run tick() once, and assert the full firing
// loop landed against a REAL Postgres:
// - the A2A proxy was invoked exactly once for the schedule's workspace
// - the post-fire UPDATE wrote last_run_at (was NULL), advanced next_run_at
// into the future, bumped run_count to 1, set last_status='ok'
// - a cron_run activity_logs row was inserted with VALID jsonb request_body
// (the `$3::jsonb` cast #2026 path) carrying the schedule metadata
//
// Regression watch-fail: if a refactor drops the write-back UPDATE, the
// activity_logs INSERT, or breaks the jsonb cast, this test fails where every
// sqlmock unit test stays green.
func TestIntegration_TickFiresAndWritesBack(t *testing.T) {
conn := integrationDB(t)
wsID := insertWorkspace(t, conn, "cron-fire-ws", 0)
schedID := insertSchedule(t, conn, wsID, "hourly-audit", "0 * * * *", "run the hourly audit")
proxy := &recordingProxy{
status: 200,
body: []byte(`{"jsonrpc":"2.0","result":{"kind":"message","parts":[{"kind":"text","text":"done"}]},"id":"1"}`),
}
s := New(proxy, nil)
s.tick(context.Background())
// 1. A2A boundary reached exactly once for the right workspace.
if proxy.fires != 1 {
t.Fatalf("proxy fires = %d, want 1 (tick must fire the one due schedule)", proxy.fires)
}
if proxy.lastWSID != wsID {
t.Errorf("proxy fired for workspace %q, want %q", proxy.lastWSID, wsID)
}
// Empty callerID = canvas-style (bypasses access control); logActivity=true.
if proxy.lastCaller != "" {
t.Errorf("callerID = %q, want empty (canvas-style scheduler fire)", proxy.lastCaller)
}
if !proxy.lastLogFlag {
t.Error("logActivity flag = false, want true")
}
// 2. Row write-back.
st := readScheduleState(t, conn, schedID)
if !st.lastRunAt.Valid {
t.Error("last_run_at is NULL after fire, want set (write-back UPDATE did not land)")
}
if !st.nextRunAt.Valid {
t.Fatal("next_run_at is NULL after fire, want a future timestamp")
}
if !st.nextRunAt.Time.After(time.Now()) {
t.Errorf("next_run_at = %v, want a time in the future (schedule would tight-loop otherwise)", st.nextRunAt.Time)
}
if st.runCount != 1 {
t.Errorf("run_count = %d, want 1", st.runCount)
}
if st.lastStatus != "ok" {
t.Errorf("last_status = %q, want \"ok\"", st.lastStatus)
}
// 3. activity_logs cron_run row with valid jsonb request_body.
var actCount int
var summary, status string
var reqBody []byte
err := conn.QueryRowContext(context.Background(), `
SELECT count(*) OVER (), summary, status, request_body
FROM activity_logs
WHERE workspace_id = $1 AND activity_type = 'cron_run'
LIMIT 1
`, wsID).Scan(&actCount, &summary, &status, &reqBody)
if err == sql.ErrNoRows {
t.Fatal("no cron_run activity_logs row inserted after fire (#152/#2026 path missing)")
}
if err != nil {
t.Fatalf("read activity_logs: %v", err)
}
if actCount != 1 {
t.Errorf("cron_run activity_logs rows = %d, want 1", actCount)
}
if status != "ok" {
t.Errorf("activity_logs.status = %q, want \"ok\"", status)
}
// request_body must be valid jsonb carrying the schedule_id — proves the
// `$3::jsonb` cast accepted the payload (the #2026 wedge surface).
var sid string
if err := conn.QueryRowContext(context.Background(), `
SELECT request_body->>'schedule_id'
FROM activity_logs WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1
`, wsID).Scan(&sid); err != nil {
t.Fatalf("request_body is not queryable jsonb: %v", err)
}
if sid != schedID {
t.Errorf("activity_logs request_body->>'schedule_id' = %q, want %q", sid, schedID)
}
}
// ── TestIntegration_InvalidUTF8PromptSanitizedIntoJsonb (#2026 / #2149) ────
//
// The agent-editable prompt can carry raw invalid-UTF-8 bytes. Postgres jsonb
// columns REJECT invalid UTF-8, which (pre-#2026) wedged the activity_logs
// INSERT and held the transaction open — stalling the whole scheduler.
// fireSchedule now sanitizeUTF8()s every string before the `$3::jsonb` insert.
//
// Postgres TEXT columns (workspace_schedules.prompt) also reject invalid UTF-8
// in a UTF-8 database, so we cannot INSERT the bad bytes through the fixture.
// Instead we insert a valid prompt, then call fireSchedule directly with a
// scheduleRow whose Prompt field contains the invalid bytes — this simulates
// the real regression path (e.g. truncation splitting a multi-byte rune, or
// an agent-edited template arriving via a path that bypasses DB validation).
//
// Assertions:
// - the fire still completed (write-back UPDATE landed)
// - the cron_run activity_logs row was inserted (the jsonb cast accepted
// the SANITIZED payload — the INSERT did not wedge)
// - the stored request_body is queryable jsonb (valid UTF-8 on disk)
//
// Watch-fail: remove the sanitizeUTF8() wrapping around the jsonb payload and
// this test fails on a real Postgres (INSERT errors / row absent), while the
// sqlmock unit test that only checks "an INSERT fired" stays green.
func TestIntegration_InvalidUTF8PromptSanitizedIntoJsonb(t *testing.T) {
conn := integrationDB(t)
wsID := insertWorkspace(t, conn, "utf8-ws", 0)
// Insert with valid UTF-8 — Postgres TEXT rejects 0x80/0xff.
schedID := insertSchedule(t, conn, wsID, "utf8-job", "0 * * * *", "valid prompt")
// Prompt with invalid UTF-8: orphan continuation byte + bare 0xff.
badPrompt := "audit \x80 report \xff end"
row := scheduleRow{
ID: schedID,
WorkspaceID: wsID,
Name: "utf8-job",
CronExpr: "0 * * * *",
Timezone: "UTC",
Prompt: badPrompt,
}
proxy := &recordingProxy{
status: 200,
body: []byte(`{"result":{"kind":"message","parts":[{"kind":"text","text":"ok"}]}}`),
}
s := New(proxy, nil)
s.fireSchedule(context.Background(), row)
if proxy.fires != 1 {
t.Fatalf("proxy fires = %d, want 1", proxy.fires)
}
// Write-back must have landed despite the bad prompt bytes.
st := readScheduleState(t, conn, schedID)
if st.runCount != 1 || st.lastStatus != "ok" {
t.Errorf("post-fire state run_count=%d last_status=%q, want 1/\"ok\" "+
"(invalid-UTF-8 prompt must not block the fire)", st.runCount, st.lastStatus)
}
// The cron_run activity_logs row MUST exist — proving the `$3::jsonb`
// INSERT accepted the sanitized payload (did not wedge on invalid UTF-8).
var n int
if err := conn.QueryRowContext(context.Background(), `
SELECT count(*) FROM activity_logs
WHERE workspace_id = $1 AND activity_type = 'cron_run'
`, wsID).Scan(&n); err != nil {
t.Fatalf("count cron_run rows: %v", err)
}
if n != 1 {
t.Fatalf("cron_run activity_logs rows = %d, want 1 — the jsonb INSERT wedged "+
"on invalid UTF-8 (the #2026 regression)", n)
}
// The stored prompt inside request_body must be queryable + valid UTF-8.
var storedPrompt string
if err := conn.QueryRowContext(context.Background(), `
SELECT request_body->>'prompt'
FROM activity_logs WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1
`, wsID).Scan(&storedPrompt); err != nil {
t.Fatalf("request_body->>'prompt' not queryable jsonb: %v", err)
}
if storedPrompt == "" {
t.Error("stored prompt is empty, want the sanitized prompt text")
}
// Round-trip through Postgres jsonb guarantees valid UTF-8; assert the
// replacement character replaced the bad bytes rather than them surviving.
for i := 0; i < len(storedPrompt); i++ {
if storedPrompt[i] == 0x80 || storedPrompt[i] == 0xff {
t.Fatalf("stored prompt still contains raw invalid byte 0x%x at %d", storedPrompt[i], i)
}
}
}
// ── TestIntegration_TickErrorStatusWriteBack (#2149) ──────────────────────
//
// When the A2A proxy returns a transport error, fireSchedule must still write
// back: last_status='error', last_error populated, next_run_at advanced (so
// the schedule does not get stuck re-firing), run_count bumped. Verifies the
// error path persists to a real row, not just that "an UPDATE fired".
func TestIntegration_TickErrorStatusWriteBack(t *testing.T) {
conn := integrationDB(t)
wsID := insertWorkspace(t, conn, "err-ws", 0)
schedID := insertSchedule(t, conn, wsID, "err-job", "0 * * * *", "do work")
proxy := &recordingProxy{err: context.DeadlineExceeded}
s := New(proxy, nil)
s.tick(context.Background())
st := readScheduleState(t, conn, schedID)
if st.lastStatus != "error" {
t.Errorf("last_status = %q, want \"error\"", st.lastStatus)
}
if st.lastError == "" {
t.Error("last_error is empty, want the proxy error text persisted (#152)")
}
if st.runCount != 1 {
t.Errorf("run_count = %d, want 1 (run still counted on error)", st.runCount)
}
if !st.nextRunAt.Valid || !st.nextRunAt.Time.After(time.Now()) {
t.Errorf("next_run_at not advanced to future on error path (= %v) — schedule would tight-loop", st.nextRunAt)
}
// The error activity_logs row must carry status='error' + error_detail.
var status, errDetail string
if err := conn.QueryRowContext(context.Background(), `
SELECT status, COALESCE(error_detail,'') FROM activity_logs
WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1
`, wsID).Scan(&status, &errDetail); err != nil {
t.Fatalf("read error activity_logs: %v", err)
}
if status != "error" {
t.Errorf("activity_logs.status = %q, want \"error\"", status)
}
if errDetail == "" {
t.Error("activity_logs.error_detail empty on error fire, want the error message (#152)")
}
}
// ── TestIntegration_SweepPhantomBusy (#2149 — no prior test) ──────────────
//
// sweepPhantomBusy resets active_tasks=0 for workspaces stuck busy with NO
// activity_logs row in the last phantomStaleThreshold window, and must LEAVE
// ALONE workspaces that have recent activity. The NOT IN (SELECT DISTINCT
// workspace_id FROM activity_logs WHERE created_at > now() - interval) subquery
// is exactly the kind of set-semantics that sqlmock cannot validate — there is
// no unit test for this method at all (#2149).
//
// Fixture:
// - phantomWS: active_tasks=3, NO recent activity_log → must reset to 0
// - recentWS: active_tasks=2, activity_log 1 min ago → must stay at 2
// - staleWS: active_tasks=1, activity_log 30 min ago → must reset to 0
// - removedWS: active_tasks=4, status='removed', no activity → must stay (status guard)
// - idleWS: active_tasks=0 → untouched (not >0)
//
// Watch-fail: break the subquery (e.g. drop the status!='removed' guard, or
// invert the NOT IN), and the asserted end-state diverges on a real Postgres.
func TestIntegration_SweepPhantomBusy(t *testing.T) {
conn := integrationDB(t)
phantomWS := insertWorkspace(t, conn, "phantom-ws", 3)
recentWS := insertWorkspace(t, conn, "recent-ws", 2)
staleWS := insertWorkspace(t, conn, "stale-ws", 1)
idleWS := insertWorkspace(t, conn, "idle-ws", 0)
// removedWS: busy but status='removed' — the sweep must skip it.
var removedWS string
if err := conn.QueryRowContext(context.Background(), `
INSERT INTO workspaces (name, status, active_tasks, max_concurrent_tasks)
VALUES ('removed-ws', 'removed', 4, 1) RETURNING id
`).Scan(&removedWS); err != nil {
t.Fatalf("insert removedWS: %v", err)
}
// recentWS has a fresh activity_log (1 min ago → inside the 10-min window).
if _, err := conn.ExecContext(context.Background(), `
INSERT INTO activity_logs (workspace_id, activity_type, status, created_at)
VALUES ($1, 'a2a_receive', 'ok', now() - interval '1 minute')
`, recentWS); err != nil {
t.Fatalf("insert recent activity_log: %v", err)
}
// staleWS has only an OLD activity_log (30 min ago → outside the window).
if _, err := conn.ExecContext(context.Background(), `
INSERT INTO activity_logs (workspace_id, activity_type, status, created_at)
VALUES ($1, 'a2a_receive', 'ok', now() - interval '30 minutes')
`, staleWS); err != nil {
t.Fatalf("insert stale activity_log: %v", err)
}
s := New(nil, nil)
s.sweepPhantomBusy(context.Background())
active := func(id string) int {
var n int
if err := conn.QueryRowContext(context.Background(),
`SELECT active_tasks FROM workspaces WHERE id = $1`, id).Scan(&n); err != nil {
t.Fatalf("read active_tasks(%s): %v", id, err)
}
return n
}
if got := active(phantomWS); got != 0 {
t.Errorf("phantomWS active_tasks = %d, want 0 (busy + no recent activity → must be swept)", got)
}
if got := active(staleWS); got != 0 {
t.Errorf("staleWS active_tasks = %d, want 0 (only stale activity → must be swept)", got)
}
if got := active(recentWS); got != 2 {
t.Errorf("recentWS active_tasks = %d, want 2 (recent activity → must NOT be swept)", got)
}
if got := active(removedWS); got != 4 {
t.Errorf("removedWS active_tasks = %d, want 4 (status='removed' → sweep must skip it)", got)
}
if got := active(idleWS); got != 0 {
t.Errorf("idleWS active_tasks = %d, want 0 (was never busy)", got)
}
// The swept rows must also have current_task cleared.
var ct string
if err := conn.QueryRowContext(context.Background(),
`SELECT COALESCE(current_task,'') FROM workspaces WHERE id = $1`, phantomWS).Scan(&ct); err != nil {
t.Fatalf("read current_task: %v", err)
}
if ct != "" {
t.Errorf("phantomWS current_task = %q, want empty after sweep", ct)
}
}
// ── TestIntegration_NativeSchedulerSkipAdvancesNextRunAt (#2149) ──────────
//
// When a workspace's adapter owns scheduling natively, tick() must SKIP the
// fire but still advance next_run_at (so the row doesn't tight-loop on every
// poll) — observability (next_run_at) is preserved while the fire is dropped.
// Asserts the native-skip UPDATE landed on a real row and the proxy was NOT
// invoked. This is the native-skip UPDATE path #2149 calls out — sqlmock can
// only assert an UPDATE fired, not that next_run_at moved forward.
func TestIntegration_NativeSchedulerSkipAdvancesNextRunAt(t *testing.T) {
conn := integrationDB(t)
wsID := insertWorkspace(t, conn, "native-ws", 0)
schedID := insertSchedule(t, conn, wsID, "native-job", "0 * * * *", "native run")
// Capture the pre-tick next_run_at (it is in the past by construction).
before := readScheduleState(t, conn, schedID)
if !before.nextRunAt.Valid || before.nextRunAt.Time.After(time.Now()) {
t.Fatalf("precondition: next_run_at should start in the past, got %v", before.nextRunAt)
}
proxy := &recordingProxy{status: 200, body: []byte(`{}`)}
s := New(proxy, nil)
// Every workspace reports native scheduling → fire must be skipped.
s.SetNativeSchedulerCheck(func(string) bool { return true })
s.tick(context.Background())
if proxy.fires != 0 {
t.Errorf("proxy fires = %d, want 0 (native-scheduler workspace must NOT fire)", proxy.fires)
}
after := readScheduleState(t, conn, schedID)
if !after.nextRunAt.Valid || !after.nextRunAt.Time.After(time.Now()) {
t.Errorf("next_run_at = %v, want advanced into the future (native-skip UPDATE must still run)", after.nextRunAt)
}
// Skip path does NOT bump run_count or write last_run_at (no fire happened).
if after.runCount != 0 {
t.Errorf("run_count = %d, want 0 (skip must not count as a run)", after.runCount)
}
if after.lastRunAt.Valid {
t.Error("last_run_at set on native-skip, want NULL (no fire occurred)")
}
}