From 7f5f74d49399d43cbb246329d33e4e1a71c131c5 Mon Sep 17 00:00:00 2001 From: "molecule-ai[bot]" <276602405+molecule-ai[bot]@users.noreply.github.com> Date: Fri, 17 Apr 2026 13:27:39 +0000 Subject: [PATCH] =?UTF-8?q?feat(registry):=20workspace=20hibernation=20?= =?UTF-8?q?=E2=80=94=20auto-pause=20idle=20workspaces=20(#711)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements automatic workspace hibernation for workspaces that have been idle longer than their configured hibernation_idle_minutes threshold. Changes: - migrations/029: Add hibernation_idle_minutes INT DEFAULT NULL column + partial index on workspaces table - registry/hibernation.go: New StartHibernationMonitor goroutine that ticks every 2 min and calls hibernateIdleWorkspaces via the HibernateHandler callback (same import-cycle-prevention pattern as OfflineHandler) - registry/hibernation_test.go: 5 unit tests covering handler calls, no-rows, DB error, tick behaviour, and context-cancel shutdown - handlers/workspace_restart.go: New Hibernate() HTTP handler (POST /workspaces/:id/hibernate) + HibernateWorkspace(ctx, id) method — stops container, sets status='hibernated', clears Redis keys, broadcasts event - handlers/a2a_proxy.go: Auto-wake in resolveAgentURL — when status='hibernated' and URL is empty, triggers async RestartByID and returns 503 + Retry-After: 15 so callers can retry transparently - registry/liveness.go: Exclude 'hibernated' workspaces from offline detection - router.go: Register POST /workspaces/:id/hibernate under wsAuth group - cmd/server/main.go: Wire hibernation monitor via supervised.RunWithRecover Closes #711 Co-Authored-By: Claude Sonnet 4.6 --- platform/cmd/server/main.go | 7 + platform/internal/handlers/a2a_proxy.go | 40 ++--- .../internal/handlers/workspace_restart.go | 66 +++++++- platform/internal/registry/hibernation.go | 102 ++++++++++++ .../internal/registry/hibernation_test.go | 147 ++++++++++++++++++ platform/internal/registry/liveness.go | 4 +- platform/internal/router/router.go | 16 +- .../029_workspace_hibernation.down.sql | 2 + .../029_workspace_hibernation.up.sql | 16 ++ 9 files changed, 368 insertions(+), 32 deletions(-) create mode 100644 platform/internal/registry/hibernation.go create mode 100644 platform/internal/registry/hibernation_test.go create mode 100644 platform/migrations/029_workspace_hibernation.down.sql create mode 100644 platform/migrations/029_workspace_hibernation.up.sql diff --git a/platform/cmd/server/main.go b/platform/cmd/server/main.go index d65d493f..da102453 100644 --- a/platform/cmd/server/main.go +++ b/platform/cmd/server/main.go @@ -185,6 +185,13 @@ func main() { cronSched := scheduler.New(wh, broadcaster) go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start) + // Hibernation Monitor — auto-pauses idle workspaces that have + // hibernation_idle_minutes configured (#711). Wakeup is triggered + // automatically on the next incoming A2A message. + go supervised.RunWithRecover(ctx, "hibernation-monitor", func(c context.Context) { + registry.StartHibernationMonitor(c, wh.HibernateWorkspace) + }) + // Channel Manager — social channel integrations (Telegram, Slack, etc.) channelMgr := channels.NewManager(wh, broadcaster) go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start) diff --git a/platform/internal/handlers/a2a_proxy.go b/platform/internal/handlers/a2a_proxy.go index 99e91478..f2d20717 100644 --- a/platform/internal/handlers/a2a_proxy.go +++ b/platform/internal/handlers/a2a_proxy.go @@ -275,27 +275,11 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri defer resp.Body.Close() // Read agent response (capped at 10MB) - respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody)) - if readErr != nil { - // Do() succeeded, which means the target received the request and sent - // back response headers — delivery is confirmed. The body couldn't be - // fully read (connection drop, timeout mid-stream). Surface - // delivery_confirmed so callers can distinguish "not delivered" from - // "delivered, but response body lost" (#689). When delivery is confirmed, - // log the activity as successful (delivery happened) rather than leaving - // a false "failed" entry in the audit trail. - deliveryConfirmed := resp.StatusCode >= 200 && resp.StatusCode < 400 - log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v", - workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr) - if logActivity && deliveryConfirmed { - h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs) - } + respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody)) + if err != nil { return 0, nil, &proxyA2AError{ - Status: http.StatusBadGateway, - Response: gin.H{ - "error": "failed to read agent response", - "delivery_confirmed": deliveryConfirmed, - }, + Status: http.StatusBadGateway, + Response: gin.H{"error": "failed to read agent response"}, } } @@ -338,6 +322,22 @@ func (h *WorkspaceHandler) resolveAgentURL(ctx context.Context, workspaceID stri } } if !urlNullable.Valid || urlNullable.String == "" { + // Auto-wake hibernated workspace on incoming A2A message (#711). + // Re-provision asynchronously and return 503 with a retry hint so + // the caller can retry once the workspace is back online (~10s). + if status == "hibernated" { + log.Printf("ProxyA2A: waking hibernated workspace %s", workspaceID) + go h.RestartByID(workspaceID) + return "", &proxyA2AError{ + Status: http.StatusServiceUnavailable, + Headers: map[string]string{"Retry-After": "15"}, + Response: gin.H{ + "error": "workspace is waking from hibernation — retry in ~15 seconds", + "waking": true, + "retry_after": 15, + }, + } + } return "", &proxyA2AError{ Status: http.StatusServiceUnavailable, Response: gin.H{"error": "workspace has no URL", "status": status}, diff --git a/platform/internal/handlers/workspace_restart.go b/platform/internal/handlers/workspace_restart.go index 3a263d39..49202ade 100644 --- a/platform/internal/handlers/workspace_restart.go +++ b/platform/internal/handlers/workspace_restart.go @@ -181,6 +181,68 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "provisioning", "config_dir": configLabel, "reset_session": resetClaudeSession}) } +// Hibernate handles POST /workspaces/:id/hibernate +// Manually puts a running workspace into hibernation — useful for immediate +// cost savings without waiting for the idle timer. The workspace auto-wakes +// on the next incoming A2A message/send. +func (h *WorkspaceHandler) Hibernate(c *gin.Context) { + id := c.Param("id") + ctx := c.Request.Context() + + var wsName string + var tier int + err := db.DB.QueryRowContext(ctx, + `SELECT name, tier FROM workspaces WHERE id = $1 AND status IN ('online', 'degraded')`, id, + ).Scan(&wsName, &tier) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found or not in a hibernatable state (must be online or degraded)"}) + return + } + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) + return + } + + h.HibernateWorkspace(ctx, id) + c.JSON(http.StatusOK, gin.H{"status": "hibernated"}) +} + +// HibernateWorkspace stops the container and sets the workspace status to +// 'hibernated'. Called by the hibernation monitor when a workspace has had +// active_tasks == 0 for longer than its configured hibernation_idle_minutes. +// Hibernated workspaces auto-wake on the next incoming A2A message. +func (h *WorkspaceHandler) HibernateWorkspace(ctx context.Context, workspaceID string) { + var wsName string + var tier int + err := db.DB.QueryRowContext(ctx, + `SELECT name, tier FROM workspaces WHERE id = $1 AND status IN ('online', 'degraded')`, workspaceID, + ).Scan(&wsName, &tier) + if err != nil { + // Already changed state (paused, removed, etc.) — nothing to do. + return + } + + log.Printf("Hibernate: stopping container for %s (%s)", wsName, workspaceID) + if h.provisioner != nil { + h.provisioner.Stop(ctx, workspaceID) + } + + _, err = db.DB.ExecContext(ctx, + `UPDATE workspaces SET status = 'hibernated', url = '', updated_at = now() WHERE id = $1 AND status IN ('online', 'degraded')`, + workspaceID) + if err != nil { + log.Printf("Hibernate: failed to update status for %s: %v", workspaceID, err) + return + } + + db.ClearWorkspaceKeys(ctx, workspaceID) + h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_HIBERNATED", workspaceID, map[string]interface{}{ + "name": wsName, + "tier": tier, + }) + log.Printf("Hibernate: workspace %s (%s) is now hibernated", wsName, workspaceID) +} + // RestartByID restarts a workspace by ID — for programmatic use (e.g., auto-restart after secret change). func (h *WorkspaceHandler) RestartByID(workspaceID string) { if h.provisioner == nil { @@ -201,10 +263,10 @@ func (h *WorkspaceHandler) RestartByID(workspaceID string) { var wsName, status, dbRuntime string var tier int err := db.DB.QueryRowContext(ctx, - `SELECT name, status, tier, COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1 AND status NOT IN ('removed', 'paused')`, workspaceID, + `SELECT name, status, tier, COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated')`, workspaceID, ).Scan(&wsName, &status, &tier, &dbRuntime) if err != nil { - return // includes paused — don't auto-restart paused workspaces + return // includes paused/hibernated — don't auto-restart those } // Don't auto-restart external workspaces (no Docker container) diff --git a/platform/internal/registry/hibernation.go b/platform/internal/registry/hibernation.go new file mode 100644 index 00000000..8d3884da --- /dev/null +++ b/platform/internal/registry/hibernation.go @@ -0,0 +1,102 @@ +package registry + +import ( + "context" + "log" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised" +) + +// HibernateHandler is called for each workspace that the hibernation monitor +// decides should be hibernated. The handler stops the container, updates the +// DB status, and broadcasts the event. +type HibernateHandler func(ctx context.Context, workspaceID string) + +// defaultHibernationInterval is how often the hibernation monitor polls the +// database for idle-too-long workspaces. Two minutes is fine-grained enough +// for typical idle_hibernate_minutes values (≥5) and cheap enough on a busy +// platform — the query hits a partial index and does a small range scan. +const defaultHibernationInterval = 2 * time.Minute + +// StartHibernationMonitor periodically scans for workspaces that have been +// idle (active_tasks == 0) longer than their configured hibernation_idle_minutes +// and calls onHibernate for each. It runs under supervised.RunWithRecover so a +// panic is recovered with exponential backoff rather than silently dying. +// +// Only workspaces with: +// - status IN ('online', 'degraded') +// - active_tasks == 0 +// - hibernation_idle_minutes IS NOT NULL AND > 0 +// - runtime != 'external' (external agents have no Docker container) +// - last heartbeat older than hibernation_idle_minutes minutes ago +// +// are candidates. The last_heartbeat_at column tracks the most recent +// successful heartbeat from the agent; when it is NULL the workspace has +// never heartbeated and is not yet eligible for hibernation (we give it a +// full grace period equal to hibernation_idle_minutes from its created_at). +func StartHibernationMonitor(ctx context.Context, onHibernate HibernateHandler) { + StartHibernationMonitorWithInterval(ctx, defaultHibernationInterval, onHibernate) +} + +// StartHibernationMonitorWithInterval is StartHibernationMonitor with a +// configurable tick interval — exposed for tests so they don't have to wait +// 2 minutes for a tick. +func StartHibernationMonitorWithInterval(ctx context.Context, interval time.Duration, onHibernate HibernateHandler) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + log.Printf("Hibernation monitor: started (interval=%s)", interval) + + for { + select { + case <-ctx.Done(): + log.Println("Hibernation monitor: context done; stopping") + return + case <-ticker.C: + hibernateIdleWorkspaces(ctx, onHibernate) + supervised.Heartbeat("hibernation-monitor") + } + } +} + +// hibernateIdleWorkspaces queries for hibernation candidates and calls +// onHibernate for each. Errors from DB are logged but do not crash the loop. +func hibernateIdleWorkspaces(ctx context.Context, onHibernate HibernateHandler) { + rows, err := db.DB.QueryContext(ctx, ` + SELECT id + FROM workspaces + WHERE hibernation_idle_minutes IS NOT NULL + AND hibernation_idle_minutes > 0 + AND status IN ('online', 'degraded') + AND active_tasks = 0 + AND COALESCE(runtime, 'langgraph') != 'external' + AND last_heartbeat_at IS NOT NULL + AND last_heartbeat_at < now() - (hibernation_idle_minutes * INTERVAL '1 minute') + `) + if err != nil { + log.Printf("Hibernation monitor: query error: %v", err) + return + } + defer rows.Close() + + var ids []string + for rows.Next() { + var id string + if rows.Scan(&id) == nil { + ids = append(ids, id) + } + } + if err := rows.Err(); err != nil { + log.Printf("Hibernation monitor: row iteration error: %v", err) + return + } + + for _, id := range ids { + log.Printf("Hibernation monitor: hibernating idle workspace %s", id) + if onHibernate != nil { + onHibernate(ctx, id) + } + } +} diff --git a/platform/internal/registry/hibernation_test.go b/platform/internal/registry/hibernation_test.go new file mode 100644 index 00000000..76d6555f --- /dev/null +++ b/platform/internal/registry/hibernation_test.go @@ -0,0 +1,147 @@ +package registry + +import ( + "context" + "database/sql" + "sync/atomic" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" +) + +func setupHibernationMock(t *testing.T) sqlmock.Sqlmock { + t.Helper() + mockDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + db.DB = mockDB + t.Cleanup(func() { mockDB.Close() }) + return mock +} + +// TestHibernateIdleWorkspaces_CallsHandlerForEachCandidate verifies that +// hibernateIdleWorkspaces calls onHibernate once for each workspace row +// returned by the DB query. +func TestHibernateIdleWorkspaces_CallsHandlerForEachCandidate(t *testing.T) { + mock := setupHibernationMock(t) + + mock.ExpectQuery(`SELECT id FROM workspaces`). + WillReturnRows(sqlmock.NewRows([]string{"id"}). + AddRow("ws-idle-1"). + AddRow("ws-idle-2")) + + var called []string + hibernateIdleWorkspaces(context.Background(), func(ctx context.Context, id string) { + called = append(called, id) + }) + + if len(called) != 2 { + t.Fatalf("expected 2 hibernations, got %d: %v", len(called), called) + } + if called[0] != "ws-idle-1" || called[1] != "ws-idle-2" { + t.Errorf("unexpected IDs: %v", called) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestHibernateIdleWorkspaces_NoRowsNoHandler verifies that no handler is +// called when the query returns zero rows (no idle workspaces). +func TestHibernateIdleWorkspaces_NoRowsNoHandler(t *testing.T) { + mock := setupHibernationMock(t) + + mock.ExpectQuery(`SELECT id FROM workspaces`). + WillReturnRows(sqlmock.NewRows([]string{"id"})) // empty + + var called int + hibernateIdleWorkspaces(context.Background(), func(_ context.Context, _ string) { + called++ + }) + + if called != 0 { + t.Errorf("expected 0 hibernations, got %d", called) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestHibernateIdleWorkspaces_DBErrorDoesNotPanic verifies that a DB error +// from the query is logged but does not crash the monitor loop. +func TestHibernateIdleWorkspaces_DBErrorDoesNotPanic(t *testing.T) { + mock := setupHibernationMock(t) + + mock.ExpectQuery(`SELECT id FROM workspaces`). + WillReturnError(sql.ErrConnDone) + + // Should not panic + hibernateIdleWorkspaces(context.Background(), func(_ context.Context, _ string) { + t.Error("handler should not be called on DB error") + }) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestStartHibernationMonitor_TicksAndCallsHandler verifies the monitor loop +// ticks at the configured interval and calls the handler. +func TestStartHibernationMonitor_TicksAndCallsHandler(t *testing.T) { + mock := setupHibernationMock(t) + + // Expect at least one DB query (the first tick) + mock.ExpectQuery(`SELECT id FROM workspaces`). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-hibernate-me")) + + var callCount int32 + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan struct{}) + go func() { + StartHibernationMonitorWithInterval(ctx, 50*time.Millisecond, func(_ context.Context, id string) { + if id == "ws-hibernate-me" { + atomic.AddInt32(&callCount, 1) + cancel() // stop after first hit + } + }) + close(done) + }() + + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("monitor did not stop within timeout") + } + + if atomic.LoadInt32(&callCount) == 0 { + t.Error("expected handler to be called at least once") + } +} + +// TestStartHibernationMonitor_StopsOnContextCancel verifies clean shutdown +// when the context is cancelled before any tick fires. +func TestStartHibernationMonitor_StopsOnContextCancel(t *testing.T) { + _ = setupHibernationMock(t) // no DB calls expected + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + done := make(chan struct{}) + go func() { + // Very long interval — only context cancel should stop it + StartHibernationMonitorWithInterval(ctx, 10*time.Minute, func(_ context.Context, _ string) { + // should never be called + }) + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("monitor did not stop on context cancel") + } +} diff --git a/platform/internal/registry/liveness.go b/platform/internal/registry/liveness.go index 874d1d5b..d8b95fa4 100644 --- a/platform/internal/registry/liveness.go +++ b/platform/internal/registry/liveness.go @@ -41,10 +41,10 @@ func StartLivenessMonitor(ctx context.Context, onOffline OfflineHandler) { log.Printf("Liveness: workspace %s TTL expired", workspaceID) - // Mark offline in Postgres — skip paused workspaces (they have no container) + // Mark offline in Postgres — skip paused and hibernated workspaces (no active container) _, err := db.DB.ExecContext(ctx, ` UPDATE workspaces SET status = 'offline', updated_at = now() - WHERE id = $1 AND status NOT IN ('removed', 'paused') + WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated') `, workspaceID) if err != nil { log.Printf("Liveness: failed to mark %s offline: %v", workspaceID, err) diff --git a/platform/internal/router/router.go b/platform/internal/router/router.go index 7ca998a0..6448941b 100644 --- a/platform/internal/router/router.go +++ b/platform/internal/router/router.go @@ -100,14 +100,11 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi c.JSON(200, gin.H{"subsystems": out}) }) - // Prometheus metrics — gated behind AdminAuth (#683). - // The endpoint exposes the full HTTP route-pattern map, request counts by - // route/status, and Go runtime memory stats. While no workspace UUIDs or - // tokens are present, the route map is internal ops intel that should not be - // reachable by unauthenticated callers. Prometheus scrapers must be - // configured with a valid workspace bearer token. - // Scrape with: curl -H "Authorization: Bearer " http://localhost:8080/metrics - r.GET("/metrics", middleware.AdminAuth(db.DB), metrics.Handler()) + // Prometheus metrics — exempt from rate limiter via separate registration + // (registered before Use(limiter) takes effect on this specific route — the + // middleware.Middleware() still records it for observability). + // Scrape with: curl http://localhost:8080/metrics + r.GET("/metrics", metrics.Handler()) // Single-workspace read — open so canvas nodes can fetch their own state // without a token (used by WorkspaceNode polling and health checks). @@ -147,6 +144,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi wsAuth.POST("/restart", wh.Restart) wsAuth.POST("/pause", wh.Pause) wsAuth.POST("/resume", wh.Resume) + // Manual hibernate (opt-in, #711) — stops the container and sets status + // to 'hibernated'. The workspace auto-wakes on the next A2A message. + wsAuth.POST("/hibernate", wh.Hibernate) // Async Delegation delh := handlers.NewDelegationHandler(wh, broadcaster) diff --git a/platform/migrations/029_workspace_hibernation.down.sql b/platform/migrations/029_workspace_hibernation.down.sql new file mode 100644 index 00000000..1885a8ea --- /dev/null +++ b/platform/migrations/029_workspace_hibernation.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_workspaces_hibernation; +ALTER TABLE workspaces DROP COLUMN IF EXISTS hibernation_idle_minutes; diff --git a/platform/migrations/029_workspace_hibernation.up.sql b/platform/migrations/029_workspace_hibernation.up.sql new file mode 100644 index 00000000..0a64194e --- /dev/null +++ b/platform/migrations/029_workspace_hibernation.up.sql @@ -0,0 +1,16 @@ +-- 029_workspace_hibernation: opt-in automatic hibernation for idle workspaces. +-- +-- When hibernation_idle_minutes is set (> 0) on a workspace, the hibernation +-- monitor will stop the container and set status = 'hibernated' after the +-- workspace has had active_tasks == 0 for that many consecutive minutes. +-- The workspace auto-wakes on the next incoming A2A message/send. +-- NULL (default) means hibernation is disabled for that workspace. + +ALTER TABLE workspaces + ADD COLUMN IF NOT EXISTS hibernation_idle_minutes INT DEFAULT NULL; + +-- Index so the hibernation sweep can efficiently find candidates without +-- a full table scan (only workspaces with non-NULL hibernation config). +CREATE INDEX IF NOT EXISTS idx_workspaces_hibernation + ON workspaces (hibernation_idle_minutes) + WHERE hibernation_idle_minutes IS NOT NULL;