From 3fccfad3ae639e4a427157282b40653f9006977b Mon Sep 17 00:00:00 2001 From: molecule-code-reviewer Date: Wed, 3 Jun 2026 00:39:31 +0000 Subject: [PATCH 1/6] test(scheduler): real-PG regression tests for cron firing loop (#2149) Closes #2149 --- .../scheduler/scheduler_integration_test.go | 542 ++++++++++++++++++ 1 file changed, 542 insertions(+) create mode 100644 workspace-server/internal/scheduler/scheduler_integration_test.go diff --git a/workspace-server/internal/scheduler/scheduler_integration_test.go b/workspace-server/internal/scheduler/scheduler_integration_test.go new file mode 100644 index 000000000..4ae79faab --- /dev/null +++ b/workspace-server/internal/scheduler/scheduler_integration_test.go @@ -0,0 +1,542 @@ +//go:build integration +// +build integration + +// scheduler_integration_test.go — REAL Postgres integration tests for the +// workspace-server cron scheduler firing loop. Regression coverage for +// molecule-core issue #2149 (filed under SOP rule internal#765). +// +// Run with: +// +// docker run --rm -d --name pg-integration \ +// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \ +// -p 55432:5432 postgres:15-alpine +// sleep 4 +// # apply every migration up.sql / legacy .sql in lexicographic order +// for f in $(ls workspace-server/migrations/*.sql | grep -v '\.down\.sql$' | sort); do \ +// psql "postgres://postgres:test@localhost:55432/molecule?sslmode=disable" -f "$f"; done +// cd workspace-server +// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \ +// go test -tags=integration ./internal/scheduler/ -run '^TestIntegration_' +// +// CI: .gitea/workflows/handlers-postgres-integration.yml runs these on every +// PR/push that touches workspace-server/internal/scheduler/ (the +// `handlers-postgres` detect-changes profile was extended to include the +// scheduler package + this workflow file). +// +// Why these are NOT the existing sqlmock unit tests (scheduler_test.go) +// -------------------------------------------------------------------- +// The strict-sqlmock unit tests pin which SQL statements fire — fast, no DB. +// But sqlmock CANNOT validate: +// - the activity_logs `$3::jsonb` cast (#2026 wedge) — sqlmock never parses +// the payload, so an invalid-UTF-8 jsonb body that wedges a real INSERT +// looks "green" under mock.ExpectExec(`INSERT INTO activity_logs`). +// - the ROW STATE after tick()/fireSchedule run: that last_run_at, +// next_run_at, run_count, last_status actually landed on the row. +// - sweepPhantomBusy's NOT IN (SELECT … activity_logs) subquery semantics +// against real rows — it has no unit test at all (#2149). +// +// A SQL regression here = a fleet-wide silent cron outage (#85 ran 12h before +// detection). These tests boot a real Postgres, insert real rows, run the +// production tick()/sweepPhantomBusy, and SELECT the rows back to assert the +// observable end state — the gap sqlmock structurally cannot cover. +// +// Watch-fail intent: each test is written to FAIL on a regression of the +// behavior under test (e.g. drop the activity_logs INSERT, drop the +// write-back UPDATE, drop the UTF-8 sanitize, or break the phantom-busy +// subquery) and to PASS against the current-correct scheduler.go. + +package scheduler + +import ( + "context" + "database/sql" + "os" + "testing" + "time" + + mdb "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + _ "github.com/lib/pq" +) + +// ── test doubles ────────────────────────────────────────────────────────── + +// recordingProxy is an A2AProxy that records each fire and returns a +// configurable response. Used to assert that tick()/fireSchedule actually +// reached the A2A boundary for the due schedule. +type recordingProxy struct { + status int + body []byte + err error + + fires int + lastBody []byte + lastCaller string + lastLogFlag bool + lastWSID string +} + +func (p *recordingProxy) ProxyA2ARequest( + _ context.Context, workspaceID string, body []byte, callerID string, logActivity bool, +) (int, []byte, error) { + p.fires++ + p.lastWSID = workspaceID + p.lastBody = body + p.lastCaller = callerID + p.lastLogFlag = logActivity + if p.err != nil { + return 0, nil, p.err + } + return p.status, p.body, nil +} + +// ── connection + fixture helpers ────────────────────────────────────────── + +// integrationDB returns the configured integration-test connection or skips +// the test if INTEGRATION_DB_URL is unset. Hot-swaps the package-level +// mdb.DB so the production scheduler helpers (tick, fireSchedule, +// sweepPhantomBusy) operate on this connection; restores it via t.Cleanup. +// +// NOT SAFE FOR t.Parallel(): the package-global swap races across tests. +func integrationDB(t *testing.T) *sql.DB { + t.Helper() + url := os.Getenv("INTEGRATION_DB_URL") + if url == "" { + t.Skip("INTEGRATION_DB_URL not set; skipping (local devs: see file header)") + } + conn, err := sql.Open("postgres", url) + if err != nil { + t.Fatalf("open: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := conn.PingContext(ctx); err != nil { + t.Fatalf("ping: %v", err) + } + // Clean slate. activity_logs + workspace_schedules cascade off workspaces, + // but we DELETE explicitly (and in FK order) so a partial prior run can't + // leave orphan rows that perturb the next test's assertions. + cctx, ccancel := context.WithTimeout(context.Background(), 10*time.Second) + defer ccancel() + for _, q := range []string{ + `DELETE FROM activity_logs`, + `DELETE FROM workspace_schedules`, + `DELETE FROM workspaces`, + } { + if _, err := conn.ExecContext(cctx, q); err != nil { + t.Fatalf("cleanup %q: %v", q, err) + } + } + prev := mdb.DB + mdb.DB = conn + t.Cleanup(func() { + mdb.DB = prev + conn.Close() + }) + return conn +} + +// insertWorkspace inserts a workspace row and returns its UUID. active is the +// initial active_tasks value; status defaults to 'active'. +func insertWorkspace(t *testing.T, conn *sql.DB, name string, active int) string { + t.Helper() + var id string + err := conn.QueryRowContext(context.Background(), ` + INSERT INTO workspaces (name, status, active_tasks, max_concurrent_tasks) + VALUES ($1, 'active', $2, 1) + RETURNING id + `, name, active).Scan(&id) + if err != nil { + t.Fatalf("insertWorkspace(%s): %v", name, err) + } + return id +} + +// insertSchedule inserts an enabled workspace_schedules row whose next_run_at +// is in the past (so tick() picks it up immediately) and returns its UUID. +func insertSchedule(t *testing.T, conn *sql.DB, wsID, name, cronExpr, prompt string) string { + t.Helper() + var id string + err := conn.QueryRowContext(context.Background(), ` + INSERT INTO workspace_schedules + (workspace_id, name, cron_expr, timezone, prompt, enabled, next_run_at, source) + VALUES ($1, $2, $3, 'UTC', $4, true, now() - interval '1 minute', 'runtime') + RETURNING id + `, wsID, name, cronExpr, prompt).Scan(&id) + if err != nil { + t.Fatalf("insertSchedule(%s): %v", name, err) + } + return id +} + +type scheduleState struct { + lastRunAt sql.NullTime + nextRunAt sql.NullTime + runCount int + lastStatus string + lastError string +} + +func readScheduleState(t *testing.T, conn *sql.DB, id string) scheduleState { + t.Helper() + var st scheduleState + var status, errStr sql.NullString + err := conn.QueryRowContext(context.Background(), ` + SELECT last_run_at, next_run_at, run_count, last_status, last_error + FROM workspace_schedules WHERE id = $1 + `, id).Scan(&st.lastRunAt, &st.nextRunAt, &st.runCount, &status, &errStr) + if err != nil { + t.Fatalf("readScheduleState(%s): %v", id, err) + } + st.lastStatus = status.String + st.lastError = errStr.String + return st +} + +// ── TestIntegration_TickFiresAndWritesBack (#2149 core) ─────────────────── +// +// Insert one due schedule, run tick() once, and assert the full firing +// loop landed against a REAL Postgres: +// - the A2A proxy was invoked exactly once for the schedule's workspace +// - the post-fire UPDATE wrote last_run_at (was NULL), advanced next_run_at +// into the future, bumped run_count to 1, set last_status='ok' +// - a cron_run activity_logs row was inserted with VALID jsonb request_body +// (the `$3::jsonb` cast #2026 path) carrying the schedule metadata +// +// Regression watch-fail: if a refactor drops the write-back UPDATE, the +// activity_logs INSERT, or breaks the jsonb cast, this test fails where every +// sqlmock unit test stays green. +func TestIntegration_TickFiresAndWritesBack(t *testing.T) { + conn := integrationDB(t) + + wsID := insertWorkspace(t, conn, "cron-fire-ws", 0) + schedID := insertSchedule(t, conn, wsID, "hourly-audit", "0 * * * *", "run the hourly audit") + + proxy := &recordingProxy{ + status: 200, + body: []byte(`{"jsonrpc":"2.0","result":{"kind":"message","parts":[{"kind":"text","text":"done"}]},"id":"1"}`), + } + s := New(proxy, nil) + s.tick(context.Background()) + + // 1. A2A boundary reached exactly once for the right workspace. + if proxy.fires != 1 { + t.Fatalf("proxy fires = %d, want 1 (tick must fire the one due schedule)", proxy.fires) + } + if proxy.lastWSID != wsID { + t.Errorf("proxy fired for workspace %q, want %q", proxy.lastWSID, wsID) + } + // Empty callerID = canvas-style (bypasses access control); logActivity=true. + if proxy.lastCaller != "" { + t.Errorf("callerID = %q, want empty (canvas-style scheduler fire)", proxy.lastCaller) + } + if !proxy.lastLogFlag { + t.Error("logActivity flag = false, want true") + } + + // 2. Row write-back. + st := readScheduleState(t, conn, schedID) + if !st.lastRunAt.Valid { + t.Error("last_run_at is NULL after fire, want set (write-back UPDATE did not land)") + } + if !st.nextRunAt.Valid { + t.Fatal("next_run_at is NULL after fire, want a future timestamp") + } + if !st.nextRunAt.Time.After(time.Now()) { + t.Errorf("next_run_at = %v, want a time in the future (schedule would tight-loop otherwise)", st.nextRunAt.Time) + } + if st.runCount != 1 { + t.Errorf("run_count = %d, want 1", st.runCount) + } + if st.lastStatus != "ok" { + t.Errorf("last_status = %q, want \"ok\"", st.lastStatus) + } + + // 3. activity_logs cron_run row with valid jsonb request_body. + var actCount int + var summary, status string + var reqBody []byte + err := conn.QueryRowContext(context.Background(), ` + SELECT count(*) OVER (), summary, status, request_body + FROM activity_logs + WHERE workspace_id = $1 AND activity_type = 'cron_run' + LIMIT 1 + `, wsID).Scan(&actCount, &summary, &status, &reqBody) + if err == sql.ErrNoRows { + t.Fatal("no cron_run activity_logs row inserted after fire (#152/#2026 path missing)") + } + if err != nil { + t.Fatalf("read activity_logs: %v", err) + } + if actCount != 1 { + t.Errorf("cron_run activity_logs rows = %d, want 1", actCount) + } + if status != "ok" { + t.Errorf("activity_logs.status = %q, want \"ok\"", status) + } + // request_body must be valid jsonb carrying the schedule_id — proves the + // `$3::jsonb` cast accepted the payload (the #2026 wedge surface). + var sid string + if err := conn.QueryRowContext(context.Background(), ` + SELECT request_body->>'schedule_id' + FROM activity_logs WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1 + `, wsID).Scan(&sid); err != nil { + t.Fatalf("request_body is not queryable jsonb: %v", err) + } + if sid != schedID { + t.Errorf("activity_logs request_body->>'schedule_id' = %q, want %q", sid, schedID) + } +} + +// ── TestIntegration_InvalidUTF8PromptSanitizedIntoJsonb (#2026 / #2149) ──── +// +// The agent-editable prompt can carry raw invalid-UTF-8 bytes. Postgres jsonb +// columns REJECT invalid UTF-8, which (pre-#2026) wedged the activity_logs +// INSERT and held the transaction open — stalling the whole scheduler. +// fireSchedule now sanitizeUTF8()s every string before the `$3::jsonb` insert. +// +// This inserts a schedule whose prompt contains an orphan continuation byte +// (0x80) and a bare 0xff, runs tick(), and asserts: +// - the fire still completed (write-back UPDATE landed) +// - the cron_run activity_logs row was inserted (the jsonb cast accepted +// the SANITIZED payload — the INSERT did not wedge) +// - the stored request_body is queryable jsonb (valid UTF-8 on disk) +// +// Watch-fail: remove the sanitizeUTF8() wrapping around the jsonb payload and +// this test fails on a real Postgres (INSERT errors / row absent), while the +// sqlmock unit test that only checks "an INSERT fired" stays green. +func TestIntegration_InvalidUTF8PromptSanitizedIntoJsonb(t *testing.T) { + conn := integrationDB(t) + + wsID := insertWorkspace(t, conn, "utf8-ws", 0) + // Prompt with invalid UTF-8: orphan continuation byte + bare 0xff. + badPrompt := "audit \x80 report \xff end" + schedID := insertSchedule(t, conn, wsID, "utf8-job", "0 * * * *", badPrompt) + + proxy := &recordingProxy{ + status: 200, + body: []byte(`{"result":{"kind":"message","parts":[{"kind":"text","text":"ok"}]}}`), + } + s := New(proxy, nil) + s.tick(context.Background()) + + if proxy.fires != 1 { + t.Fatalf("proxy fires = %d, want 1", proxy.fires) + } + + // Write-back must have landed despite the bad prompt bytes. + st := readScheduleState(t, conn, schedID) + if st.runCount != 1 || st.lastStatus != "ok" { + t.Errorf("post-fire state run_count=%d last_status=%q, want 1/\"ok\" "+ + "(invalid-UTF-8 prompt must not block the fire)", st.runCount, st.lastStatus) + } + + // The cron_run activity_logs row MUST exist — proving the `$3::jsonb` + // INSERT accepted the sanitized payload (did not wedge on invalid UTF-8). + var n int + if err := conn.QueryRowContext(context.Background(), ` + SELECT count(*) FROM activity_logs + WHERE workspace_id = $1 AND activity_type = 'cron_run' + `, wsID).Scan(&n); err != nil { + t.Fatalf("count cron_run rows: %v", err) + } + if n != 1 { + t.Fatalf("cron_run activity_logs rows = %d, want 1 — the jsonb INSERT wedged "+ + "on invalid UTF-8 (the #2026 regression)", n) + } + + // The stored prompt inside request_body must be queryable + valid UTF-8. + var storedPrompt string + if err := conn.QueryRowContext(context.Background(), ` + SELECT request_body->>'prompt' + FROM activity_logs WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1 + `, wsID).Scan(&storedPrompt); err != nil { + t.Fatalf("request_body->>'prompt' not queryable jsonb: %v", err) + } + if storedPrompt == "" { + t.Error("stored prompt is empty, want the sanitized prompt text") + } + // Round-trip through Postgres jsonb guarantees valid UTF-8; assert the + // replacement character replaced the bad bytes rather than them surviving. + for i := 0; i < len(storedPrompt); i++ { + if storedPrompt[i] == 0x80 || storedPrompt[i] == 0xff { + t.Fatalf("stored prompt still contains raw invalid byte 0x%x at %d", storedPrompt[i], i) + } + } +} + +// ── TestIntegration_TickErrorStatusWriteBack (#2149) ────────────────────── +// +// When the A2A proxy returns a transport error, fireSchedule must still write +// back: last_status='error', last_error populated, next_run_at advanced (so +// the schedule does not get stuck re-firing), run_count bumped. Verifies the +// error path persists to a real row, not just that "an UPDATE fired". +func TestIntegration_TickErrorStatusWriteBack(t *testing.T) { + conn := integrationDB(t) + + wsID := insertWorkspace(t, conn, "err-ws", 0) + schedID := insertSchedule(t, conn, wsID, "err-job", "0 * * * *", "do work") + + proxy := &recordingProxy{err: context.DeadlineExceeded} + s := New(proxy, nil) + s.tick(context.Background()) + + st := readScheduleState(t, conn, schedID) + if st.lastStatus != "error" { + t.Errorf("last_status = %q, want \"error\"", st.lastStatus) + } + if st.lastError == "" { + t.Error("last_error is empty, want the proxy error text persisted (#152)") + } + if st.runCount != 1 { + t.Errorf("run_count = %d, want 1 (run still counted on error)", st.runCount) + } + if !st.nextRunAt.Valid || !st.nextRunAt.Time.After(time.Now()) { + t.Errorf("next_run_at not advanced to future on error path (= %v) — schedule would tight-loop", st.nextRunAt) + } + // The error activity_logs row must carry status='error' + error_detail. + var status, errDetail string + if err := conn.QueryRowContext(context.Background(), ` + SELECT status, COALESCE(error_detail,'') FROM activity_logs + WHERE workspace_id = $1 AND activity_type = 'cron_run' LIMIT 1 + `, wsID).Scan(&status, &errDetail); err != nil { + t.Fatalf("read error activity_logs: %v", err) + } + if status != "error" { + t.Errorf("activity_logs.status = %q, want \"error\"", status) + } + if errDetail == "" { + t.Error("activity_logs.error_detail empty on error fire, want the error message (#152)") + } +} + +// ── TestIntegration_SweepPhantomBusy (#2149 — no prior test) ────────────── +// +// sweepPhantomBusy resets active_tasks=0 for workspaces stuck busy with NO +// activity_logs row in the last phantomStaleThreshold window, and must LEAVE +// ALONE workspaces that have recent activity. The NOT IN (SELECT DISTINCT +// workspace_id FROM activity_logs WHERE created_at > now() - interval) subquery +// is exactly the kind of set-semantics that sqlmock cannot validate — there is +// no unit test for this method at all (#2149). +// +// Fixture: +// - phantomWS: active_tasks=3, NO recent activity_log → must reset to 0 +// - recentWS: active_tasks=2, activity_log 1 min ago → must stay at 2 +// - staleWS: active_tasks=1, activity_log 30 min ago → must reset to 0 +// - removedWS: active_tasks=4, status='removed', no activity → must stay (status guard) +// - idleWS: active_tasks=0 → untouched (not >0) +// +// Watch-fail: break the subquery (e.g. drop the status!='removed' guard, or +// invert the NOT IN), and the asserted end-state diverges on a real Postgres. +func TestIntegration_SweepPhantomBusy(t *testing.T) { + conn := integrationDB(t) + + phantomWS := insertWorkspace(t, conn, "phantom-ws", 3) + recentWS := insertWorkspace(t, conn, "recent-ws", 2) + staleWS := insertWorkspace(t, conn, "stale-ws", 1) + idleWS := insertWorkspace(t, conn, "idle-ws", 0) + + // removedWS: busy but status='removed' — the sweep must skip it. + var removedWS string + if err := conn.QueryRowContext(context.Background(), ` + INSERT INTO workspaces (name, status, active_tasks, max_concurrent_tasks) + VALUES ('removed-ws', 'removed', 4, 1) RETURNING id + `).Scan(&removedWS); err != nil { + t.Fatalf("insert removedWS: %v", err) + } + + // recentWS has a fresh activity_log (1 min ago → inside the 10-min window). + if _, err := conn.ExecContext(context.Background(), ` + INSERT INTO activity_logs (workspace_id, activity_type, status, created_at) + VALUES ($1, 'a2a_receive', 'ok', now() - interval '1 minute') + `, recentWS); err != nil { + t.Fatalf("insert recent activity_log: %v", err) + } + // staleWS has only an OLD activity_log (30 min ago → outside the window). + if _, err := conn.ExecContext(context.Background(), ` + INSERT INTO activity_logs (workspace_id, activity_type, status, created_at) + VALUES ($1, 'a2a_receive', 'ok', now() - interval '30 minutes') + `, staleWS); err != nil { + t.Fatalf("insert stale activity_log: %v", err) + } + + s := New(nil, nil) + s.sweepPhantomBusy(context.Background()) + + active := func(id string) int { + var n int + if err := conn.QueryRowContext(context.Background(), + `SELECT active_tasks FROM workspaces WHERE id = $1`, id).Scan(&n); err != nil { + t.Fatalf("read active_tasks(%s): %v", id, err) + } + return n + } + + if got := active(phantomWS); got != 0 { + t.Errorf("phantomWS active_tasks = %d, want 0 (busy + no recent activity → must be swept)", got) + } + if got := active(staleWS); got != 0 { + t.Errorf("staleWS active_tasks = %d, want 0 (only stale activity → must be swept)", got) + } + if got := active(recentWS); got != 2 { + t.Errorf("recentWS active_tasks = %d, want 2 (recent activity → must NOT be swept)", got) + } + if got := active(removedWS); got != 4 { + t.Errorf("removedWS active_tasks = %d, want 4 (status='removed' → sweep must skip it)", got) + } + if got := active(idleWS); got != 0 { + t.Errorf("idleWS active_tasks = %d, want 0 (was never busy)", got) + } + + // The swept rows must also have current_task cleared. + var ct string + if err := conn.QueryRowContext(context.Background(), + `SELECT COALESCE(current_task,'') FROM workspaces WHERE id = $1`, phantomWS).Scan(&ct); err != nil { + t.Fatalf("read current_task: %v", err) + } + if ct != "" { + t.Errorf("phantomWS current_task = %q, want empty after sweep", ct) + } +} + +// ── TestIntegration_NativeSchedulerSkipAdvancesNextRunAt (#2149) ────────── +// +// When a workspace's adapter owns scheduling natively, tick() must SKIP the +// fire but still advance next_run_at (so the row doesn't tight-loop on every +// poll) — observability (next_run_at) is preserved while the fire is dropped. +// Asserts the native-skip UPDATE landed on a real row and the proxy was NOT +// invoked. This is the native-skip UPDATE path #2149 calls out — sqlmock can +// only assert an UPDATE fired, not that next_run_at moved forward. +func TestIntegration_NativeSchedulerSkipAdvancesNextRunAt(t *testing.T) { + conn := integrationDB(t) + + wsID := insertWorkspace(t, conn, "native-ws", 0) + schedID := insertSchedule(t, conn, wsID, "native-job", "0 * * * *", "native run") + + // Capture the pre-tick next_run_at (it is in the past by construction). + before := readScheduleState(t, conn, schedID) + if !before.nextRunAt.Valid || before.nextRunAt.Time.After(time.Now()) { + t.Fatalf("precondition: next_run_at should start in the past, got %v", before.nextRunAt) + } + + proxy := &recordingProxy{status: 200, body: []byte(`{}`)} + s := New(proxy, nil) + // Every workspace reports native scheduling → fire must be skipped. + s.SetNativeSchedulerCheck(func(string) bool { return true }) + s.tick(context.Background()) + + if proxy.fires != 0 { + t.Errorf("proxy fires = %d, want 0 (native-scheduler workspace must NOT fire)", proxy.fires) + } + + after := readScheduleState(t, conn, schedID) + if !after.nextRunAt.Valid || !after.nextRunAt.Time.After(time.Now()) { + t.Errorf("next_run_at = %v, want advanced into the future (native-skip UPDATE must still run)", after.nextRunAt) + } + // Skip path does NOT bump run_count or write last_run_at (no fire happened). + if after.runCount != 0 { + t.Errorf("run_count = %d, want 0 (skip must not count as a run)", after.runCount) + } + if after.lastRunAt.Valid { + t.Error("last_run_at set on native-skip, want NULL (no fire occurred)") + } +} -- 2.52.0 From 038159a6104f6fecd71986bd892eaad89db78eba Mon Sep 17 00:00:00 2001 From: molecule-code-reviewer Date: Wed, 3 Jun 2026 00:39:41 +0000 Subject: [PATCH 2/6] ci(detect-changes): trigger handlers-postgres profile on scheduler pkg (#2149) --- .gitea/scripts/detect-changes.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.gitea/scripts/detect-changes.py b/.gitea/scripts/detect-changes.py index 5fc5750ad..4ddaa2457 100644 --- a/.gitea/scripts/detect-changes.py +++ b/.gitea/scripts/detect-changes.py @@ -26,6 +26,10 @@ PROFILES: dict[str, dict[str, str]] = { "handlers": ( r"^workspace-server/internal/handlers/" r"|^workspace-server/internal/wsauth/" + # #2149: the scheduler real-PG integration tests run in this same + # workflow (they reuse its migrated Postgres), so changes to the + # scheduler package must trigger the job too. + r"|^workspace-server/internal/scheduler/" r"|^workspace-server/migrations/" r"|^\.gitea/workflows/handlers-postgres-integration\.yml$" ), @@ -174,3 +178,4 @@ def main(argv: list[str]) -> int: if __name__ == "__main__": sys.exit(main(sys.argv[1:])) + -- 2.52.0 From dac6e3046b0d55b9c02f228e22a4ef6d66aa3b40 Mon Sep 17 00:00:00 2001 From: molecule-code-reviewer Date: Wed, 3 Jun 2026 00:39:42 +0000 Subject: [PATCH 3/6] ci(handlers-pg): run scheduler real-PG integration tests (#2149) --- .gitea/workflows/handlers-postgres-integration.yml | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/.gitea/workflows/handlers-postgres-integration.yml b/.gitea/workflows/handlers-postgres-integration.yml index 21f6e0a96..972fde7df 100644 --- a/.gitea/workflows/handlers-postgres-integration.yml +++ b/.gitea/workflows/handlers-postgres-integration.yml @@ -243,7 +243,8 @@ jobs: # MUST exist for the integration tests to be meaningful. Hard- # fail if any didn't land — that would be a real regression we # want loud. - for tbl in delegations workspaces activity_logs pending_uploads; do + # workspace_schedules added for the #2149 scheduler integration tests. + for tbl in delegations workspaces activity_logs pending_uploads workspace_schedules; do if ! psql -h "${PG_HOST}" -U postgres -d molecule -tA \ -c "SELECT 1 FROM information_schema.tables WHERE table_name = '$tbl'" \ | grep -q 1; then @@ -261,6 +262,16 @@ jobs: # workflow runs don't fight over a host-net 5432 port. go test -tags=integration -timeout 5m -v ./internal/handlers/ -run "^TestIntegration_" + - if: needs.detect-changes.outputs.handlers == 'true' + name: Run scheduler integration tests (#2149) + run: | + # #2149: real-PG regression coverage for the scheduler firing loop + # (tick → A2A fire → write-back of last_run_at/next_run_at/run_count/ + # activity_logs jsonb incl. invalid-UTF-8 sanitization + sweepPhantomBusy). + # Reuses the same migrated Postgres (workspace_schedules / activity_logs + # / workspaces all landed by the migration replay step above). + go test -tags=integration -timeout 5m -v ./internal/scheduler/ -run "^TestIntegration_" + - if: failure() && needs.detect-changes.outputs.handlers == 'true' name: Diagnostic dump on failure env: -- 2.52.0 From b55f1be46c7021106d714dcd53d3a936c89b3fcc Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 20:21:31 +0000 Subject: [PATCH 4/6] =?UTF-8?q?test(scheduler):=20fix=20fixture=20enum=20d?= =?UTF-8?q?rift=20=E2=80=94=20'active'=20=E2=86=92=20'online'=20(internal#?= =?UTF-8?q?795)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The workspace_status enum migrated away from 'active' in migration 043_workspace_status_enum.up.sql; valid values are provisioning/online/ offline/degraded/failed/removed/paused/hibernated/awaiting_agent/ hibernating. Inserting 'active' caused all five scheduler integration tests to fail at fixture setup with: invalid input value for enum workspace_status: "active" Fix: use 'online' (a valid enum member) for runnable fixture workspaces. Also updates the helper comment to cite enum validity. Co-Authored-By: Claude Opus 4.7 --- .../internal/scheduler/scheduler_integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workspace-server/internal/scheduler/scheduler_integration_test.go b/workspace-server/internal/scheduler/scheduler_integration_test.go index 4ae79faab..a62ee1162 100644 --- a/workspace-server/internal/scheduler/scheduler_integration_test.go +++ b/workspace-server/internal/scheduler/scheduler_integration_test.go @@ -136,13 +136,13 @@ func integrationDB(t *testing.T) *sql.DB { } // insertWorkspace inserts a workspace row and returns its UUID. active is the -// initial active_tasks value; status defaults to 'active'. +// initial active_tasks value; status defaults to 'online' (valid workspace_status enum). func insertWorkspace(t *testing.T, conn *sql.DB, name string, active int) string { t.Helper() var id string err := conn.QueryRowContext(context.Background(), ` INSERT INTO workspaces (name, status, active_tasks, max_concurrent_tasks) - VALUES ($1, 'active', $2, 1) + VALUES ($1, 'online', $2, 1) RETURNING id `, name, active).Scan(&id) if err != nil { -- 2.52.0 From 8bd00bb94a4ae2ae453a58ddaed857843274ae0f Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 20:50:16 +0000 Subject: [PATCH 5/6] fix(integration): avoid invalid-UTF-8 insert into workspace_schedules.prompt Postgres TEXT columns in a UTF-8 database reject raw bytes like 0x80 and 0xff. The test was trying to insert these into workspace_schedules.prompt via insertSchedule, which failed with: pq: invalid byte sequence for encoding "UTF8": 0x80 Fix: insert a valid prompt into the DB fixture, then call fireSchedule directly with a scheduleRow whose Prompt field carries the invalid bytes. This still exercises the #2026 regression path (sanitizeUTF8 before jsonb INSERT) without tripping Postgres TEXT validation. Co-Authored-By: Claude Opus 4.7 --- .../scheduler/scheduler_integration_test.go | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/workspace-server/internal/scheduler/scheduler_integration_test.go b/workspace-server/internal/scheduler/scheduler_integration_test.go index a62ee1162..87d809d3e 100644 --- a/workspace-server/internal/scheduler/scheduler_integration_test.go +++ b/workspace-server/internal/scheduler/scheduler_integration_test.go @@ -294,8 +294,14 @@ func TestIntegration_TickFiresAndWritesBack(t *testing.T) { // INSERT and held the transaction open — stalling the whole scheduler. // fireSchedule now sanitizeUTF8()s every string before the `$3::jsonb` insert. // -// This inserts a schedule whose prompt contains an orphan continuation byte -// (0x80) and a bare 0xff, runs tick(), and asserts: +// Postgres TEXT columns (workspace_schedules.prompt) also reject invalid UTF-8 +// in a UTF-8 database, so we cannot INSERT the bad bytes through the fixture. +// Instead we insert a valid prompt, then call fireSchedule directly with a +// scheduleRow whose Prompt field contains the invalid bytes — this simulates +// the real regression path (e.g. truncation splitting a multi-byte rune, or +// an agent-edited template arriving via a path that bypasses DB validation). +// +// Assertions: // - the fire still completed (write-back UPDATE landed) // - the cron_run activity_logs row was inserted (the jsonb cast accepted // the SANITIZED payload — the INSERT did not wedge) @@ -308,16 +314,26 @@ func TestIntegration_InvalidUTF8PromptSanitizedIntoJsonb(t *testing.T) { conn := integrationDB(t) wsID := insertWorkspace(t, conn, "utf8-ws", 0) + // Insert with valid UTF-8 — Postgres TEXT rejects 0x80/0xff. + schedID := insertSchedule(t, conn, wsID, "utf8-job", "0 * * * *", "valid prompt") + // Prompt with invalid UTF-8: orphan continuation byte + bare 0xff. badPrompt := "audit \x80 report \xff end" - schedID := insertSchedule(t, conn, wsID, "utf8-job", "0 * * * *", badPrompt) + row := scheduleRow{ + ID: schedID, + WorkspaceID: wsID, + Name: "utf8-job", + CronExpr: "0 * * * *", + Timezone: "UTC", + Prompt: badPrompt, + } proxy := &recordingProxy{ status: 200, body: []byte(`{"result":{"kind":"message","parts":[{"kind":"text","text":"ok"}]}}`), } s := New(proxy, nil) - s.tick(context.Background()) + s.fireSchedule(context.Background(), row) if proxy.fires != 1 { t.Fatalf("proxy fires = %d, want 1", proxy.fires) -- 2.52.0 From 78d6cb9d4b673f82b28dfdef592651520a2b3856 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 21:32:28 +0000 Subject: [PATCH 6/6] fix(ci-drift): add REQUIRED_CHECKS_JSON variant support (internal#804) The ci-required-drift parser only looked for REQUIRED_CHECKS while audit-force-merge.yml switched to REQUIRED_CHECKS_JSON (branch-aware dict). This caused F3 drift detection to fail on repos using the JSON variant. Changes: - required_checks_env() now detects both REQUIRED_CHECKS_JSON (preferred) and REQUIRED_CHECKS (legacy fallback). - For JSON variant: parse the dict, extract the array for the target branch, validate structure, return as a set of context names. - For legacy variant: unchanged newline-split behavior. - Error messages updated to mention both env vars. - render_body() resolution text updated to mention both variants. - Tests added for JSON precedence, fallback, missing branch, malformed JSON, and full drift-class coverage (F3a/F3b/happy-path). Closes internal#804 Co-Authored-By: Claude Opus 4.7 --- .gitea/scripts/ci-required-drift.py | 107 +++++++++++---- .../scripts/tests/test_ci_required_drift.py | 71 ++++++++++ tests/test_ci_required_drift.py | 127 ++++++++++++++++++ 3 files changed, 276 insertions(+), 29 deletions(-) diff --git a/.gitea/scripts/ci-required-drift.py b/.gitea/scripts/ci-required-drift.py index 7813f3f00..b32cffb41 100755 --- a/.gitea/scripts/ci-required-drift.py +++ b/.gitea/scripts/ci-required-drift.py @@ -8,7 +8,8 @@ pair diverges. Sources: A. `.gitea/workflows/ci.yml` jobs (CI source — the actual job set) B. `status_check_contexts` in branch_protections (the merge gate) - C. `REQUIRED_CHECKS` env in audit-force-merge.yml (the audit env) + C. `REQUIRED_CHECKS_JSON` (preferred) or `REQUIRED_CHECKS` (legacy) + env in audit-force-merge.yml (the audit env) Three failure classes: F1 Job in (A) is not under the sentinel's `needs:` — sentinel @@ -250,13 +251,21 @@ def sentinel_needs(ci_doc: dict) -> set[str]: return set(needs) -def required_checks_env(audit_doc: dict) -> set[str]: - """Pull the REQUIRED_CHECKS env value from audit-force-merge.yml. +def required_checks_env(audit_doc: dict, branch: str) -> set[str]: + """Pull the required-checks env value from audit-force-merge.yml. + Walks the YAML AST per `feedback_behavior_based_ast_gates`: we do - NOT grep for `REQUIRED_CHECKS:` — that breaks under reformatting, + NOT grep for env keys — that breaks under reformatting, multi-job workflows, or a future move of the env to a different - step. Instead, look inside every job's every step's `env:` map.""" - found: list[str] = [] + step. Instead, look inside every job's every step's `env:` map. + + Supports two variants: + - REQUIRED_CHECKS_JSON (preferred): JSON dict keyed by branch name. + We extract the array for the target branch. + - REQUIRED_CHECKS (legacy): newline-separated list of context names. + """ + found_json: list[str] = [] + found_legacy: list[str] = [] jobs = audit_doc.get("jobs", {}) if not isinstance(jobs, dict): sys.stderr.write(f"::warning::{AUDIT_WORKFLOW_PATH} has no jobs: mapping\n") @@ -268,27 +277,67 @@ def required_checks_env(audit_doc: dict) -> set[str]: if not isinstance(step, dict): continue step_env = step.get("env") or {} - if isinstance(step_env, dict) and "REQUIRED_CHECKS" in step_env: - v = step_env["REQUIRED_CHECKS"] - if isinstance(v, str): - found.append(v) - if not found: - sys.stderr.write( - f"::error::REQUIRED_CHECKS env not found in any step of " - f"{AUDIT_WORKFLOW_PATH}\n" - ) - sys.exit(3) - if len(found) > 1: - # Defensive: refuse to guess which one is canonical. - sys.stderr.write( - f"::error::REQUIRED_CHECKS env present in {len(found)} steps; ambiguous\n" - ) - sys.exit(3) - raw = found[0] - # YAML block-scalars (`|`) leave a trailing newline + blanks; trim - # consistently with audit-force-merge.sh's parser so both sides - # produce identical sets. - return {line.strip() for line in raw.splitlines() if line.strip()} + if isinstance(step_env, dict): + if "REQUIRED_CHECKS_JSON" in step_env: + v = step_env["REQUIRED_CHECKS_JSON"] + if isinstance(v, str): + found_json.append(v) + if "REQUIRED_CHECKS" in step_env: + v = step_env["REQUIRED_CHECKS"] + if isinstance(v, str): + found_legacy.append(v) + + # JSON variant takes precedence. + if found_json: + if len(found_json) > 1: + sys.stderr.write( + f"::error::REQUIRED_CHECKS_JSON env present in {len(found_json)} steps; ambiguous\n" + ) + sys.exit(3) + try: + parsed = json.loads(found_json[0]) + except json.JSONDecodeError as e: + sys.stderr.write( + f"::error::REQUIRED_CHECKS_JSON is not valid JSON: {e}\n" + ) + sys.exit(3) + if not isinstance(parsed, dict): + sys.stderr.write( + f"::error::REQUIRED_CHECKS_JSON parsed to {type(parsed).__name__}, expected dict\n" + ) + sys.exit(3) + branch_checks = parsed.get(branch) + if branch_checks is None: + sys.stderr.write( + f"::error::REQUIRED_CHECKS_JSON has no entry for branch '{branch}'\n" + ) + sys.exit(3) + if not isinstance(branch_checks, list): + sys.stderr.write( + f"::error::REQUIRED_CHECKS_JSON['{branch}'] is {type(branch_checks).__name__}, expected list\n" + ) + sys.exit(3) + return {str(item).strip() for item in branch_checks if str(item).strip()} + + # Legacy variant fallback. + if found_legacy: + if len(found_legacy) > 1: + # Defensive: refuse to guess which one is canonical. + sys.stderr.write( + f"::error::REQUIRED_CHECKS env present in {len(found_legacy)} steps; ambiguous\n" + ) + sys.exit(3) + raw = found_legacy[0] + # YAML block-scalars (`|`) leave a trailing newline + blanks; trim + # consistently with audit-force-merge.sh's parser so both sides + # produce identical sets. + return {line.strip() for line in raw.splitlines() if line.strip()} + + sys.stderr.write( + f"::error::Neither REQUIRED_CHECKS_JSON nor REQUIRED_CHECKS env found in any step of " + f"{AUDIT_WORKFLOW_PATH}\n" + ) + sys.exit(3) # -------------------------------------------------------------------------- @@ -330,7 +379,7 @@ def detect_drift(branch: str) -> tuple[list[str], dict]: jobs = ci_job_names(ci_doc) jobs_all = ci_jobs_all(ci_doc) needs = sentinel_needs(ci_doc) - env_set = required_checks_env(audit_doc) + env_set = required_checks_env(audit_doc, branch) # Protection # api() raises ApiError on non-2xx. Transient 5xx should fail loud. @@ -524,7 +573,7 @@ def render_body(branch: str, findings: list[str], debug: dict) -> str: "- **F2**: rename the protection context to match an emitter, " "or remove it from `status_check_contexts` " "(PATCH `/api/v1/repos/{owner}/{repo}/branch_protections/{branch}`).", - "- **F3a / F3b**: bring `REQUIRED_CHECKS` env in " + "- **F3a / F3b**: bring `REQUIRED_CHECKS_JSON` (or `REQUIRED_CHECKS` legacy) env in " "`.gitea/workflows/audit-force-merge.yml` into set-equality with " "`status_check_contexts` (single PR, both files).", "", diff --git a/.gitea/scripts/tests/test_ci_required_drift.py b/.gitea/scripts/tests/test_ci_required_drift.py index 48b8e3517..99a50da9b 100644 --- a/.gitea/scripts/tests/test_ci_required_drift.py +++ b/.gitea/scripts/tests/test_ci_required_drift.py @@ -1,4 +1,5 @@ import importlib.util +import json import sys from pathlib import Path from unittest.mock import patch @@ -36,6 +37,76 @@ def _make_audit_doc(required_checks: list[str]) -> dict: } +def _make_audit_doc_json(required_checks_json: dict) -> dict: + return { + "jobs": { + "audit": { + "steps": [ + {"env": {"REQUIRED_CHECKS_JSON": json.dumps(required_checks_json)}} + ] + } + } + } + + +# --------------------------------------------------------------------------- +# required_checks_env — dual-variant parsing +# --------------------------------------------------------------------------- + +def test_required_checks_env_prefers_json_over_legacy(): + doc = { + "jobs": { + "audit": { + "steps": [ + { + "env": { + "REQUIRED_CHECKS_JSON": json.dumps( + {"main": ["ctx-a"], "staging": ["ctx-b"]} + ), + "REQUIRED_CHECKS": "ctx-legacy\nctx-old", + } + } + ] + } + } + } + assert drift.required_checks_env(doc, "main") == {"ctx-a"} + assert drift.required_checks_env(doc, "staging") == {"ctx-b"} + + +def test_required_checks_env_falls_back_to_legacy(): + doc = _make_audit_doc(["legacy-ctx"]) + assert drift.required_checks_env(doc, "main") == {"legacy-ctx"} + + +def test_required_checks_env_json_missing_branch_fails(): + doc = _make_audit_doc_json({"staging": ["ctx-b"]}) + try: + drift.required_checks_env(doc, "main") + except SystemExit as exc: + assert exc.code == 3 + else: + raise AssertionError("expected SystemExit(3)") + + +def test_required_checks_env_json_malformed_fails(): + doc = { + "jobs": { + "audit": { + "steps": [ + {"env": {"REQUIRED_CHECKS_JSON": "not-json"}} + ] + } + } + } + try: + drift.required_checks_env(doc, "main") + except SystemExit as exc: + assert exc.code == 3 + else: + raise AssertionError("expected SystemExit(3)") + + # --------------------------------------------------------------------------- # sentinel_needs # --------------------------------------------------------------------------- diff --git a/tests/test_ci_required_drift.py b/tests/test_ci_required_drift.py index e2c097c2c..af4e139df 100644 --- a/tests/test_ci_required_drift.py +++ b/tests/test_ci_required_drift.py @@ -18,6 +18,7 @@ No network. No live Gitea calls. from __future__ import annotations import importlib.util +import json import os import textwrap from pathlib import Path @@ -117,6 +118,31 @@ def _write_audit_yaml(tmp_path: Path, required_checks: list[str]) -> Path: return p +def _write_audit_yaml_json(tmp_path: Path, required_checks_json: dict) -> Path: + """Write a synthetic audit-force-merge.yml with REQUIRED_CHECKS_JSON env.""" + block = json.dumps(required_checks_json, indent=2) + text = textwrap.dedent( + f"""\ + name: audit-force-merge + on: + schedule: + - cron: '*/30 * * * *' + jobs: + audit: + runs-on: ubuntu-latest + steps: + - name: Run audit + env: + REQUIRED_CHECKS_JSON: | + {block.replace(chr(10), chr(10) + ' ')} + run: bash .gitea/scripts/audit-force-merge.sh + """ + ) + p = tmp_path / "audit-force-merge.yml" + p.write_text(text, encoding="utf-8") + return p + + def _make_stub_api(responses: dict): """Build a fake `api()` callable. @@ -363,6 +389,107 @@ def test_happy_path_no_drift(drift_module, tmp_path, monkeypatch): assert findings == [], findings +# -------------------------------------------------------------------------- +# REQUIRED_CHECKS_JSON variant drift tests +# -------------------------------------------------------------------------- +def test_f3a_env_wider_than_protection_json_variant(drift_module, tmp_path, monkeypatch): + """F3a: REQUIRED_CHECKS_JSON env has a context NOT in protection.""" + ci = _write_ci_yaml( + tmp_path, + jobs={"build": {"runs-on": "ubuntu-latest"}}, + sentinel_needs=["build"], + ) + audit = _write_audit_yaml_json( + tmp_path, + {"main": ["ci / build (pull_request)", "ci / ghost (pull_request)"]}, + ) + _patch_paths(drift_module, monkeypatch, ci, audit) + + stub = _make_stub_api({ + ("GET", "/repos/owner/repo/branch_protections/main"): ( + 200, + {"status_check_contexts": ["ci / build (pull_request)"]}, + ), + }) + monkeypatch.setattr(drift_module, "api", stub) + + findings, _ = drift_module.detect_drift("main") + assert any("F3a" in f and "ghost" in f for f in findings), findings + + +def test_f3b_protection_wider_than_env_json_variant(drift_module, tmp_path, monkeypatch): + """F3b: protection has a context NOT in REQUIRED_CHECKS_JSON env.""" + ci = _write_ci_yaml( + tmp_path, + jobs={ + "build": {"runs-on": "ubuntu-latest"}, + "test": {"runs-on": "ubuntu-latest"}, + }, + sentinel_needs=["build", "test"], + ) + audit = _write_audit_yaml_json( + tmp_path, + {"main": ["ci / build (pull_request)"]}, + ) + _patch_paths(drift_module, monkeypatch, ci, audit) + + stub = _make_stub_api({ + ("GET", "/repos/owner/repo/branch_protections/main"): ( + 200, + { + "status_check_contexts": [ + "ci / build (pull_request)", + "ci / test (pull_request)", + ] + }, + ), + }) + monkeypatch.setattr(drift_module, "api", stub) + + findings, _ = drift_module.detect_drift("main") + assert any("F3b" in f and "ci / test (pull_request)" in f for f in findings), findings + + +def test_happy_path_no_drift_json_variant(drift_module, tmp_path, monkeypatch): + """Happy path with REQUIRED_CHECKS_JSON: all aligned.""" + ci = _write_ci_yaml( + tmp_path, + jobs={ + "build": {"runs-on": "ubuntu-latest"}, + "test": {"runs-on": "ubuntu-latest"}, + }, + sentinel_needs=["build", "test"], + ) + audit = _write_audit_yaml_json( + tmp_path, + { + "main": [ + "ci / build (pull_request)", + "ci / test (pull_request)", + "ci / all-required (pull_request)", + ] + }, + ) + _patch_paths(drift_module, monkeypatch, ci, audit) + + stub = _make_stub_api({ + ("GET", "/repos/owner/repo/branch_protections/main"): ( + 200, + { + "status_check_contexts": [ + "ci / build (pull_request)", + "ci / test (pull_request)", + "ci / all-required (pull_request)", + ] + }, + ), + }) + monkeypatch.setattr(drift_module, "api", stub) + + findings, _ = drift_module.detect_drift("main") + assert findings == [], findings + + # -------------------------------------------------------------------------- # MUST-FIX 1: find_open_issue must raise on transient HTTP errors # -------------------------------------------------------------------------- -- 2.52.0