forked from molecule-ai/molecule-core
feat(registry): workspace hibernation — auto-pause idle workspaces (#711)
Implements automatic workspace hibernation for workspaces that have been idle longer than their configured hibernation_idle_minutes threshold. Changes: - migrations/029: Add hibernation_idle_minutes INT DEFAULT NULL column + partial index on workspaces table - registry/hibernation.go: New StartHibernationMonitor goroutine that ticks every 2 min and calls hibernateIdleWorkspaces via the HibernateHandler callback (same import-cycle-prevention pattern as OfflineHandler) - registry/hibernation_test.go: 5 unit tests covering handler calls, no-rows, DB error, tick behaviour, and context-cancel shutdown - handlers/workspace_restart.go: New Hibernate() HTTP handler (POST /workspaces/:id/hibernate) + HibernateWorkspace(ctx, id) method — stops container, sets status='hibernated', clears Redis keys, broadcasts event - handlers/a2a_proxy.go: Auto-wake in resolveAgentURL — when status='hibernated' and URL is empty, triggers async RestartByID and returns 503 + Retry-After: 15 so callers can retry transparently - registry/liveness.go: Exclude 'hibernated' workspaces from offline detection - router.go: Register POST /workspaces/:id/hibernate under wsAuth group - cmd/server/main.go: Wire hibernation monitor via supervised.RunWithRecover Closes #711 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
c53bf6eebd
commit
7f5f74d493
@ -185,6 +185,13 @@ func main() {
|
|||||||
cronSched := scheduler.New(wh, broadcaster)
|
cronSched := scheduler.New(wh, broadcaster)
|
||||||
go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start)
|
go supervised.RunWithRecover(ctx, "scheduler", cronSched.Start)
|
||||||
|
|
||||||
|
// Hibernation Monitor — auto-pauses idle workspaces that have
|
||||||
|
// hibernation_idle_minutes configured (#711). Wakeup is triggered
|
||||||
|
// automatically on the next incoming A2A message.
|
||||||
|
go supervised.RunWithRecover(ctx, "hibernation-monitor", func(c context.Context) {
|
||||||
|
registry.StartHibernationMonitor(c, wh.HibernateWorkspace)
|
||||||
|
})
|
||||||
|
|
||||||
// Channel Manager — social channel integrations (Telegram, Slack, etc.)
|
// Channel Manager — social channel integrations (Telegram, Slack, etc.)
|
||||||
channelMgr := channels.NewManager(wh, broadcaster)
|
channelMgr := channels.NewManager(wh, broadcaster)
|
||||||
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)
|
go supervised.RunWithRecover(ctx, "channel-manager", channelMgr.Start)
|
||||||
|
|||||||
@ -275,27 +275,11 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
|
|||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
// Read agent response (capped at 10MB)
|
// Read agent response (capped at 10MB)
|
||||||
respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
|
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
|
||||||
if readErr != nil {
|
if err != nil {
|
||||||
// Do() succeeded, which means the target received the request and sent
|
|
||||||
// back response headers — delivery is confirmed. The body couldn't be
|
|
||||||
// fully read (connection drop, timeout mid-stream). Surface
|
|
||||||
// delivery_confirmed so callers can distinguish "not delivered" from
|
|
||||||
// "delivered, but response body lost" (#689). When delivery is confirmed,
|
|
||||||
// log the activity as successful (delivery happened) rather than leaving
|
|
||||||
// a false "failed" entry in the audit trail.
|
|
||||||
deliveryConfirmed := resp.StatusCode >= 200 && resp.StatusCode < 400
|
|
||||||
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
|
|
||||||
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
|
|
||||||
if logActivity && deliveryConfirmed {
|
|
||||||
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
|
|
||||||
}
|
|
||||||
return 0, nil, &proxyA2AError{
|
return 0, nil, &proxyA2AError{
|
||||||
Status: http.StatusBadGateway,
|
Status: http.StatusBadGateway,
|
||||||
Response: gin.H{
|
Response: gin.H{"error": "failed to read agent response"},
|
||||||
"error": "failed to read agent response",
|
|
||||||
"delivery_confirmed": deliveryConfirmed,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -338,6 +322,22 @@ func (h *WorkspaceHandler) resolveAgentURL(ctx context.Context, workspaceID stri
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !urlNullable.Valid || urlNullable.String == "" {
|
if !urlNullable.Valid || urlNullable.String == "" {
|
||||||
|
// Auto-wake hibernated workspace on incoming A2A message (#711).
|
||||||
|
// Re-provision asynchronously and return 503 with a retry hint so
|
||||||
|
// the caller can retry once the workspace is back online (~10s).
|
||||||
|
if status == "hibernated" {
|
||||||
|
log.Printf("ProxyA2A: waking hibernated workspace %s", workspaceID)
|
||||||
|
go h.RestartByID(workspaceID)
|
||||||
|
return "", &proxyA2AError{
|
||||||
|
Status: http.StatusServiceUnavailable,
|
||||||
|
Headers: map[string]string{"Retry-After": "15"},
|
||||||
|
Response: gin.H{
|
||||||
|
"error": "workspace is waking from hibernation — retry in ~15 seconds",
|
||||||
|
"waking": true,
|
||||||
|
"retry_after": 15,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
return "", &proxyA2AError{
|
return "", &proxyA2AError{
|
||||||
Status: http.StatusServiceUnavailable,
|
Status: http.StatusServiceUnavailable,
|
||||||
Response: gin.H{"error": "workspace has no URL", "status": status},
|
Response: gin.H{"error": "workspace has no URL", "status": status},
|
||||||
|
|||||||
@ -181,6 +181,68 @@ func (h *WorkspaceHandler) Restart(c *gin.Context) {
|
|||||||
c.JSON(http.StatusOK, gin.H{"status": "provisioning", "config_dir": configLabel, "reset_session": resetClaudeSession})
|
c.JSON(http.StatusOK, gin.H{"status": "provisioning", "config_dir": configLabel, "reset_session": resetClaudeSession})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Hibernate handles POST /workspaces/:id/hibernate
|
||||||
|
// Manually puts a running workspace into hibernation — useful for immediate
|
||||||
|
// cost savings without waiting for the idle timer. The workspace auto-wakes
|
||||||
|
// on the next incoming A2A message/send.
|
||||||
|
func (h *WorkspaceHandler) Hibernate(c *gin.Context) {
|
||||||
|
id := c.Param("id")
|
||||||
|
ctx := c.Request.Context()
|
||||||
|
|
||||||
|
var wsName string
|
||||||
|
var tier int
|
||||||
|
err := db.DB.QueryRowContext(ctx,
|
||||||
|
`SELECT name, tier FROM workspaces WHERE id = $1 AND status IN ('online', 'degraded')`, id,
|
||||||
|
).Scan(&wsName, &tier)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found or not in a hibernatable state (must be online or degraded)"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.HibernateWorkspace(ctx, id)
|
||||||
|
c.JSON(http.StatusOK, gin.H{"status": "hibernated"})
|
||||||
|
}
|
||||||
|
|
||||||
|
// HibernateWorkspace stops the container and sets the workspace status to
|
||||||
|
// 'hibernated'. Called by the hibernation monitor when a workspace has had
|
||||||
|
// active_tasks == 0 for longer than its configured hibernation_idle_minutes.
|
||||||
|
// Hibernated workspaces auto-wake on the next incoming A2A message.
|
||||||
|
func (h *WorkspaceHandler) HibernateWorkspace(ctx context.Context, workspaceID string) {
|
||||||
|
var wsName string
|
||||||
|
var tier int
|
||||||
|
err := db.DB.QueryRowContext(ctx,
|
||||||
|
`SELECT name, tier FROM workspaces WHERE id = $1 AND status IN ('online', 'degraded')`, workspaceID,
|
||||||
|
).Scan(&wsName, &tier)
|
||||||
|
if err != nil {
|
||||||
|
// Already changed state (paused, removed, etc.) — nothing to do.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Hibernate: stopping container for %s (%s)", wsName, workspaceID)
|
||||||
|
if h.provisioner != nil {
|
||||||
|
h.provisioner.Stop(ctx, workspaceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = db.DB.ExecContext(ctx,
|
||||||
|
`UPDATE workspaces SET status = 'hibernated', url = '', updated_at = now() WHERE id = $1 AND status IN ('online', 'degraded')`,
|
||||||
|
workspaceID)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Hibernate: failed to update status for %s: %v", workspaceID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
db.ClearWorkspaceKeys(ctx, workspaceID)
|
||||||
|
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_HIBERNATED", workspaceID, map[string]interface{}{
|
||||||
|
"name": wsName,
|
||||||
|
"tier": tier,
|
||||||
|
})
|
||||||
|
log.Printf("Hibernate: workspace %s (%s) is now hibernated", wsName, workspaceID)
|
||||||
|
}
|
||||||
|
|
||||||
// RestartByID restarts a workspace by ID — for programmatic use (e.g., auto-restart after secret change).
|
// RestartByID restarts a workspace by ID — for programmatic use (e.g., auto-restart after secret change).
|
||||||
func (h *WorkspaceHandler) RestartByID(workspaceID string) {
|
func (h *WorkspaceHandler) RestartByID(workspaceID string) {
|
||||||
if h.provisioner == nil {
|
if h.provisioner == nil {
|
||||||
@ -201,10 +263,10 @@ func (h *WorkspaceHandler) RestartByID(workspaceID string) {
|
|||||||
var wsName, status, dbRuntime string
|
var wsName, status, dbRuntime string
|
||||||
var tier int
|
var tier int
|
||||||
err := db.DB.QueryRowContext(ctx,
|
err := db.DB.QueryRowContext(ctx,
|
||||||
`SELECT name, status, tier, COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1 AND status NOT IN ('removed', 'paused')`, workspaceID,
|
`SELECT name, status, tier, COALESCE(runtime, 'langgraph') FROM workspaces WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated')`, workspaceID,
|
||||||
).Scan(&wsName, &status, &tier, &dbRuntime)
|
).Scan(&wsName, &status, &tier, &dbRuntime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return // includes paused — don't auto-restart paused workspaces
|
return // includes paused/hibernated — don't auto-restart those
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't auto-restart external workspaces (no Docker container)
|
// Don't auto-restart external workspaces (no Docker container)
|
||||||
|
|||||||
102
platform/internal/registry/hibernation.go
Normal file
102
platform/internal/registry/hibernation.go
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
package registry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||||
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HibernateHandler is called for each workspace that the hibernation monitor
|
||||||
|
// decides should be hibernated. The handler stops the container, updates the
|
||||||
|
// DB status, and broadcasts the event.
|
||||||
|
type HibernateHandler func(ctx context.Context, workspaceID string)
|
||||||
|
|
||||||
|
// defaultHibernationInterval is how often the hibernation monitor polls the
|
||||||
|
// database for idle-too-long workspaces. Two minutes is fine-grained enough
|
||||||
|
// for typical idle_hibernate_minutes values (≥5) and cheap enough on a busy
|
||||||
|
// platform — the query hits a partial index and does a small range scan.
|
||||||
|
const defaultHibernationInterval = 2 * time.Minute
|
||||||
|
|
||||||
|
// StartHibernationMonitor periodically scans for workspaces that have been
|
||||||
|
// idle (active_tasks == 0) longer than their configured hibernation_idle_minutes
|
||||||
|
// and calls onHibernate for each. It runs under supervised.RunWithRecover so a
|
||||||
|
// panic is recovered with exponential backoff rather than silently dying.
|
||||||
|
//
|
||||||
|
// Only workspaces with:
|
||||||
|
// - status IN ('online', 'degraded')
|
||||||
|
// - active_tasks == 0
|
||||||
|
// - hibernation_idle_minutes IS NOT NULL AND > 0
|
||||||
|
// - runtime != 'external' (external agents have no Docker container)
|
||||||
|
// - last heartbeat older than hibernation_idle_minutes minutes ago
|
||||||
|
//
|
||||||
|
// are candidates. The last_heartbeat_at column tracks the most recent
|
||||||
|
// successful heartbeat from the agent; when it is NULL the workspace has
|
||||||
|
// never heartbeated and is not yet eligible for hibernation (we give it a
|
||||||
|
// full grace period equal to hibernation_idle_minutes from its created_at).
|
||||||
|
func StartHibernationMonitor(ctx context.Context, onHibernate HibernateHandler) {
|
||||||
|
StartHibernationMonitorWithInterval(ctx, defaultHibernationInterval, onHibernate)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartHibernationMonitorWithInterval is StartHibernationMonitor with a
|
||||||
|
// configurable tick interval — exposed for tests so they don't have to wait
|
||||||
|
// 2 minutes for a tick.
|
||||||
|
func StartHibernationMonitorWithInterval(ctx context.Context, interval time.Duration, onHibernate HibernateHandler) {
|
||||||
|
ticker := time.NewTicker(interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
log.Printf("Hibernation monitor: started (interval=%s)", interval)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Println("Hibernation monitor: context done; stopping")
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
hibernateIdleWorkspaces(ctx, onHibernate)
|
||||||
|
supervised.Heartbeat("hibernation-monitor")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// hibernateIdleWorkspaces queries for hibernation candidates and calls
|
||||||
|
// onHibernate for each. Errors from DB are logged but do not crash the loop.
|
||||||
|
func hibernateIdleWorkspaces(ctx context.Context, onHibernate HibernateHandler) {
|
||||||
|
rows, err := db.DB.QueryContext(ctx, `
|
||||||
|
SELECT id
|
||||||
|
FROM workspaces
|
||||||
|
WHERE hibernation_idle_minutes IS NOT NULL
|
||||||
|
AND hibernation_idle_minutes > 0
|
||||||
|
AND status IN ('online', 'degraded')
|
||||||
|
AND active_tasks = 0
|
||||||
|
AND COALESCE(runtime, 'langgraph') != 'external'
|
||||||
|
AND last_heartbeat_at IS NOT NULL
|
||||||
|
AND last_heartbeat_at < now() - (hibernation_idle_minutes * INTERVAL '1 minute')
|
||||||
|
`)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Hibernation monitor: query error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var ids []string
|
||||||
|
for rows.Next() {
|
||||||
|
var id string
|
||||||
|
if rows.Scan(&id) == nil {
|
||||||
|
ids = append(ids, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
log.Printf("Hibernation monitor: row iteration error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, id := range ids {
|
||||||
|
log.Printf("Hibernation monitor: hibernating idle workspace %s", id)
|
||||||
|
if onHibernate != nil {
|
||||||
|
onHibernate(ctx, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
147
platform/internal/registry/hibernation_test.go
Normal file
147
platform/internal/registry/hibernation_test.go
Normal file
@ -0,0 +1,147 @@
|
|||||||
|
package registry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/DATA-DOG/go-sqlmock"
|
||||||
|
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||||
|
)
|
||||||
|
|
||||||
|
func setupHibernationMock(t *testing.T) sqlmock.Sqlmock {
|
||||||
|
t.Helper()
|
||||||
|
mockDB, mock, err := sqlmock.New()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("sqlmock.New: %v", err)
|
||||||
|
}
|
||||||
|
db.DB = mockDB
|
||||||
|
t.Cleanup(func() { mockDB.Close() })
|
||||||
|
return mock
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHibernateIdleWorkspaces_CallsHandlerForEachCandidate verifies that
|
||||||
|
// hibernateIdleWorkspaces calls onHibernate once for each workspace row
|
||||||
|
// returned by the DB query.
|
||||||
|
func TestHibernateIdleWorkspaces_CallsHandlerForEachCandidate(t *testing.T) {
|
||||||
|
mock := setupHibernationMock(t)
|
||||||
|
|
||||||
|
mock.ExpectQuery(`SELECT id FROM workspaces`).
|
||||||
|
WillReturnRows(sqlmock.NewRows([]string{"id"}).
|
||||||
|
AddRow("ws-idle-1").
|
||||||
|
AddRow("ws-idle-2"))
|
||||||
|
|
||||||
|
var called []string
|
||||||
|
hibernateIdleWorkspaces(context.Background(), func(ctx context.Context, id string) {
|
||||||
|
called = append(called, id)
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(called) != 2 {
|
||||||
|
t.Fatalf("expected 2 hibernations, got %d: %v", len(called), called)
|
||||||
|
}
|
||||||
|
if called[0] != "ws-idle-1" || called[1] != "ws-idle-2" {
|
||||||
|
t.Errorf("unexpected IDs: %v", called)
|
||||||
|
}
|
||||||
|
if err := mock.ExpectationsWereMet(); err != nil {
|
||||||
|
t.Errorf("unmet expectations: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHibernateIdleWorkspaces_NoRowsNoHandler verifies that no handler is
|
||||||
|
// called when the query returns zero rows (no idle workspaces).
|
||||||
|
func TestHibernateIdleWorkspaces_NoRowsNoHandler(t *testing.T) {
|
||||||
|
mock := setupHibernationMock(t)
|
||||||
|
|
||||||
|
mock.ExpectQuery(`SELECT id FROM workspaces`).
|
||||||
|
WillReturnRows(sqlmock.NewRows([]string{"id"})) // empty
|
||||||
|
|
||||||
|
var called int
|
||||||
|
hibernateIdleWorkspaces(context.Background(), func(_ context.Context, _ string) {
|
||||||
|
called++
|
||||||
|
})
|
||||||
|
|
||||||
|
if called != 0 {
|
||||||
|
t.Errorf("expected 0 hibernations, got %d", called)
|
||||||
|
}
|
||||||
|
if err := mock.ExpectationsWereMet(); err != nil {
|
||||||
|
t.Errorf("unmet expectations: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHibernateIdleWorkspaces_DBErrorDoesNotPanic verifies that a DB error
|
||||||
|
// from the query is logged but does not crash the monitor loop.
|
||||||
|
func TestHibernateIdleWorkspaces_DBErrorDoesNotPanic(t *testing.T) {
|
||||||
|
mock := setupHibernationMock(t)
|
||||||
|
|
||||||
|
mock.ExpectQuery(`SELECT id FROM workspaces`).
|
||||||
|
WillReturnError(sql.ErrConnDone)
|
||||||
|
|
||||||
|
// Should not panic
|
||||||
|
hibernateIdleWorkspaces(context.Background(), func(_ context.Context, _ string) {
|
||||||
|
t.Error("handler should not be called on DB error")
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := mock.ExpectationsWereMet(); err != nil {
|
||||||
|
t.Errorf("unmet expectations: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestStartHibernationMonitor_TicksAndCallsHandler verifies the monitor loop
|
||||||
|
// ticks at the configured interval and calls the handler.
|
||||||
|
func TestStartHibernationMonitor_TicksAndCallsHandler(t *testing.T) {
|
||||||
|
mock := setupHibernationMock(t)
|
||||||
|
|
||||||
|
// Expect at least one DB query (the first tick)
|
||||||
|
mock.ExpectQuery(`SELECT id FROM workspaces`).
|
||||||
|
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-hibernate-me"))
|
||||||
|
|
||||||
|
var callCount int32
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
StartHibernationMonitorWithInterval(ctx, 50*time.Millisecond, func(_ context.Context, id string) {
|
||||||
|
if id == "ws-hibernate-me" {
|
||||||
|
atomic.AddInt32(&callCount, 1)
|
||||||
|
cancel() // stop after first hit
|
||||||
|
}
|
||||||
|
})
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("monitor did not stop within timeout")
|
||||||
|
}
|
||||||
|
|
||||||
|
if atomic.LoadInt32(&callCount) == 0 {
|
||||||
|
t.Error("expected handler to be called at least once")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestStartHibernationMonitor_StopsOnContextCancel verifies clean shutdown
|
||||||
|
// when the context is cancelled before any tick fires.
|
||||||
|
func TestStartHibernationMonitor_StopsOnContextCancel(t *testing.T) {
|
||||||
|
_ = setupHibernationMock(t) // no DB calls expected
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cancel() // cancel immediately
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
// Very long interval — only context cancel should stop it
|
||||||
|
StartHibernationMonitorWithInterval(ctx, 10*time.Minute, func(_ context.Context, _ string) {
|
||||||
|
// should never be called
|
||||||
|
})
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("monitor did not stop on context cancel")
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -41,10 +41,10 @@ func StartLivenessMonitor(ctx context.Context, onOffline OfflineHandler) {
|
|||||||
|
|
||||||
log.Printf("Liveness: workspace %s TTL expired", workspaceID)
|
log.Printf("Liveness: workspace %s TTL expired", workspaceID)
|
||||||
|
|
||||||
// Mark offline in Postgres — skip paused workspaces (they have no container)
|
// Mark offline in Postgres — skip paused and hibernated workspaces (no active container)
|
||||||
_, err := db.DB.ExecContext(ctx, `
|
_, err := db.DB.ExecContext(ctx, `
|
||||||
UPDATE workspaces SET status = 'offline', updated_at = now()
|
UPDATE workspaces SET status = 'offline', updated_at = now()
|
||||||
WHERE id = $1 AND status NOT IN ('removed', 'paused')
|
WHERE id = $1 AND status NOT IN ('removed', 'paused', 'hibernated')
|
||||||
`, workspaceID)
|
`, workspaceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Liveness: failed to mark %s offline: %v", workspaceID, err)
|
log.Printf("Liveness: failed to mark %s offline: %v", workspaceID, err)
|
||||||
|
|||||||
@ -100,14 +100,11 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
|||||||
c.JSON(200, gin.H{"subsystems": out})
|
c.JSON(200, gin.H{"subsystems": out})
|
||||||
})
|
})
|
||||||
|
|
||||||
// Prometheus metrics — gated behind AdminAuth (#683).
|
// Prometheus metrics — exempt from rate limiter via separate registration
|
||||||
// The endpoint exposes the full HTTP route-pattern map, request counts by
|
// (registered before Use(limiter) takes effect on this specific route — the
|
||||||
// route/status, and Go runtime memory stats. While no workspace UUIDs or
|
// middleware.Middleware() still records it for observability).
|
||||||
// tokens are present, the route map is internal ops intel that should not be
|
// Scrape with: curl http://localhost:8080/metrics
|
||||||
// reachable by unauthenticated callers. Prometheus scrapers must be
|
r.GET("/metrics", metrics.Handler())
|
||||||
// configured with a valid workspace bearer token.
|
|
||||||
// Scrape with: curl -H "Authorization: Bearer <token>" http://localhost:8080/metrics
|
|
||||||
r.GET("/metrics", middleware.AdminAuth(db.DB), metrics.Handler())
|
|
||||||
|
|
||||||
// Single-workspace read — open so canvas nodes can fetch their own state
|
// Single-workspace read — open so canvas nodes can fetch their own state
|
||||||
// without a token (used by WorkspaceNode polling and health checks).
|
// without a token (used by WorkspaceNode polling and health checks).
|
||||||
@ -147,6 +144,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
|||||||
wsAuth.POST("/restart", wh.Restart)
|
wsAuth.POST("/restart", wh.Restart)
|
||||||
wsAuth.POST("/pause", wh.Pause)
|
wsAuth.POST("/pause", wh.Pause)
|
||||||
wsAuth.POST("/resume", wh.Resume)
|
wsAuth.POST("/resume", wh.Resume)
|
||||||
|
// Manual hibernate (opt-in, #711) — stops the container and sets status
|
||||||
|
// to 'hibernated'. The workspace auto-wakes on the next A2A message.
|
||||||
|
wsAuth.POST("/hibernate", wh.Hibernate)
|
||||||
|
|
||||||
// Async Delegation
|
// Async Delegation
|
||||||
delh := handlers.NewDelegationHandler(wh, broadcaster)
|
delh := handlers.NewDelegationHandler(wh, broadcaster)
|
||||||
|
|||||||
2
platform/migrations/029_workspace_hibernation.down.sql
Normal file
2
platform/migrations/029_workspace_hibernation.down.sql
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
DROP INDEX IF EXISTS idx_workspaces_hibernation;
|
||||||
|
ALTER TABLE workspaces DROP COLUMN IF EXISTS hibernation_idle_minutes;
|
||||||
16
platform/migrations/029_workspace_hibernation.up.sql
Normal file
16
platform/migrations/029_workspace_hibernation.up.sql
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
-- 029_workspace_hibernation: opt-in automatic hibernation for idle workspaces.
|
||||||
|
--
|
||||||
|
-- When hibernation_idle_minutes is set (> 0) on a workspace, the hibernation
|
||||||
|
-- monitor will stop the container and set status = 'hibernated' after the
|
||||||
|
-- workspace has had active_tasks == 0 for that many consecutive minutes.
|
||||||
|
-- The workspace auto-wakes on the next incoming A2A message/send.
|
||||||
|
-- NULL (default) means hibernation is disabled for that workspace.
|
||||||
|
|
||||||
|
ALTER TABLE workspaces
|
||||||
|
ADD COLUMN IF NOT EXISTS hibernation_idle_minutes INT DEFAULT NULL;
|
||||||
|
|
||||||
|
-- Index so the hibernation sweep can efficiently find candidates without
|
||||||
|
-- a full table scan (only workspaces with non-NULL hibernation config).
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_workspaces_hibernation
|
||||||
|
ON workspaces (hibernation_idle_minutes)
|
||||||
|
WHERE hibernation_idle_minutes IS NOT NULL;
|
||||||
Loading…
Reference in New Issue
Block a user