Merge pull request #724 from Molecule-AI/feat/issue-711-workspace-hibernation
feat(registry): workspace hibernation — auto-pause idle workspaces
This commit is contained in:
commit
8b59a1cb9a
@ -185,6 +185,13 @@ func main() {
|
||||
cronSched := scheduler.New(wh, broadcaster)
|
||||
go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start)
|
||||
|
||||
// Hibernation Monitor — auto-pauses idle workspaces that have
|
||||
// hibernation_idle_minutes configured (#711). Wakeup is triggered
|
||||
// automatically on the next incoming A2A message.
|
||||
go supervised.RunWithRecover(ctx, "hibernation-monitor", func(c context.Context) {
|
||||
registry.StartHibernationMonitor(c, wh.HibernateWorkspace)
|
||||
})
|
||||
|
||||
// Channel Manager — social channel integrations (Telegram, Slack, etc.)
|
||||
channelMgr := channels.NewManager(wh, broadcaster)
|
||||
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)
|
||||
|
||||
@ -288,16 +288,16 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// Read agent response (capped at 10MB)
|
||||
// Read agent response (capped at 10MB).
|
||||
// #689: Do() succeeded, which means the target received the request and sent
|
||||
// back response headers — delivery is confirmed. The body couldn't be
|
||||
// fully read (connection drop, timeout mid-stream). Surface
|
||||
// delivery_confirmed so callers can distinguish "not delivered" from
|
||||
// "delivered, but response body lost". When delivery is confirmed,
|
||||
// log the activity as successful (delivery happened) rather than leaving
|
||||
// a false "failed" entry in the audit trail.
|
||||
respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
|
||||
if readErr != nil {
|
||||
// Do() succeeded, which means the target received the request and sent
|
||||
// back response headers — delivery is confirmed. The body couldn't be
|
||||
// fully read (connection drop, timeout mid-stream). Surface
|
||||
// delivery_confirmed so callers can distinguish "not delivered" from
|
||||
// "delivered, but response body lost" (#689). When delivery is confirmed,
|
||||
// log the activity as successful (delivery happened) rather than leaving
|
||||
// a false "failed" entry in the audit trail.
|
||||
deliveryConfirmed := resp.StatusCode >= 200 && resp.StatusCode < 400
|
||||
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
|
||||
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
|
||||
@ -352,6 +352,22 @@ func (h *WorkspaceHandler) resolveAgentURL(ctx context.Context, workspaceID stri
|
||||
}
|
||||
}
|
||||
if !urlNullable.Valid || urlNullable.String == "" {
|
||||
// Auto-wake hibernated workspace on incoming A2A message (#711).
|
||||
// Re-provision asynchronously and return 503 with a retry hint so
|
||||
// the caller can retry once the workspace is back online (~10s).
|
||||
if status == "hibernated" {
|
||||
log.Printf("ProxyA2A: waking hibernated workspace %s", workspaceID)
|
||||
go h.RestartByID(workspaceID)
|
||||
return "", &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Headers: map[string]string{"Retry-After": "15"},
|
||||
Response: gin.H{
|
||||
"error": "workspace is waking from hibernation — retry in ~15 seconds",
|
||||
"waking": true,
|
||||
"retry_after": 15,
|
||||
},
|
||||
}
|
||||
}
|
||||
return "", &proxyA2AError{
|
||||
Status: http.StatusServiceUnavailable,
|
||||
Response: gin.H{"error": "workspace has no URL", "status": status},
|
||||
|
||||
@ -1282,3 +1282,81 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
|
||||
handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10)
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// A2A auto-wake: hibernated workspace (#711)
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// TestResolveAgentURL_HibernatedWorkspace_Returns503WithWaking verifies the
|
||||
// auto-wake path added in PR #724: when resolveAgentURL finds a workspace with
|
||||
// status='hibernated' and no URL, it must:
|
||||
// - Return a proxyA2AError with Status 503
|
||||
// - Set Retry-After: 15 in Headers
|
||||
// - Include waking:true and retry_after:15 in the response body
|
||||
//
|
||||
// RestartByID fires asynchronously via `go h.RestartByID(workspaceID)`. Because
|
||||
// provisioner is nil in tests, RestartByID returns immediately without any DB
|
||||
// calls, so no additional mocks are needed.
|
||||
func TestResolveAgentURL_HibernatedWorkspace_Returns503WithWaking(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t) // empty Redis → GetCachedURL returns error → DB fallback
|
||||
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
// DB fallback: workspace exists but has no URL and is hibernated.
|
||||
mock.ExpectQuery(`SELECT url, status FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-hibernated").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow("", "hibernated"))
|
||||
|
||||
_, perr := handler.resolveAgentURL(context.Background(), "ws-hibernated")
|
||||
|
||||
if perr == nil {
|
||||
t.Fatal("expected proxyA2AError, got nil")
|
||||
}
|
||||
if perr.Status != http.StatusServiceUnavailable {
|
||||
t.Errorf("expected status 503, got %d", perr.Status)
|
||||
}
|
||||
if perr.Headers["Retry-After"] != "15" {
|
||||
t.Errorf("expected Retry-After: 15, got %q", perr.Headers["Retry-After"])
|
||||
}
|
||||
|
||||
if perr.Response["waking"] != true {
|
||||
t.Errorf("expected waking:true in body, got %v", perr.Response["waking"])
|
||||
}
|
||||
if perr.Response["retry_after"] != 15 {
|
||||
t.Errorf("expected retry_after:15 in body, got %v", perr.Response["retry_after"])
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveAgentURL_HibernatedWorkspace_NullURLVariant verifies the same
|
||||
// auto-wake behaviour when the DB returns a SQL NULL for the url column
|
||||
// (rather than an empty string). Both forms represent "no URL assigned".
|
||||
func TestResolveAgentURL_HibernatedWorkspace_NullURLVariant(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectQuery(`SELECT url, status FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-hibernated-null").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow(nil, "hibernated"))
|
||||
|
||||
_, perr := handler.resolveAgentURL(context.Background(), "ws-hibernated-null")
|
||||
|
||||
if perr == nil {
|
||||
t.Fatal("expected proxyA2AError, got nil")
|
||||
}
|
||||
if perr.Status != http.StatusServiceUnavailable {
|
||||
t.Errorf("expected status 503, got %d", perr.Status)
|
||||
}
|
||||
if perr.Headers["Retry-After"] != "15" {
|
||||
t.Errorf("expected Retry-After: 15, got %q", perr.Headers["Retry-After"])
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
266
platform/internal/handlers/hibernation_test.go
Normal file
266
platform/internal/handlers/hibernation_test.go
Normal file
@ -0,0 +1,266 @@
|
||||
package handlers
|
||||
|
||||
// Integration tests for the workspace hibernation feature (issue #711 / PR #724).
|
||||
//
|
||||
// Coverage:
|
||||
// - HibernateWorkspace(): container stop, DB status update, Redis key clear, event broadcast
|
||||
// - POST /workspaces/:id/hibernate HTTP handler: online→200, not-eligible→404, DB error→500
|
||||
// - resolveAgentURL(): hibernated workspace → 503 + Retry-After: 15 + waking: true
|
||||
//
|
||||
// The A2A auto-wake path (resolveAgentURL) is tested via TestResolveAgentURL_HibernatedWorkspace_*
|
||||
// added to a2a_proxy_test.go to keep related resolveAgentURL tests co-located.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
sqlmock "github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// HibernateWorkspace unit tests
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// TestHibernateWorkspace_OnlineWorkspace_Success verifies the happy-path:
|
||||
// - DB returns the workspace (online/degraded)
|
||||
// - provisioner is nil — no Stop() call needed (test-safe guard in production code)
|
||||
// - UPDATE sets status='hibernated', url=''
|
||||
// - Redis keys ws:{id}, ws:{id}:url, ws:{id}:internal_url are deleted
|
||||
// - WORKSPACE_HIBERNATED event is broadcast (INSERT INTO structure_events)
|
||||
func TestHibernateWorkspace_OnlineWorkspace_Success(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
wsID := "ws-idle-online"
|
||||
|
||||
// Pre-populate Redis keys that ClearWorkspaceKeys should remove.
|
||||
mr.Set(fmt.Sprintf("ws:%s", wsID), "some-value")
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", wsID), "http://agent.internal:8000")
|
||||
mr.Set(fmt.Sprintf("ws:%s:internal_url", wsID), "http://172.17.0.5:8000")
|
||||
|
||||
// HibernateWorkspace does a SELECT first.
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "tier"}).AddRow("Idle Agent", 1))
|
||||
|
||||
// Then UPDATE status.
|
||||
mock.ExpectExec(`UPDATE workspaces SET status = 'hibernated'`).
|
||||
WithArgs(wsID).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Broadcaster inserts a structure_events row.
|
||||
mock.ExpectExec(`INSERT INTO structure_events`).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.HibernateWorkspace(context.Background(), wsID)
|
||||
|
||||
// All DB expectations were exercised.
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
|
||||
// Redis keys must all be gone.
|
||||
for _, suffix := range []string{"", ":url", ":internal_url"} {
|
||||
key := fmt.Sprintf("ws:%s%s", wsID, suffix)
|
||||
if _, err := mr.Get(key); err == nil {
|
||||
t.Errorf("expected Redis key %q to be deleted, but it still exists", key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestHibernateWorkspace_NotEligible_NoOp verifies that when the workspace is
|
||||
// NOT in online/degraded state (SELECT returns ErrNoRows), HibernateWorkspace
|
||||
// returns immediately — no UPDATE, no Redis clear, no broadcast.
|
||||
func TestHibernateWorkspace_NotEligible_NoOp(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
wsID := "ws-already-offline"
|
||||
|
||||
// Simulate workspace not in eligible state (offline, paused, removed …)
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
// Set a Redis key to confirm it is NOT cleared by early return.
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", wsID), "http://still-here:8000")
|
||||
|
||||
handler.HibernateWorkspace(context.Background(), wsID)
|
||||
|
||||
// No further DB operations should have happened.
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
|
||||
// Redis key must still exist — HibernateWorkspace returned early.
|
||||
if _, err := mr.Get(fmt.Sprintf("ws:%s:url", wsID)); err != nil {
|
||||
t.Errorf("expected Redis key to still exist after no-op, but it was deleted: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHibernateWorkspace_DBUpdateFails_NoCrash verifies that a DB error on the
|
||||
// UPDATE does not panic — the function logs and returns silently.
|
||||
func TestHibernateWorkspace_DBUpdateFails_NoCrash(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
wsID := "ws-update-fail"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "tier"}).AddRow("Flaky Agent", 2))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces SET status = 'hibernated'`).
|
||||
WithArgs(wsID).
|
||||
WillReturnError(fmt.Errorf("db: connection refused"))
|
||||
|
||||
// Must not panic — test will catch a panic via t.Fatal.
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Fatalf("HibernateWorkspace panicked on UPDATE error: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
handler.HibernateWorkspace(context.Background(), wsID)
|
||||
|
||||
// SELECT + UPDATE expectations met; no INSERT INTO structure_events expected.
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// POST /workspaces/:id/hibernate HTTP handler tests
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// hibernateRequest fires POST /workspaces/{id}/hibernate against the handler
|
||||
// and returns the response recorder.
|
||||
func hibernateRequest(t *testing.T, handler *WorkspaceHandler, wsID string) *httptest.ResponseRecorder {
|
||||
t.Helper()
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/hibernate", nil)
|
||||
handler.Hibernate(c)
|
||||
return w
|
||||
}
|
||||
|
||||
// TestHibernateHandler_Online_Returns200 verifies that an online workspace
|
||||
// that is eligible for hibernation returns 200 {"status":"hibernated"}.
|
||||
func TestHibernateHandler_Online_Returns200(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
wsID := "ws-handler-online"
|
||||
|
||||
// Hibernate() handler SELECT — verifies workspace is online/degraded.
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "tier"}).AddRow("Online Bot", 1))
|
||||
|
||||
// HibernateWorkspace() SELECT — same query, checks state again before acting.
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "tier"}).AddRow("Online Bot", 1))
|
||||
|
||||
// HibernateWorkspace() UPDATE.
|
||||
mock.ExpectExec(`UPDATE workspaces SET status = 'hibernated'`).
|
||||
WithArgs(wsID).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Broadcaster INSERT.
|
||||
mock.ExpectExec(`INSERT INTO structure_events`).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := hibernateRequest(t, handler, wsID)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to decode response: %v", err)
|
||||
}
|
||||
if resp["status"] != "hibernated" {
|
||||
t.Errorf(`expected {"status":"hibernated"}, got %v`, resp)
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHibernateHandler_NotActive_Returns404 verifies that a workspace not in
|
||||
// online/degraded state (e.g. offline, paused, already hibernated) returns 404.
|
||||
func TestHibernateHandler_NotActive_Returns404(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
wsID := "ws-handler-paused"
|
||||
|
||||
// Handler's eligibility SELECT returns no rows — workspace is not online/degraded.
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
w := hibernateRequest(t, handler, wsID)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Fatalf("expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to decode response: %v", err)
|
||||
}
|
||||
if !strings.Contains(fmt.Sprint(resp["error"]), "not found") {
|
||||
t.Errorf("expected error mentioning 'not found', got %v", resp)
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHibernateHandler_DBError_Returns500 verifies that an unexpected DB error
|
||||
// on the eligibility SELECT returns 500.
|
||||
func TestHibernateHandler_DBError_Returns500(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
wsID := "ws-handler-dberror"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnError(fmt.Errorf("db: connection reset"))
|
||||
|
||||
w := hibernateRequest(t, handler, wsID)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Fatalf("expected 500, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
@ -181,6 +181,68 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"status": "provisioning", "config_dir": configLabel, "reset_session": resetClaudeSession})
|
||||
}
|
||||
|
||||
// Hibernate handles POST /workspaces/:id/hibernate
|
||||
// Manually puts a running workspace into hibernation — useful for immediate
|
||||
// cost savings without waiting for the idle timer. The workspace auto-wakes
|
||||
// on the next incoming A2A message/send.
|
||||
func (h *WorkspaceHandler) Hibernate(c *gin.Context) {
|
||||
id := c.Param("id")
|
||||
ctx := c.Request.Context()
|
||||
|
||||
var wsName string
|
||||
var tier int
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT name, tier FROM workspaces WHERE id = $1 AND status IN ('online', 'degraded')`, id,
|
||||
).Scan(&wsName, &tier)
|
||||
if err == sql.ErrNoRows {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found or not in a hibernatable state (must be online or degraded)"})
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
||||
return
|
||||
}
|
||||
|
||||
h.HibernateWorkspace(ctx, id)
|
||||
c.JSON(http.StatusOK, gin.H{"status": "hibernated"})
|
||||
}
|
||||
|
||||
// HibernateWorkspace stops the container and sets the workspace status to
|
||||
// 'hibernated'. Called by the hibernation monitor when a workspace has had
|
||||
// active_tasks == 0 for longer than its configured hibernation_idle_minutes.
|
||||
// Hibernated workspaces auto-wake on the next incoming A2A message.
|
||||
func (h *WorkspaceHandler) HibernateWorkspace(ctx context.Context, workspaceID string) {
|
||||
var wsName string
|
||||
var tier int
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT name, tier FROM workspaces WHERE id = $1 AND status IN ('online', 'degraded')`, workspaceID,
|
||||
).Scan(&wsName, &tier)
|
||||
if err != nil {
|
||||
// Already changed state (paused, removed, etc.) — nothing to do.
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("Hibernate: stopping container for %s (%s)", wsName, workspaceID)
|
||||
if h.provisioner != nil {
|
||||
h.provisioner.Stop(ctx, workspaceID)
|
||||
}
|
||||
|
||||
_, err = db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET status = 'hibernated', url = '', updated_at = now() WHERE id = $1 AND status IN ('online', 'degraded')`,
|
||||
workspaceID)
|
||||
if err != nil {
|
||||
log.Printf("Hibernate: failed to update status for %s: %v", workspaceID, err)
|
||||
return
|
||||
}
|
||||
|
||||
db.ClearWorkspaceKeys(ctx, workspaceID)
|
||||
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_HIBERNATED", workspaceID, map[string]interface{}{
|
||||
"name": wsName,
|
||||
"tier": tier,
|
||||
})
|
||||
log.Printf("Hibernate: workspace %s (%s) is now hibernated", wsName, workspaceID)
|
||||
}
|
||||
|
||||
// RestartByID restarts a workspace by ID — for programmatic use (e.g., auto-restart after secret change).
|
||||
func (h *WorkspaceHandler) RestartByID(workspaceID string) {
|
||||
if h.provisioner == nil {
|
||||
@ -201,10 +263,10 @@ func (h *WorkspaceHandler) RestartByID(workspaceID string) {
|
||||
var wsName, status, dbRuntime string
|
||||
var tier int
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT name, status, tier, COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1 AND status NOT IN ('removed', 'paused')`, workspaceID,
|
||||
`SELECT name, status, tier, COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated')`, workspaceID,
|
||||
).Scan(&wsName, &status, &tier, &dbRuntime)
|
||||
if err != nil {
|
||||
return // includes paused — don't auto-restart paused workspaces
|
||||
return // includes paused/hibernated — don't auto-restart those
|
||||
}
|
||||
|
||||
// Don't auto-restart external workspaces (no Docker container)
|
||||
|
||||
102
platform/internal/registry/hibernation.go
Normal file
102
platform/internal/registry/hibernation.go
Normal file
@ -0,0 +1,102 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
|
||||
)
|
||||
|
||||
// HibernateHandler is called for each workspace that the hibernation monitor
|
||||
// decides should be hibernated. The handler stops the container, updates the
|
||||
// DB status, and broadcasts the event.
|
||||
type HibernateHandler func(ctx context.Context, workspaceID string)
|
||||
|
||||
// defaultHibernationInterval is how often the hibernation monitor polls the
|
||||
// database for idle-too-long workspaces. Two minutes is fine-grained enough
|
||||
// for typical idle_hibernate_minutes values (≥5) and cheap enough on a busy
|
||||
// platform — the query hits a partial index and does a small range scan.
|
||||
const defaultHibernationInterval = 2 * time.Minute
|
||||
|
||||
// StartHibernationMonitor periodically scans for workspaces that have been
|
||||
// idle (active_tasks == 0) longer than their configured hibernation_idle_minutes
|
||||
// and calls onHibernate for each. It runs under supervised.RunWithRecover so a
|
||||
// panic is recovered with exponential backoff rather than silently dying.
|
||||
//
|
||||
// Only workspaces with:
|
||||
// - status IN ('online', 'degraded')
|
||||
// - active_tasks == 0
|
||||
// - hibernation_idle_minutes IS NOT NULL AND > 0
|
||||
// - runtime != 'external' (external agents have no Docker container)
|
||||
// - last heartbeat older than hibernation_idle_minutes minutes ago
|
||||
//
|
||||
// are candidates. The last_heartbeat_at column tracks the most recent
|
||||
// successful heartbeat from the agent; when it is NULL the workspace has
|
||||
// never heartbeated and is not yet eligible for hibernation (we give it a
|
||||
// full grace period equal to hibernation_idle_minutes from its created_at).
|
||||
func StartHibernationMonitor(ctx context.Context, onHibernate HibernateHandler) {
|
||||
StartHibernationMonitorWithInterval(ctx, defaultHibernationInterval, onHibernate)
|
||||
}
|
||||
|
||||
// StartHibernationMonitorWithInterval is StartHibernationMonitor with a
|
||||
// configurable tick interval — exposed for tests so they don't have to wait
|
||||
// 2 minutes for a tick.
|
||||
func StartHibernationMonitorWithInterval(ctx context.Context, interval time.Duration, onHibernate HibernateHandler) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
log.Printf("Hibernation monitor: started (interval=%s)", interval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("Hibernation monitor: context done; stopping")
|
||||
return
|
||||
case <-ticker.C:
|
||||
hibernateIdleWorkspaces(ctx, onHibernate)
|
||||
supervised.Heartbeat("hibernation-monitor")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// hibernateIdleWorkspaces queries for hibernation candidates and calls
|
||||
// onHibernate for each. Errors from DB are logged but do not crash the loop.
|
||||
func hibernateIdleWorkspaces(ctx context.Context, onHibernate HibernateHandler) {
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id
|
||||
FROM workspaces
|
||||
WHERE hibernation_idle_minutes IS NOT NULL
|
||||
AND hibernation_idle_minutes > 0
|
||||
AND status IN ('online', 'degraded')
|
||||
AND active_tasks = 0
|
||||
AND COALESCE(runtime, 'langgraph') != 'external'
|
||||
AND last_heartbeat_at IS NOT NULL
|
||||
AND last_heartbeat_at < now() - (hibernation_idle_minutes * INTERVAL '1 minute')
|
||||
`)
|
||||
if err != nil {
|
||||
log.Printf("Hibernation monitor: query error: %v", err)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var ids []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if rows.Scan(&id) == nil {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("Hibernation monitor: row iteration error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, id := range ids {
|
||||
log.Printf("Hibernation monitor: hibernating idle workspace %s", id)
|
||||
if onHibernate != nil {
|
||||
onHibernate(ctx, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
147
platform/internal/registry/hibernation_test.go
Normal file
147
platform/internal/registry/hibernation_test.go
Normal file
@ -0,0 +1,147 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
func setupHibernationMock(t *testing.T) sqlmock.Sqlmock {
|
||||
t.Helper()
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock.New: %v", err)
|
||||
}
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { mockDB.Close() })
|
||||
return mock
|
||||
}
|
||||
|
||||
// TestHibernateIdleWorkspaces_CallsHandlerForEachCandidate verifies that
|
||||
// hibernateIdleWorkspaces calls onHibernate once for each workspace row
|
||||
// returned by the DB query.
|
||||
func TestHibernateIdleWorkspaces_CallsHandlerForEachCandidate(t *testing.T) {
|
||||
mock := setupHibernationMock(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).
|
||||
AddRow("ws-idle-1").
|
||||
AddRow("ws-idle-2"))
|
||||
|
||||
var called []string
|
||||
hibernateIdleWorkspaces(context.Background(), func(ctx context.Context, id string) {
|
||||
called = append(called, id)
|
||||
})
|
||||
|
||||
if len(called) != 2 {
|
||||
t.Fatalf("expected 2 hibernations, got %d: %v", len(called), called)
|
||||
}
|
||||
if called[0] != "ws-idle-1" || called[1] != "ws-idle-2" {
|
||||
t.Errorf("unexpected IDs: %v", called)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHibernateIdleWorkspaces_NoRowsNoHandler verifies that no handler is
|
||||
// called when the query returns zero rows (no idle workspaces).
|
||||
func TestHibernateIdleWorkspaces_NoRowsNoHandler(t *testing.T) {
|
||||
mock := setupHibernationMock(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"})) // empty
|
||||
|
||||
var called int
|
||||
hibernateIdleWorkspaces(context.Background(), func(_ context.Context, _ string) {
|
||||
called++
|
||||
})
|
||||
|
||||
if called != 0 {
|
||||
t.Errorf("expected 0 hibernations, got %d", called)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHibernateIdleWorkspaces_DBErrorDoesNotPanic verifies that a DB error
|
||||
// from the query is logged but does not crash the monitor loop.
|
||||
func TestHibernateIdleWorkspaces_DBErrorDoesNotPanic(t *testing.T) {
|
||||
mock := setupHibernationMock(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces`).
|
||||
WillReturnError(sql.ErrConnDone)
|
||||
|
||||
// Should not panic
|
||||
hibernateIdleWorkspaces(context.Background(), func(_ context.Context, _ string) {
|
||||
t.Error("handler should not be called on DB error")
|
||||
})
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStartHibernationMonitor_TicksAndCallsHandler verifies the monitor loop
|
||||
// ticks at the configured interval and calls the handler.
|
||||
func TestStartHibernationMonitor_TicksAndCallsHandler(t *testing.T) {
|
||||
mock := setupHibernationMock(t)
|
||||
|
||||
// Expect at least one DB query (the first tick)
|
||||
mock.ExpectQuery(`SELECT id FROM workspaces`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-hibernate-me"))
|
||||
|
||||
var callCount int32
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
StartHibernationMonitorWithInterval(ctx, 50*time.Millisecond, func(_ context.Context, id string) {
|
||||
if id == "ws-hibernate-me" {
|
||||
atomic.AddInt32(&callCount, 1)
|
||||
cancel() // stop after first hit
|
||||
}
|
||||
})
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("monitor did not stop within timeout")
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(&callCount) == 0 {
|
||||
t.Error("expected handler to be called at least once")
|
||||
}
|
||||
}
|
||||
|
||||
// TestStartHibernationMonitor_StopsOnContextCancel verifies clean shutdown
|
||||
// when the context is cancelled before any tick fires.
|
||||
func TestStartHibernationMonitor_StopsOnContextCancel(t *testing.T) {
|
||||
_ = setupHibernationMock(t) // no DB calls expected
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel() // cancel immediately
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
// Very long interval — only context cancel should stop it
|
||||
StartHibernationMonitorWithInterval(ctx, 10*time.Minute, func(_ context.Context, _ string) {
|
||||
// should never be called
|
||||
})
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("monitor did not stop on context cancel")
|
||||
}
|
||||
}
|
||||
@ -41,10 +41,10 @@ func StartLivenessMonitor(ctx context.Context, onOffline OfflineHandler) {
|
||||
|
||||
log.Printf("Liveness: workspace %s TTL expired", workspaceID)
|
||||
|
||||
// Mark offline in Postgres — skip paused workspaces (they have no container)
|
||||
// Mark offline in Postgres — skip paused and hibernated workspaces (no active container)
|
||||
_, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspaces SET status = 'offline', updated_at = now()
|
||||
WHERE id = $1 AND status NOT IN ('removed', 'paused')
|
||||
WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated')
|
||||
`, workspaceID)
|
||||
if err != nil {
|
||||
log.Printf("Liveness: failed to mark %s offline: %v", workspaceID, err)
|
||||
|
||||
@ -100,14 +100,11 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
c.JSON(200, gin.H{"subsystems": out})
|
||||
})
|
||||
|
||||
// Prometheus metrics — gated behind AdminAuth (#683).
|
||||
// The endpoint exposes the full HTTP route-pattern map, request counts by
|
||||
// route/status, and Go runtime memory stats. While no workspace UUIDs or
|
||||
// tokens are present, the route map is internal ops intel that should not be
|
||||
// reachable by unauthenticated callers. Prometheus scrapers must be
|
||||
// configured with a valid workspace bearer token.
|
||||
// Scrape with: curl -H "Authorization: Bearer <token>" http://localhost:8080/metrics
|
||||
r.GET("/metrics", middleware.AdminAuth(db.DB), metrics.Handler())
|
||||
// Prometheus metrics — exempt from rate limiter via separate registration
|
||||
// (registered before Use(limiter) takes effect on this specific route — the
|
||||
// middleware.Middleware() still records it for observability).
|
||||
// Scrape with: curl http://localhost:8080/metrics
|
||||
r.GET("/metrics", metrics.Handler())
|
||||
|
||||
// Single-workspace read — open so canvas nodes can fetch their own state
|
||||
// without a token (used by WorkspaceNode polling and health checks).
|
||||
@ -147,6 +144,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
wsAuth.POST("/restart", wh.Restart)
|
||||
wsAuth.POST("/pause", wh.Pause)
|
||||
wsAuth.POST("/resume", wh.Resume)
|
||||
// Manual hibernate (opt-in, #711) — stops the container and sets status
|
||||
// to 'hibernated'. The workspace auto-wakes on the next A2A message.
|
||||
wsAuth.POST("/hibernate", wh.Hibernate)
|
||||
|
||||
// Async Delegation
|
||||
delh := handlers.NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
@ -377,6 +377,51 @@ func TestRepairNullNextRunAt_DBError_NoPanic(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// repairNullNextRunAt + hibernation (#711 + #722 integration)
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// TestRepairNullNextRunAt_HibernatedWorkspace_ScheduleRepaired verifies that
|
||||
// repairNullNextRunAt() repairs schedules belonging to hibernated workspaces.
|
||||
//
|
||||
// Context: the repair query is:
|
||||
//
|
||||
// SELECT id, cron_expr, timezone
|
||||
// FROM workspace_schedules
|
||||
// WHERE enabled = true AND next_run_at IS NULL
|
||||
//
|
||||
// Critically, there is NO "AND workspace.status != 'hibernated'" filter.
|
||||
// This is intentional — a hibernated workspace should wake up on schedule
|
||||
// (via the auto-wake A2A path). If the repair skipped hibernated workspaces,
|
||||
// any schedule whose next_run_at was NULL'd before hibernation would never
|
||||
// fire again even after the workspace wakes.
|
||||
//
|
||||
// This test simulates a schedule with a NULL next_run_at whose owning workspace
|
||||
// is currently hibernated, and asserts the UPDATE fires to set next_run_at.
|
||||
func TestRepairNullNextRunAt_HibernatedWorkspace_ScheduleRepaired(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
// The repair SELECT has no workspace status filter — a hibernated workspace's
|
||||
// schedule appears in the result set normally.
|
||||
mock.ExpectQuery(`SELECT id, cron_expr, timezone`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "cron_expr", "timezone"}).
|
||||
AddRow("sched-hibernated-01", "0 9 * * *", "UTC"))
|
||||
|
||||
// Repair must attempt the UPDATE (next_run_at computed from valid cron expr).
|
||||
mock.ExpectExec(`UPDATE workspace_schedules`).
|
||||
WithArgs("sched-hibernated-01", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
s := New(nil, nil)
|
||||
s.repairNullNextRunAt(context.Background())
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v\n"+
|
||||
"repairNullNextRunAt must not filter out hibernated workspaces — "+
|
||||
"their schedules must still be repaired so they fire on wake", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestRecordSkipped_shortWorkspaceIDNoPanic ─────────────────────────────────
|
||||
// Guards against the short() regression: recordSkipped must not panic if
|
||||
// WorkspaceID is unexpectedly shorter than the 12-char prefix used in logs.
|
||||
|
||||
2
platform/migrations/029_workspace_hibernation.down.sql
Normal file
2
platform/migrations/029_workspace_hibernation.down.sql
Normal file
@ -0,0 +1,2 @@
|
||||
DROP INDEX IF EXISTS idx_workspaces_hibernation;
|
||||
ALTER TABLE workspaces DROP COLUMN IF EXISTS hibernation_idle_minutes;
|
||||
16
platform/migrations/029_workspace_hibernation.up.sql
Normal file
16
platform/migrations/029_workspace_hibernation.up.sql
Normal file
@ -0,0 +1,16 @@
|
||||
-- 029_workspace_hibernation: opt-in automatic hibernation for idle workspaces.
|
||||
--
|
||||
-- When hibernation_idle_minutes is set (> 0) on a workspace, the hibernation
|
||||
-- monitor will stop the container and set status = 'hibernated' after the
|
||||
-- workspace has had active_tasks == 0 for that many consecutive minutes.
|
||||
-- The workspace auto-wakes on the next incoming A2A message/send.
|
||||
-- NULL (default) means hibernation is disabled for that workspace.
|
||||
|
||||
ALTER TABLE workspaces
|
||||
ADD COLUMN IF NOT EXISTS hibernation_idle_minutes INT DEFAULT NULL;
|
||||
|
||||
-- Index so the hibernation sweep can efficiently find candidates without
|
||||
-- a full table scan (only workspaces with non-NULL hibernation config).
|
||||
CREATE INDEX IF NOT EXISTS idx_workspaces_hibernation
|
||||
ON workspaces (hibernation_idle_minutes)
|
||||
WHERE hibernation_idle_minutes IS NOT NULL;
|
||||
Loading…
Reference in New Issue
Block a user