forked from molecule-ai/molecule-core
Merge pull request #2140 from Molecule-AI/feat/native-scheduler-skip
feat(runtime): native_scheduler skip — primitive #3 of 6
This commit is contained in:
commit
ddfe249584
@ -254,6 +254,12 @@ func main() {
|
||||
|
||||
// Cron Scheduler — fires A2A messages to workspaces on user-defined schedules
|
||||
cronSched := scheduler.New(wh, broadcaster)
|
||||
// Wire the native-scheduler skip — when an adapter's heartbeat
|
||||
// declares provides_native_scheduler=true, the platform's polling
|
||||
// loop drops that workspace's schedules to avoid double-fire (the
|
||||
// SDK runs them itself). See project memory
|
||||
// `project_runtime_native_pluggable.md` and capability primitive #3.
|
||||
cronSched.SetNativeSchedulerCheck(handlers.ProvidesNativeScheduler)
|
||||
go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start)
|
||||
|
||||
// Hibernation Monitor — auto-pauses idle workspaces that have
|
||||
|
||||
@ -464,11 +464,12 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) {
|
||||
|
||||
// Refresh per-workspace runtime overrides from the heartbeat's
|
||||
// runtime_metadata block (introduced for the native+pluggable
|
||||
// runtime principle — see project memory). Only idle_timeout_seconds
|
||||
// is consumed today; capability flags are stored for future
|
||||
// consumers (heartbeat-skip, scheduler-skip, etc.) by subsequent
|
||||
// PRs in task #117. A nil RuntimeMetadata or absent field clears
|
||||
// the override so the dispatch path uses the global default.
|
||||
// runtime principle — see project memory). Both idle_timeout_seconds
|
||||
// and capability flags are stored. Each consumer (a2a_proxy.dispatchA2A
|
||||
// for idle timeout, scheduler.tick for native scheduler, etc.) reads
|
||||
// what it needs from the cache. nil RuntimeMetadata or absent field
|
||||
// clears the corresponding override so the dispatch path uses the
|
||||
// global default.
|
||||
if payload.RuntimeMetadata != nil && payload.RuntimeMetadata.IdleTimeoutSeconds != nil {
|
||||
runtimeOverrides.SetIdleTimeout(
|
||||
payload.WorkspaceID,
|
||||
@ -477,6 +478,11 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) {
|
||||
} else {
|
||||
runtimeOverrides.SetIdleTimeout(payload.WorkspaceID, 0) // clear
|
||||
}
|
||||
if payload.RuntimeMetadata != nil {
|
||||
runtimeOverrides.SetCapabilities(payload.WorkspaceID, payload.RuntimeMetadata.Capabilities)
|
||||
} else {
|
||||
runtimeOverrides.SetCapabilities(payload.WorkspaceID, nil) // clear
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": "ok"})
|
||||
}
|
||||
|
||||
@ -38,42 +38,114 @@ var runtimeOverrides runtimeOverrideCache
|
||||
|
||||
type runtimeOverrideEntry struct {
|
||||
idleTimeout time.Duration // 0 means "no override; use global default"
|
||||
// capabilities maps wire-name keys from RuntimeCapabilities.to_dict()
|
||||
// — "heartbeat", "scheduler", "session", "status_mgmt", "retry",
|
||||
// "activity_decoration", "channel_dispatch" — to whether the adapter
|
||||
// claims native ownership. Consumers (e.g. scheduler.tick) read this
|
||||
// to decide whether to fire their platform-fallback behavior for this
|
||||
// workspace.
|
||||
//
|
||||
// nil map means "no capability declarations received yet" → consumers
|
||||
// fall back to the platform default (today's behavior).
|
||||
capabilities map[string]bool
|
||||
}
|
||||
|
||||
type runtimeOverrideCache struct {
|
||||
m sync.Map // key: workspaceID (string), value: runtimeOverrideEntry
|
||||
}
|
||||
|
||||
// loadEntry returns the entry for workspaceID (or a zero-value entry).
|
||||
// Internal helper for the partial-update Set methods; sync.Map's
|
||||
// Load doesn't support "read or default" in one shot.
|
||||
func (c *runtimeOverrideCache) loadEntry(workspaceID string) runtimeOverrideEntry {
|
||||
if v, ok := c.m.Load(workspaceID); ok {
|
||||
if e, ok := v.(runtimeOverrideEntry); ok {
|
||||
return e
|
||||
}
|
||||
}
|
||||
return runtimeOverrideEntry{}
|
||||
}
|
||||
|
||||
// deleteIfEmpty drops the workspace's entry from the cache when both
|
||||
// idleTimeout and capabilities are absent. Keeps the cache from
|
||||
// retaining empty husks forever after a runtime stops sending overrides.
|
||||
func (c *runtimeOverrideCache) deleteIfEmpty(workspaceID string, e runtimeOverrideEntry) {
|
||||
if e.idleTimeout <= 0 && len(e.capabilities) == 0 {
|
||||
c.m.Delete(workspaceID)
|
||||
return
|
||||
}
|
||||
c.m.Store(workspaceID, e)
|
||||
}
|
||||
|
||||
// SetIdleTimeout records the per-workspace idle-timeout override sent
|
||||
// in the most recent heartbeat. d == 0 clears the override (falling
|
||||
// back to the global default), so a runtime that previously declared
|
||||
// an override and then dropped it cleanly returns to platform behavior.
|
||||
// Capability flags on the same workspace are preserved.
|
||||
func (c *runtimeOverrideCache) SetIdleTimeout(workspaceID string, d time.Duration) {
|
||||
if workspaceID == "" {
|
||||
return
|
||||
}
|
||||
e := c.loadEntry(workspaceID)
|
||||
if d <= 0 {
|
||||
c.m.Delete(workspaceID)
|
||||
return
|
||||
e.idleTimeout = 0
|
||||
} else {
|
||||
e.idleTimeout = d
|
||||
}
|
||||
c.m.Store(workspaceID, runtimeOverrideEntry{idleTimeout: d})
|
||||
c.deleteIfEmpty(workspaceID, e)
|
||||
}
|
||||
|
||||
// IdleTimeout returns the per-workspace override and ok=true when one
|
||||
// is in effect; ok=false means dispatchA2A should fall back to the
|
||||
// global idleTimeoutDuration.
|
||||
func (c *runtimeOverrideCache) IdleTimeout(workspaceID string) (time.Duration, bool) {
|
||||
v, ok := c.m.Load(workspaceID)
|
||||
if !ok {
|
||||
return 0, false
|
||||
}
|
||||
e, ok := v.(runtimeOverrideEntry)
|
||||
if !ok || e.idleTimeout <= 0 {
|
||||
e := c.loadEntry(workspaceID)
|
||||
if e.idleTimeout <= 0 {
|
||||
return 0, false
|
||||
}
|
||||
return e.idleTimeout, true
|
||||
}
|
||||
|
||||
// SetCapabilities records the per-workspace capability declaration map
|
||||
// (e.g. {"scheduler": true, "heartbeat": false, ...}) sent in the most
|
||||
// recent heartbeat. Replaces any prior map; pass nil to clear.
|
||||
// IdleTimeout on the same workspace is preserved.
|
||||
//
|
||||
// The wire-name keys (heartbeat, scheduler, session, status_mgmt, retry,
|
||||
// activity_decoration, channel_dispatch) match RuntimeCapabilities.to_dict()
|
||||
// in workspace/adapter_base.py — keep in sync there.
|
||||
func (c *runtimeOverrideCache) SetCapabilities(workspaceID string, caps map[string]bool) {
|
||||
if workspaceID == "" {
|
||||
return
|
||||
}
|
||||
e := c.loadEntry(workspaceID)
|
||||
if len(caps) == 0 {
|
||||
e.capabilities = nil
|
||||
} else {
|
||||
// Defensive copy: caller may reuse / mutate the map after the
|
||||
// call; the cache holds long-lived refs.
|
||||
dup := make(map[string]bool, len(caps))
|
||||
for k, v := range caps {
|
||||
dup[k] = v
|
||||
}
|
||||
e.capabilities = dup
|
||||
}
|
||||
c.deleteIfEmpty(workspaceID, e)
|
||||
}
|
||||
|
||||
// HasCapability returns true when the workspace's adapter has declared
|
||||
// native ownership of the named capability. False when no entry exists,
|
||||
// no capability map was ever sent, or the named capability is absent /
|
||||
// false. Consumers (scheduler.tick, etc.) call this before firing their
|
||||
// platform-fallback behavior.
|
||||
func (c *runtimeOverrideCache) HasCapability(workspaceID, name string) bool {
|
||||
if workspaceID == "" || name == "" {
|
||||
return false
|
||||
}
|
||||
e := c.loadEntry(workspaceID)
|
||||
return e.capabilities[name]
|
||||
}
|
||||
|
||||
// Reset clears the entire cache. Test-only; production code never
|
||||
// needs this since heartbeats refresh entries naturally.
|
||||
func (c *runtimeOverrideCache) Reset() {
|
||||
@ -82,3 +154,11 @@ func (c *runtimeOverrideCache) Reset() {
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// ProvidesNativeScheduler is the public adapter exposed to the scheduler
|
||||
// package — wraps HasCapability("scheduler") with the package-level
|
||||
// runtimeOverrides instance. Wired into Scheduler.New() at router setup
|
||||
// to keep scheduler/scheduler.go free of a handlers/ import.
|
||||
func ProvidesNativeScheduler(workspaceID string) bool {
|
||||
return runtimeOverrides.HasCapability(workspaceID, "scheduler")
|
||||
}
|
||||
|
||||
@ -87,6 +87,129 @@ func TestRuntimeOverrideCache_Reset(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuntimeOverrideCache_SetCapabilitiesAndHas(t *testing.T) {
|
||||
c := &runtimeOverrideCache{}
|
||||
if c.HasCapability("ws-a", "scheduler") {
|
||||
t.Fatal("empty cache must not return any capability")
|
||||
}
|
||||
|
||||
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true, "session": false})
|
||||
if !c.HasCapability("ws-a", "scheduler") {
|
||||
t.Fatal("scheduler capability not stored")
|
||||
}
|
||||
if c.HasCapability("ws-a", "session") {
|
||||
t.Fatal("session=false should report as absent (False)")
|
||||
}
|
||||
if c.HasCapability("ws-a", "heartbeat") {
|
||||
t.Fatal("missing key must report as absent")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuntimeOverrideCache_CapabilitiesIsolatedPerWorkspace(t *testing.T) {
|
||||
// Critical: ws-a declaring native scheduler must NOT make ws-b
|
||||
// also skip its schedules. The cache's per-key isolation is the
|
||||
// only thing standing between "claude-code adapter declares this"
|
||||
// and "every workspace silently inherits the declaration."
|
||||
c := &runtimeOverrideCache{}
|
||||
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true})
|
||||
if c.HasCapability("ws-b", "scheduler") {
|
||||
t.Fatal("ws-a's scheduler capability leaked to ws-b")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuntimeOverrideCache_NilOrEmptyCapabilitiesClears(t *testing.T) {
|
||||
// An adapter that previously declared native scheduler then
|
||||
// dropped the flag (e.g. SDK update) must restore platform
|
||||
// fallback. nil + empty-map both mean "clear".
|
||||
c := &runtimeOverrideCache{}
|
||||
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true})
|
||||
if !c.HasCapability("ws-a", "scheduler") {
|
||||
t.Fatal("setup: scheduler should be set")
|
||||
}
|
||||
|
||||
c.SetCapabilities("ws-a", nil)
|
||||
if c.HasCapability("ws-a", "scheduler") {
|
||||
t.Fatal("nil should clear capabilities")
|
||||
}
|
||||
|
||||
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true})
|
||||
c.SetCapabilities("ws-a", map[string]bool{})
|
||||
if c.HasCapability("ws-a", "scheduler") {
|
||||
t.Fatal("empty map should clear capabilities")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuntimeOverrideCache_SetCapabilitiesIsDefensiveCopy(t *testing.T) {
|
||||
// The caller's map MUST NOT alias the cached one. A future careless
|
||||
// caller mutating the original map after the call should not
|
||||
// retroactively change cached capability declarations.
|
||||
c := &runtimeOverrideCache{}
|
||||
original := map[string]bool{"scheduler": true}
|
||||
c.SetCapabilities("ws-a", original)
|
||||
original["scheduler"] = false
|
||||
if !c.HasCapability("ws-a", "scheduler") {
|
||||
t.Fatal("cache aliased the caller's map; capability flipped via outside mutation")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuntimeOverrideCache_SetIdleTimeoutPreservesCapabilities(t *testing.T) {
|
||||
// The two heartbeat fields are independent — updating one must
|
||||
// not stomp the other. Pre-fix, each Set replaced the entire
|
||||
// entry, which meant the second-arriving Set in the heartbeat
|
||||
// handler effectively erased the first.
|
||||
c := &runtimeOverrideCache{}
|
||||
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true})
|
||||
c.SetIdleTimeout("ws-a", 600*time.Second)
|
||||
|
||||
if !c.HasCapability("ws-a", "scheduler") {
|
||||
t.Fatal("SetIdleTimeout erased prior capabilities")
|
||||
}
|
||||
got, ok := c.IdleTimeout("ws-a")
|
||||
if !ok || got != 600*time.Second {
|
||||
t.Fatalf("idle timeout lost; got=%v ok=%v", got, ok)
|
||||
}
|
||||
|
||||
// And the inverse: SetCapabilities must not erase IdleTimeout.
|
||||
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true, "session": true})
|
||||
if got, ok := c.IdleTimeout("ws-a"); !ok || got != 600*time.Second {
|
||||
t.Fatal("SetCapabilities erased prior idle timeout")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuntimeOverrideCache_EmptyEntryDeleted(t *testing.T) {
|
||||
// When both fields are cleared, the entry should drop out of the
|
||||
// cache entirely so a stale workspace doesn't accumulate empty
|
||||
// husks indefinitely.
|
||||
c := &runtimeOverrideCache{}
|
||||
c.SetIdleTimeout("ws-a", 60*time.Second)
|
||||
c.SetCapabilities("ws-a", map[string]bool{"scheduler": true})
|
||||
|
||||
c.SetIdleTimeout("ws-a", 0)
|
||||
c.SetCapabilities("ws-a", nil)
|
||||
|
||||
if _, ok := c.m.Load("ws-a"); ok {
|
||||
t.Fatal("entry should be deleted when both fields cleared")
|
||||
}
|
||||
}
|
||||
|
||||
func TestProvidesNativeScheduler_PackageLevel(t *testing.T) {
|
||||
// The package-level function the scheduler imports — pin that it
|
||||
// reads the same singleton the heartbeat handler writes to.
|
||||
runtimeOverrides.Reset()
|
||||
defer runtimeOverrides.Reset()
|
||||
|
||||
if ProvidesNativeScheduler("ws-a") {
|
||||
t.Fatal("empty cache should not declare native scheduler")
|
||||
}
|
||||
runtimeOverrides.SetCapabilities("ws-a", map[string]bool{"scheduler": true})
|
||||
if !ProvidesNativeScheduler("ws-a") {
|
||||
t.Fatal("ProvidesNativeScheduler did not see the declaration")
|
||||
}
|
||||
if ProvidesNativeScheduler("") {
|
||||
t.Fatal("empty workspace ID should never declare native scheduler")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRuntimeOverrideCache_ConcurrentSafe(t *testing.T) {
|
||||
// dispatchA2A reads the cache on every request; heartbeat handlers
|
||||
// write on every 30s. Different workspaces will be hot in different
|
||||
|
||||
52
workspace-server/internal/scheduler/native_scheduler_test.go
Normal file
52
workspace-server/internal/scheduler/native_scheduler_test.go
Normal file
@ -0,0 +1,52 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestSetNativeSchedulerCheck pins the wiring contract: New() leaves
|
||||
// providesNativeScheduler nil (= today's behavior, never skip);
|
||||
// SetNativeSchedulerCheck installs the override. The actual skip
|
||||
// behavior in tick() needs a DB and is exercised by the integration
|
||||
// tests in tests/e2e/.
|
||||
func TestSetNativeSchedulerCheck(t *testing.T) {
|
||||
s := New(nil, nil)
|
||||
if s.providesNativeScheduler != nil {
|
||||
t.Fatal("New() must leave providesNativeScheduler nil so untouched callers preserve today's behavior")
|
||||
}
|
||||
|
||||
called := false
|
||||
checker := NativeSchedulerCheck(func(workspaceID string) bool {
|
||||
called = true
|
||||
return workspaceID == "ws-native"
|
||||
})
|
||||
s.SetNativeSchedulerCheck(checker)
|
||||
if s.providesNativeScheduler == nil {
|
||||
t.Fatal("SetNativeSchedulerCheck did not install the function")
|
||||
}
|
||||
if !s.providesNativeScheduler("ws-native") {
|
||||
t.Fatal("installed checker not invoked / wrong return")
|
||||
}
|
||||
if !called {
|
||||
t.Fatal("installed checker not called")
|
||||
}
|
||||
if s.providesNativeScheduler("ws-other") {
|
||||
t.Fatal("checker should return false for non-native workspace")
|
||||
}
|
||||
}
|
||||
|
||||
// TestNativeSchedulerCheck_NilSafeInTick documents the contract used
|
||||
// by tick(): a nil providesNativeScheduler must mean "always fire" so
|
||||
// existing callers (test fixtures, prior to capability primitives)
|
||||
// preserve today's behavior unchanged. The conditional in tick reads
|
||||
// `s.providesNativeScheduler != nil && s.providesNativeScheduler(id)`
|
||||
// — neither branch can panic on a nil-checker scheduler.
|
||||
func TestNativeSchedulerCheck_NilSafeInTick(t *testing.T) {
|
||||
s := New(nil, nil)
|
||||
// We don't actually call tick() — that requires a live DB. We just
|
||||
// pin that the field is nil after New, which is the load-bearing
|
||||
// invariant tick() relies on.
|
||||
if s.providesNativeScheduler != nil {
|
||||
t.Fatal("nil-safety contract violated: providesNativeScheduler must be nil from New()")
|
||||
}
|
||||
}
|
||||
@ -70,6 +70,21 @@ type ChannelBroadcaster interface {
|
||||
FetchWorkspaceChannelContext(ctx context.Context, workspaceID string) string
|
||||
}
|
||||
|
||||
// NativeSchedulerCheck returns true when the workspace's adapter has
|
||||
// declared `provides_native_scheduler=True` in its capabilities. The
|
||||
// scheduler skips polling-and-firing for these workspaces — the SDK
|
||||
// runs the schedule itself (Temporal, Durable Functions, etc.) and the
|
||||
// platform's polling would cause double-fire on every restart.
|
||||
//
|
||||
// Wired at construction by the router (production) or tests. nil is
|
||||
// allowed and treated as "no override" for every workspace, preserving
|
||||
// today's behavior — same default-false posture as
|
||||
// BaseAdapter.capabilities() in workspace/adapter_base.py.
|
||||
//
|
||||
// See project memory `project_runtime_native_pluggable.md` and
|
||||
// handlers.ProvidesNativeScheduler for the production wiring.
|
||||
type NativeSchedulerCheck func(workspaceID string) bool
|
||||
|
||||
// Scheduler polls the workspace_schedules table and fires A2A messages
|
||||
// when a schedule's next_run_at has passed. Follows the same goroutine
|
||||
// pattern as registry.StartHealthSweep.
|
||||
@ -78,6 +93,11 @@ type Scheduler struct {
|
||||
broadcaster Broadcaster
|
||||
channels ChannelBroadcaster
|
||||
|
||||
// providesNativeScheduler, when non-nil and returning true, causes
|
||||
// tick() to skip firing for this workspace. nil = always-fire (the
|
||||
// pre-capability-primitive behavior). Constructor docs above.
|
||||
providesNativeScheduler NativeSchedulerCheck
|
||||
|
||||
// lastTickAt records the wall-clock time of the most recent tick
|
||||
// (whether it fired schedules or not). Read by Healthy() and the
|
||||
// /admin/scheduler/health endpoint to detect stuck-tick conditions.
|
||||
@ -102,6 +122,15 @@ func (s *Scheduler) SetChannels(ch ChannelBroadcaster) {
|
||||
s.channels = ch
|
||||
}
|
||||
|
||||
// SetNativeSchedulerCheck wires the per-workspace native-scheduler
|
||||
// override lookup. Wired by the router after the scheduler is
|
||||
// constructed (handlers package owns the cache). Pass nil to disable
|
||||
// the skip — every schedule fires regardless of adapter declaration,
|
||||
// matching pre-capability-primitive behavior.
|
||||
func (s *Scheduler) SetNativeSchedulerCheck(f NativeSchedulerCheck) {
|
||||
s.providesNativeScheduler = f
|
||||
}
|
||||
|
||||
// LastTickAt returns the wall-clock time of the most recently completed tick.
|
||||
// Returns a zero time.Time if the scheduler has never completed a tick.
|
||||
func (s *Scheduler) LastTickAt() time.Time {
|
||||
@ -231,6 +260,27 @@ func (s *Scheduler) tick(ctx context.Context) {
|
||||
log.Printf("Scheduler: scan error: %v", err)
|
||||
continue
|
||||
}
|
||||
// Skip workspaces whose adapter owns scheduling natively (e.g.
|
||||
// SDKs with built-in cron / Temporal-style workflows). Without
|
||||
// this skip, the platform's polling would fire the same
|
||||
// schedule twice — once natively in the SDK, once via this
|
||||
// loop. The skip drops only the FIRE; the schedule row stays
|
||||
// in the DB and the platform still records it, so observability
|
||||
// (next_run_at, last_run_at) is preserved per the principle.
|
||||
// Pre-fix this branch was unconditional; nil check preserves
|
||||
// behavior for callers that didn't wire the override.
|
||||
if s.providesNativeScheduler != nil && s.providesNativeScheduler(sched.WorkspaceID) {
|
||||
// Advance next_run_at so we don't tight-loop on the same
|
||||
// row every tick. A non-firing schedule is still scheduled.
|
||||
if nextTime, err := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()); err == nil {
|
||||
if _, execErr := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspace_schedules SET next_run_at=$1, updated_at=now() WHERE id=$2`,
|
||||
nextTime, sched.ID); execErr != nil {
|
||||
log.Printf("Scheduler: native-skip next_run_at UPDATE failed for schedule %s: %v", sched.ID, execErr)
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
sem <- struct{}{}
|
||||
go func(s2 scheduleRow) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user