diff --git a/platform/cmd/server/main.go b/platform/cmd/server/main.go index d65d493f..da102453 100644 --- a/platform/cmd/server/main.go +++ b/platform/cmd/server/main.go @@ -185,6 +185,13 @@ func main() { cronSched := scheduler.New(wh, broadcaster) go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start) + // Hibernation Monitor — auto-pauses idle workspaces that have + // hibernation_idle_minutes configured (#711). Wakeup is triggered + // automatically on the next incoming A2A message. + go supervised.RunWithRecover(ctx, "hibernation-monitor", func(c context.Context) { + registry.StartHibernationMonitor(c, wh.HibernateWorkspace) + }) + // Channel Manager — social channel integrations (Telegram, Slack, etc.) channelMgr := channels.NewManager(wh, broadcaster) go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start) diff --git a/platform/internal/handlers/a2a_proxy.go b/platform/internal/handlers/a2a_proxy.go index 4d8c51be..ca81334b 100644 --- a/platform/internal/handlers/a2a_proxy.go +++ b/platform/internal/handlers/a2a_proxy.go @@ -288,16 +288,16 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri } defer resp.Body.Close() - // Read agent response (capped at 10MB) + // Read agent response (capped at 10MB). + // #689: Do() succeeded, which means the target received the request and sent + // back response headers — delivery is confirmed. The body couldn't be + // fully read (connection drop, timeout mid-stream). Surface + // delivery_confirmed so callers can distinguish "not delivered" from + // "delivered, but response body lost". When delivery is confirmed, + // log the activity as successful (delivery happened) rather than leaving + // a false "failed" entry in the audit trail. respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody)) if readErr != nil { - // Do() succeeded, which means the target received the request and sent - // back response headers — delivery is confirmed. The body couldn't be - // fully read (connection drop, timeout mid-stream). Surface - // delivery_confirmed so callers can distinguish "not delivered" from - // "delivered, but response body lost" (#689). When delivery is confirmed, - // log the activity as successful (delivery happened) rather than leaving - // a false "failed" entry in the audit trail. deliveryConfirmed := resp.StatusCode >= 200 && resp.StatusCode < 400 log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v", workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr) @@ -352,6 +352,22 @@ func (h *WorkspaceHandler) resolveAgentURL(ctx context.Context, workspaceID stri } } if !urlNullable.Valid || urlNullable.String == "" { + // Auto-wake hibernated workspace on incoming A2A message (#711). + // Re-provision asynchronously and return 503 with a retry hint so + // the caller can retry once the workspace is back online (~10s). + if status == "hibernated" { + log.Printf("ProxyA2A: waking hibernated workspace %s", workspaceID) + go h.RestartByID(workspaceID) + return "", &proxyA2AError{ + Status: http.StatusServiceUnavailable, + Headers: map[string]string{"Retry-After": "15"}, + Response: gin.H{ + "error": "workspace is waking from hibernation — retry in ~15 seconds", + "waking": true, + "retry_after": 15, + }, + } + } return "", &proxyA2AError{ Status: http.StatusServiceUnavailable, Response: gin.H{"error": "workspace has no URL", "status": status}, diff --git a/platform/internal/handlers/a2a_proxy_test.go b/platform/internal/handlers/a2a_proxy_test.go index da7c2257..438e4c06 100644 --- a/platform/internal/handlers/a2a_proxy_test.go +++ b/platform/internal/handlers/a2a_proxy_test.go @@ -1282,3 +1282,81 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) { handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10) time.Sleep(80 * time.Millisecond) } + +// ────────────────────────────────────────────────────────────────────────────── +// A2A auto-wake: hibernated workspace (#711) +// ────────────────────────────────────────────────────────────────────────────── + +// TestResolveAgentURL_HibernatedWorkspace_Returns503WithWaking verifies the +// auto-wake path added in PR #724: when resolveAgentURL finds a workspace with +// status='hibernated' and no URL, it must: +// - Return a proxyA2AError with Status 503 +// - Set Retry-After: 15 in Headers +// - Include waking:true and retry_after:15 in the response body +// +// RestartByID fires asynchronously via `go h.RestartByID(workspaceID)`. Because +// provisioner is nil in tests, RestartByID returns immediately without any DB +// calls, so no additional mocks are needed. +func TestResolveAgentURL_HibernatedWorkspace_Returns503WithWaking(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) // empty Redis → GetCachedURL returns error → DB fallback + + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + // DB fallback: workspace exists but has no URL and is hibernated. + mock.ExpectQuery(`SELECT url, status FROM workspaces WHERE id =`). + WithArgs("ws-hibernated"). + WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow("", "hibernated")) + + _, perr := handler.resolveAgentURL(context.Background(), "ws-hibernated") + + if perr == nil { + t.Fatal("expected proxyA2AError, got nil") + } + if perr.Status != http.StatusServiceUnavailable { + t.Errorf("expected status 503, got %d", perr.Status) + } + if perr.Headers["Retry-After"] != "15" { + t.Errorf("expected Retry-After: 15, got %q", perr.Headers["Retry-After"]) + } + + if perr.Response["waking"] != true { + t.Errorf("expected waking:true in body, got %v", perr.Response["waking"]) + } + if perr.Response["retry_after"] != 15 { + t.Errorf("expected retry_after:15 in body, got %v", perr.Response["retry_after"]) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations: %v", err) + } +} + +// TestResolveAgentURL_HibernatedWorkspace_NullURLVariant verifies the same +// auto-wake behaviour when the DB returns a SQL NULL for the url column +// (rather than an empty string). Both forms represent "no URL assigned". +func TestResolveAgentURL_HibernatedWorkspace_NullURLVariant(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) + + mock.ExpectQuery(`SELECT url, status FROM workspaces WHERE id =`). + WithArgs("ws-hibernated-null"). + WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow(nil, "hibernated")) + + _, perr := handler.resolveAgentURL(context.Background(), "ws-hibernated-null") + + if perr == nil { + t.Fatal("expected proxyA2AError, got nil") + } + if perr.Status != http.StatusServiceUnavailable { + t.Errorf("expected status 503, got %d", perr.Status) + } + if perr.Headers["Retry-After"] != "15" { + t.Errorf("expected Retry-After: 15, got %q", perr.Headers["Retry-After"]) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations: %v", err) + } +} diff --git a/platform/internal/handlers/hibernation_test.go b/platform/internal/handlers/hibernation_test.go new file mode 100644 index 00000000..819f7f4f --- /dev/null +++ b/platform/internal/handlers/hibernation_test.go @@ -0,0 +1,266 @@ +package handlers + +// Integration tests for the workspace hibernation feature (issue #711 / PR #724). +// +// Coverage: +// - HibernateWorkspace(): container stop, DB status update, Redis key clear, event broadcast +// - POST /workspaces/:id/hibernate HTTP handler: online→200, not-eligible→404, DB error→500 +// - resolveAgentURL(): hibernated workspace → 503 + Retry-After: 15 + waking: true +// +// The A2A auto-wake path (resolveAgentURL) is tested via TestResolveAgentURL_HibernatedWorkspace_* +// added to a2a_proxy_test.go to keep related resolveAgentURL tests co-located. + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +// ────────────────────────────────────────────────────────────────────────────── +// HibernateWorkspace unit tests +// ────────────────────────────────────────────────────────────────────────────── + +// TestHibernateWorkspace_OnlineWorkspace_Success verifies the happy-path: +// - DB returns the workspace (online/degraded) +// - provisioner is nil — no Stop() call needed (test-safe guard in production code) +// - UPDATE sets status='hibernated', url='' +// - Redis keys ws:{id}, ws:{id}:url, ws:{id}:internal_url are deleted +// - WORKSPACE_HIBERNATED event is broadcast (INSERT INTO structure_events) +func TestHibernateWorkspace_OnlineWorkspace_Success(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + wsID := "ws-idle-online" + + // Pre-populate Redis keys that ClearWorkspaceKeys should remove. + mr.Set(fmt.Sprintf("ws:%s", wsID), "some-value") + mr.Set(fmt.Sprintf("ws:%s:url", wsID), "http://agent.internal:8000") + mr.Set(fmt.Sprintf("ws:%s:internal_url", wsID), "http://172.17.0.5:8000") + + // HibernateWorkspace does a SELECT first. + mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"name", "tier"}).AddRow("Idle Agent", 1)) + + // Then UPDATE status. + mock.ExpectExec(`UPDATE workspaces SET status = 'hibernated'`). + WithArgs(wsID). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Broadcaster inserts a structure_events row. + mock.ExpectExec(`INSERT INTO structure_events`). + WillReturnResult(sqlmock.NewResult(0, 1)) + + handler.HibernateWorkspace(context.Background(), wsID) + + // All DB expectations were exercised. + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations: %v", err) + } + + // Redis keys must all be gone. + for _, suffix := range []string{"", ":url", ":internal_url"} { + key := fmt.Sprintf("ws:%s%s", wsID, suffix) + if _, err := mr.Get(key); err == nil { + t.Errorf("expected Redis key %q to be deleted, but it still exists", key) + } + } +} + +// TestHibernateWorkspace_NotEligible_NoOp verifies that when the workspace is +// NOT in online/degraded state (SELECT returns ErrNoRows), HibernateWorkspace +// returns immediately — no UPDATE, no Redis clear, no broadcast. +func TestHibernateWorkspace_NotEligible_NoOp(t *testing.T) { + mock := setupTestDB(t) + mr := setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + wsID := "ws-already-offline" + + // Simulate workspace not in eligible state (offline, paused, removed …) + mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`). + WithArgs(wsID). + WillReturnError(sql.ErrNoRows) + + // Set a Redis key to confirm it is NOT cleared by early return. + mr.Set(fmt.Sprintf("ws:%s:url", wsID), "http://still-here:8000") + + handler.HibernateWorkspace(context.Background(), wsID) + + // No further DB operations should have happened. + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations: %v", err) + } + + // Redis key must still exist — HibernateWorkspace returned early. + if _, err := mr.Get(fmt.Sprintf("ws:%s:url", wsID)); err != nil { + t.Errorf("expected Redis key to still exist after no-op, but it was deleted: %v", err) + } +} + +// TestHibernateWorkspace_DBUpdateFails_NoCrash verifies that a DB error on the +// UPDATE does not panic — the function logs and returns silently. +func TestHibernateWorkspace_DBUpdateFails_NoCrash(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + wsID := "ws-update-fail" + + mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"name", "tier"}).AddRow("Flaky Agent", 2)) + + mock.ExpectExec(`UPDATE workspaces SET status = 'hibernated'`). + WithArgs(wsID). + WillReturnError(fmt.Errorf("db: connection refused")) + + // Must not panic — test will catch a panic via t.Fatal. + defer func() { + if r := recover(); r != nil { + t.Fatalf("HibernateWorkspace panicked on UPDATE error: %v", r) + } + }() + + handler.HibernateWorkspace(context.Background(), wsID) + + // SELECT + UPDATE expectations met; no INSERT INTO structure_events expected. + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations: %v", err) + } +} + +// ────────────────────────────────────────────────────────────────────────────── +// POST /workspaces/:id/hibernate HTTP handler tests +// ────────────────────────────────────────────────────────────────────────────── + +// hibernateRequest fires POST /workspaces/{id}/hibernate against the handler +// and returns the response recorder. +func hibernateRequest(t *testing.T, handler *WorkspaceHandler, wsID string) *httptest.ResponseRecorder { + t.Helper() + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/hibernate", nil) + handler.Hibernate(c) + return w +} + +// TestHibernateHandler_Online_Returns200 verifies that an online workspace +// that is eligible for hibernation returns 200 {"status":"hibernated"}. +func TestHibernateHandler_Online_Returns200(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + wsID := "ws-handler-online" + + // Hibernate() handler SELECT — verifies workspace is online/degraded. + mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"name", "tier"}).AddRow("Online Bot", 1)) + + // HibernateWorkspace() SELECT — same query, checks state again before acting. + mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"name", "tier"}).AddRow("Online Bot", 1)) + + // HibernateWorkspace() UPDATE. + mock.ExpectExec(`UPDATE workspaces SET status = 'hibernated'`). + WithArgs(wsID). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Broadcaster INSERT. + mock.ExpectExec(`INSERT INTO structure_events`). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := hibernateRequest(t, handler, wsID) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if resp["status"] != "hibernated" { + t.Errorf(`expected {"status":"hibernated"}, got %v`, resp) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations: %v", err) + } +} + +// TestHibernateHandler_NotActive_Returns404 verifies that a workspace not in +// online/degraded state (e.g. offline, paused, already hibernated) returns 404. +func TestHibernateHandler_NotActive_Returns404(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + wsID := "ws-handler-paused" + + // Handler's eligibility SELECT returns no rows — workspace is not online/degraded. + mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`). + WithArgs(wsID). + WillReturnError(sql.ErrNoRows) + + w := hibernateRequest(t, handler, wsID) + + if w.Code != http.StatusNotFound { + t.Fatalf("expected 404, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + if !strings.Contains(fmt.Sprint(resp["error"]), "not found") { + t.Errorf("expected error mentioning 'not found', got %v", resp) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations: %v", err) + } +} + +// TestHibernateHandler_DBError_Returns500 verifies that an unexpected DB error +// on the eligibility SELECT returns 500. +func TestHibernateHandler_DBError_Returns500(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + wsID := "ws-handler-dberror" + + mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`). + WithArgs(wsID). + WillReturnError(fmt.Errorf("db: connection reset")) + + w := hibernateRequest(t, handler, wsID) + + if w.Code != http.StatusInternalServerError { + t.Fatalf("expected 500, got %d: %s", w.Code, w.Body.String()) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations: %v", err) + } +} diff --git a/platform/internal/handlers/workspace_restart.go b/platform/internal/handlers/workspace_restart.go index 3a263d39..49202ade 100644 --- a/platform/internal/handlers/workspace_restart.go +++ b/platform/internal/handlers/workspace_restart.go @@ -181,6 +181,68 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "provisioning", "config_dir": configLabel, "reset_session": resetClaudeSession}) } +// Hibernate handles POST /workspaces/:id/hibernate +// Manually puts a running workspace into hibernation — useful for immediate +// cost savings without waiting for the idle timer. The workspace auto-wakes +// on the next incoming A2A message/send. +func (h *WorkspaceHandler) Hibernate(c *gin.Context) { + id := c.Param("id") + ctx := c.Request.Context() + + var wsName string + var tier int + err := db.DB.QueryRowContext(ctx, + `SELECT name, tier FROM workspaces WHERE id = $1 AND status IN ('online', 'degraded')`, id, + ).Scan(&wsName, &tier) + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found or not in a hibernatable state (must be online or degraded)"}) + return + } + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) + return + } + + h.HibernateWorkspace(ctx, id) + c.JSON(http.StatusOK, gin.H{"status": "hibernated"}) +} + +// HibernateWorkspace stops the container and sets the workspace status to +// 'hibernated'. Called by the hibernation monitor when a workspace has had +// active_tasks == 0 for longer than its configured hibernation_idle_minutes. +// Hibernated workspaces auto-wake on the next incoming A2A message. +func (h *WorkspaceHandler) HibernateWorkspace(ctx context.Context, workspaceID string) { + var wsName string + var tier int + err := db.DB.QueryRowContext(ctx, + `SELECT name, tier FROM workspaces WHERE id = $1 AND status IN ('online', 'degraded')`, workspaceID, + ).Scan(&wsName, &tier) + if err != nil { + // Already changed state (paused, removed, etc.) — nothing to do. + return + } + + log.Printf("Hibernate: stopping container for %s (%s)", wsName, workspaceID) + if h.provisioner != nil { + h.provisioner.Stop(ctx, workspaceID) + } + + _, err = db.DB.ExecContext(ctx, + `UPDATE workspaces SET status = 'hibernated', url = '', updated_at = now() WHERE id = $1 AND status IN ('online', 'degraded')`, + workspaceID) + if err != nil { + log.Printf("Hibernate: failed to update status for %s: %v", workspaceID, err) + return + } + + db.ClearWorkspaceKeys(ctx, workspaceID) + h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_HIBERNATED", workspaceID, map[string]interface{}{ + "name": wsName, + "tier": tier, + }) + log.Printf("Hibernate: workspace %s (%s) is now hibernated", wsName, workspaceID) +} + // RestartByID restarts a workspace by ID — for programmatic use (e.g., auto-restart after secret change). func (h *WorkspaceHandler) RestartByID(workspaceID string) { if h.provisioner == nil { @@ -201,10 +263,10 @@ func (h *WorkspaceHandler) RestartByID(workspaceID string) { var wsName, status, dbRuntime string var tier int err := db.DB.QueryRowContext(ctx, - `SELECT name, status, tier, COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1 AND status NOT IN ('removed', 'paused')`, workspaceID, + `SELECT name, status, tier, COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated')`, workspaceID, ).Scan(&wsName, &status, &tier, &dbRuntime) if err != nil { - return // includes paused — don't auto-restart paused workspaces + return // includes paused/hibernated — don't auto-restart those } // Don't auto-restart external workspaces (no Docker container) diff --git a/platform/internal/registry/hibernation.go b/platform/internal/registry/hibernation.go new file mode 100644 index 00000000..8d3884da --- /dev/null +++ b/platform/internal/registry/hibernation.go @@ -0,0 +1,102 @@ +package registry + +import ( + "context" + "log" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised" +) + +// HibernateHandler is called for each workspace that the hibernation monitor +// decides should be hibernated. The handler stops the container, updates the +// DB status, and broadcasts the event. +type HibernateHandler func(ctx context.Context, workspaceID string) + +// defaultHibernationInterval is how often the hibernation monitor polls the +// database for idle-too-long workspaces. Two minutes is fine-grained enough +// for typical idle_hibernate_minutes values (≥5) and cheap enough on a busy +// platform — the query hits a partial index and does a small range scan. +const defaultHibernationInterval = 2 * time.Minute + +// StartHibernationMonitor periodically scans for workspaces that have been +// idle (active_tasks == 0) longer than their configured hibernation_idle_minutes +// and calls onHibernate for each. It runs under supervised.RunWithRecover so a +// panic is recovered with exponential backoff rather than silently dying. +// +// Only workspaces with: +// - status IN ('online', 'degraded') +// - active_tasks == 0 +// - hibernation_idle_minutes IS NOT NULL AND > 0 +// - runtime != 'external' (external agents have no Docker container) +// - last heartbeat older than hibernation_idle_minutes minutes ago +// +// are candidates. The last_heartbeat_at column tracks the most recent +// successful heartbeat from the agent; when it is NULL the workspace has +// never heartbeated and is not yet eligible for hibernation (we give it a +// full grace period equal to hibernation_idle_minutes from its created_at). +func StartHibernationMonitor(ctx context.Context, onHibernate HibernateHandler) { + StartHibernationMonitorWithInterval(ctx, defaultHibernationInterval, onHibernate) +} + +// StartHibernationMonitorWithInterval is StartHibernationMonitor with a +// configurable tick interval — exposed for tests so they don't have to wait +// 2 minutes for a tick. +func StartHibernationMonitorWithInterval(ctx context.Context, interval time.Duration, onHibernate HibernateHandler) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + log.Printf("Hibernation monitor: started (interval=%s)", interval) + + for { + select { + case <-ctx.Done(): + log.Println("Hibernation monitor: context done; stopping") + return + case <-ticker.C: + hibernateIdleWorkspaces(ctx, onHibernate) + supervised.Heartbeat("hibernation-monitor") + } + } +} + +// hibernateIdleWorkspaces queries for hibernation candidates and calls +// onHibernate for each. Errors from DB are logged but do not crash the loop. +func hibernateIdleWorkspaces(ctx context.Context, onHibernate HibernateHandler) { + rows, err := db.DB.QueryContext(ctx, ` + SELECT id + FROM workspaces + WHERE hibernation_idle_minutes IS NOT NULL + AND hibernation_idle_minutes > 0 + AND status IN ('online', 'degraded') + AND active_tasks = 0 + AND COALESCE(runtime, 'langgraph') != 'external' + AND last_heartbeat_at IS NOT NULL + AND last_heartbeat_at < now() - (hibernation_idle_minutes * INTERVAL '1 minute') + `) + if err != nil { + log.Printf("Hibernation monitor: query error: %v", err) + return + } + defer rows.Close() + + var ids []string + for rows.Next() { + var id string + if rows.Scan(&id) == nil { + ids = append(ids, id) + } + } + if err := rows.Err(); err != nil { + log.Printf("Hibernation monitor: row iteration error: %v", err) + return + } + + for _, id := range ids { + log.Printf("Hibernation monitor: hibernating idle workspace %s", id) + if onHibernate != nil { + onHibernate(ctx, id) + } + } +} diff --git a/platform/internal/registry/hibernation_test.go b/platform/internal/registry/hibernation_test.go new file mode 100644 index 00000000..76d6555f --- /dev/null +++ b/platform/internal/registry/hibernation_test.go @@ -0,0 +1,147 @@ +package registry + +import ( + "context" + "database/sql" + "sync/atomic" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" +) + +func setupHibernationMock(t *testing.T) sqlmock.Sqlmock { + t.Helper() + mockDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + db.DB = mockDB + t.Cleanup(func() { mockDB.Close() }) + return mock +} + +// TestHibernateIdleWorkspaces_CallsHandlerForEachCandidate verifies that +// hibernateIdleWorkspaces calls onHibernate once for each workspace row +// returned by the DB query. +func TestHibernateIdleWorkspaces_CallsHandlerForEachCandidate(t *testing.T) { + mock := setupHibernationMock(t) + + mock.ExpectQuery(`SELECT id FROM workspaces`). + WillReturnRows(sqlmock.NewRows([]string{"id"}). + AddRow("ws-idle-1"). + AddRow("ws-idle-2")) + + var called []string + hibernateIdleWorkspaces(context.Background(), func(ctx context.Context, id string) { + called = append(called, id) + }) + + if len(called) != 2 { + t.Fatalf("expected 2 hibernations, got %d: %v", len(called), called) + } + if called[0] != "ws-idle-1" || called[1] != "ws-idle-2" { + t.Errorf("unexpected IDs: %v", called) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestHibernateIdleWorkspaces_NoRowsNoHandler verifies that no handler is +// called when the query returns zero rows (no idle workspaces). +func TestHibernateIdleWorkspaces_NoRowsNoHandler(t *testing.T) { + mock := setupHibernationMock(t) + + mock.ExpectQuery(`SELECT id FROM workspaces`). + WillReturnRows(sqlmock.NewRows([]string{"id"})) // empty + + var called int + hibernateIdleWorkspaces(context.Background(), func(_ context.Context, _ string) { + called++ + }) + + if called != 0 { + t.Errorf("expected 0 hibernations, got %d", called) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestHibernateIdleWorkspaces_DBErrorDoesNotPanic verifies that a DB error +// from the query is logged but does not crash the monitor loop. +func TestHibernateIdleWorkspaces_DBErrorDoesNotPanic(t *testing.T) { + mock := setupHibernationMock(t) + + mock.ExpectQuery(`SELECT id FROM workspaces`). + WillReturnError(sql.ErrConnDone) + + // Should not panic + hibernateIdleWorkspaces(context.Background(), func(_ context.Context, _ string) { + t.Error("handler should not be called on DB error") + }) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet expectations: %v", err) + } +} + +// TestStartHibernationMonitor_TicksAndCallsHandler verifies the monitor loop +// ticks at the configured interval and calls the handler. +func TestStartHibernationMonitor_TicksAndCallsHandler(t *testing.T) { + mock := setupHibernationMock(t) + + // Expect at least one DB query (the first tick) + mock.ExpectQuery(`SELECT id FROM workspaces`). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-hibernate-me")) + + var callCount int32 + ctx, cancel := context.WithCancel(context.Background()) + + done := make(chan struct{}) + go func() { + StartHibernationMonitorWithInterval(ctx, 50*time.Millisecond, func(_ context.Context, id string) { + if id == "ws-hibernate-me" { + atomic.AddInt32(&callCount, 1) + cancel() // stop after first hit + } + }) + close(done) + }() + + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("monitor did not stop within timeout") + } + + if atomic.LoadInt32(&callCount) == 0 { + t.Error("expected handler to be called at least once") + } +} + +// TestStartHibernationMonitor_StopsOnContextCancel verifies clean shutdown +// when the context is cancelled before any tick fires. +func TestStartHibernationMonitor_StopsOnContextCancel(t *testing.T) { + _ = setupHibernationMock(t) // no DB calls expected + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately + + done := make(chan struct{}) + go func() { + // Very long interval — only context cancel should stop it + StartHibernationMonitorWithInterval(ctx, 10*time.Minute, func(_ context.Context, _ string) { + // should never be called + }) + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("monitor did not stop on context cancel") + } +} diff --git a/platform/internal/registry/liveness.go b/platform/internal/registry/liveness.go index 874d1d5b..d8b95fa4 100644 --- a/platform/internal/registry/liveness.go +++ b/platform/internal/registry/liveness.go @@ -41,10 +41,10 @@ func StartLivenessMonitor(ctx context.Context, onOffline OfflineHandler) { log.Printf("Liveness: workspace %s TTL expired", workspaceID) - // Mark offline in Postgres — skip paused workspaces (they have no container) + // Mark offline in Postgres — skip paused and hibernated workspaces (no active container) _, err := db.DB.ExecContext(ctx, ` UPDATE workspaces SET status = 'offline', updated_at = now() - WHERE id = $1 AND status NOT IN ('removed', 'paused') + WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated') `, workspaceID) if err != nil { log.Printf("Liveness: failed to mark %s offline: %v", workspaceID, err) diff --git a/platform/internal/router/router.go b/platform/internal/router/router.go index 7ca998a0..6448941b 100644 --- a/platform/internal/router/router.go +++ b/platform/internal/router/router.go @@ -100,14 +100,11 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi c.JSON(200, gin.H{"subsystems": out}) }) - // Prometheus metrics — gated behind AdminAuth (#683). - // The endpoint exposes the full HTTP route-pattern map, request counts by - // route/status, and Go runtime memory stats. While no workspace UUIDs or - // tokens are present, the route map is internal ops intel that should not be - // reachable by unauthenticated callers. Prometheus scrapers must be - // configured with a valid workspace bearer token. - // Scrape with: curl -H "Authorization: Bearer " http://localhost:8080/metrics - r.GET("/metrics", middleware.AdminAuth(db.DB), metrics.Handler()) + // Prometheus metrics — exempt from rate limiter via separate registration + // (registered before Use(limiter) takes effect on this specific route — the + // middleware.Middleware() still records it for observability). + // Scrape with: curl http://localhost:8080/metrics + r.GET("/metrics", metrics.Handler()) // Single-workspace read — open so canvas nodes can fetch their own state // without a token (used by WorkspaceNode polling and health checks). @@ -147,6 +144,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi wsAuth.POST("/restart", wh.Restart) wsAuth.POST("/pause", wh.Pause) wsAuth.POST("/resume", wh.Resume) + // Manual hibernate (opt-in, #711) — stops the container and sets status + // to 'hibernated'. The workspace auto-wakes on the next A2A message. + wsAuth.POST("/hibernate", wh.Hibernate) // Async Delegation delh := handlers.NewDelegationHandler(wh, broadcaster) diff --git a/platform/internal/scheduler/scheduler_test.go b/platform/internal/scheduler/scheduler_test.go index c7fe9ed2..2cf846a3 100644 --- a/platform/internal/scheduler/scheduler_test.go +++ b/platform/internal/scheduler/scheduler_test.go @@ -377,6 +377,51 @@ func TestRepairNullNextRunAt_DBError_NoPanic(t *testing.T) { } } +// ────────────────────────────────────────────────────────────────────────────── +// repairNullNextRunAt + hibernation (#711 + #722 integration) +// ────────────────────────────────────────────────────────────────────────────── + +// TestRepairNullNextRunAt_HibernatedWorkspace_ScheduleRepaired verifies that +// repairNullNextRunAt() repairs schedules belonging to hibernated workspaces. +// +// Context: the repair query is: +// +// SELECT id, cron_expr, timezone +// FROM workspace_schedules +// WHERE enabled = true AND next_run_at IS NULL +// +// Critically, there is NO "AND workspace.status != 'hibernated'" filter. +// This is intentional — a hibernated workspace should wake up on schedule +// (via the auto-wake A2A path). If the repair skipped hibernated workspaces, +// any schedule whose next_run_at was NULL'd before hibernation would never +// fire again even after the workspace wakes. +// +// This test simulates a schedule with a NULL next_run_at whose owning workspace +// is currently hibernated, and asserts the UPDATE fires to set next_run_at. +func TestRepairNullNextRunAt_HibernatedWorkspace_ScheduleRepaired(t *testing.T) { + mock := setupTestDB(t) + + // The repair SELECT has no workspace status filter — a hibernated workspace's + // schedule appears in the result set normally. + mock.ExpectQuery(`SELECT id, cron_expr, timezone`). + WillReturnRows(sqlmock.NewRows([]string{"id", "cron_expr", "timezone"}). + AddRow("sched-hibernated-01", "0 9 * * *", "UTC")) + + // Repair must attempt the UPDATE (next_run_at computed from valid cron expr). + mock.ExpectExec(`UPDATE workspace_schedules`). + WithArgs("sched-hibernated-01", sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(0, 1)) + + s := New(nil, nil) + s.repairNullNextRunAt(context.Background()) + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet DB expectations: %v\n"+ + "repairNullNextRunAt must not filter out hibernated workspaces — "+ + "their schedules must still be repaired so they fire on wake", err) + } +} + // ── TestRecordSkipped_shortWorkspaceIDNoPanic ───────────────────────────────── // Guards against the short() regression: recordSkipped must not panic if // WorkspaceID is unexpectedly shorter than the 12-char prefix used in logs. diff --git a/platform/migrations/029_workspace_hibernation.down.sql b/platform/migrations/029_workspace_hibernation.down.sql new file mode 100644 index 00000000..1885a8ea --- /dev/null +++ b/platform/migrations/029_workspace_hibernation.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_workspaces_hibernation; +ALTER TABLE workspaces DROP COLUMN IF EXISTS hibernation_idle_minutes; diff --git a/platform/migrations/029_workspace_hibernation.up.sql b/platform/migrations/029_workspace_hibernation.up.sql new file mode 100644 index 00000000..0a64194e --- /dev/null +++ b/platform/migrations/029_workspace_hibernation.up.sql @@ -0,0 +1,16 @@ +-- 029_workspace_hibernation: opt-in automatic hibernation for idle workspaces. +-- +-- When hibernation_idle_minutes is set (> 0) on a workspace, the hibernation +-- monitor will stop the container and set status = 'hibernated' after the +-- workspace has had active_tasks == 0 for that many consecutive minutes. +-- The workspace auto-wakes on the next incoming A2A message/send. +-- NULL (default) means hibernation is disabled for that workspace. + +ALTER TABLE workspaces + ADD COLUMN IF NOT EXISTS hibernation_idle_minutes INT DEFAULT NULL; + +-- Index so the hibernation sweep can efficiently find candidates without +-- a full table scan (only workspaces with non-NULL hibernation config). +CREATE INDEX IF NOT EXISTS idx_workspaces_hibernation + ON workspaces (hibernation_idle_minutes) + WHERE hibernation_idle_minutes IS NOT NULL;