feat(runtime): native_scheduler skip — primitive #3 of 6

When an adapter declares provides_native_scheduler=True (because its
SDK has built-in cron / Temporal-style workflows), the platform's
polling loop must skip firing schedules for that workspace — otherwise
the schedule fires twice (once natively, once via platform). The
native skip preserves observability (next_run_at still advances, the
schedule row stays in the DB, last_run_at would still update) while
moving the FIRE responsibility to the SDK.

Stacked on PR #2139 (idle_timeout_override end-to-end). The
RuntimeMetadata heartbeat block already carries the capability map;
this PR teaches the platform how to read and act on the scheduler bit.

Components:

  - handlers/runtime_overrides.go: extended the cache to store
    capability flags alongside idle timeout. Two heartbeat fields are
    independent — SetIdleTimeout / SetCapabilities each update one
    without stomping the other. Defensive copy on SetCapabilities so
    a caller mutating its map after the call doesn't retroactively
    change cached declarations. Empty entries dropped to avoid stale
    husks.

  - handlers/runtime_overrides.go: new HasCapability(workspaceID, name)
    + ProvidesNativeScheduler(workspaceID) — the latter is the
    package-level adapter the scheduler imports (avoids a
    handlers/scheduler import cycle).

  - handlers/registry.go: heartbeat handler now calls SetCapabilities
    in addition to SetIdleTimeout.

  - scheduler/scheduler.go: NativeSchedulerCheck function-pointer DI
    (mirrors the existing QueueDrainFunc pattern). New() leaves the
    field nil so existing callers preserve today's "always fire"
    behavior. SetNativeSchedulerCheck wires production. tick() drops
    workspaces declaring native ownership before goroutine fan-out;
    advances next_run_at so we don't tight-loop on the same row.

  - cmd/server/main.go: wires handlers.ProvidesNativeScheduler into
    the cron scheduler at server boot.

Tests:
  Go (7 new):
    - SetCapabilitiesAndHas (round-trip)
    - per-workspace isolation (ws-a's declaration doesn't leak to ws-b)
    - nil/empty map clears (adapter dropping the flag restores fallback)
    - SetCapabilities is a defensive copy (caller mutation can't
      retroactively flip cached value)
    - SetIdleTimeout preserves capabilities and vice-versa (two-field
      independence)
    - empty entry deleted (no stale husks)
    - ProvidesNativeScheduler reads the same singleton heartbeat writes
    - SetNativeSchedulerCheck wires the function (scheduler-side)
    - nil-check safety contract for tick

  Python: no change needed — the heartbeat already serializes the
  full capability map via _runtime_metadata_payload (PR #2139). An
  adapter setting RuntimeCapabilities(provides_native_scheduler=True)
  automatically flows through.

Verification:
  - 1308 / 1308 Python pytest pass (unchanged)
  - All Go handlers + scheduler tests pass
  - go build + go vet clean

See project memory `project_runtime_native_pluggable.md`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-04-26 22:47:00 -07:00
parent 0d3058585b
commit c0a5d842b4
6 changed files with 331 additions and 14 deletions

View File

@ -254,6 +254,12 @@ func main() {
// Cron Scheduler — fires A2A messages to workspaces on user-defined schedules
cronSched := scheduler.New(wh, broadcaster)
// Wire the native-scheduler skip — when an adapter's heartbeat
// declares provides_native_scheduler=true, the platform's polling
// loop drops that workspace's schedules to avoid double-fire (the
// SDK runs them itself). See project memory
// `project_runtime_native_pluggable.md` and capability primitive #3.
cronSched.SetNativeSchedulerCheck(handlers.ProvidesNativeScheduler)
go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start)
// Hibernation Monitor — auto-pauses idle workspaces that have

View File

@ -464,11 +464,12 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) {
// Refresh per-workspace runtime overrides from the heartbeat's
// runtime_metadata block (introduced for the native+pluggable
// runtime principle — see project memory). Only idle_timeout_seconds
// is consumed today; capability flags are stored for future
// consumers (heartbeat-skip, scheduler-skip, etc.) by subsequent
// PRs in task #117. A nil RuntimeMetadata or absent field clears
// the override so the dispatch path uses the global default.
// runtime principle — see project memory). Both idle_timeout_seconds
// and capability flags are stored. Each consumer (a2a_proxy.dispatchA2A
// for idle timeout, scheduler.tick for native scheduler, etc.) reads
// what it needs from the cache. nil RuntimeMetadata or absent field
// clears the corresponding override so the dispatch path uses the
// global default.
if payload.RuntimeMetadata != nil && payload.RuntimeMetadata.IdleTimeoutSeconds != nil {
runtimeOverrides.SetIdleTimeout(
payload.WorkspaceID,
@ -477,6 +478,11 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) {
} else {
runtimeOverrides.SetIdleTimeout(payload.WorkspaceID, 0) // clear
}
if payload.RuntimeMetadata != nil {
runtimeOverrides.SetCapabilities(payload.WorkspaceID, payload.RuntimeMetadata.Capabilities)
} else {
runtimeOverrides.SetCapabilities(payload.WorkspaceID, nil) // clear
}
c.JSON(http.StatusOK, gin.H{"status": "ok"})
}

View File

@ -38,42 +38,114 @@ var runtimeOverrides runtimeOverrideCache
type runtimeOverrideEntry struct {
idleTimeout time.Duration // 0 means "no override; use global default"
// capabilities maps wire-name keys from RuntimeCapabilities.to_dict()
// — "heartbeat", "scheduler", "session", "status_mgmt", "retry",
// "activity_decoration", "channel_dispatch" — to whether the adapter
// claims native ownership. Consumers (e.g. scheduler.tick) read this
// to decide whether to fire their platform-fallback behavior for this
// workspace.
//
// nil map means "no capability declarations received yet" → consumers
// fall back to the platform default (today's behavior).
capabilities map[string]bool
}
type runtimeOverrideCache struct {
m sync.Map // key: workspaceID (string), value: runtimeOverrideEntry
}
// loadEntry returns the entry for workspaceID (or a zero-value entry).
// Internal helper for the partial-update Set methods; sync.Map's
// Load doesn't support "read or default" in one shot.
func (c *runtimeOverrideCache) loadEntry(workspaceID string) runtimeOverrideEntry {
if v, ok := c.m.Load(workspaceID); ok {
if e, ok := v.(runtimeOverrideEntry); ok {
return e
}
}
return runtimeOverrideEntry{}
}
// deleteIfEmpty drops the workspace's entry from the cache when both
// idleTimeout and capabilities are absent. Keeps the cache from
// retaining empty husks forever after a runtime stops sending overrides.
func (c *runtimeOverrideCache) deleteIfEmpty(workspaceID string, e runtimeOverrideEntry) {
if e.idleTimeout <= 0 && len(e.capabilities) == 0 {
c.m.Delete(workspaceID)
return
}
c.m.Store(workspaceID, e)
}
// SetIdleTimeout records the per-workspace idle-timeout override sent
// in the most recent heartbeat. d == 0 clears the override (falling
// back to the global default), so a runtime that previously declared
// an override and then dropped it cleanly returns to platform behavior.
// Capability flags on the same workspace are preserved.
func (c *runtimeOverrideCache) SetIdleTimeout(workspaceID string, d time.Duration) {
if workspaceID == "" {
return
}
e := c.loadEntry(workspaceID)
if d <= 0 {
c.m.Delete(workspaceID)
return
e.idleTimeout = 0
} else {
e.idleTimeout = d
}
c.m.Store(workspaceID, runtimeOverrideEntry{idleTimeout: d})
c.deleteIfEmpty(workspaceID, e)
}
// IdleTimeout returns the per-workspace override and ok=true when one
// is in effect; ok=false means dispatchA2A should fall back to the
// global idleTimeoutDuration.
func (c *runtimeOverrideCache) IdleTimeout(workspaceID string) (time.Duration, bool) {
v, ok := c.m.Load(workspaceID)
if !ok {
return 0, false
}
e, ok := v.(runtimeOverrideEntry)
if !ok || e.idleTimeout <= 0 {
e := c.loadEntry(workspaceID)
if e.idleTimeout <= 0 {
return 0, false
}
return e.idleTimeout, true
}
// SetCapabilities records the per-workspace capability declaration map
// (e.g. {"scheduler": true, "heartbeat": false, ...}) sent in the most
// recent heartbeat. Replaces any prior map; pass nil to clear.
// IdleTimeout on the same workspace is preserved.
//
// The wire-name keys (heartbeat, scheduler, session, status_mgmt, retry,
// activity_decoration, channel_dispatch) match RuntimeCapabilities.to_dict()
// in workspace/adapter_base.py — keep in sync there.
func (c *runtimeOverrideCache) SetCapabilities(workspaceID string, caps map[string]bool) {
if workspaceID == "" {
return
}
e := c.loadEntry(workspaceID)
if len(caps) == 0 {
e.capabilities = nil
} else {
// Defensive copy: caller may reuse / mutate the map after the
// call; the cache holds long-lived refs.
dup := make(map[string]bool, len(caps))
for k, v := range caps {
dup[k] = v
}
e.capabilities = dup
}
c.deleteIfEmpty(workspaceID, e)
}
// HasCapability returns true when the workspace's adapter has declared
// native ownership of the named capability. False when no entry exists,
// no capability map was ever sent, or the named capability is absent /
// false. Consumers (scheduler.tick, etc.) call this before firing their
// platform-fallback behavior.
func (c *runtimeOverrideCache) HasCapability(workspaceID, name string) bool {
if workspaceID == "" || name == "" {
return false
}
e := c.loadEntry(workspaceID)
return e.capabilities[name]
}
// Reset clears the entire cache. Test-only; production code never
// needs this since heartbeats refresh entries naturally.
func (c *runtimeOverrideCache) Reset() {
@ -82,3 +154,11 @@ func (c *runtimeOverrideCache) Reset() {
return true
})
}
// ProvidesNativeScheduler is the public adapter exposed to the scheduler
// package — wraps HasCapability("scheduler") with the package-level
// runtimeOverrides instance. Wired into Scheduler.New() at router setup
// to keep scheduler/scheduler.go free of a handlers/ import.
func ProvidesNativeScheduler(workspaceID string) bool {
return runtimeOverrides.HasCapability(workspaceID, "scheduler")
}

View File

@ -87,6 +87,129 @@ func TestRuntimeOverrideCache_Reset(t *testing.T) {
}
}
func TestRuntimeOverrideCache_SetCapabilitiesAndHas(t *testing.T) {
c := &runtimeOverrideCache{}
if c.HasCapability("ws-a", "scheduler") {
t.Fatal("empty cache must not return any capability")
}
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true, "session": false})
if !c.HasCapability("ws-a", "scheduler") {
t.Fatal("scheduler capability not stored")
}
if c.HasCapability("ws-a", "session") {
t.Fatal("session=false should report as absent (False)")
}
if c.HasCapability("ws-a", "heartbeat") {
t.Fatal("missing key must report as absent")
}
}
func TestRuntimeOverrideCache_CapabilitiesIsolatedPerWorkspace(t *testing.T) {
// Critical: ws-a declaring native scheduler must NOT make ws-b
// also skip its schedules. The cache's per-key isolation is the
// only thing standing between "claude-code adapter declares this"
// and "every workspace silently inherits the declaration."
c := &runtimeOverrideCache{}
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true})
if c.HasCapability("ws-b", "scheduler") {
t.Fatal("ws-a's scheduler capability leaked to ws-b")
}
}
func TestRuntimeOverrideCache_NilOrEmptyCapabilitiesClears(t *testing.T) {
// An adapter that previously declared native scheduler then
// dropped the flag (e.g. SDK update) must restore platform
// fallback. nil + empty-map both mean "clear".
c := &runtimeOverrideCache{}
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true})
if !c.HasCapability("ws-a", "scheduler") {
t.Fatal("setup: scheduler should be set")
}
c.SetCapabilities("ws-a", nil)
if c.HasCapability("ws-a", "scheduler") {
t.Fatal("nil should clear capabilities")
}
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true})
c.SetCapabilities("ws-a", map[string]bool{})
if c.HasCapability("ws-a", "scheduler") {
t.Fatal("empty map should clear capabilities")
}
}
func TestRuntimeOverrideCache_SetCapabilitiesIsDefensiveCopy(t *testing.T) {
// The caller's map MUST NOT alias the cached one. A future careless
// caller mutating the original map after the call should not
// retroactively change cached capability declarations.
c := &runtimeOverrideCache{}
original := map[string]bool{"scheduler": true}
c.SetCapabilities("ws-a", original)
original["scheduler"] = false
if !c.HasCapability("ws-a", "scheduler") {
t.Fatal("cache aliased the caller's map; capability flipped via outside mutation")
}
}
func TestRuntimeOverrideCache_SetIdleTimeoutPreservesCapabilities(t *testing.T) {
// The two heartbeat fields are independent — updating one must
// not stomp the other. Pre-fix, each Set replaced the entire
// entry, which meant the second-arriving Set in the heartbeat
// handler effectively erased the first.
c := &runtimeOverrideCache{}
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true})
c.SetIdleTimeout("ws-a", 600*time.Second)
if !c.HasCapability("ws-a", "scheduler") {
t.Fatal("SetIdleTimeout erased prior capabilities")
}
got, ok := c.IdleTimeout("ws-a")
if !ok || got != 600*time.Second {
t.Fatalf("idle timeout lost; got=%v ok=%v", got, ok)
}
// And the inverse: SetCapabilities must not erase IdleTimeout.
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true, "session": true})
if got, ok := c.IdleTimeout("ws-a"); !ok || got != 600*time.Second {
t.Fatal("SetCapabilities erased prior idle timeout")
}
}
func TestRuntimeOverrideCache_EmptyEntryDeleted(t *testing.T) {
// When both fields are cleared, the entry should drop out of the
// cache entirely so a stale workspace doesn't accumulate empty
// husks indefinitely.
c := &runtimeOverrideCache{}
c.SetIdleTimeout("ws-a", 60*time.Second)
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true})
c.SetIdleTimeout("ws-a", 0)
c.SetCapabilities("ws-a", nil)
if _, ok := c.m.Load("ws-a"); ok {
t.Fatal("entry should be deleted when both fields cleared")
}
}
func TestProvidesNativeScheduler_PackageLevel(t *testing.T) {
// The package-level function the scheduler imports — pin that it
// reads the same singleton the heartbeat handler writes to.
runtimeOverrides.Reset()
defer runtimeOverrides.Reset()
if ProvidesNativeScheduler("ws-a") {
t.Fatal("empty cache should not declare native scheduler")
}
runtimeOverrides.SetCapabilities("ws-a", map[string]bool{"scheduler": true})
if !ProvidesNativeScheduler("ws-a") {
t.Fatal("ProvidesNativeScheduler did not see the declaration")
}
if ProvidesNativeScheduler("") {
t.Fatal("empty workspace ID should never declare native scheduler")
}
}
func TestRuntimeOverrideCache_ConcurrentSafe(t *testing.T) {
// dispatchA2A reads the cache on every request; heartbeat handlers
// write on every 30s. Different workspaces will be hot in different

View File

@ -0,0 +1,52 @@
package scheduler
import (
"testing"
)
// TestSetNativeSchedulerCheck pins the wiring contract: New() leaves
// providesNativeScheduler nil (= today's behavior, never skip);
// SetNativeSchedulerCheck installs the override. The actual skip
// behavior in tick() needs a DB and is exercised by the integration
// tests in tests/e2e/.
func TestSetNativeSchedulerCheck(t *testing.T) {
s := New(nil, nil)
if s.providesNativeScheduler != nil {
t.Fatal("New() must leave providesNativeScheduler nil so untouched callers preserve today's behavior")
}
called := false
checker := NativeSchedulerCheck(func(workspaceID string) bool {
called = true
return workspaceID == "ws-native"
})
s.SetNativeSchedulerCheck(checker)
if s.providesNativeScheduler == nil {
t.Fatal("SetNativeSchedulerCheck did not install the function")
}
if !s.providesNativeScheduler("ws-native") {
t.Fatal("installed checker not invoked / wrong return")
}
if !called {
t.Fatal("installed checker not called")
}
if s.providesNativeScheduler("ws-other") {
t.Fatal("checker should return false for non-native workspace")
}
}
// TestNativeSchedulerCheck_NilSafeInTick documents the contract used
// by tick(): a nil providesNativeScheduler must mean "always fire" so
// existing callers (test fixtures, prior to capability primitives)
// preserve today's behavior unchanged. The conditional in tick reads
// `s.providesNativeScheduler != nil && s.providesNativeScheduler(id)`
// — neither branch can panic on a nil-checker scheduler.
func TestNativeSchedulerCheck_NilSafeInTick(t *testing.T) {
s := New(nil, nil)
// We don't actually call tick() — that requires a live DB. We just
// pin that the field is nil after New, which is the load-bearing
// invariant tick() relies on.
if s.providesNativeScheduler != nil {
t.Fatal("nil-safety contract violated: providesNativeScheduler must be nil from New()")
}
}

View File

@ -70,6 +70,21 @@ type ChannelBroadcaster interface {
FetchWorkspaceChannelContext(ctx context.Context, workspaceID string) string
}
// NativeSchedulerCheck returns true when the workspace's adapter has
// declared `provides_native_scheduler=True` in its capabilities. The
// scheduler skips polling-and-firing for these workspaces — the SDK
// runs the schedule itself (Temporal, Durable Functions, etc.) and the
// platform's polling would cause double-fire on every restart.
//
// Wired at construction by the router (production) or tests. nil is
// allowed and treated as "no override" for every workspace, preserving
// today's behavior — same default-false posture as
// BaseAdapter.capabilities() in workspace/adapter_base.py.
//
// See project memory `project_runtime_native_pluggable.md` and
// handlers.ProvidesNativeScheduler for the production wiring.
type NativeSchedulerCheck func(workspaceID string) bool
// Scheduler polls the workspace_schedules table and fires A2A messages
// when a schedule's next_run_at has passed. Follows the same goroutine
// pattern as registry.StartHealthSweep.
@ -78,6 +93,11 @@ type Scheduler struct {
broadcaster Broadcaster
channels ChannelBroadcaster
// providesNativeScheduler, when non-nil and returning true, causes
// tick() to skip firing for this workspace. nil = always-fire (the
// pre-capability-primitive behavior). Constructor docs above.
providesNativeScheduler NativeSchedulerCheck
// lastTickAt records the wall-clock time of the most recent tick
// (whether it fired schedules or not). Read by Healthy() and the
// /admin/scheduler/health endpoint to detect stuck-tick conditions.
@ -102,6 +122,15 @@ func (s *Scheduler) SetChannels(ch ChannelBroadcaster) {
s.channels = ch
}
// SetNativeSchedulerCheck wires the per-workspace native-scheduler
// override lookup. Wired by the router after the scheduler is
// constructed (handlers package owns the cache). Pass nil to disable
// the skip — every schedule fires regardless of adapter declaration,
// matching pre-capability-primitive behavior.
func (s *Scheduler) SetNativeSchedulerCheck(f NativeSchedulerCheck) {
s.providesNativeScheduler = f
}
// LastTickAt returns the wall-clock time of the most recently completed tick.
// Returns a zero time.Time if the scheduler has never completed a tick.
func (s *Scheduler) LastTickAt() time.Time {
@ -231,6 +260,27 @@ func (s *Scheduler) tick(ctx context.Context) {
log.Printf("Scheduler: scan error: %v", err)
continue
}
// Skip workspaces whose adapter owns scheduling natively (e.g.
// SDKs with built-in cron / Temporal-style workflows). Without
// this skip, the platform's polling would fire the same
// schedule twice — once natively in the SDK, once via this
// loop. The skip drops only the FIRE; the schedule row stays
// in the DB and the platform still records it, so observability
// (next_run_at, last_run_at) is preserved per the principle.
// Pre-fix this branch was unconditional; nil check preserves
// behavior for callers that didn't wire the override.
if s.providesNativeScheduler != nil && s.providesNativeScheduler(sched.WorkspaceID) {
// Advance next_run_at so we don't tight-loop on the same
// row every tick. A non-firing schedule is still scheduled.
if nextTime, err := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()); err == nil {
if _, execErr := db.DB.ExecContext(ctx,
`UPDATE workspace_schedules SET next_run_at=$1, updated_at=now() WHERE id=$2`,
nextTime, sched.ID); execErr != nil {
log.Printf("Scheduler: native-skip next_run_at UPDATE failed for schedule %s: %v", sched.ID, execErr)
}
}
continue
}
wg.Add(1)
sem <- struct{}{}
go func(s2 scheduleRow) {