diff --git a/workspace-server/cmd/server/main.go b/workspace-server/cmd/server/main.go index 1e3e284e..d0d5ae57 100644 --- a/workspace-server/cmd/server/main.go +++ b/workspace-server/cmd/server/main.go @@ -254,6 +254,12 @@ func main() { // Cron Scheduler — fires A2A messages to workspaces on user-defined schedules cronSched := scheduler.New(wh, broadcaster) + // Wire the native-scheduler skip — when an adapter's heartbeat + // declares provides_native_scheduler=true, the platform's polling + // loop drops that workspace's schedules to avoid double-fire (the + // SDK runs them itself). See project memory + // `project_runtime_native_pluggable.md` and capability primitive #3. + cronSched.SetNativeSchedulerCheck(handlers.ProvidesNativeScheduler) go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start) // Hibernation Monitor — auto-pauses idle workspaces that have diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index 82386b82..755c3f81 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -464,11 +464,12 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) { // Refresh per-workspace runtime overrides from the heartbeat's // runtime_metadata block (introduced for the native+pluggable - // runtime principle — see project memory). Only idle_timeout_seconds - // is consumed today; capability flags are stored for future - // consumers (heartbeat-skip, scheduler-skip, etc.) by subsequent - // PRs in task #117. A nil RuntimeMetadata or absent field clears - // the override so the dispatch path uses the global default. + // runtime principle — see project memory). Both idle_timeout_seconds + // and capability flags are stored. Each consumer (a2a_proxy.dispatchA2A + // for idle timeout, scheduler.tick for native scheduler, etc.) reads + // what it needs from the cache. nil RuntimeMetadata or absent field + // clears the corresponding override so the dispatch path uses the + // global default. if payload.RuntimeMetadata != nil && payload.RuntimeMetadata.IdleTimeoutSeconds != nil { runtimeOverrides.SetIdleTimeout( payload.WorkspaceID, @@ -477,6 +478,11 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) { } else { runtimeOverrides.SetIdleTimeout(payload.WorkspaceID, 0) // clear } + if payload.RuntimeMetadata != nil { + runtimeOverrides.SetCapabilities(payload.WorkspaceID, payload.RuntimeMetadata.Capabilities) + } else { + runtimeOverrides.SetCapabilities(payload.WorkspaceID, nil) // clear + } c.JSON(http.StatusOK, gin.H{"status": "ok"}) } diff --git a/workspace-server/internal/handlers/runtime_overrides.go b/workspace-server/internal/handlers/runtime_overrides.go index 2e188dd2..60ff83d3 100644 --- a/workspace-server/internal/handlers/runtime_overrides.go +++ b/workspace-server/internal/handlers/runtime_overrides.go @@ -38,42 +38,114 @@ var runtimeOverrides runtimeOverrideCache type runtimeOverrideEntry struct { idleTimeout time.Duration // 0 means "no override; use global default" + // capabilities maps wire-name keys from RuntimeCapabilities.to_dict() + // — "heartbeat", "scheduler", "session", "status_mgmt", "retry", + // "activity_decoration", "channel_dispatch" — to whether the adapter + // claims native ownership. Consumers (e.g. scheduler.tick) read this + // to decide whether to fire their platform-fallback behavior for this + // workspace. + // + // nil map means "no capability declarations received yet" → consumers + // fall back to the platform default (today's behavior). + capabilities map[string]bool } type runtimeOverrideCache struct { m sync.Map // key: workspaceID (string), value: runtimeOverrideEntry } +// loadEntry returns the entry for workspaceID (or a zero-value entry). +// Internal helper for the partial-update Set methods; sync.Map's +// Load doesn't support "read or default" in one shot. +func (c *runtimeOverrideCache) loadEntry(workspaceID string) runtimeOverrideEntry { + if v, ok := c.m.Load(workspaceID); ok { + if e, ok := v.(runtimeOverrideEntry); ok { + return e + } + } + return runtimeOverrideEntry{} +} + +// deleteIfEmpty drops the workspace's entry from the cache when both +// idleTimeout and capabilities are absent. Keeps the cache from +// retaining empty husks forever after a runtime stops sending overrides. +func (c *runtimeOverrideCache) deleteIfEmpty(workspaceID string, e runtimeOverrideEntry) { + if e.idleTimeout <= 0 && len(e.capabilities) == 0 { + c.m.Delete(workspaceID) + return + } + c.m.Store(workspaceID, e) +} + // SetIdleTimeout records the per-workspace idle-timeout override sent // in the most recent heartbeat. d == 0 clears the override (falling // back to the global default), so a runtime that previously declared // an override and then dropped it cleanly returns to platform behavior. +// Capability flags on the same workspace are preserved. func (c *runtimeOverrideCache) SetIdleTimeout(workspaceID string, d time.Duration) { if workspaceID == "" { return } + e := c.loadEntry(workspaceID) if d <= 0 { - c.m.Delete(workspaceID) - return + e.idleTimeout = 0 + } else { + e.idleTimeout = d } - c.m.Store(workspaceID, runtimeOverrideEntry{idleTimeout: d}) + c.deleteIfEmpty(workspaceID, e) } // IdleTimeout returns the per-workspace override and ok=true when one // is in effect; ok=false means dispatchA2A should fall back to the // global idleTimeoutDuration. func (c *runtimeOverrideCache) IdleTimeout(workspaceID string) (time.Duration, bool) { - v, ok := c.m.Load(workspaceID) - if !ok { - return 0, false - } - e, ok := v.(runtimeOverrideEntry) - if !ok || e.idleTimeout <= 0 { + e := c.loadEntry(workspaceID) + if e.idleTimeout <= 0 { return 0, false } return e.idleTimeout, true } +// SetCapabilities records the per-workspace capability declaration map +// (e.g. {"scheduler": true, "heartbeat": false, ...}) sent in the most +// recent heartbeat. Replaces any prior map; pass nil to clear. +// IdleTimeout on the same workspace is preserved. +// +// The wire-name keys (heartbeat, scheduler, session, status_mgmt, retry, +// activity_decoration, channel_dispatch) match RuntimeCapabilities.to_dict() +// in workspace/adapter_base.py — keep in sync there. +func (c *runtimeOverrideCache) SetCapabilities(workspaceID string, caps map[string]bool) { + if workspaceID == "" { + return + } + e := c.loadEntry(workspaceID) + if len(caps) == 0 { + e.capabilities = nil + } else { + // Defensive copy: caller may reuse / mutate the map after the + // call; the cache holds long-lived refs. + dup := make(map[string]bool, len(caps)) + for k, v := range caps { + dup[k] = v + } + e.capabilities = dup + } + c.deleteIfEmpty(workspaceID, e) +} + +// HasCapability returns true when the workspace's adapter has declared +// native ownership of the named capability. False when no entry exists, +// no capability map was ever sent, or the named capability is absent / +// false. Consumers (scheduler.tick, etc.) call this before firing their +// platform-fallback behavior. +func (c *runtimeOverrideCache) HasCapability(workspaceID, name string) bool { + if workspaceID == "" || name == "" { + return false + } + e := c.loadEntry(workspaceID) + return e.capabilities[name] +} + // Reset clears the entire cache. Test-only; production code never // needs this since heartbeats refresh entries naturally. func (c *runtimeOverrideCache) Reset() { @@ -82,3 +154,11 @@ func (c *runtimeOverrideCache) Reset() { return true }) } + +// ProvidesNativeScheduler is the public adapter exposed to the scheduler +// package — wraps HasCapability("scheduler") with the package-level +// runtimeOverrides instance. Wired into Scheduler.New() at router setup +// to keep scheduler/scheduler.go free of a handlers/ import. +func ProvidesNativeScheduler(workspaceID string) bool { + return runtimeOverrides.HasCapability(workspaceID, "scheduler") +} diff --git a/workspace-server/internal/handlers/runtime_overrides_test.go b/workspace-server/internal/handlers/runtime_overrides_test.go index 63ce1653..b784bbfc 100644 --- a/workspace-server/internal/handlers/runtime_overrides_test.go +++ b/workspace-server/internal/handlers/runtime_overrides_test.go @@ -87,6 +87,129 @@ func TestRuntimeOverrideCache_Reset(t *testing.T) { } } +func TestRuntimeOverrideCache_SetCapabilitiesAndHas(t *testing.T) { + c := &runtimeOverrideCache{} + if c.HasCapability("ws-a", "scheduler") { + t.Fatal("empty cache must not return any capability") + } + + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true, "session": false}) + if !c.HasCapability("ws-a", "scheduler") { + t.Fatal("scheduler capability not stored") + } + if c.HasCapability("ws-a", "session") { + t.Fatal("session=false should report as absent (False)") + } + if c.HasCapability("ws-a", "heartbeat") { + t.Fatal("missing key must report as absent") + } +} + +func TestRuntimeOverrideCache_CapabilitiesIsolatedPerWorkspace(t *testing.T) { + // Critical: ws-a declaring native scheduler must NOT make ws-b + // also skip its schedules. The cache's per-key isolation is the + // only thing standing between "claude-code adapter declares this" + // and "every workspace silently inherits the declaration." + c := &runtimeOverrideCache{} + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true}) + if c.HasCapability("ws-b", "scheduler") { + t.Fatal("ws-a's scheduler capability leaked to ws-b") + } +} + +func TestRuntimeOverrideCache_NilOrEmptyCapabilitiesClears(t *testing.T) { + // An adapter that previously declared native scheduler then + // dropped the flag (e.g. SDK update) must restore platform + // fallback. nil + empty-map both mean "clear". + c := &runtimeOverrideCache{} + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true}) + if !c.HasCapability("ws-a", "scheduler") { + t.Fatal("setup: scheduler should be set") + } + + c.SetCapabilities("ws-a", nil) + if c.HasCapability("ws-a", "scheduler") { + t.Fatal("nil should clear capabilities") + } + + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true}) + c.SetCapabilities("ws-a", map[string]bool{}) + if c.HasCapability("ws-a", "scheduler") { + t.Fatal("empty map should clear capabilities") + } +} + +func TestRuntimeOverrideCache_SetCapabilitiesIsDefensiveCopy(t *testing.T) { + // The caller's map MUST NOT alias the cached one. A future careless + // caller mutating the original map after the call should not + // retroactively change cached capability declarations. + c := &runtimeOverrideCache{} + original := map[string]bool{"scheduler": true} + c.SetCapabilities("ws-a", original) + original["scheduler"] = false + if !c.HasCapability("ws-a", "scheduler") { + t.Fatal("cache aliased the caller's map; capability flipped via outside mutation") + } +} + +func TestRuntimeOverrideCache_SetIdleTimeoutPreservesCapabilities(t *testing.T) { + // The two heartbeat fields are independent — updating one must + // not stomp the other. Pre-fix, each Set replaced the entire + // entry, which meant the second-arriving Set in the heartbeat + // handler effectively erased the first. + c := &runtimeOverrideCache{} + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true}) + c.SetIdleTimeout("ws-a", 600*time.Second) + + if !c.HasCapability("ws-a", "scheduler") { + t.Fatal("SetIdleTimeout erased prior capabilities") + } + got, ok := c.IdleTimeout("ws-a") + if !ok || got != 600*time.Second { + t.Fatalf("idle timeout lost; got=%v ok=%v", got, ok) + } + + // And the inverse: SetCapabilities must not erase IdleTimeout. + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true, "session": true}) + if got, ok := c.IdleTimeout("ws-a"); !ok || got != 600*time.Second { + t.Fatal("SetCapabilities erased prior idle timeout") + } +} + +func TestRuntimeOverrideCache_EmptyEntryDeleted(t *testing.T) { + // When both fields are cleared, the entry should drop out of the + // cache entirely so a stale workspace doesn't accumulate empty + // husks indefinitely. + c := &runtimeOverrideCache{} + c.SetIdleTimeout("ws-a", 60*time.Second) + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true}) + + c.SetIdleTimeout("ws-a", 0) + c.SetCapabilities("ws-a", nil) + + if _, ok := c.m.Load("ws-a"); ok { + t.Fatal("entry should be deleted when both fields cleared") + } +} + +func TestProvidesNativeScheduler_PackageLevel(t *testing.T) { + // The package-level function the scheduler imports — pin that it + // reads the same singleton the heartbeat handler writes to. + runtimeOverrides.Reset() + defer runtimeOverrides.Reset() + + if ProvidesNativeScheduler("ws-a") { + t.Fatal("empty cache should not declare native scheduler") + } + runtimeOverrides.SetCapabilities("ws-a", map[string]bool{"scheduler": true}) + if !ProvidesNativeScheduler("ws-a") { + t.Fatal("ProvidesNativeScheduler did not see the declaration") + } + if ProvidesNativeScheduler("") { + t.Fatal("empty workspace ID should never declare native scheduler") + } +} + func TestRuntimeOverrideCache_ConcurrentSafe(t *testing.T) { // dispatchA2A reads the cache on every request; heartbeat handlers // write on every 30s. Different workspaces will be hot in different diff --git a/workspace-server/internal/scheduler/native_scheduler_test.go b/workspace-server/internal/scheduler/native_scheduler_test.go new file mode 100644 index 00000000..2a9e3732 --- /dev/null +++ b/workspace-server/internal/scheduler/native_scheduler_test.go @@ -0,0 +1,52 @@ +package scheduler + +import ( + "testing" +) + +// TestSetNativeSchedulerCheck pins the wiring contract: New() leaves +// providesNativeScheduler nil (= today's behavior, never skip); +// SetNativeSchedulerCheck installs the override. The actual skip +// behavior in tick() needs a DB and is exercised by the integration +// tests in tests/e2e/. +func TestSetNativeSchedulerCheck(t *testing.T) { + s := New(nil, nil) + if s.providesNativeScheduler != nil { + t.Fatal("New() must leave providesNativeScheduler nil so untouched callers preserve today's behavior") + } + + called := false + checker := NativeSchedulerCheck(func(workspaceID string) bool { + called = true + return workspaceID == "ws-native" + }) + s.SetNativeSchedulerCheck(checker) + if s.providesNativeScheduler == nil { + t.Fatal("SetNativeSchedulerCheck did not install the function") + } + if !s.providesNativeScheduler("ws-native") { + t.Fatal("installed checker not invoked / wrong return") + } + if !called { + t.Fatal("installed checker not called") + } + if s.providesNativeScheduler("ws-other") { + t.Fatal("checker should return false for non-native workspace") + } +} + +// TestNativeSchedulerCheck_NilSafeInTick documents the contract used +// by tick(): a nil providesNativeScheduler must mean "always fire" so +// existing callers (test fixtures, prior to capability primitives) +// preserve today's behavior unchanged. The conditional in tick reads +// `s.providesNativeScheduler != nil && s.providesNativeScheduler(id)` +// — neither branch can panic on a nil-checker scheduler. +func TestNativeSchedulerCheck_NilSafeInTick(t *testing.T) { + s := New(nil, nil) + // We don't actually call tick() — that requires a live DB. We just + // pin that the field is nil after New, which is the load-bearing + // invariant tick() relies on. + if s.providesNativeScheduler != nil { + t.Fatal("nil-safety contract violated: providesNativeScheduler must be nil from New()") + } +} diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index 9c97ef45..0c6eb84f 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -70,6 +70,21 @@ type ChannelBroadcaster interface { FetchWorkspaceChannelContext(ctx context.Context, workspaceID string) string } +// NativeSchedulerCheck returns true when the workspace's adapter has +// declared `provides_native_scheduler=True` in its capabilities. The +// scheduler skips polling-and-firing for these workspaces — the SDK +// runs the schedule itself (Temporal, Durable Functions, etc.) and the +// platform's polling would cause double-fire on every restart. +// +// Wired at construction by the router (production) or tests. nil is +// allowed and treated as "no override" for every workspace, preserving +// today's behavior — same default-false posture as +// BaseAdapter.capabilities() in workspace/adapter_base.py. +// +// See project memory `project_runtime_native_pluggable.md` and +// handlers.ProvidesNativeScheduler for the production wiring. +type NativeSchedulerCheck func(workspaceID string) bool + // Scheduler polls the workspace_schedules table and fires A2A messages // when a schedule's next_run_at has passed. Follows the same goroutine // pattern as registry.StartHealthSweep. @@ -78,6 +93,11 @@ type Scheduler struct { broadcaster Broadcaster channels ChannelBroadcaster + // providesNativeScheduler, when non-nil and returning true, causes + // tick() to skip firing for this workspace. nil = always-fire (the + // pre-capability-primitive behavior). Constructor docs above. + providesNativeScheduler NativeSchedulerCheck + // lastTickAt records the wall-clock time of the most recent tick // (whether it fired schedules or not). Read by Healthy() and the // /admin/scheduler/health endpoint to detect stuck-tick conditions. @@ -102,6 +122,15 @@ func (s *Scheduler) SetChannels(ch ChannelBroadcaster) { s.channels = ch } +// SetNativeSchedulerCheck wires the per-workspace native-scheduler +// override lookup. Wired by the router after the scheduler is +// constructed (handlers package owns the cache). Pass nil to disable +// the skip — every schedule fires regardless of adapter declaration, +// matching pre-capability-primitive behavior. +func (s *Scheduler) SetNativeSchedulerCheck(f NativeSchedulerCheck) { + s.providesNativeScheduler = f +} + // LastTickAt returns the wall-clock time of the most recently completed tick. // Returns a zero time.Time if the scheduler has never completed a tick. func (s *Scheduler) LastTickAt() time.Time { @@ -231,6 +260,27 @@ func (s *Scheduler) tick(ctx context.Context) { log.Printf("Scheduler: scan error: %v", err) continue } + // Skip workspaces whose adapter owns scheduling natively (e.g. + // SDKs with built-in cron / Temporal-style workflows). Without + // this skip, the platform's polling would fire the same + // schedule twice — once natively in the SDK, once via this + // loop. The skip drops only the FIRE; the schedule row stays + // in the DB and the platform still records it, so observability + // (next_run_at, last_run_at) is preserved per the principle. + // Pre-fix this branch was unconditional; nil check preserves + // behavior for callers that didn't wire the override. + if s.providesNativeScheduler != nil && s.providesNativeScheduler(sched.WorkspaceID) { + // Advance next_run_at so we don't tight-loop on the same + // row every tick. A non-firing schedule is still scheduled. + if nextTime, err := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()); err == nil { + if _, execErr := db.DB.ExecContext(ctx, + `UPDATE workspace_schedules SET next_run_at=$1, updated_at=now() WHERE id=$2`, + nextTime, sched.ID); execErr != nil { + log.Printf("Scheduler: native-skip next_run_at UPDATE failed for schedule %s: %v", sched.ID, execErr) + } + } + continue + } wg.Add(1) sem <- struct{}{} go func(s2 scheduleRow) {