diff --git a/workspace-server/cmd/server/main.go b/workspace-server/cmd/server/main.go index 1e3e284e..d0d5ae57 100644 --- a/workspace-server/cmd/server/main.go +++ b/workspace-server/cmd/server/main.go @@ -254,6 +254,12 @@ func main() { // Cron Scheduler — fires A2A messages to workspaces on user-defined schedules cronSched := scheduler.New(wh, broadcaster) + // Wire the native-scheduler skip — when an adapter's heartbeat + // declares provides_native_scheduler=true, the platform's polling + // loop drops that workspace's schedules to avoid double-fire (the + // SDK runs them itself). See project memory + // `project_runtime_native_pluggable.md` and capability primitive #3. + cronSched.SetNativeSchedulerCheck(handlers.ProvidesNativeScheduler) go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start) // Hibernation Monitor — auto-pauses idle workspaces that have diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 1cd58ffb..aee89cb4 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -588,7 +588,18 @@ func (h *WorkspaceHandler) dispatchA2A(ctx context.Context, workspaceID, agentUR if concrete, ok := h.broadcaster.(*events.Broadcaster); ok { b = concrete } - forwardCtx, idleCancel := applyIdleTimeout(forwardCtx, b, workspaceID, idleTimeoutDuration) + // Per-workspace idle-timeout override (capability primitive #2 — + // see workspace/adapter_base.py:idle_timeout_override). The + // adapter declares a longer/shorter window than the platform + // default in its heartbeat; the heartbeat handler stashes it in + // runtimeOverrides; we honor it here. Falls through to the global + // default (env A2A_IDLE_TIMEOUT_SECONDS, default 5min) when no + // override is registered for this workspace. + idle := idleTimeoutDuration + if perWorkspace, ok := runtimeOverrides.IdleTimeout(workspaceID); ok { + idle = perWorkspace + } + forwardCtx, idleCancel := applyIdleTimeout(forwardCtx, b, workspaceID, idle) cancel := func() { idleCancel() if ceilingCancel != nil { diff --git a/workspace-server/internal/handlers/registry.go b/workspace-server/internal/handlers/registry.go index a3b63291..755c3f81 100644 --- a/workspace-server/internal/handlers/registry.go +++ b/workspace-server/internal/handlers/registry.go @@ -11,6 +11,7 @@ import ( "os" "strings" "sync" + "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" @@ -461,6 +462,28 @@ func (h *RegistryHandler) Heartbeat(c *gin.Context) { "uptime_seconds": payload.UptimeSeconds, }) + // Refresh per-workspace runtime overrides from the heartbeat's + // runtime_metadata block (introduced for the native+pluggable + // runtime principle — see project memory). Both idle_timeout_seconds + // and capability flags are stored. Each consumer (a2a_proxy.dispatchA2A + // for idle timeout, scheduler.tick for native scheduler, etc.) reads + // what it needs from the cache. nil RuntimeMetadata or absent field + // clears the corresponding override so the dispatch path uses the + // global default. + if payload.RuntimeMetadata != nil && payload.RuntimeMetadata.IdleTimeoutSeconds != nil { + runtimeOverrides.SetIdleTimeout( + payload.WorkspaceID, + time.Duration(*payload.RuntimeMetadata.IdleTimeoutSeconds)*time.Second, + ) + } else { + runtimeOverrides.SetIdleTimeout(payload.WorkspaceID, 0) // clear + } + if payload.RuntimeMetadata != nil { + runtimeOverrides.SetCapabilities(payload.WorkspaceID, payload.RuntimeMetadata.Capabilities) + } else { + runtimeOverrides.SetCapabilities(payload.WorkspaceID, nil) // clear + } + c.JSON(http.StatusOK, gin.H{"status": "ok"}) } diff --git a/workspace-server/internal/handlers/runtime_overrides.go b/workspace-server/internal/handlers/runtime_overrides.go new file mode 100644 index 00000000..60ff83d3 --- /dev/null +++ b/workspace-server/internal/handlers/runtime_overrides.go @@ -0,0 +1,164 @@ +package handlers + +import ( + "sync" + "time" +) + +// runtimeOverrides is the in-memory cache of per-workspace, adapter- +// declared overrides for cross-cutting capabilities. Populated by the +// heartbeat handler from HeartbeatPayload.RuntimeMetadata; consumed by +// dispatch paths (a2a_proxy.dispatchA2A reads IdleTimeout) before +// applying their own platform-default behavior. +// +// Why an in-memory cache and not a DB column: +// - Heartbeats arrive every ~30s, so a fresh override propagates +// within a heartbeat cycle of any change in adapter declarations. +// - On platform restart the cache resets to empty until each +// workspace's next heartbeat repopulates it. Worst-case window = +// 30s of platform-default behavior. Acceptable; nothing about +// these overrides is correctness-critical (they tune timeouts + +// enable native ownership of fallback features, not state). +// - DB-roundtripping every dispatch would add latency to a hot +// path (a2a_proxy is on every agent → agent call). The cache is +// a sync.Map — atomic ptr load per dispatch, zero lock contention +// under steady load. +// +// Stale entries: a workspace that goes offline never sends another +// heartbeat, but the cache entry persists until the platform restarts. +// Acceptable because dispatchA2A only consults the cache when actually +// dispatching to that workspace — a stale entry for an offline +// workspace just means "use the override that was active when it was +// last alive" (correct behavior; the workspace will get the same +// timeouts when it comes back). +// +// See workspace/adapter_base.py:idle_timeout_override and project +// memory `project_runtime_native_pluggable.md`. +var runtimeOverrides runtimeOverrideCache + +type runtimeOverrideEntry struct { + idleTimeout time.Duration // 0 means "no override; use global default" + // capabilities maps wire-name keys from RuntimeCapabilities.to_dict() + // — "heartbeat", "scheduler", "session", "status_mgmt", "retry", + // "activity_decoration", "channel_dispatch" — to whether the adapter + // claims native ownership. Consumers (e.g. scheduler.tick) read this + // to decide whether to fire their platform-fallback behavior for this + // workspace. + // + // nil map means "no capability declarations received yet" → consumers + // fall back to the platform default (today's behavior). + capabilities map[string]bool +} + +type runtimeOverrideCache struct { + m sync.Map // key: workspaceID (string), value: runtimeOverrideEntry +} + +// loadEntry returns the entry for workspaceID (or a zero-value entry). +// Internal helper for the partial-update Set methods; sync.Map's +// Load doesn't support "read or default" in one shot. +func (c *runtimeOverrideCache) loadEntry(workspaceID string) runtimeOverrideEntry { + if v, ok := c.m.Load(workspaceID); ok { + if e, ok := v.(runtimeOverrideEntry); ok { + return e + } + } + return runtimeOverrideEntry{} +} + +// deleteIfEmpty drops the workspace's entry from the cache when both +// idleTimeout and capabilities are absent. Keeps the cache from +// retaining empty husks forever after a runtime stops sending overrides. +func (c *runtimeOverrideCache) deleteIfEmpty(workspaceID string, e runtimeOverrideEntry) { + if e.idleTimeout <= 0 && len(e.capabilities) == 0 { + c.m.Delete(workspaceID) + return + } + c.m.Store(workspaceID, e) +} + +// SetIdleTimeout records the per-workspace idle-timeout override sent +// in the most recent heartbeat. d == 0 clears the override (falling +// back to the global default), so a runtime that previously declared +// an override and then dropped it cleanly returns to platform behavior. +// Capability flags on the same workspace are preserved. +func (c *runtimeOverrideCache) SetIdleTimeout(workspaceID string, d time.Duration) { + if workspaceID == "" { + return + } + e := c.loadEntry(workspaceID) + if d <= 0 { + e.idleTimeout = 0 + } else { + e.idleTimeout = d + } + c.deleteIfEmpty(workspaceID, e) +} + +// IdleTimeout returns the per-workspace override and ok=true when one +// is in effect; ok=false means dispatchA2A should fall back to the +// global idleTimeoutDuration. +func (c *runtimeOverrideCache) IdleTimeout(workspaceID string) (time.Duration, bool) { + e := c.loadEntry(workspaceID) + if e.idleTimeout <= 0 { + return 0, false + } + return e.idleTimeout, true +} + +// SetCapabilities records the per-workspace capability declaration map +// (e.g. {"scheduler": true, "heartbeat": false, ...}) sent in the most +// recent heartbeat. Replaces any prior map; pass nil to clear. +// IdleTimeout on the same workspace is preserved. +// +// The wire-name keys (heartbeat, scheduler, session, status_mgmt, retry, +// activity_decoration, channel_dispatch) match RuntimeCapabilities.to_dict() +// in workspace/adapter_base.py — keep in sync there. +func (c *runtimeOverrideCache) SetCapabilities(workspaceID string, caps map[string]bool) { + if workspaceID == "" { + return + } + e := c.loadEntry(workspaceID) + if len(caps) == 0 { + e.capabilities = nil + } else { + // Defensive copy: caller may reuse / mutate the map after the + // call; the cache holds long-lived refs. + dup := make(map[string]bool, len(caps)) + for k, v := range caps { + dup[k] = v + } + e.capabilities = dup + } + c.deleteIfEmpty(workspaceID, e) +} + +// HasCapability returns true when the workspace's adapter has declared +// native ownership of the named capability. False when no entry exists, +// no capability map was ever sent, or the named capability is absent / +// false. Consumers (scheduler.tick, etc.) call this before firing their +// platform-fallback behavior. +func (c *runtimeOverrideCache) HasCapability(workspaceID, name string) bool { + if workspaceID == "" || name == "" { + return false + } + e := c.loadEntry(workspaceID) + return e.capabilities[name] +} + +// Reset clears the entire cache. Test-only; production code never +// needs this since heartbeats refresh entries naturally. +func (c *runtimeOverrideCache) Reset() { + c.m.Range(func(k, _ any) bool { + c.m.Delete(k) + return true + }) +} + +// ProvidesNativeScheduler is the public adapter exposed to the scheduler +// package — wraps HasCapability("scheduler") with the package-level +// runtimeOverrides instance. Wired into Scheduler.New() at router setup +// to keep scheduler/scheduler.go free of a handlers/ import. +func ProvidesNativeScheduler(workspaceID string) bool { + return runtimeOverrides.HasCapability(workspaceID, "scheduler") +} diff --git a/workspace-server/internal/handlers/runtime_overrides_test.go b/workspace-server/internal/handlers/runtime_overrides_test.go new file mode 100644 index 00000000..b784bbfc --- /dev/null +++ b/workspace-server/internal/handlers/runtime_overrides_test.go @@ -0,0 +1,241 @@ +package handlers + +import ( + "sync" + "testing" + "time" +) + +func TestRuntimeOverrideCache_SetAndGet(t *testing.T) { + c := &runtimeOverrideCache{} + + if _, ok := c.IdleTimeout("ws-a"); ok { + t.Fatal("empty cache should not return any override") + } + + c.SetIdleTimeout("ws-a", 10*time.Minute) + got, ok := c.IdleTimeout("ws-a") + if !ok || got != 10*time.Minute { + t.Fatalf("expected 10m override; got=%v ok=%v", got, ok) + } + + // Sibling workspace unaffected — pin against the trap where a + // shared map without proper keying would leak overrides across + // workspaces (a hard-to-debug "claude-code's longer timeout + // somehow applied to langgraph too"). + if _, ok := c.IdleTimeout("ws-b"); ok { + t.Fatal("override for ws-a leaked to ws-b") + } +} + +func TestRuntimeOverrideCache_ZeroOrNegativeClears(t *testing.T) { + // Adapter dropping the override (returning None / 0 from + // idle_timeout_override) must restore platform-default behavior. + // If the cache held the previous value indefinitely, an adapter + // downgrade would silently keep the longer timeout active. + c := &runtimeOverrideCache{} + c.SetIdleTimeout("ws-a", 10*time.Minute) + if _, ok := c.IdleTimeout("ws-a"); !ok { + t.Fatal("setup: override should be set") + } + + c.SetIdleTimeout("ws-a", 0) + if _, ok := c.IdleTimeout("ws-a"); ok { + t.Fatal("zero duration should clear override") + } + + c.SetIdleTimeout("ws-a", 5*time.Minute) + c.SetIdleTimeout("ws-a", -1*time.Second) + if _, ok := c.IdleTimeout("ws-a"); ok { + t.Fatal("negative duration should clear override") + } +} + +func TestRuntimeOverrideCache_EmptyWorkspaceIDIgnored(t *testing.T) { + // Defensive: a misrouted heartbeat with empty workspace_id + // should NOT pollute the cache with a "" key. workspaceID == "" + // is also the value dispatchA2A passes when the workspace is + // indeterminate, and that path must not surface a stored value. + c := &runtimeOverrideCache{} + c.SetIdleTimeout("", 10*time.Minute) + if _, ok := c.IdleTimeout(""); ok { + t.Fatal("empty workspace_id must not store overrides") + } +} + +func TestRuntimeOverrideCache_SetReplaces(t *testing.T) { + // A heartbeat with a new override value replaces, doesn't append. + c := &runtimeOverrideCache{} + c.SetIdleTimeout("ws-a", 10*time.Minute) + c.SetIdleTimeout("ws-a", 20*time.Minute) + got, _ := c.IdleTimeout("ws-a") + if got != 20*time.Minute { + t.Fatalf("expected 20m after replacement; got %v", got) + } +} + +func TestRuntimeOverrideCache_Reset(t *testing.T) { + c := &runtimeOverrideCache{} + c.SetIdleTimeout("ws-a", 10*time.Minute) + c.SetIdleTimeout("ws-b", 20*time.Minute) + c.Reset() + if _, ok := c.IdleTimeout("ws-a"); ok { + t.Fatal("reset should clear ws-a") + } + if _, ok := c.IdleTimeout("ws-b"); ok { + t.Fatal("reset should clear ws-b") + } +} + +func TestRuntimeOverrideCache_SetCapabilitiesAndHas(t *testing.T) { + c := &runtimeOverrideCache{} + if c.HasCapability("ws-a", "scheduler") { + t.Fatal("empty cache must not return any capability") + } + + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true, "session": false}) + if !c.HasCapability("ws-a", "scheduler") { + t.Fatal("scheduler capability not stored") + } + if c.HasCapability("ws-a", "session") { + t.Fatal("session=false should report as absent (False)") + } + if c.HasCapability("ws-a", "heartbeat") { + t.Fatal("missing key must report as absent") + } +} + +func TestRuntimeOverrideCache_CapabilitiesIsolatedPerWorkspace(t *testing.T) { + // Critical: ws-a declaring native scheduler must NOT make ws-b + // also skip its schedules. The cache's per-key isolation is the + // only thing standing between "claude-code adapter declares this" + // and "every workspace silently inherits the declaration." + c := &runtimeOverrideCache{} + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true}) + if c.HasCapability("ws-b", "scheduler") { + t.Fatal("ws-a's scheduler capability leaked to ws-b") + } +} + +func TestRuntimeOverrideCache_NilOrEmptyCapabilitiesClears(t *testing.T) { + // An adapter that previously declared native scheduler then + // dropped the flag (e.g. SDK update) must restore platform + // fallback. nil + empty-map both mean "clear". + c := &runtimeOverrideCache{} + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true}) + if !c.HasCapability("ws-a", "scheduler") { + t.Fatal("setup: scheduler should be set") + } + + c.SetCapabilities("ws-a", nil) + if c.HasCapability("ws-a", "scheduler") { + t.Fatal("nil should clear capabilities") + } + + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true}) + c.SetCapabilities("ws-a", map[string]bool{}) + if c.HasCapability("ws-a", "scheduler") { + t.Fatal("empty map should clear capabilities") + } +} + +func TestRuntimeOverrideCache_SetCapabilitiesIsDefensiveCopy(t *testing.T) { + // The caller's map MUST NOT alias the cached one. A future careless + // caller mutating the original map after the call should not + // retroactively change cached capability declarations. + c := &runtimeOverrideCache{} + original := map[string]bool{"scheduler": true} + c.SetCapabilities("ws-a", original) + original["scheduler"] = false + if !c.HasCapability("ws-a", "scheduler") { + t.Fatal("cache aliased the caller's map; capability flipped via outside mutation") + } +} + +func TestRuntimeOverrideCache_SetIdleTimeoutPreservesCapabilities(t *testing.T) { + // The two heartbeat fields are independent — updating one must + // not stomp the other. Pre-fix, each Set replaced the entire + // entry, which meant the second-arriving Set in the heartbeat + // handler effectively erased the first. + c := &runtimeOverrideCache{} + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true}) + c.SetIdleTimeout("ws-a", 600*time.Second) + + if !c.HasCapability("ws-a", "scheduler") { + t.Fatal("SetIdleTimeout erased prior capabilities") + } + got, ok := c.IdleTimeout("ws-a") + if !ok || got != 600*time.Second { + t.Fatalf("idle timeout lost; got=%v ok=%v", got, ok) + } + + // And the inverse: SetCapabilities must not erase IdleTimeout. + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true, "session": true}) + if got, ok := c.IdleTimeout("ws-a"); !ok || got != 600*time.Second { + t.Fatal("SetCapabilities erased prior idle timeout") + } +} + +func TestRuntimeOverrideCache_EmptyEntryDeleted(t *testing.T) { + // When both fields are cleared, the entry should drop out of the + // cache entirely so a stale workspace doesn't accumulate empty + // husks indefinitely. + c := &runtimeOverrideCache{} + c.SetIdleTimeout("ws-a", 60*time.Second) + c.SetCapabilities("ws-a", map[string]bool{"scheduler": true}) + + c.SetIdleTimeout("ws-a", 0) + c.SetCapabilities("ws-a", nil) + + if _, ok := c.m.Load("ws-a"); ok { + t.Fatal("entry should be deleted when both fields cleared") + } +} + +func TestProvidesNativeScheduler_PackageLevel(t *testing.T) { + // The package-level function the scheduler imports — pin that it + // reads the same singleton the heartbeat handler writes to. + runtimeOverrides.Reset() + defer runtimeOverrides.Reset() + + if ProvidesNativeScheduler("ws-a") { + t.Fatal("empty cache should not declare native scheduler") + } + runtimeOverrides.SetCapabilities("ws-a", map[string]bool{"scheduler": true}) + if !ProvidesNativeScheduler("ws-a") { + t.Fatal("ProvidesNativeScheduler did not see the declaration") + } + if ProvidesNativeScheduler("") { + t.Fatal("empty workspace ID should never declare native scheduler") + } +} + +func TestRuntimeOverrideCache_ConcurrentSafe(t *testing.T) { + // dispatchA2A reads the cache on every request; heartbeat handlers + // write on every 30s. Different workspaces will be hot in different + // goroutines. The sync.Map underlying the cache promises this; the + // test pins it so a future "let me just use a regular map with a + // mutex" change can't silently regress under load. + c := &runtimeOverrideCache{} + var wg sync.WaitGroup + const N = 100 + + for i := 0; i < N; i++ { + wg.Add(2) + go func(i int) { + defer wg.Done() + c.SetIdleTimeout("ws", time.Duration(i+1)*time.Second) + }(i) + go func() { + defer wg.Done() + _, _ = c.IdleTimeout("ws") + }() + } + wg.Wait() + // Final value must be SOME positive duration written by one of the + // goroutines — not corrupted, not zero. + got, ok := c.IdleTimeout("ws") + if !ok || got <= 0 || got > time.Duration(N)*time.Second { + t.Fatalf("expected a valid override after concurrent writes; got %v ok=%v", got, ok) + } +} diff --git a/workspace-server/internal/models/workspace.go b/workspace-server/internal/models/workspace.go index b8732c9e..e8850425 100644 --- a/workspace-server/internal/models/workspace.go +++ b/workspace-server/internal/models/workspace.go @@ -70,6 +70,40 @@ type HeartbeatPayload struct { // non-empty value is "wedged"; future values can extend this without // migration. RuntimeState string `json:"runtime_state"` + + // RuntimeMetadata is the adapter-declared capability map + per- + // capability override values. The Python runtime builds this from + // BaseAdapter.capabilities() + per-hook methods (e.g. + // idle_timeout_override()) — see workspace/heartbeat.py: + // _runtime_metadata_payload. Optional: missing means "use platform + // defaults for everything", matching pre-2026-04 behavior. + // + // Pointer (not value) so a missing JSON field is nil rather than a + // zero-value RuntimeMetadata{} that would falsely claim "all caps = + // false declared explicitly". Lets the platform distinguish "adapter + // said no native ownership" from "old runtime version, didn't say". + RuntimeMetadata *RuntimeMetadata `json:"runtime_metadata,omitempty"` +} + +// RuntimeMetadata is the adapter-declared capability + override block +// the Python runtime sends in the heartbeat payload. New fields can be +// added with `omitempty` without breaking older runtime versions. +// +// See project memory `project_runtime_native_pluggable.md` for the +// principle and workspace/adapter_base.py:RuntimeCapabilities for the +// Python source of truth. +type RuntimeMetadata struct { + // Capabilities maps capability name → "adapter owns it natively". + // Keys (heartbeat, scheduler, session, status_mgmt, retry, + // activity_decoration, channel_dispatch) match + // RuntimeCapabilities.to_dict() in adapter_base.py — keep in sync. + Capabilities map[string]bool `json:"capabilities,omitempty"` + + // IdleTimeoutSeconds, when set, overrides the per-dispatch silence + // window in a2a_proxy.go for this workspace's A2A traffic. Pointer + // so nil means "no override; use the global default". Zero / negative + // is treated as nil by the consumer (a2a_proxy.go). + IdleTimeoutSeconds *int `json:"idle_timeout_seconds,omitempty"` } type UpdateCardPayload struct { diff --git a/workspace-server/internal/scheduler/native_scheduler_test.go b/workspace-server/internal/scheduler/native_scheduler_test.go new file mode 100644 index 00000000..2a9e3732 --- /dev/null +++ b/workspace-server/internal/scheduler/native_scheduler_test.go @@ -0,0 +1,52 @@ +package scheduler + +import ( + "testing" +) + +// TestSetNativeSchedulerCheck pins the wiring contract: New() leaves +// providesNativeScheduler nil (= today's behavior, never skip); +// SetNativeSchedulerCheck installs the override. The actual skip +// behavior in tick() needs a DB and is exercised by the integration +// tests in tests/e2e/. +func TestSetNativeSchedulerCheck(t *testing.T) { + s := New(nil, nil) + if s.providesNativeScheduler != nil { + t.Fatal("New() must leave providesNativeScheduler nil so untouched callers preserve today's behavior") + } + + called := false + checker := NativeSchedulerCheck(func(workspaceID string) bool { + called = true + return workspaceID == "ws-native" + }) + s.SetNativeSchedulerCheck(checker) + if s.providesNativeScheduler == nil { + t.Fatal("SetNativeSchedulerCheck did not install the function") + } + if !s.providesNativeScheduler("ws-native") { + t.Fatal("installed checker not invoked / wrong return") + } + if !called { + t.Fatal("installed checker not called") + } + if s.providesNativeScheduler("ws-other") { + t.Fatal("checker should return false for non-native workspace") + } +} + +// TestNativeSchedulerCheck_NilSafeInTick documents the contract used +// by tick(): a nil providesNativeScheduler must mean "always fire" so +// existing callers (test fixtures, prior to capability primitives) +// preserve today's behavior unchanged. The conditional in tick reads +// `s.providesNativeScheduler != nil && s.providesNativeScheduler(id)` +// — neither branch can panic on a nil-checker scheduler. +func TestNativeSchedulerCheck_NilSafeInTick(t *testing.T) { + s := New(nil, nil) + // We don't actually call tick() — that requires a live DB. We just + // pin that the field is nil after New, which is the load-bearing + // invariant tick() relies on. + if s.providesNativeScheduler != nil { + t.Fatal("nil-safety contract violated: providesNativeScheduler must be nil from New()") + } +} diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index 9c97ef45..0c6eb84f 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -70,6 +70,21 @@ type ChannelBroadcaster interface { FetchWorkspaceChannelContext(ctx context.Context, workspaceID string) string } +// NativeSchedulerCheck returns true when the workspace's adapter has +// declared `provides_native_scheduler=True` in its capabilities. The +// scheduler skips polling-and-firing for these workspaces — the SDK +// runs the schedule itself (Temporal, Durable Functions, etc.) and the +// platform's polling would cause double-fire on every restart. +// +// Wired at construction by the router (production) or tests. nil is +// allowed and treated as "no override" for every workspace, preserving +// today's behavior — same default-false posture as +// BaseAdapter.capabilities() in workspace/adapter_base.py. +// +// See project memory `project_runtime_native_pluggable.md` and +// handlers.ProvidesNativeScheduler for the production wiring. +type NativeSchedulerCheck func(workspaceID string) bool + // Scheduler polls the workspace_schedules table and fires A2A messages // when a schedule's next_run_at has passed. Follows the same goroutine // pattern as registry.StartHealthSweep. @@ -78,6 +93,11 @@ type Scheduler struct { broadcaster Broadcaster channels ChannelBroadcaster + // providesNativeScheduler, when non-nil and returning true, causes + // tick() to skip firing for this workspace. nil = always-fire (the + // pre-capability-primitive behavior). Constructor docs above. + providesNativeScheduler NativeSchedulerCheck + // lastTickAt records the wall-clock time of the most recent tick // (whether it fired schedules or not). Read by Healthy() and the // /admin/scheduler/health endpoint to detect stuck-tick conditions. @@ -102,6 +122,15 @@ func (s *Scheduler) SetChannels(ch ChannelBroadcaster) { s.channels = ch } +// SetNativeSchedulerCheck wires the per-workspace native-scheduler +// override lookup. Wired by the router after the scheduler is +// constructed (handlers package owns the cache). Pass nil to disable +// the skip — every schedule fires regardless of adapter declaration, +// matching pre-capability-primitive behavior. +func (s *Scheduler) SetNativeSchedulerCheck(f NativeSchedulerCheck) { + s.providesNativeScheduler = f +} + // LastTickAt returns the wall-clock time of the most recently completed tick. // Returns a zero time.Time if the scheduler has never completed a tick. func (s *Scheduler) LastTickAt() time.Time { @@ -231,6 +260,27 @@ func (s *Scheduler) tick(ctx context.Context) { log.Printf("Scheduler: scan error: %v", err) continue } + // Skip workspaces whose adapter owns scheduling natively (e.g. + // SDKs with built-in cron / Temporal-style workflows). Without + // this skip, the platform's polling would fire the same + // schedule twice — once natively in the SDK, once via this + // loop. The skip drops only the FIRE; the schedule row stays + // in the DB and the platform still records it, so observability + // (next_run_at, last_run_at) is preserved per the principle. + // Pre-fix this branch was unconditional; nil check preserves + // behavior for callers that didn't wire the override. + if s.providesNativeScheduler != nil && s.providesNativeScheduler(sched.WorkspaceID) { + // Advance next_run_at so we don't tight-loop on the same + // row every tick. A non-firing schedule is still scheduled. + if nextTime, err := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()); err == nil { + if _, execErr := db.DB.ExecContext(ctx, + `UPDATE workspace_schedules SET next_run_at=$1, updated_at=now() WHERE id=$2`, + nextTime, sched.ID); execErr != nil { + log.Printf("Scheduler: native-skip next_run_at UPDATE failed for schedule %s: %v", sched.ID, execErr) + } + } + continue + } wg.Add(1) sem <- struct{}{} go func(s2 scheduleRow) { diff --git a/workspace/adapter_base.py b/workspace/adapter_base.py index 98cccca2..70a46e38 100644 --- a/workspace/adapter_base.py +++ b/workspace/adapter_base.py @@ -164,6 +164,29 @@ class BaseAdapter(ABC): project memory `project_runtime_native_pluggable.md`.""" return RuntimeCapabilities() + def idle_timeout_override(self) -> int | None: + """Per-A2A-dispatch silence window override, in SECONDS. + + Return None to use the platform default (env var + A2A_IDLE_TIMEOUT_SECONDS, falling back to 5 minutes — see + a2a_proxy.go:defaultIdleTimeoutDuration). Override when this + runtime's SDK can legitimately go silent longer than the + default before the dispatch should be considered wedged. + + Why this is per-adapter, not just env: the env value is a + cluster-wide knob set by ops. Different SDKs have different + latency profiles — claude-code synthesis on Opus + tool use + legitimately runs 8-10 min between broadcasts; hermes synth + with custom providers can be even slower. Hardcoding 5min for + everyone either cancels real work (claude-code synth) or + leaves wedged runtimes (langgraph) hanging too long. + + Platform reads this from the heartbeat payload and stashes + it per-workspace; dispatchA2A consults it before applying the + idle timer. None / unset / zero falls through to the global + default — same behavior as before this hook landed.""" + return None + # ------------------------------------------------------------------ # Plugin install hooks # ------------------------------------------------------------------ diff --git a/workspace/heartbeat.py b/workspace/heartbeat.py index c0fc2f1d..6230c93a 100644 --- a/workspace/heartbeat.py +++ b/workspace/heartbeat.py @@ -43,6 +43,43 @@ def _runtime_state_payload() -> dict: "sample_error": wedge_reason(), } + +def _runtime_metadata_payload() -> dict: + """Build the {runtime_metadata} portion of the heartbeat body — + adapter-declared capabilities + per-capability override values + (idle timeout, etc.). The platform reads this to route capabilities + to the right owner: native (adapter) vs fallback (platform). + + Returns an empty dict if the adapter can't be loaded or introspected. + Heartbeat must NEVER fail because of capability discovery — observability + is more important than capability accuracy. The platform falls through + to its own defaults when fields are missing. + + See project memory `project_runtime_native_pluggable.md` and + workspace/adapter_base.py:RuntimeCapabilities. + """ + try: + from adapters import get_adapter + # ADAPTER_MODULE wins over the runtime arg in get_adapter — pass + # an empty string to force the env-var path. + adapter_cls = get_adapter("") + adapter = adapter_cls() + caps = adapter.capabilities() + meta: dict = {"capabilities": caps.to_dict()} + idle = adapter.idle_timeout_override() + # Only include the override when it's a positive integer. None / + # zero / negative falls through to the platform's global default + # (env A2A_IDLE_TIMEOUT_SECONDS, default 5min) — that "absent + # field = use default" contract is what keeps the wire small. + if isinstance(idle, int) and idle > 0: + meta["idle_timeout_seconds"] = idle + return {"runtime_metadata": meta} + except Exception as e: + # debug-level: missing ADAPTER_MODULE in dev / test envs is normal + logger.debug("runtime_metadata: failed to read adapter caps: %s", e) + return {} + + logger = logging.getLogger(__name__) HEARTBEAT_INTERVAL = 30 # seconds @@ -123,6 +160,7 @@ class HeartbeatLoop: # sample_error field. The platform reads # runtime_state to flip status → degraded. body.update(_runtime_state_payload()) + body.update(_runtime_metadata_payload()) await client.post( f"{self.platform_url}/registry/heartbeat", json=body, diff --git a/workspace/tests/test_heartbeat_runtime_metadata.py b/workspace/tests/test_heartbeat_runtime_metadata.py new file mode 100644 index 00000000..3fae87eb --- /dev/null +++ b/workspace/tests/test_heartbeat_runtime_metadata.py @@ -0,0 +1,146 @@ +"""Tests for heartbeat._runtime_metadata_payload — the heartbeat-side +producer that sends adapter capability declarations + the +idle_timeout_override value to the platform every 30s. Capability +primitive #2 (task #117) wires this into the platform's a2a_proxy. + +Tests use sys.modules monkey-patching to stub the `adapters` module +because workspace/heartbeat.py lazy-imports it inside the helper — +keeping heartbeat resilient to a missing/broken adapter discovery +path.""" +import sys +from types import SimpleNamespace + +import pytest + +from adapter_base import BaseAdapter, RuntimeCapabilities +from heartbeat import _runtime_metadata_payload + + +class _FakeAdapter(BaseAdapter): + """Default adapter — every capability False, no idle override. + Matches today's behavior for any runtime that doesn't opt in.""" + + @staticmethod + def name() -> str: + return "fake" + + @staticmethod + def display_name() -> str: + return "Fake" + + @staticmethod + def description() -> str: + return "Fake adapter for heartbeat metadata tests" + + async def setup(self, config) -> None: + return None + + async def create_executor(self, config): # pragma: no cover + raise NotImplementedError + + +class _NativeAdapter(_FakeAdapter): + """Adapter that declares native heartbeat + 600s idle override — + matches what claude-code's adapter will declare once #87 lands.""" + + def capabilities(self) -> RuntimeCapabilities: + return RuntimeCapabilities(provides_native_heartbeat=True) + + def idle_timeout_override(self) -> int: + return 600 + + +@pytest.fixture +def stub_adapters_module(request): + """Install a fake `adapters` module that returns the requested + adapter class from get_adapter(). Cleans up after the test.""" + adapter_cls = getattr(request, "param", _FakeAdapter) + fake_mod = SimpleNamespace(get_adapter=lambda runtime: adapter_cls) + saved = sys.modules.get("adapters") + sys.modules["adapters"] = fake_mod # type: ignore[assignment] + try: + yield adapter_cls + finally: + if saved is None: + sys.modules.pop("adapters", None) + else: + sys.modules["adapters"] = saved + + +@pytest.mark.parametrize("stub_adapters_module", [_FakeAdapter], indirect=True) +def test_default_adapter_emits_all_false_capabilities_no_idle_override(stub_adapters_module): + """Default-adapter heartbeat MUST carry the runtime_metadata block + with all-False caps and no idle_timeout_seconds. The block being + present (even with zero info) is the wire signal that this runtime + speaks the new protocol — older runtimes omit the field entirely.""" + payload = _runtime_metadata_payload() + assert "runtime_metadata" in payload + meta = payload["runtime_metadata"] + assert meta["capabilities"] == { + "heartbeat": False, + "scheduler": False, + "session": False, + "status_mgmt": False, + "retry": False, + "activity_decoration": False, + "channel_dispatch": False, + } + # No override key at all — pin the "absent field = use platform + # default" wire contract Go side relies on. + assert "idle_timeout_seconds" not in meta + + +@pytest.mark.parametrize("stub_adapters_module", [_NativeAdapter], indirect=True) +def test_native_adapter_emits_capability_flag_and_idle_override(stub_adapters_module): + payload = _runtime_metadata_payload() + meta = payload["runtime_metadata"] + assert meta["capabilities"]["heartbeat"] is True + # Sibling caps untouched — declaring one capability doesn't + # accidentally claim ownership of the others. + assert meta["capabilities"]["scheduler"] is False + assert meta["idle_timeout_seconds"] == 600 + + +def test_returns_empty_dict_when_adapter_module_missing(monkeypatch): + """get_adapter() raises KeyError when ADAPTER_MODULE is unset. + Heartbeat must NEVER fail — the metadata is optional, the + heartbeat itself (alive signal) is load-bearing. Pin that the + helper swallows the error and returns {}.""" + # Remove any stub from prior tests. + monkeypatch.delitem(sys.modules, "adapters", raising=False) + # Force get_adapter to raise by ensuring ADAPTER_MODULE is unset. + monkeypatch.delenv("ADAPTER_MODULE", raising=False) + payload = _runtime_metadata_payload() + assert payload == {} + + +@pytest.mark.parametrize("stub_adapters_module", [_FakeAdapter], indirect=True) +def test_idle_timeout_override_zero_or_negative_omitted(stub_adapters_module, monkeypatch): + """An adapter that returns 0 or negative from idle_timeout_override + means 'use the platform default' — same as None. Don't ship a + bogus value to the wire that the Go side would have to filter.""" + class _BadOverrideAdapter(_FakeAdapter): + def idle_timeout_override(self) -> int: + return 0 + + fake_mod = SimpleNamespace(get_adapter=lambda runtime: _BadOverrideAdapter) + monkeypatch.setitem(sys.modules, "adapters", fake_mod) + + payload = _runtime_metadata_payload() + assert "idle_timeout_seconds" not in payload["runtime_metadata"] + + +@pytest.mark.parametrize("stub_adapters_module", [_FakeAdapter], indirect=True) +def test_swallows_unexpected_exception_inside_adapter(stub_adapters_module, monkeypatch): + """Adapter capabilities() / idle_timeout_override() throwing must + NOT crash heartbeat. Returns {} so no field is sent and the + platform falls through to defaults.""" + class _BrokenAdapter(_FakeAdapter): + def capabilities(self): + raise RuntimeError("simulated broken adapter init") + + fake_mod = SimpleNamespace(get_adapter=lambda runtime: _BrokenAdapter) + monkeypatch.setitem(sys.modules, "adapters", fake_mod) + + payload = _runtime_metadata_payload() + assert payload == {} diff --git a/workspace/tests/test_runtime_capabilities.py b/workspace/tests/test_runtime_capabilities.py index 9e48795f..d685c57f 100644 --- a/workspace/tests/test_runtime_capabilities.py +++ b/workspace/tests/test_runtime_capabilities.py @@ -152,3 +152,35 @@ class TestBaseAdapterCapabilitiesDefault: native = _NativeHeartbeatAdapter().capabilities() assert minimal.provides_native_heartbeat is False assert native.provides_native_heartbeat is True + + +class TestIdleTimeoutOverride: + """The idle_timeout_override() hook — the first capability primitive + with an actual platform consumer (workspace-server's a2a_proxy.go + consults this per-workspace before applying its idle timer). + + Default behavior MUST be no-op (return None → platform uses global + default). Subclasses override to declare longer/shorter window.""" + + def test_default_returns_none(self): + # If this default ever flips to a positive number, every adapter + # silently gets that idle timeout. The platform's global default + # (env A2A_IDLE_TIMEOUT_SECONDS, default 5min) would stop being + # the floor — instead this hook would be — and ops would lose + # the central knob. + assert _MinimalAdapter().idle_timeout_override() is None + + def test_subclass_can_override_to_positive_seconds(self): + class _SlowAdapter(_MinimalAdapter): + def idle_timeout_override(self) -> int: + return 600 # 10 min — typical for a slow synth runtime + assert _SlowAdapter().idle_timeout_override() == 600 + + def test_subclass_can_explicitly_keep_default_via_none(self): + # An adapter that overrode this in an old version then dropped + # the override (back to None) should cleanly fall back to the + # platform default. Pinning here makes the round-trip explicit. + class _DroppedOverrideAdapter(_MinimalAdapter): + def idle_timeout_override(self): + return None + assert _DroppedOverrideAdapter().idle_timeout_override() is None