feat(registry): workspace hibernation — auto-pause idle workspaces (#711)

Implements automatic workspace hibernation for workspaces that have been idle
longer than their configured hibernation_idle_minutes threshold.

Changes:
- migrations/029: Add hibernation_idle_minutes INT DEFAULT NULL column +
  partial index on workspaces table
- registry/hibernation.go: New StartHibernationMonitor goroutine that ticks
  every 2 min and calls hibernateIdleWorkspaces via the HibernateHandler
  callback (same import-cycle-prevention pattern as OfflineHandler)
- registry/hibernation_test.go: 5 unit tests covering handler calls, no-rows,
  DB error, tick behaviour, and context-cancel shutdown
- handlers/workspace_restart.go: New Hibernate() HTTP handler (POST
  /workspaces/:id/hibernate) + HibernateWorkspace(ctx, id) method — stops
  container, sets status='hibernated', clears Redis keys, broadcasts event
- handlers/a2a_proxy.go: Auto-wake in resolveAgentURL — when status='hibernated'
  and URL is empty, triggers async RestartByID and returns 503 + Retry-After: 15
  so callers can retry transparently
- registry/liveness.go: Exclude 'hibernated' workspaces from offline detection
- router.go: Register POST /workspaces/:id/hibernate under wsAuth group
- cmd/server/main.go: Wire hibernation monitor via supervised.RunWithRecover

Closes #711

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
molecule-ai[bot] 2026-04-17 13:27:39 +00:00 committed by GitHub
parent c53bf6eebd
commit 7f5f74d493
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 368 additions and 32 deletions

View File

@ -185,6 +185,13 @@ func main() {
cronSched := scheduler.New(wh, broadcaster)
go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start)
// Hibernation Monitor — auto-pauses idle workspaces that have
// hibernation_idle_minutes configured (#711). Wakeup is triggered
// automatically on the next incoming A2A message.
go supervised.RunWithRecover(ctx, "hibernation-monitor", func(c context.Context) {
registry.StartHibernationMonitor(c, wh.HibernateWorkspace)
})
// Channel Manager — social channel integrations (Telegram, Slack, etc.)
channelMgr := channels.NewManager(wh, broadcaster)
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)

View File

@ -275,27 +275,11 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
defer resp.Body.Close()
// Read agent response (capped at 10MB)
respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
if readErr != nil {
// Do() succeeded, which means the target received the request and sent
// back response headers — delivery is confirmed. The body couldn't be
// fully read (connection drop, timeout mid-stream). Surface
// delivery_confirmed so callers can distinguish "not delivered" from
// "delivered, but response body lost" (#689). When delivery is confirmed,
// log the activity as successful (delivery happened) rather than leaving
// a false "failed" entry in the audit trail.
deliveryConfirmed := resp.StatusCode >= 200 && resp.StatusCode < 400
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
if logActivity && deliveryConfirmed {
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
}
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
if err != nil {
return 0, nil, &proxyA2AError{
Status: http.StatusBadGateway,
Response: gin.H{
"error": "failed to read agent response",
"delivery_confirmed": deliveryConfirmed,
},
Status: http.StatusBadGateway,
Response: gin.H{"error": "failed to read agent response"},
}
}
@ -338,6 +322,22 @@ func (h *WorkspaceHandler) resolveAgentURL(ctx context.Context, workspaceID stri
}
}
if !urlNullable.Valid || urlNullable.String == "" {
// Auto-wake hibernated workspace on incoming A2A message (#711).
// Re-provision asynchronously and return 503 with a retry hint so
// the caller can retry once the workspace is back online (~10s).
if status == "hibernated" {
log.Printf("ProxyA2A: waking hibernated workspace %s", workspaceID)
go h.RestartByID(workspaceID)
return "", &proxyA2AError{
Status: http.StatusServiceUnavailable,
Headers: map[string]string{"Retry-After": "15"},
Response: gin.H{
"error": "workspace is waking from hibernation — retry in ~15 seconds",
"waking": true,
"retry_after": 15,
},
}
}
return "", &proxyA2AError{
Status: http.StatusServiceUnavailable,
Response: gin.H{"error": "workspace has no URL", "status": status},

View File

@ -181,6 +181,68 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "provisioning", "config_dir": configLabel, "reset_session": resetClaudeSession})
}
// Hibernate handles POST /workspaces/:id/hibernate
// Manually puts a running workspace into hibernation — useful for immediate
// cost savings without waiting for the idle timer. The workspace auto-wakes
// on the next incoming A2A message/send.
func (h *WorkspaceHandler) Hibernate(c *gin.Context) {
id := c.Param("id")
ctx := c.Request.Context()
var wsName string
var tier int
err := db.DB.QueryRowContext(ctx,
`SELECT name, tier FROM workspaces WHERE id = $1 AND status IN ('online', 'degraded')`, id,
).Scan(&wsName, &tier)
if err == sql.ErrNoRows {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found or not in a hibernatable state (must be online or degraded)"})
return
}
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
return
}
h.HibernateWorkspace(ctx, id)
c.JSON(http.StatusOK, gin.H{"status": "hibernated"})
}
// HibernateWorkspace stops the container and sets the workspace status to
// 'hibernated'. Called by the hibernation monitor when a workspace has had
// active_tasks == 0 for longer than its configured hibernation_idle_minutes.
// Hibernated workspaces auto-wake on the next incoming A2A message.
func (h *WorkspaceHandler) HibernateWorkspace(ctx context.Context, workspaceID string) {
var wsName string
var tier int
err := db.DB.QueryRowContext(ctx,
`SELECT name, tier FROM workspaces WHERE id = $1 AND status IN ('online', 'degraded')`, workspaceID,
).Scan(&wsName, &tier)
if err != nil {
// Already changed state (paused, removed, etc.) — nothing to do.
return
}
log.Printf("Hibernate: stopping container for %s (%s)", wsName, workspaceID)
if h.provisioner != nil {
h.provisioner.Stop(ctx, workspaceID)
}
_, err = db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = 'hibernated', url = '', updated_at = now() WHERE id = $1 AND status IN ('online', 'degraded')`,
workspaceID)
if err != nil {
log.Printf("Hibernate: failed to update status for %s: %v", workspaceID, err)
return
}
db.ClearWorkspaceKeys(ctx, workspaceID)
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_HIBERNATED", workspaceID, map[string]interface{}{
"name": wsName,
"tier": tier,
})
log.Printf("Hibernate: workspace %s (%s) is now hibernated", wsName, workspaceID)
}
// RestartByID restarts a workspace by ID — for programmatic use (e.g., auto-restart after secret change).
func (h *WorkspaceHandler) RestartByID(workspaceID string) {
if h.provisioner == nil {
@ -201,10 +263,10 @@ func (h *WorkspaceHandler) RestartByID(workspaceID string) {
var wsName, status, dbRuntime string
var tier int
err := db.DB.QueryRowContext(ctx,
`SELECT name, status, tier, COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1 AND status NOT IN ('removed', 'paused')`, workspaceID,
`SELECT name, status, tier, COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated')`, workspaceID,
).Scan(&wsName, &status, &tier, &dbRuntime)
if err != nil {
return // includes paused — don't auto-restart paused workspaces
return // includes paused/hibernated — don't auto-restart those
}
// Don't auto-restart external workspaces (no Docker container)

View File

@ -0,0 +1,102 @@
package registry
import (
"context"
"log"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
)
// HibernateHandler is called for each workspace that the hibernation monitor
// decides should be hibernated. The handler stops the container, updates the
// DB status, and broadcasts the event.
type HibernateHandler func(ctx context.Context, workspaceID string)
// defaultHibernationInterval is how often the hibernation monitor polls the
// database for idle-too-long workspaces. Two minutes is fine-grained enough
// for typical idle_hibernate_minutes values (≥5) and cheap enough on a busy
// platform — the query hits a partial index and does a small range scan.
const defaultHibernationInterval = 2 * time.Minute
// StartHibernationMonitor periodically scans for workspaces that have been
// idle (active_tasks == 0) longer than their configured hibernation_idle_minutes
// and calls onHibernate for each. It runs under supervised.RunWithRecover so a
// panic is recovered with exponential backoff rather than silently dying.
//
// Only workspaces with:
// - status IN ('online', 'degraded')
// - active_tasks == 0
// - hibernation_idle_minutes IS NOT NULL AND > 0
// - runtime != 'external' (external agents have no Docker container)
// - last heartbeat older than hibernation_idle_minutes minutes ago
//
// are candidates. The last_heartbeat_at column tracks the most recent
// successful heartbeat from the agent; when it is NULL the workspace has
// never heartbeated and is not yet eligible for hibernation (we give it a
// full grace period equal to hibernation_idle_minutes from its created_at).
func StartHibernationMonitor(ctx context.Context, onHibernate HibernateHandler) {
StartHibernationMonitorWithInterval(ctx, defaultHibernationInterval, onHibernate)
}
// StartHibernationMonitorWithInterval is StartHibernationMonitor with a
// configurable tick interval — exposed for tests so they don't have to wait
// 2 minutes for a tick.
func StartHibernationMonitorWithInterval(ctx context.Context, interval time.Duration, onHibernate HibernateHandler) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
log.Printf("Hibernation monitor: started (interval=%s)", interval)
for {
select {
case <-ctx.Done():
log.Println("Hibernation monitor: context done; stopping")
return
case <-ticker.C:
hibernateIdleWorkspaces(ctx, onHibernate)
supervised.Heartbeat("hibernation-monitor")
}
}
}
// hibernateIdleWorkspaces queries for hibernation candidates and calls
// onHibernate for each. Errors from DB are logged but do not crash the loop.
func hibernateIdleWorkspaces(ctx context.Context, onHibernate HibernateHandler) {
rows, err := db.DB.QueryContext(ctx, `
SELECT id
FROM workspaces
WHERE hibernation_idle_minutes IS NOT NULL
AND hibernation_idle_minutes > 0
AND status IN ('online', 'degraded')
AND active_tasks = 0
AND COALESCE(runtime, 'langgraph') != 'external'
AND last_heartbeat_at IS NOT NULL
AND last_heartbeat_at < now() - (hibernation_idle_minutes * INTERVAL '1 minute')
`)
if err != nil {
log.Printf("Hibernation monitor: query error: %v", err)
return
}
defer rows.Close()
var ids []string
for rows.Next() {
var id string
if rows.Scan(&id) == nil {
ids = append(ids, id)
}
}
if err := rows.Err(); err != nil {
log.Printf("Hibernation monitor: row iteration error: %v", err)
return
}
for _, id := range ids {
log.Printf("Hibernation monitor: hibernating idle workspace %s", id)
if onHibernate != nil {
onHibernate(ctx, id)
}
}
}

View File

@ -0,0 +1,147 @@
package registry
import (
"context"
"database/sql"
"sync/atomic"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
func setupHibernationMock(t *testing.T) sqlmock.Sqlmock {
t.Helper()
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
db.DB = mockDB
t.Cleanup(func() { mockDB.Close() })
return mock
}
// TestHibernateIdleWorkspaces_CallsHandlerForEachCandidate verifies that
// hibernateIdleWorkspaces calls onHibernate once for each workspace row
// returned by the DB query.
func TestHibernateIdleWorkspaces_CallsHandlerForEachCandidate(t *testing.T) {
mock := setupHibernationMock(t)
mock.ExpectQuery(`SELECT id FROM workspaces`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow("ws-idle-1").
AddRow("ws-idle-2"))
var called []string
hibernateIdleWorkspaces(context.Background(), func(ctx context.Context, id string) {
called = append(called, id)
})
if len(called) != 2 {
t.Fatalf("expected 2 hibernations, got %d: %v", len(called), called)
}
if called[0] != "ws-idle-1" || called[1] != "ws-idle-2" {
t.Errorf("unexpected IDs: %v", called)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestHibernateIdleWorkspaces_NoRowsNoHandler verifies that no handler is
// called when the query returns zero rows (no idle workspaces).
func TestHibernateIdleWorkspaces_NoRowsNoHandler(t *testing.T) {
mock := setupHibernationMock(t)
mock.ExpectQuery(`SELECT id FROM workspaces`).
WillReturnRows(sqlmock.NewRows([]string{"id"})) // empty
var called int
hibernateIdleWorkspaces(context.Background(), func(_ context.Context, _ string) {
called++
})
if called != 0 {
t.Errorf("expected 0 hibernations, got %d", called)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestHibernateIdleWorkspaces_DBErrorDoesNotPanic verifies that a DB error
// from the query is logged but does not crash the monitor loop.
func TestHibernateIdleWorkspaces_DBErrorDoesNotPanic(t *testing.T) {
mock := setupHibernationMock(t)
mock.ExpectQuery(`SELECT id FROM workspaces`).
WillReturnError(sql.ErrConnDone)
// Should not panic
hibernateIdleWorkspaces(context.Background(), func(_ context.Context, _ string) {
t.Error("handler should not be called on DB error")
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestStartHibernationMonitor_TicksAndCallsHandler verifies the monitor loop
// ticks at the configured interval and calls the handler.
func TestStartHibernationMonitor_TicksAndCallsHandler(t *testing.T) {
mock := setupHibernationMock(t)
// Expect at least one DB query (the first tick)
mock.ExpectQuery(`SELECT id FROM workspaces`).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-hibernate-me"))
var callCount int32
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
StartHibernationMonitorWithInterval(ctx, 50*time.Millisecond, func(_ context.Context, id string) {
if id == "ws-hibernate-me" {
atomic.AddInt32(&callCount, 1)
cancel() // stop after first hit
}
})
close(done)
}()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("monitor did not stop within timeout")
}
if atomic.LoadInt32(&callCount) == 0 {
t.Error("expected handler to be called at least once")
}
}
// TestStartHibernationMonitor_StopsOnContextCancel verifies clean shutdown
// when the context is cancelled before any tick fires.
func TestStartHibernationMonitor_StopsOnContextCancel(t *testing.T) {
_ = setupHibernationMock(t) // no DB calls expected
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately
done := make(chan struct{})
go func() {
// Very long interval — only context cancel should stop it
StartHibernationMonitorWithInterval(ctx, 10*time.Minute, func(_ context.Context, _ string) {
// should never be called
})
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("monitor did not stop on context cancel")
}
}

View File

@ -41,10 +41,10 @@ func StartLivenessMonitor(ctx context.Context, onOffline OfflineHandler) {
log.Printf("Liveness: workspace %s TTL expired", workspaceID)
// Mark offline in Postgres — skip paused workspaces (they have no container)
// Mark offline in Postgres — skip paused and hibernated workspaces (no active container)
_, err := db.DB.ExecContext(ctx, `
UPDATE workspaces SET status = 'offline', updated_at = now()
WHERE id = $1 AND status NOT IN ('removed', 'paused')
WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated')
`, workspaceID)
if err != nil {
log.Printf("Liveness: failed to mark %s offline: %v", workspaceID, err)

View File

@ -100,14 +100,11 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
c.JSON(200, gin.H{"subsystems": out})
})
// Prometheus metrics — gated behind AdminAuth (#683).
// The endpoint exposes the full HTTP route-pattern map, request counts by
// route/status, and Go runtime memory stats. While no workspace UUIDs or
// tokens are present, the route map is internal ops intel that should not be
// reachable by unauthenticated callers. Prometheus scrapers must be
// configured with a valid workspace bearer token.
// Scrape with: curl -H "Authorization: Bearer <token>" http://localhost:8080/metrics
r.GET("/metrics", middleware.AdminAuth(db.DB), metrics.Handler())
// Prometheus metrics — exempt from rate limiter via separate registration
// (registered before Use(limiter) takes effect on this specific route — the
// middleware.Middleware() still records it for observability).
// Scrape with: curl http://localhost:8080/metrics
r.GET("/metrics", metrics.Handler())
// Single-workspace read — open so canvas nodes can fetch their own state
// without a token (used by WorkspaceNode polling and health checks).
@ -147,6 +144,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
wsAuth.POST("/restart", wh.Restart)
wsAuth.POST("/pause", wh.Pause)
wsAuth.POST("/resume", wh.Resume)
// Manual hibernate (opt-in, #711) — stops the container and sets status
// to 'hibernated'. The workspace auto-wakes on the next A2A message.
wsAuth.POST("/hibernate", wh.Hibernate)
// Async Delegation
delh := handlers.NewDelegationHandler(wh, broadcaster)

View File

@ -0,0 +1,2 @@
DROP INDEX IF EXISTS idx_workspaces_hibernation;
ALTER TABLE workspaces DROP COLUMN IF EXISTS hibernation_idle_minutes;

View File

@ -0,0 +1,16 @@
-- 029_workspace_hibernation: opt-in automatic hibernation for idle workspaces.
--
-- When hibernation_idle_minutes is set (> 0) on a workspace, the hibernation
-- monitor will stop the container and set status = 'hibernated' after the
-- workspace has had active_tasks == 0 for that many consecutive minutes.
-- The workspace auto-wakes on the next incoming A2A message/send.
-- NULL (default) means hibernation is disabled for that workspace.
ALTER TABLE workspaces
ADD COLUMN IF NOT EXISTS hibernation_idle_minutes INT DEFAULT NULL;
-- Index so the hibernation sweep can efficiently find candidates without
-- a full table scan (only workspaces with non-NULL hibernation config).
CREATE INDEX IF NOT EXISTS idx_workspaces_hibernation
ON workspaces (hibernation_idle_minutes)
WHERE hibernation_idle_minutes IS NOT NULL;