Some checks failed
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 9s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 8s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 8s
Harness Replays / detect-changes (pull_request) Successful in 9s
CI / Python Lint & Test (pull_request) Successful in 6s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 6s
CI / Canvas (Next.js) (pull_request) Successful in 8s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 10s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 12s
Harness Replays / Harness Replays (pull_request) Successful in 8s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 8s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
CodeQL / Analyze (${{ matrix.language }}) (python) (pull_request) Failing after 1m36s
cascade-list-drift-gate / check (pull_request) Successful in 5s
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (pull_request) Failing after 1m30s
CodeQL / Analyze (${{ matrix.language }}) (go) (pull_request) Failing after 1m39s
E2E API Smoke Test / E2E API Smoke Test (pull_request) Failing after 2m50s
Retarget main PRs to staging / Retarget to staging (pull_request) Has been skipped
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 5s
CI / Platform (Go) (pull_request) Successful in 4m29s
CI / Detect changes (pull_request) Successful in 6s
E2E API Smoke Test / detect-changes (pull_request) Successful in 8s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 7s
Adds a 'mock' runtime: virtual workspaces with no container, no EC2,
no LLM. Every A2A reply is synthesised from a small canned-variant
pool ('On it!', 'Got it, on it now.', etc.) deterministically seeded
by (workspace_id, request_id).
Built for funding-demo "200-workspace mock org" — renders an
enterprise-scale org chart on the canvas (CEO/VPs/Managers/ICs)
without burning real LLM credits or provisioning 200 EC2 instances.
Surfaces:
- workspace-server/internal/handlers/mock_runtime.go: A2A proxy
short-circuit, canned-reply pool, deterministic variant pick.
- workspace-server/internal/handlers/a2a_proxy.go: gate the
short-circuit before resolveAgentURL (mock has no URL).
- workspace-server/internal/handlers/org_import.go: skip Docker
provisioning for mock workspaces, set status='online' directly,
drop the per-sibling 2s pacing for mock children (collapses
a 200-workspace import from ~7min → ~1s).
- workspace-server/internal/handlers/runtime_registry.go: register
'mock' in the runtime allowlist (manifest + fallback set).
- workspace-server/internal/registry/healthsweep.go +
orphan_sweeper.go: skip mock workspaces in container-health and
stale-token sweeps (no container by design).
- workspace-server/internal/handlers/workspace_restart.go: mirror
the 'external' Restart no-op for mock.
- manifest.json: register the new
Molecule-AI/molecule-ai-org-template-mock-bigorg repo.
Tests: 5 new in mock_runtime_test.go covering happy-path, non-mock
regression guard, determinism, IsMockRuntime trim/case, JSON-RPC
id echo. All existing handler + registry tests still pass.
Local-verified: imported the 200-workspace template against a fresh
postgres+redis, confirmed all 200 land in 'online' and stay there
through the 30s health-sweep window, exercised A2A on CEO + VPs +
Managers + ICs and saw the variant pool rotate.
Org template lives at
Molecule-AI/molecule-ai-org-template-mock-bigorg (created today)
and is imported via the existing /org/import flow on the canvas
Template Palette.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
190 lines
6.6 KiB
Go
190 lines
6.6 KiB
Go
package registry
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"os"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
|
|
)
|
|
|
|
// ContainerChecker checks if a workspace container is running via Docker API.
|
|
type ContainerChecker interface {
|
|
IsRunning(ctx context.Context, workspaceID string) (bool, error)
|
|
}
|
|
|
|
// DefaultRemoteStaleAfter is the default heartbeat-freshness window for
|
|
// `runtime='external'` workspaces before they're marked offline. Chosen
|
|
// slightly longer than the 60s Redis TTL so transient network hiccups
|
|
// don't cause a flapping online/offline ping-pong on the canvas. Override
|
|
// via `REMOTE_LIVENESS_STALE_AFTER` env var (integer seconds).
|
|
const DefaultRemoteStaleAfter = 90 * time.Second
|
|
|
|
// remoteStaleAfter reads the override from env, falling back to default.
|
|
// Called once per sweep tick — we don't cache because ops occasionally
|
|
// tune this live via a container restart, and the overhead of reading
|
|
// an env var on a 15s cadence is irrelevant.
|
|
func remoteStaleAfter() time.Duration {
|
|
if v := os.Getenv("REMOTE_LIVENESS_STALE_AFTER"); v != "" {
|
|
if n, err := strconv.Atoi(v); err == nil && n > 0 {
|
|
return time.Duration(n) * time.Second
|
|
}
|
|
}
|
|
return DefaultRemoteStaleAfter
|
|
}
|
|
|
|
// StartHealthSweep periodically checks all "online" workspaces. For
|
|
// container-backed runtimes (langgraph, claude-code, …) it calls the
|
|
// Docker API via `checker.IsRunning`. For `runtime='external'` (remote
|
|
// agents per Phase 30) it checks heartbeat freshness: a heartbeat older
|
|
// than `REMOTE_LIVENESS_STALE_AFTER` (default 90s) marks the workspace
|
|
// offline and calls `onOffline`.
|
|
//
|
|
// If `checker` is nil we still run the remote-liveness path — a
|
|
// deployment without Docker (e.g. a pure SaaS front-door) is a valid
|
|
// configuration and shouldn't lose liveness monitoring for its remote
|
|
// agents.
|
|
func StartHealthSweep(ctx context.Context, checker ContainerChecker, interval time.Duration, onOffline OfflineHandler) {
|
|
if checker == nil {
|
|
log.Println("Health sweep: no Docker container checker — running remote-liveness sweep only")
|
|
}
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
log.Printf("Health sweep: started (interval=%s, remote stale-after=%s)", interval, remoteStaleAfter())
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
if checker != nil {
|
|
sweepOnlineWorkspaces(ctx, checker, onOffline)
|
|
}
|
|
sweepStaleRemoteWorkspaces(ctx, onOffline)
|
|
}
|
|
}
|
|
}
|
|
|
|
func sweepOnlineWorkspaces(ctx context.Context, checker ContainerChecker, onOffline OfflineHandler) {
|
|
// Skip external + mock workspaces — neither has a Docker container.
|
|
// external: agent runs on operator's laptop, polled via heartbeat.
|
|
// mock: virtual workspace, every reply is canned (see
|
|
// workspace-server/internal/handlers/mock_runtime.go). Both would
|
|
// false-positive as "container gone" on every sweep tick and
|
|
// auto-restart would loop forever (provisioner has no template
|
|
// for either runtime).
|
|
rows, err := db.DB.QueryContext(ctx,
|
|
`SELECT id FROM workspaces WHERE status IN ('online', 'degraded') AND COALESCE(runtime, 'langgraph') NOT IN ('external', 'mock')`)
|
|
if err != nil {
|
|
log.Printf("Health sweep: query error: %v", err)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
var ids []string
|
|
for rows.Next() {
|
|
var id string
|
|
if err := rows.Scan(&id); err == nil {
|
|
ids = append(ids, id)
|
|
}
|
|
}
|
|
|
|
for _, id := range ids {
|
|
running, err := checker.IsRunning(ctx, id)
|
|
if err != nil {
|
|
continue // Docker API error — skip, don't false-positive
|
|
}
|
|
if running {
|
|
continue
|
|
}
|
|
|
|
log.Printf("Health sweep: container for %s is gone — marking offline", id)
|
|
|
|
_, err = db.DB.ExecContext(ctx,
|
|
`UPDATE workspaces SET status = $1, updated_at = now()
|
|
WHERE id = $2 AND status NOT IN ('removed', 'provisioning')`,
|
|
models.StatusOffline, id)
|
|
if err != nil {
|
|
log.Printf("Health sweep: failed to mark %s offline: %v", id, err)
|
|
continue
|
|
}
|
|
|
|
db.ClearWorkspaceKeys(ctx, id)
|
|
|
|
if onOffline != nil {
|
|
onOffline(ctx, id)
|
|
}
|
|
}
|
|
}
|
|
|
|
// sweepStaleRemoteWorkspaces marks `runtime='external'` workspaces offline
|
|
// when their last heartbeat is older than `remoteStaleAfter()`. This is
|
|
// the Phase 30.7 analogue of `sweepOnlineWorkspaces` — instead of asking
|
|
// Docker "is the container alive?" we ask the DB "did the agent check in
|
|
// recently?". Workspaces that never heartbeated (last_heartbeat_at IS
|
|
// NULL) are eligible for sweep only after they've been online longer
|
|
// than the staleness window, so a newly-registered agent gets a full
|
|
// grace period to send its first heartbeat.
|
|
func sweepStaleRemoteWorkspaces(ctx context.Context, onOffline OfflineHandler) {
|
|
staleAfter := remoteStaleAfter()
|
|
staleAfterSec := int(staleAfter / time.Second)
|
|
|
|
// Use Postgres age arithmetic so the cutoff is computed server-side
|
|
// (no clock skew between platform host and DB). `COALESCE` ensures
|
|
// a NULL heartbeat is compared against updated_at (which is set
|
|
// when the external workspace was created + marked online) — that
|
|
// way an agent that registered but immediately crashed before its
|
|
// first heartbeat still gets swept after the grace window.
|
|
rows, err := db.DB.QueryContext(ctx, `
|
|
SELECT id FROM workspaces
|
|
WHERE status IN ('online', 'degraded')
|
|
AND COALESCE(runtime, 'langgraph') = 'external'
|
|
AND COALESCE(last_heartbeat_at, updated_at) < now() - ($1 || ' seconds')::interval
|
|
`, staleAfterSec)
|
|
if err != nil {
|
|
log.Printf("Health sweep (remote): query error: %v", err)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
var ids []string
|
|
for rows.Next() {
|
|
var id string
|
|
if err := rows.Scan(&id); err == nil {
|
|
ids = append(ids, id)
|
|
}
|
|
}
|
|
|
|
for _, id := range ids {
|
|
// External workspaces flip to 'awaiting_agent' (re-registrable
|
|
// via /registry/register) instead of 'offline' (which was the
|
|
// terminal-feeling status used pre-2026-04-30). The CLI's
|
|
// `molecule connect` command (RFC #10 in molecule-cli) re-
|
|
// registers on each invocation, bringing the workspace back
|
|
// online. 'offline' was confusing because it implied "agent
|
|
// crashed and needs operator intervention" when often the
|
|
// operator simply closed their laptop overnight.
|
|
log.Printf("Health sweep (remote): %s heartbeat stale (>%s) — marking awaiting_agent", id, staleAfter)
|
|
|
|
_, err = db.DB.ExecContext(ctx,
|
|
`UPDATE workspaces SET status = $1, updated_at = now()
|
|
WHERE id = $2 AND status NOT IN ('removed', 'provisioning', 'paused')`,
|
|
models.StatusAwaitingAgent, id)
|
|
if err != nil {
|
|
log.Printf("Health sweep (remote): failed to mark %s awaiting_agent: %v", id, err)
|
|
continue
|
|
}
|
|
|
|
db.ClearWorkspaceKeys(ctx, id)
|
|
|
|
if onOffline != nil {
|
|
onOffline(ctx, id)
|
|
}
|
|
}
|
|
}
|