molecule-core/workspace-server/internal/scheduler/scheduler.go
molecule-ai[bot] 64ccf8e179
fix: CWE-78 rm scope, go vet failures, delegation idempotency
* refactor: split 4 oversized handler files into focused sub-files

- org.go (1099 lines) → org.go + org_import.go + org_helpers.go
- mcp.go (1001 lines) → mcp.go + mcp_tools.go
- workspace.go (934 lines) → workspace.go + workspace_crud.go
- a2a_proxy.go (825 lines) → a2a_proxy.go + a2a_proxy_helpers.go

No functional changes — same package, same exports, same tests.
All files stay under 635 lines.

Note: isSafeURL and isPrivateOrMetadataIP are duplicated between
mcp_tools.go and a2a_proxy_helpers.go — this is a pre-existing issue
from the original mcp.go and a2a_proxy.go, not introduced by this split.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(runtime+scheduler): increment/decrement active_tasks counter (refs #1386)

* docs(tutorials): add Self-Hosted AI Agents guide — Docker, Fly Machines, bare metal

* docs: add Remote Agents feature + Phase 30 blog links to docs index

* docs(marketing): update Phase 30 brief — Action 5 complete, docs/index.md update noted

* docs(api-ref): add workspace file copy API reference (#1281)

Documents TemplatesHandler.copyFilesToContainer (container_files.go):
- Endpoint overview: PUT /workspaces/:id/files/*path
- Parameter descriptions for all four function parameters
- CWE-22 path traversal protection (PRs #1267/1270/1271)
- Defense-in-depth: validateRelPath at handler + archive boundary
- Full error code table (400/404/500)
- curl example with success and path-traversal rejection cases

Also covers: writeViaEphemeral routing, findContainer fallback,
allowed roots allow-list, and related links to platform-api.md.

Co-authored-by: Molecule AI Technical Writer <technical-writer@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(security): CWE-78/CWE-22 — block shell injection in deleteViaEphemeral (#1310)

## Summary
Issue #1273: deleteViaEphemeral interpolated filePath directly into
rm command, enabling both shell injection (CWE-78) and path traversal
(CWE-22) attacks.

## Changes
1. Added validateRelPath(filePath) guard before constructing the rm command.
   validateRelPath blocks absolute paths and ".." traversal sequences.
2. Changed Cmd from "/configs/"+filePath (string interpolation) to
   []string{"rm", "-rf", "/configs", filePath} (exec form). This
   eliminates shell injection entirely — filePath is a plain argument,
   never interpreted as shell code.

## Security properties
- validateRelPath: blocks "../" and absolute paths before they reach Docker
- Exec form: filePath cannot inject shell metacharacters even if validation
  is somehow bypassed
- "/configs" as separate arg: rm has exactly two arguments, no room for
  injected args

Closes #1273.

Co-authored-by: Molecule AI Infra-Runtime-BE <infra-runtime-be@agents.moleculesai.app>

* fix(security): backport SSRF defence (CWE-918) to main — isSafeURL in a2a_proxy.go (#1292) (#1302)

* fix(security): backport SSRF defence (CWE-918) to main — isSafeURL in mcp.go and a2a_proxy.go

Issue #1042: 3 CodeQL SSRF findings across mcp.go and a2a_proxy.go.
staging already ships the fix (PRs #1147, #1154 → merged); main did not include it.

- mcp.go: add isSafeURL() + isPrivateOrMetadataIP() helpers; validate
  agentURL before outbound calls in mcpCallTool (line ~529) and
  toolDelegateTaskAsync (line ~607)
- a2a_proxy.go: add identical isSafeURL() + isPrivateOrMetadataIP()
  helpers; call isSafeURL() before dispatchA2A in resolveAgentURL()
  (blocks finding #1 at line 462)
- mcp_test.go: 19 new tests covering all blocked URL patterns:
  file://, ftp://, 127.0.0.1, ::1, 169.254.169.254, 10.x.x.x,
  172.16.x.x, 192.168.x.x, empty hostname, invalid URL,
  isPrivateOrMetadataIP across all private/CGNAT/metadata ranges

1. URL scheme enforcement — http/https only
2. IP literal blocking — loopback, link-local, RFC-1918, CGNAT, doc/test ranges
3. DNS hostname resolution — blocks internal hostnames resolving to private IPs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(ci-blocker): remove duplicate isSafeURL/isPrivateOrMetadataIP from mcp.go

Issue #1292: PR #1274 duplicated isSafeURL + isPrivateOrMetadataIP in
mcp.go — both functions already exist on main at lines 829 and 876.
Kept the mcp.go definitions (the originals) and removed the 70-line
duplicate appended at end of file. a2a_proxy.go functions are
unchanged — they serve the same purpose via a separate code path.

* fix: remove orphaned commit-text lines from a2a_proxy.go

Three lines from the PR/commit title were accidentally baked into the
file during the rebase from #1274 to #1302, causing a Go syntax error
(a bare string literal at statement level followed by dangling braces).

Deletion restores:
  }
  return agentURL, nil
}

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Molecule AI Infra-Runtime-BE <infra-runtime-be@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Molecule AI Core-BE <core-be@agents.moleculesai.app>
Co-authored-by: Molecule AI SDK Lead <sdk-lead@agents.moleculesai.app>

* fix(canvas/test): patch test regressions from PR #1243 + proximity hitbox fix (#1313)

* fix(ci): revert cancel-in-progress to true — ubuntu-runner dispatch stalled

With cancel-in-progress: false, pending CI runs accumulate in the
ci-staging concurrency group. New pushes create queued runs, but
GitHub dispatches multiple runs for the same SHA instead of replacing
the pending one. All runs get stuck/cancelled before completing.

Reverting to cancel-in-progress: true restores CI operation — runs
that are superseded are cancelled, freeing the concurrency slot for
the new run to proceed.

Runner availability (ubuntu-latest dispatch stall) is a separate
infra issue tracked independently.

* fix(security): validate tar header names in copyFilesToContainer — CWE-22 path traversal (#1043)

Tar header names were built from raw map keys without validation. A malicious
server-side caller could embed "../" in a file name to escape the destPath
volume mount (/configs) and write files outside the intended directory.

Fix: validate each name with filepath.Clean + IsAbs + HasPrefix("..") checks
before using it in the tar header, then join with destPath for the archive
header. Also guard parent-directory creation against traversal.

Closes #1043.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(canvas/test): patch regressed tests from PR #1243 orgs-page flakiness fix

Two regressions introduced by PR #1243 (fix issue #1207):

1. **ContextMenu.keyboard.test.tsx** — `setPendingDelete` now receives
   `{id, name, hasChildren}` (cascade-delete UX, PR #1252), but the test
   expected only `{id, name}`. Added `hasChildren: false` to the assertion.

2. **orgs-page.test.tsx** — 10 tests awaited `vi.advanceTimersByTimeAsync(50)`
   without `act()`. With fake timers, `setState` (synchronous) is flushed by
   `advanceTimersByTimeAsync`, but the React state update it triggers is a
   microtask — so the test saw stale render. Wrapping in `act(async () =>
   { await vi.advanceTimersByTimeAsync(50); })` ensures microtasks drain
   before assertions run.

All 813 vitest tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(canvas): add 100px proximity threshold to drag-to-nest detection

Fixes #1052 — previously, getIntersectingNodes() returned any node whose
bounding box overlapped the dragged node, regardless of actual pixel
distance. On a sparse canvas this triggered the "Nest Workspace" dialog
even when the dragged node was nowhere near any target.

The fix adds an on-node-drag proximity filter: only nodes within 100px
(center-to-center) of the dragged node are eligible as nest targets.
Distance is computed as squared Euclidean to avoid the sqrt overhead in
the hot drag path.

Added two tests to Canvas.pan-to-node.test.tsx covering the mock wiring
and confirming the regression is addressed in Canvas.tsx.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: molecule-ai[bot] <276602405+molecule-ai[bot]@users.noreply.github.com>
Co-authored-by: Molecule AI Core-FE <core-fe@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(canvas): add ?? 0 guard for optional budget_used in progressPct (#1324) (#1327)

* fix(ci): revert cancel-in-progress to true — ubuntu-runner dispatch stalled

With cancel-in-progress: false, pending CI runs accumulate in the
ci-staging concurrency group. New pushes create queued runs, but
GitHub dispatches multiple runs for the same SHA instead of replacing
the pending one. All runs get stuck/cancelled before completing.

Reverting to cancel-in-progress: true restores CI operation — runs
that are superseded are cancelled, freeing the concurrency slot for
the new run to proceed.

Runner availability (ubuntu-latest dispatch stall) is a separate
infra issue tracked independently.

* fix(security): validate tar header names in copyFilesToContainer — CWE-22 path traversal (#1043)

Tar header names were built from raw map keys without validation. A malicious
server-side caller could embed "../" in a file name to escape the destPath
volume mount (/configs) and write files outside the intended directory.

Fix: validate each name with filepath.Clean + IsAbs + HasPrefix("..") checks
before using it in the tar header, then join with destPath for the archive
header. Also guard parent-directory creation against traversal.

Closes #1043.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(canvas/test): patch regressed tests from PR #1243 orgs-page flakiness fix

Two regressions introduced by PR #1243 (fix issue #1207):

1. **ContextMenu.keyboard.test.tsx** — `setPendingDelete` now receives
   `{id, name, hasChildren}` (cascade-delete UX, PR #1252), but the test
   expected only `{id, name}`. Added `hasChildren: false` to the assertion.

2. **orgs-page.test.tsx** — 10 tests awaited `vi.advanceTimersByTimeAsync(50)`
   without `act()`. With fake timers, `setState` (synchronous) is flushed by
   `advanceTimersByTimeAsync`, but the React state update it triggers is a
   microtask — so the test saw stale render. Wrapping in `act(async () =>
   { await vi.advanceTimersByTimeAsync(50); })` ensures microtasks drain
   before assertions run.

All 813 vitest tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(canvas): add 100px proximity threshold to drag-to-nest detection

Fixes #1052 — previously, getIntersectingNodes() returned any node whose
bounding box overlapped the dragged node, regardless of actual pixel
distance. On a sparse canvas this triggered the "Nest Workspace" dialog
even when the dragged node was nowhere near any target.

The fix adds an on-node-drag proximity filter: only nodes within 100px
(center-to-center) of the dragged node are eligible as nest targets.
Distance is computed as squared Euclidean to avoid the sqrt overhead in
the hot drag path.

Added two tests to Canvas.pan-to-node.test.tsx covering the mock wiring
and confirming the regression is addressed in Canvas.tsx.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(canvas): add ?? 0 guard for optional budget_used in progressPct

Fixes #1324 — TypeScript strict mode flags budget.budget_used as
possibly undefined in the progressPct ternary, even though the
outer condition checks budget_limit > 0.

Fix: use nullish coalescing (budget_used ?? 0) so progress shows 0%
when the backend returns a partial shape (provisioning-stuck
workspaces). Also adds a test covering the undefined-budget_used
case with the progress bar aria-valuenow and fill width both at 0%.

Closes #1324.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: molecule-ai[bot] <276602405+molecule-ai[bot]@users.noreply.github.com>
Co-authored-by: Molecule AI Core-FE <core-fe@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(canvas): add ?? 0 guard for optional budget_used in progressPct (issue #1324) (#1329)

* fix(ci): revert cancel-in-progress to true — ubuntu-runner dispatch stalled

With cancel-in-progress: false, pending CI runs accumulate in the
ci-staging concurrency group. New pushes create queued runs, but
GitHub dispatches multiple runs for the same SHA instead of replacing
the pending one. All runs get stuck/cancelled before completing.

Reverting to cancel-in-progress: true restores CI operation — runs
that are superseded are cancelled, freeing the concurrency slot for
the new run to proceed.

Runner availability (ubuntu-latest dispatch stall) is a separate
infra issue tracked independently.

* fix(security): validate tar header names in copyFilesToContainer — CWE-22 path traversal (#1043)

Tar header names were built from raw map keys without validation. A malicious
server-side caller could embed "../" in a file name to escape the destPath
volume mount (/configs) and write files outside the intended directory.

Fix: validate each name with filepath.Clean + IsAbs + HasPrefix("..") checks
before using it in the tar header, then join with destPath for the archive
header. Also guard parent-directory creation against traversal.

Closes #1043.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(canvas/test): patch regressed tests from PR #1243 orgs-page flakiness fix

Two regressions introduced by PR #1243 (fix issue #1207):

1. **ContextMenu.keyboard.test.tsx** — `setPendingDelete` now receives
   `{id, name, hasChildren}` (cascade-delete UX, PR #1252), but the test
   expected only `{id, name}`. Added `hasChildren: false` to the assertion.

2. **orgs-page.test.tsx** — 10 tests awaited `vi.advanceTimersByTimeAsync(50)`
   without `act()`. With fake timers, `setState` (synchronous) is flushed by
   `advanceTimersByTimeAsync`, but the React state update it triggers is a
   microtask — so the test saw stale render. Wrapping in `act(async () =>
   { await vi.advanceTimersByTimeAsync(50); })` ensures microtasks drain
   before assertions run.

All 813 vitest tests pass.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(canvas): add 100px proximity threshold to drag-to-nest detection

Fixes #1052 — previously, getIntersectingNodes() returned any node whose
bounding box overlapped the dragged node, regardless of actual pixel
distance. On a sparse canvas this triggered the "Nest Workspace" dialog
even when the dragged node was nowhere near any target.

The fix adds an on-node-drag proximity filter: only nodes within 100px
(center-to-center) of the dragged node are eligible as nest targets.
Distance is computed as squared Euclidean to avoid the sqrt overhead in
the hot drag path.

Added two tests to Canvas.pan-to-node.test.tsx covering the mock wiring
and confirming the regression is addressed in Canvas.tsx.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(canvas): add ?? 0 guard for optional budget_used in progressPct

Fixes #1324 — TypeScript strict mode flags budget.budget_used as
possibly undefined in the progressPct ternary, even though the
outer condition checks budget_limit > 0.

Fix: use nullish coalescing (budget_used ?? 0) so progress shows 0%
when the backend returns a partial shape (provisioning-stuck
workspaces). Also adds a test covering the undefined-budget_used
case with the progress bar aria-valuenow and fill width both at 0%.

Closes #1324.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: molecule-ai[bot] <276602405+molecule-ai[bot]@users.noreply.github.com>
Co-authored-by: Molecule AI Core-FE <core-fe@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(platform): unblock SaaS workspace registration end-to-end

Every workspace in the cross-EC2 SaaS provisioning shape was failing
registration, heartbeat, or A2A routing. Four distinct blockers sat
between "EC2 is up" and "agent responds"; three are platform-side and
fixed here (the fourth is in the CP user-data, separate PR).

1. SSRF validator blocked RFC-1918 (registry.go + mcp.go)
   validateAgentURL and isPrivateOrMetadataIP rejected 172.16.0.0/12,
   which contains the AWS default VPC range (172.31.x.x) that every
   sibling workspace EC2 registers from. Registration returned 400 and
   the 10-min provision sweep flipped status to failed. RFC-1918 +
   IPv6 ULA are now gated behind saasMode(); link-local (169.254/16),
   loopback, IPv6 metadata (fe80::/10, ::1), and TEST-NET stay blocked
   unconditionally in both modes.

   saasMode() resolution order:
     1. MOLECULE_DEPLOY_MODE=saas|self-hosted (explicit operator flag)
     2. MOLECULE_ORG_ID presence (legacy implicit signal, kept for
        back-compat so existing deployments don't need a config change)

   isPrivateOrMetadataIP now actually checks IPv6 — previously it
   returned false on any non-IPv4 input, which would let a registered
   [::1] or [fe80::...] URL bypass the SSRF check entirely.

2. Orphan auth-token minting (workspace_provision.go)
   issueAndInjectToken mints a token and stuffs it into
   cfg.ConfigFiles[".auth_token"]. The Docker provisioner writes that
   file into the /configs volume — the CP provisioner ignores it
   (only cfg.EnvVars crosses the wire). Result: live token in DB, no
   plaintext on disk, RegistryHandler.requireWorkspaceToken 401s every
   /registry/register attempt because the workspace is no longer in
   the "no live token → bootstrap-allowed" state. Now no-ops in SaaS
   mode; the register handler already mints on first successful
   register and returns the plaintext in the response body for the
   runtime to persist locally.

   Also removes the redundant wsauth.IssueToken call at the bottom of
   provisionWorkspaceCP, which created the same orphan-token pattern
   a second time.

3. Compaction artefacts (bundle/importer.go, handlers/org_tokens.go,
   scheduler.go, workspace_provision.go)
   Four pre-existing compile errors on main from an earlier session's
   code truncation: missing tuple destructuring on ExecContext /
   redactSecrets / orgTokenActor, missing close-brace in
   Scheduler.fireSchedule's panic recovery. All one-line mechanical
   fixes; without them the binary would not build.

Tests
-----
ssrf_test.go adds:
  * TestSaasMode — covers the env resolution ladder (explicit flag
    wins over legacy signal, case-insensitive, whitespace tolerant)
  * TestIsPrivateOrMetadataIP_SaaSMode — asserts RFC-1918 + IPv6 ULA
    flip to allowed, metadata/loopback/TEST-NET still blocked
  * TestIsPrivateOrMetadataIP_IPv6 — regression guard for the old
    "returns false for all IPv6" behaviour

Follow-up issue for CP-sourced workspace_id attestation will be filed
separately — closes the residual intra-VPC SSRF + token-race windows
the SaaS-mode relaxation introduces.

Verified end-to-end today on workspace 6565a2e0 (hermes runtime, OpenAI
provider) — agent returned "PONG" in 1.4s after register → heartbeat →
A2A proxy → runtime.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(runtime+scheduler): increment/decrement active_tasks + max_concurrent (#1408)

Runtime (shared_runtime.py):
- set_current_task now increments active_tasks on task start, decrements
  on completion (was binary 0/1)
- Counter never goes below 0 (max(0, n-1))
- Pushes heartbeat immediately on BOTH increment and decrement (#1372)

Scheduler (scheduler.go):
- Reads max_concurrent_tasks from DB (default 1, backward compatible)
- Skips cron only when active_tasks >= max_concurrent_tasks (was > 0)
- Leaders can be configured with max_concurrent_tasks > 1 to accept
  A2A delegations while a cron runs

Platform:
- Added max_concurrent_tasks column to workspaces (migration 037)
- Workspace model + list/get queries include the new field
- API exposes max_concurrent_tasks in workspace JSON

Config.yaml support (future): runtime_config.max_concurrent_tasks

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(review): address 3 critical issues from code review

1. BLOCKER: executor_helpers.py now uses increment/decrement too
   (was still binary 0/1, stomping the counter for CLI + SDK executors)

2. BUG: asymmetric getattr defaults fixed — both paths use default 0
   (was 0 on increment, 1 on decrement)

3. UX: current_task preserved when active_tasks > 0 on decrement
   (was clearing task description even when other tasks still running)

4. Scheduler polling loop re-reads max_concurrent_tasks on each poll
   (was using stale value from initial query)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Hongming Wang <hongmingwangrabbit@gmail.com>
Co-authored-by: molecule-ai[bot] <276602405+molecule-ai[bot]@users.noreply.github.com>
Co-authored-by: Molecule AI Technical Writer <technical-writer@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Molecule AI Infra-Runtime-BE <infra-runtime-be@agents.moleculesai.app>
Co-authored-by: Molecule AI Core-BE <core-be@agents.moleculesai.app>
Co-authored-by: Molecule AI SDK Lead <sdk-lead@agents.moleculesai.app>
Co-authored-by: Molecule AI Core-FE <core-fe@agents.moleculesai.app>
Co-authored-by: Hongming Wang <hongmingwang.rabbit@users.noreply.github.com>

* docs: workspace files API reference, skill catalog, and links

* docs: fix secrets endpoint path across docs

The workspace secrets endpoint is `/workspaces/:id/secrets`, not
`/secrets/values`. This was wrong in quickstart.md (Path 2: Remote Agent)
and workspace-runtime.md (registration flow example and comparison table).
The external-agent-registration guide already had the correct path.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs: fix broken blog cross-link in skills-vs-bundled-tools post

Link path had an extra `/docs/` segment: `/docs/blog/...` instead of
`/blog/...`. Nextra resolves blog posts directly under `/blog/<slug>`,
not under `/docs/blog/`.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs: add skill-catalog.md guide

Linked from the skills-vs-bundled-tools blog post as a reference
for TTS/image-generation/web-search skills. The blog promises
"install directly via the CLI" with a skill catalog — this page
fills that promise by documenting available skill types, install
commands, version management, custom skill authoring, and removal.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs(marketing): update Phase 30 brief — Action 5 complete, docs/index.md update noted

* docs(api-ref): add workspace file copy API reference

Documents TemplatesHandler.copyFilesToContainer (container_files.go):
- Endpoint overview: PUT /workspaces/:id/files/*path
- Parameter descriptions for all four function parameters
- CWE-22 path traversal protection (PRs #1267/1270/1271)
- Defense-in-depth: validateRelPath at handler + archive boundary
- Full error code table (400/404/500)
- curl example with success and path-traversal rejection cases

Also covers: writeViaEphemeral routing, findContainer fallback,
allowed roots allow-list, and related links to platform-api.md.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Molecule AI Technical Writer <technical-writer@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: molecule-ai[bot] <276602405+molecule-ai[bot]@users.noreply.github.com>

* fix(handlers): add saasMode() gating to isPrivateOrMetadataIP in a2a_proxy_helpers.go

Issue #1421 / #1401: PR #1363 (handler split) moved isPrivateOrMetadataIP
into a2a_proxy_helpers.go but kept the OLD pre-SaaS version — it
unconditionally blocks RFC-1918 addresses, regressing the fix in
commits 1125a02 / cf10733.

The A2A proxy path now has the same SaaS-gated logic as registry.go:
- Cloud metadata (169.254/16, fe80::/10, ::1) always blocked in both modes
- RFC-1918 (10/8, 172.16/12, 192.168/16) + IPv6 ULA (fc00::/7) blocked in
  self-hosted, allowed in SaaS cross-EC2 mode
- IPv6 addresses now properly checked (previous version returned false for all)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* docs(marketing): Discord adapter Day 2 Reddit + HN community copy

* fix(tests): supply *events.Broadcaster pointer to captureBroadcaster

Cannot use *captureBroadcaster as *events.Broadcaster when the struct
embeds events.Broadcaster as a value — must initialize as a named field.

Fixes go vet error in workspace_provision_test.go:
  cannot use broadcaster (*captureBroadcaster) as *events.Broadcaster value

* Merge pull request #1429 from fix/canvas-tooltip-clear-timer

Without this, a 400ms setTimeout from onFocus/onMouseEnter that fires
after onBlur will re-show a tooltip the user just dismissed. The
setShow(false) in onBlur closes the tooltip immediately but leaves the
timer pending — Tab-blur followed by timer-fire would re-show it.

Fix: add clearTimeout(timerRef.current) at the top of onBlur, mirroring
the pattern already used in onMouseLeave and onFocus.

Refs: PR #1367 (a11y keyboard support — this was a pre-existing gap)

Co-authored-by: Molecule AI App-FE <app-fe@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(canvas/test): add missing children:[] to setPendingDelete expectation (#1426)

PR #1252 (cascade-delete UX) updated setPendingDelete to pass a
children array for cascade-warning rendering. The keyboard-a11y test
assertion was not updated to match.

Test: clicking 'Delete' hoists state to the store and closes the menu

Co-authored-by: Molecule AI Core-QA <core-qa@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(canvas/test): add children:[] to setPendingDelete + \&apos; entity fix (closes #1380) (#1427)

* ci: retry — trigger fresh runner allocation

* fix(canvas/test): add children:[] to setPendingDelete assertion

setPendingDelete now includes children:[] (PR #1383 extended the
pendingDelete type). The keyboard accessibility test at line 225 used
exact object matching which omitted the new field, causing a failure
after staging merged #1383.

Issue: #1380

* fix(canvas): replace &apos; HTML entity with straight apostrophe

JSX does not entity-decode &apos; — it renders the literal text
"&apos;" instead of "'".  Found at line 157 (payment confirmed) and
line 321 (empty org list).  Replaced with a straight apostrophe,
which JSX handles correctly.

Ref: issue #1375
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: DevOps Engineer <devops@molecule.ai>
Co-authored-by: Molecule AI Core-UIUX <core-uiux@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>

* Merge pull request #1430 from fix/1421-saas-ssrf-helpers

Issue #1421 / #1401: PR #1363 (handler split) moved isPrivateOrMetadataIP
into a2a_proxy_helpers.go but kept the OLD pre-SaaS version — it
unconditionally blocks RFC-1918 addresses, regressing the fix in
commits 1125a02 / cf10733.

The A2A proxy path now has the same SaaS-gated logic as registry.go:
- Cloud metadata (169.254/16, fe80::/10, ::1) always blocked in both modes
- RFC-1918 (10/8, 172.16/12, 192.168/16) + IPv6 ULA (fc00::/7) blocked in
  self-hosted, allowed in SaaS cross-EC2 mode
- IPv6 addresses now properly checked (previous version returned false for all)

Co-authored-by: Molecule AI Core-BE <core-be@agents.moleculesai.app>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(P0): CWE-22 path traversal in copyFilesToContainer + ContextMenu test

Issue #1434 — CWE-22 Path Traversal Regression:
PR #1280 (dc218212) correctly used cleaned path in tar header.
PR #1363 (e9615af) regressed to using uncleaned `name`.
Fix: use `clean` in filepath.Join AND add defence-in-depth escape check.

Issue #1422 — ContextMenu Test Regression:
PR #1340 expanded pendingDelete store type to include `children:[]`.
Test assertion missing the field — add `children:[]` to match.

Note: ssrf.go created (shared isSafeURL/isPrivateOrMetadataIP) to
prepare for the handler-split refactor fix — current branch has no
build error, but the shared file will prevent regression when PR #1363
is merged. isSafeURL/isPrivateOrMetadataIP retained in both files
for now to avoid breaking callers while the split is finalized.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: resolve 3 go vet failures + add idempotency_key to delegate_task_async

- workspace_provision_test.go: add missing mock := setupTestDB(t) to
  TestSeedInitialMemories_Truncation — mock was referenced but never
  declared, causing "undefined: mock" vet error
- orgtoken/tokens_test.go: discard unused orgID return value with _ in
  Validate call — "declared and not used" vet error
- a2a_tools.py: delegate_task_async now sends idempotency_key (SHA-256
  of workspace_id + task) to POST /workspaces/:id/delegate, fixing
  duplicate task execution when an agent restarts mid-delegation (#1456)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: airenostars <airenostars@gmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: molecule-ai[bot] <276602405+molecule-ai[bot]@users.noreply.github.com>
Co-authored-by: Hongming Wang <hongmingwangrabbit@gmail.com>
Co-authored-by: Molecule AI Technical Writer <technical-writer@agents.moleculesai.app>
Co-authored-by: Molecule AI Infra-Runtime-BE <infra-runtime-be@agents.moleculesai.app>
Co-authored-by: Molecule AI Core-BE <core-be@agents.moleculesai.app>
Co-authored-by: Molecule AI SDK Lead <sdk-lead@agents.moleculesai.app>
Co-authored-by: Molecule AI Core-FE <core-fe@agents.moleculesai.app>
Co-authored-by: Hongming Wang <hongmingwang.rabbit@users.noreply.github.com>
Co-authored-by: Molecule AI Community Manager <community-manager@agents.moleculesai.app>
Co-authored-by: Molecule AI App-FE <app-fe@agents.moleculesai.app>
Co-authored-by: Molecule AI Core-QA <core-qa@agents.moleculesai.app>
Co-authored-by: DevOps Engineer <devops@molecule.ai>
Co-authored-by: Molecule AI Core-UIUX <core-uiux@agents.moleculesai.app>
Co-authored-by: Molecule AI Dev Lead <dev-lead@agents.moleculesai.app>
2026-04-21 18:22:30 +00:00

650 lines
24 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package scheduler
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/google/uuid"
cronlib "github.com/robfig/cron/v3"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
)
const (
pollInterval = 30 * time.Second
maxConcurrent = 10
batchLimit = 50
fireTimeout = 5 * time.Minute
)
// A2AProxy is the interface the scheduler needs to send messages to workspaces.
// WorkspaceHandler.ProxyA2ARequest satisfies this.
type A2AProxy interface {
ProxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error)
}
// Broadcaster records events and pushes them to WebSocket clients.
type Broadcaster interface {
RecordAndBroadcast(ctx context.Context, eventType, workspaceID string, data interface{}) error
}
type scheduleRow struct {
ID string
WorkspaceID string
Name string
CronExpr string
Timezone string
Prompt string
}
// ChannelBroadcaster posts messages to and reads context from workspace channels.
type ChannelBroadcaster interface {
BroadcastToWorkspaceChannels(ctx context.Context, workspaceID, text string)
FetchWorkspaceChannelContext(ctx context.Context, workspaceID string) string
}
// Scheduler polls the workspace_schedules table and fires A2A messages
// when a schedule's next_run_at has passed. Follows the same goroutine
// pattern as registry.StartHealthSweep.
type Scheduler struct {
proxy A2AProxy
broadcaster Broadcaster
channels ChannelBroadcaster
// lastTickAt records the wall-clock time of the most recent tick
// (whether it fired schedules or not). Read by Healthy() and the
// /admin/scheduler/health endpoint to detect stuck-tick conditions.
// Atomic-ish via the mutex; tick rate is 30s so contention is trivial.
mu sync.RWMutex
lastTickAt time.Time
tickInterval time.Duration // defaults to pollInterval; overridable in tests
}
func New(proxy A2AProxy, broadcaster Broadcaster) *Scheduler {
return &Scheduler{
proxy: proxy,
broadcaster: broadcaster,
tickInterval: pollInterval,
}
}
// SetChannels wires the channel manager for auto-posting cron output.
// Called after both scheduler and channel manager are initialized.
func (s *Scheduler) SetChannels(ch ChannelBroadcaster) {
s.channels = ch
}
// LastTickAt returns the wall-clock time of the most recently completed tick.
// Returns a zero time.Time if the scheduler has never completed a tick.
func (s *Scheduler) LastTickAt() time.Time {
s.mu.RLock()
defer s.mu.RUnlock()
return s.lastTickAt
}
// Healthy returns true if the scheduler has completed a tick within the last
// 2×pollInterval window. Returns false before the first tick or if the
// scheduler is stalled.
func (s *Scheduler) Healthy() bool {
s.mu.RLock()
t := s.lastTickAt
s.mu.RUnlock()
if t.IsZero() {
return false
}
return time.Since(t) < 2*pollInterval
}
// Start runs the scheduler poll loop. Blocks until ctx is cancelled.
//
// Defends against panics inside tick() so a single bad row / bad cron
// expression / DB blip can't permanently kill the scheduler. Without
// this recover the goroutine dies and the only signal to the operator
// is "no crons firing" — which we observed as a 12+ hour silent outage
// on 2026-04-14 (issue #85).
func (s *Scheduler) Start(ctx context.Context) {
ticker := time.NewTicker(s.tickInterval)
defer ticker.Stop()
log.Printf("Scheduler: started (poll interval=%s)", s.tickInterval)
tickWithRecover := func() {
defer func() {
if r := recover(); r != nil {
log.Printf("Scheduler: PANIC in tick — recovered: %v (next tick in %s)", r, pollInterval)
}
}()
s.tick(ctx)
s.mu.Lock()
s.lastTickAt = time.Now()
s.mu.Unlock()
}
// #722 — startup repair: find any enabled schedule whose next_run_at was
// NULL'd by the pre-fix bug and recompute it now. Without this pass those
// schedules would never fire again even after the binary is updated.
s.repairNullNextRunAt(ctx)
// Heartbeat + initial lastTickAt so /admin/liveness and Healthy() both
// pass during the first 30s interval after startup.
supervised.Heartbeat("scheduler")
s.mu.Lock()
s.lastTickAt = time.Now()
s.mu.Unlock()
// Independent heartbeat pulse (#140). Decoupled from tick completion so
// a single long fire (UIUX audits routinely take 60-120s; max fireTimeout
// is 5min) can't make /admin/liveness look stale for the whole fire window.
// tick() also calls Heartbeat at its top + each fire goroutine calls it
// entry/exit — those are kept as redundant signals but this pulse is the
// one that guarantees liveness freshness regardless of tick state.
go func() {
pulseTicker := time.NewTicker(10 * time.Second)
defer pulseTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-pulseTicker.C:
supervised.Heartbeat("scheduler")
}
}
}()
for {
select {
case <-ctx.Done():
log.Println("Scheduler: stopped")
return
case <-ticker.C:
tickWithRecover()
supervised.Heartbeat("scheduler")
}
}
}
// tick queries all due schedules and fires each in a goroutine.
// Waits for all goroutines to finish before returning so the next tick
// doesn't re-fire schedules whose next_run_at hasn't been updated yet.
//
// Heartbeat is called at three points to keep /admin/liveness fresh during
// long-running fires (some prompts take minutes — without these heartbeats
// the scheduler looks "stale" the whole time it's working):
// - immediately on entering tick (proves we're past the ticker.C wait)
// - inside each per-fire goroutine (every fire bumps the heartbeat)
// - implicitly via the post-tick heartbeat in Start()
func (s *Scheduler) tick(ctx context.Context) {
supervised.Heartbeat("scheduler")
rows, err := db.DB.QueryContext(ctx, `
SELECT id, workspace_id, name, cron_expr, timezone, prompt
FROM workspace_schedules
WHERE enabled = true AND next_run_at IS NOT NULL AND next_run_at <= now()
ORDER BY next_run_at ASC
LIMIT $1
`, batchLimit)
if err != nil {
log.Printf("Scheduler: tick query error: %v", err)
return
}
defer rows.Close()
var wg sync.WaitGroup
sem := make(chan struct{}, maxConcurrent)
for rows.Next() {
var sched scheduleRow
if err := rows.Scan(&sched.ID, &sched.WorkspaceID, &sched.Name, &sched.CronExpr, &sched.Timezone, &sched.Prompt); err != nil {
log.Printf("Scheduler: scan error: %v", err)
continue
}
wg.Add(1)
sem <- struct{}{}
go func(s2 scheduleRow) {
defer wg.Done()
defer func() { <-sem }()
defer func() {
if r := recover(); r != nil {
log.Printf("Scheduler: PANIC firing '%s' on workspace %s — recovered: %v",
s2.Name, s2.WorkspaceID, r)
// Always advance next_run_at even on panic so the schedule doesn't get
// stuck re-firing the same panicking schedule indefinitely (#1029).
if nextTime, err := ComputeNextRun(s2.CronExpr, s2.Timezone, time.Now()); err == nil {
// F1089: use context.Background() so the panic-recovery UPDATE is not
// silently skipped if the outer ctx was cancelled during the panic window.
if _, execErr := db.DB.ExecContext(context.Background(), `UPDATE workspace_schedules SET next_run_at=$1, updated_at=now() WHERE id=$2`, nextTime, s2.ID); execErr != nil {
log.Printf("Scheduler: panic-recovery next_run_at UPDATE failed for schedule %s: %v", s2.ID, execErr)
}
}
}
}()
supervised.Heartbeat("scheduler")
s.fireSchedule(ctx, s2)
supervised.Heartbeat("scheduler")
}(sched)
}
if err := rows.Err(); err != nil {
log.Printf("Scheduler: rows error: %v", err)
}
wg.Wait()
// Record tick completion time for health monitoring.
s.mu.Lock()
s.lastTickAt = time.Now()
s.mu.Unlock()
}
// fireSchedule sends the A2A message and updates the schedule row.
// A deferred recover guards against panics in the A2A proxy so that a single
// misbehaving workspace cannot crash the scheduler goroutine pool.
func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
defer func() {
if r := recover(); r != nil {
log.Printf("Scheduler: panic recovered in fireSchedule for '%s' (%s): %v",
sched.Name, sched.ID, r)
// Always advance next_run_at even on panic so the schedule doesn't get
// stuck re-firing the same panicking schedule indefinitely (#1029).
if nextTime, err := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()); err == nil {
// F1089: use context.Background() so the panic-recovery UPDATE is not
// silently skipped if the outer ctx was cancelled during the panic window.
if _, execErr := db.DB.ExecContext(context.Background(), `UPDATE workspace_schedules SET next_run_at=$1, updated_at=now() WHERE id=$2`, nextTime, sched.ID); execErr != nil {
log.Printf("Scheduler: panic-recovery next_run_at UPDATE failed for schedule %s: %v", sched.ID, execErr)
}
}
}
}()
// #969 concurrency-aware queue — when the target workspace is busy,
// defer the fire instead of skipping. Polls every 10s for up to 2 min
// waiting for the workspace to become idle. If still busy after 2 min,
// falls back to the original skip behavior.
//
// This replaces the #115 "skip when busy" pattern which caused crons
// to permanently miss when workspaces were perpetually busy from the
// Orchestrator pulse delegation chain (~30% message drop rate on Dev Lead).
// Check workspace capacity — fire when active_tasks < max_concurrent_tasks.
// Default max is 1 (backward compatible). Workspaces can override via config
// to allow concurrent task processing (e.g. leaders handling A2A while cron runs).
var activeTasks int
var maxConcurrent int
if err := db.DB.QueryRowContext(ctx,
`SELECT COALESCE(active_tasks, 0), COALESCE(max_concurrent_tasks, 1) FROM workspaces WHERE id = $1`,
sched.WorkspaceID,
).Scan(&activeTasks, &maxConcurrent); err == nil && activeTasks >= maxConcurrent {
log.Printf("Scheduler: '%s' workspace %s at capacity (active_tasks=%d, max=%d), deferring up to 2 min",
sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent)
// Poll every 10s for up to 2 minutes
waited := false
for i := 0; i < 12; i++ {
time.Sleep(10 * time.Second)
if err := db.DB.QueryRowContext(ctx,
`SELECT COALESCE(active_tasks, 0), COALESCE(max_concurrent_tasks, 1) FROM workspaces WHERE id = $1`,
sched.WorkspaceID,
).Scan(&activeTasks, &maxConcurrent); err != nil || activeTasks < maxConcurrent {
waited = true
break
}
}
if !waited && activeTasks >= maxConcurrent {
log.Printf("Scheduler: skipping '%s' on busy workspace %s after 2 min wait (active_tasks=%d, max=%d)",
sched.Name, short(sched.WorkspaceID, 12), activeTasks, maxConcurrent)
s.recordSkipped(ctx, sched, activeTasks)
return
}
log.Printf("Scheduler: '%s' workspace %s has capacity after deferral, firing",
sched.Name, short(sched.WorkspaceID, 12))
}
fireCtx, cancel := context.WithTimeout(ctx, fireTimeout)
defer cancel()
// Level 3: inject ambient Slack channel context into the cron prompt.
// The agent sees recent peer messages before acting, enabling cross-agent
// awareness without explicit A2A delegation. Best-effort — if the fetch
// fails or the workspace has no Slack channels, the prompt is unchanged.
prompt := sched.Prompt
if s.channels != nil {
if channelCtx := s.channels.FetchWorkspaceChannelContext(fireCtx, sched.WorkspaceID); channelCtx != "" {
prompt = channelCtx + "\n" + prompt
}
}
msgID := fmt.Sprintf("cron-%s-%s", short(sched.ID, 8), uuid.New().String()[:8])
a2aBody, _ := json.Marshal(map[string]interface{}{
"method": "message/send",
"params": map[string]interface{}{
"message": map[string]interface{}{
"role": "user",
"messageId": msgID,
"parts": []map[string]interface{}{{"kind": "text", "text": prompt}},
},
},
})
log.Printf("Scheduler: firing '%s' → workspace %s", sched.Name, short(sched.WorkspaceID, 12))
// Empty callerID = canvas-style request (bypasses access control, source_id=NULL in activity log).
// "system:scheduler" was invalid — source_id column is UUID and rejects non-UUID strings.
statusCode, respBody, proxyErr := s.proxy.ProxyA2ARequest(fireCtx, sched.WorkspaceID, a2aBody, "", true)
lastStatus := "ok"
lastError := ""
if proxyErr != nil {
lastStatus = "error"
lastError = fmt.Sprintf("%v", proxyErr)
log.Printf("Scheduler: '%s' error: %v", sched.Name, proxyErr)
} else if statusCode < 200 || statusCode >= 300 {
lastStatus = "error"
lastError = fmt.Sprintf("HTTP %d", statusCode)
log.Printf("Scheduler: '%s' non-2xx: %d", sched.Name, statusCode)
} else {
log.Printf("Scheduler: '%s' completed (HTTP %d)", sched.Name, statusCode)
}
// #795: detect phantom-producing schedules — cron fires successfully
// but the agent returns empty or "(no response generated)". Track
// consecutive empties and escalate to 'stale' after 3 in a row.
isEmpty := isEmptyResponse(respBody)
if lastStatus == "ok" && isEmpty {
// One query instead of UPDATE-then-SELECT: RETURNING hands back
// the post-increment value so the stale-threshold check doesn't
// cost a second roundtrip. This handler fires once per cron tick
// per schedule; at 100 tenants × dozens of schedules the saved
// query matters.
var consecEmpty int
if err := db.DB.QueryRowContext(ctx, `
UPDATE workspace_schedules
SET consecutive_empty_runs = consecutive_empty_runs + 1,
updated_at = now()
WHERE id = $1
RETURNING consecutive_empty_runs`, sched.ID).Scan(&consecEmpty); err != nil {
log.Printf("Scheduler: '%s' empty-run bump failed: %v", sched.Name, err)
}
if consecEmpty >= 3 {
lastStatus = "stale"
lastError = fmt.Sprintf("empty response %d consecutive times — agent may be phantom-producing (#795)", consecEmpty)
log.Printf("Scheduler: '%s' STALE — %d consecutive empty responses (workspace %s)",
sched.Name, consecEmpty, short(sched.WorkspaceID, 12))
}
} else if lastStatus == "ok" {
// Non-empty success — reset the counter
db.DB.ExecContext(ctx, `
UPDATE workspace_schedules
SET consecutive_empty_runs = 0,
updated_at = now()
WHERE id = $1`, sched.ID)
}
nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now())
var nextRunPtr *time.Time
if nextErr == nil {
nextRunPtr = &nextRun
} else {
// #722: if ComputeNextRun fails, keep the existing next_run_at so the
// schedule is not silently removed from the fire query (NULL next_run_at
// is excluded by the tick WHERE clause). COALESCE($2, next_run_at) does
// this: when $2 is NULL the DB column value is preserved as-is.
log.Printf("Scheduler: ComputeNextRun error for '%s' (%s) — preserving existing next_run_at: %v",
sched.Name, sched.ID, nextErr)
}
// F1089: use a dedicated context with its own 5s deadline for the
// post-fire UPDATE. The outer ctx (fireCtx) may be cancelled if the
// HTTP call timed out or the server is shutting down; using it here
// would silently skip the UPDATE and leave next_run_at stale, causing
// the schedule to be immediately re-fired on the next tick.
updateCtx, updateCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer updateCancel()
_, err := db.DB.ExecContext(updateCtx, `
UPDATE workspace_schedules
SET last_run_at = now(),
next_run_at = COALESCE($2, next_run_at),
run_count = run_count + 1,
last_status = $3,
last_error = $4,
updated_at = now()
WHERE id = $1
`, sched.ID, nextRunPtr, lastStatus, lastError)
if err != nil {
log.Printf("Scheduler: post-fire update error for %s [%s]: %v", sched.ID, sched.Name, err)
}
// Log a dedicated cron_run activity entry with schedule metadata so the
// history endpoint can query by schedule_id.
cronMeta, _ := json.Marshal(map[string]interface{}{
"schedule_id": sched.ID,
"schedule_name": sched.Name,
"cron_expr": sched.CronExpr,
"prompt": truncate(sched.Prompt, 200),
})
// #152: persist lastError into error_detail on the activity_logs row
// so GET /workspaces/:id/schedules/:id/history can surface why a run
// failed (previously dropped — history returned status without any
// error context, making root-cause debugging impossible).
_, _ = db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, $5, now())
`, sched.WorkspaceID, "Cron: "+sched.Name, string(cronMeta), lastStatus, lastError)
if s.broadcaster != nil {
s.broadcaster.RecordAndBroadcast(ctx, "CRON_EXECUTED", sched.WorkspaceID, map[string]interface{}{
"schedule_id": sched.ID,
"schedule_name": sched.Name,
"status": lastStatus,
})
}
// Level 1: auto-post cron output to workspace's Slack channels.
// Only post non-empty successful responses — errors and empties are
// noise that clutters the channel without adding value.
if s.channels != nil && lastStatus == "ok" && !isEmpty {
summary := s.extractResponseSummary(respBody)
if summary != "" {
go func(wsID, text string) {
postCtx, postCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer postCancel()
s.channels.BroadcastToWorkspaceChannels(postCtx, wsID, text)
}(sched.WorkspaceID, summary)
}
}
}
// recordSkipped advances next_run_at and logs a cron_run activity entry
// with status='skipped' when the target workspace was already busy.
// Issue #115 — replaces the previous "busy → fire → fail → retry next
// tick" loop with "busy → skip → advance → try next slot". Keeps the
// history surface honest (a skip is not an error) and stops filling
// last_error with noise.
func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, activeTasks int) {
reason := fmt.Sprintf("skipped: workspace busy (active_tasks=%d)", activeTasks)
nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now())
var nextRunPtr *time.Time
if nextErr == nil {
nextRunPtr = &nextRun
} else {
// #722: same guard as in fireSchedule — preserve existing next_run_at
// rather than writing NULL when the cron expression cannot be parsed.
log.Printf("Scheduler: ComputeNextRun error in recordSkipped for '%s' (%s) — preserving existing next_run_at: %v",
sched.Name, sched.ID, nextErr)
}
// Advance next_run_at + bump run_count so the liveness view reflects
// that we're still ticking. last_status='skipped', last_error carries
// the reason for operators debugging via the schedule history API.
_, _ = db.DB.ExecContext(ctx, `
UPDATE workspace_schedules
SET last_run_at = now(),
next_run_at = COALESCE($2, next_run_at),
run_count = run_count + 1,
last_status = 'skipped',
last_error = $3,
updated_at = now()
WHERE id = $1
`, sched.ID, nextRunPtr, reason)
cronMeta, _ := json.Marshal(map[string]interface{}{
"schedule_id": sched.ID,
"schedule_name": sched.Name,
"cron_expr": sched.CronExpr,
"skipped": true,
"active_tasks": activeTasks,
})
_, _ = db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now())
`, sched.WorkspaceID, "Cron skipped: "+sched.Name, string(cronMeta), reason)
if s.broadcaster != nil {
_ = s.broadcaster.RecordAndBroadcast(ctx, "CRON_SKIPPED", sched.WorkspaceID, map[string]interface{}{
"schedule_id": sched.ID,
"schedule_name": sched.Name,
"reason": reason,
})
}
}
// repairNullNextRunAt is called once during Start() to recompute next_run_at
// for any enabled schedule where it is NULL — a state left by the pre-#722 bug
// where a ComputeNextRun error caused an UPDATE that wrote NULL.
// Without this repair those schedules would never appear in the tick query
// (which requires next_run_at IS NOT NULL) even after the binary is patched.
func (s *Scheduler) repairNullNextRunAt(ctx context.Context) {
rows, err := db.DB.QueryContext(ctx, `
SELECT id, cron_expr, timezone
FROM workspace_schedules
WHERE enabled = true AND next_run_at IS NULL
`)
if err != nil {
log.Printf("Scheduler: startup repair query error: %v", err)
return
}
defer rows.Close()
type repairRow struct {
ID string
CronExpr string
Timezone string
}
var repaired, failed int
for rows.Next() {
var r repairRow
if err := rows.Scan(&r.ID, &r.CronExpr, &r.Timezone); err != nil {
log.Printf("Scheduler: startup repair scan error: %v", err)
continue
}
nextRun, err := ComputeNextRun(r.CronExpr, r.Timezone, time.Now())
if err != nil {
log.Printf("Scheduler: startup repair: cannot compute next_run_at for schedule %s (%s): %v — leaving NULL",
r.ID, r.CronExpr, err)
failed++
continue
}
if _, err := db.DB.ExecContext(ctx, `
UPDATE workspace_schedules SET next_run_at = $2, updated_at = now() WHERE id = $1
`, r.ID, nextRun); err != nil {
log.Printf("Scheduler: startup repair: update failed for schedule %s: %v", r.ID, err)
failed++
} else {
repaired++
}
}
if err := rows.Err(); err != nil {
log.Printf("Scheduler: startup repair rows error: %v", err)
}
if repaired > 0 || failed > 0 {
log.Printf("Scheduler: startup repair: %d schedule(s) repaired, %d skipped (bad cron/tz)", repaired, failed)
}
}
// isEmptyResponse checks if an A2A response body indicates the agent
// produced no meaningful output. Catches "(no response generated)" from
// the workspace runtime + genuinely empty/null responses. Used by the
// consecutive-empty tracker (#795) to detect phantom-producing crons.
// extractResponseSummary pulls the agent's text from the A2A response body.
// Returns empty string if parsing fails or the response has no text content.
func (s *Scheduler) extractResponseSummary(body []byte) string {
if len(body) == 0 {
return ""
}
var resp map[string]interface{}
if json.Unmarshal(body, &resp) != nil {
return ""
}
// A2A response: result.parts[].text
if result, ok := resp["result"].(map[string]interface{}); ok {
if parts, ok := result["parts"].([]interface{}); ok {
for _, p := range parts {
if part, ok := p.(map[string]interface{}); ok {
if text, ok := part["text"].(string); ok && text != "" {
return text
}
}
}
}
}
return ""
}
func isEmptyResponse(body []byte) bool {
if len(body) == 0 {
return true
}
s := string(body)
// The A2A response wraps the agent text in {"result":{"parts":[{"text":"..."}]}}
// Check for the sentinel the workspace runtime emits when the agent produces nothing.
for _, marker := range []string{
`(no response generated)`,
`"text": "(no response generated)"`,
`"text":""`,
`"text": ""`,
} {
if strings.Contains(s, marker) {
return true
}
}
return false
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen-3] + "..."
}
// short returns up to n leading characters of s without panicking when s is
// shorter than n. Used to safely display UUID prefixes in log lines where
// the full ID would be noisy but the full-length bounds check is repetitive.
func short(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n]
}
// ComputeNextRun parses a cron expression and returns the next fire time
// after the given time, in the specified timezone.
func ComputeNextRun(cronExpr, tz string, after time.Time) (time.Time, error) {
loc, err := time.LoadLocation(tz)
if err != nil {
return time.Time{}, fmt.Errorf("invalid timezone %q: %w", tz, err)
}
parser := cronlib.NewParser(cronlib.Minute | cronlib.Hour | cronlib.Dom | cronlib.Month | cronlib.Dow)
sched, err := parser.Parse(cronExpr)
if err != nil {
return time.Time{}, fmt.Errorf("invalid cron expression %q: %w", cronExpr, err)
}
return sched.Next(after.In(loc)).UTC(), nil
}