Merge pull request #750 from Molecule-AI/test/issue-711-hibernation-integration
test(hibernation): integration tests for workspace hibernation (#711)
This commit is contained in:
commit
0fd0effb70
@ -1237,3 +1237,81 @@ func TestLogA2ASuccess_ErrorStatus(t *testing.T) {
|
||||
handler.logA2ASuccess(context.Background(), "ws-err", "ws-caller", []byte(`{}`), []byte(`{}`), "message/send", 500, 10)
|
||||
time.Sleep(80 * time.Millisecond)
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// A2A auto-wake: hibernated workspace (#711)
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// TestResolveAgentURL_HibernatedWorkspace_Returns503WithWaking verifies the
|
||||
// auto-wake path added in PR #724: when resolveAgentURL finds a workspace with
|
||||
// status='hibernated' and no URL, it must:
|
||||
// - Return a proxyA2AError with Status 503
|
||||
// - Set Retry-After: 15 in Headers
|
||||
// - Include waking:true and retry_after:15 in the response body
|
||||
//
|
||||
// RestartByID fires asynchronously via `go h.RestartByID(workspaceID)`. Because
|
||||
// provisioner is nil in tests, RestartByID returns immediately without any DB
|
||||
// calls, so no additional mocks are needed.
|
||||
func TestResolveAgentURL_HibernatedWorkspace_Returns503WithWaking(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t) // empty Redis → GetCachedURL returns error → DB fallback
|
||||
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
// DB fallback: workspace exists but has no URL and is hibernated.
|
||||
mock.ExpectQuery(`SELECT url, status FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-hibernated").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow("", "hibernated"))
|
||||
|
||||
_, perr := handler.resolveAgentURL(context.Background(), "ws-hibernated")
|
||||
|
||||
if perr == nil {
|
||||
t.Fatal("expected proxyA2AError, got nil")
|
||||
}
|
||||
if perr.Status != http.StatusServiceUnavailable {
|
||||
t.Errorf("expected status 503, got %d", perr.Status)
|
||||
}
|
||||
if perr.Headers["Retry-After"] != "15" {
|
||||
t.Errorf("expected Retry-After: 15, got %q", perr.Headers["Retry-After"])
|
||||
}
|
||||
|
||||
if perr.Response["waking"] != true {
|
||||
t.Errorf("expected waking:true in body, got %v", perr.Response["waking"])
|
||||
}
|
||||
if perr.Response["retry_after"] != 15 {
|
||||
t.Errorf("expected retry_after:15 in body, got %v", perr.Response["retry_after"])
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveAgentURL_HibernatedWorkspace_NullURLVariant verifies the same
|
||||
// auto-wake behaviour when the DB returns a SQL NULL for the url column
|
||||
// (rather than an empty string). Both forms represent "no URL assigned".
|
||||
func TestResolveAgentURL_HibernatedWorkspace_NullURLVariant(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
mock.ExpectQuery(`SELECT url, status FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-hibernated-null").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"url", "status"}).AddRow(nil, "hibernated"))
|
||||
|
||||
_, perr := handler.resolveAgentURL(context.Background(), "ws-hibernated-null")
|
||||
|
||||
if perr == nil {
|
||||
t.Fatal("expected proxyA2AError, got nil")
|
||||
}
|
||||
if perr.Status != http.StatusServiceUnavailable {
|
||||
t.Errorf("expected status 503, got %d", perr.Status)
|
||||
}
|
||||
if perr.Headers["Retry-After"] != "15" {
|
||||
t.Errorf("expected Retry-After: 15, got %q", perr.Headers["Retry-After"])
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
266
platform/internal/handlers/hibernation_test.go
Normal file
266
platform/internal/handlers/hibernation_test.go
Normal file
@ -0,0 +1,266 @@
|
||||
package handlers
|
||||
|
||||
// Integration tests for the workspace hibernation feature (issue #711 / PR #724).
|
||||
//
|
||||
// Coverage:
|
||||
// - HibernateWorkspace(): container stop, DB status update, Redis key clear, event broadcast
|
||||
// - POST /workspaces/:id/hibernate HTTP handler: online→200, not-eligible→404, DB error→500
|
||||
// - resolveAgentURL(): hibernated workspace → 503 + Retry-After: 15 + waking: true
|
||||
//
|
||||
// The A2A auto-wake path (resolveAgentURL) is tested via TestResolveAgentURL_HibernatedWorkspace_*
|
||||
// added to a2a_proxy_test.go to keep related resolveAgentURL tests co-located.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
sqlmock "github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// HibernateWorkspace unit tests
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// TestHibernateWorkspace_OnlineWorkspace_Success verifies the happy-path:
|
||||
// - DB returns the workspace (online/degraded)
|
||||
// - provisioner is nil — no Stop() call needed (test-safe guard in production code)
|
||||
// - UPDATE sets status='hibernated', url=''
|
||||
// - Redis keys ws:{id}, ws:{id}:url, ws:{id}:internal_url are deleted
|
||||
// - WORKSPACE_HIBERNATED event is broadcast (INSERT INTO structure_events)
|
||||
func TestHibernateWorkspace_OnlineWorkspace_Success(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
wsID := "ws-idle-online"
|
||||
|
||||
// Pre-populate Redis keys that ClearWorkspaceKeys should remove.
|
||||
mr.Set(fmt.Sprintf("ws:%s", wsID), "some-value")
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", wsID), "http://agent.internal:8000")
|
||||
mr.Set(fmt.Sprintf("ws:%s:internal_url", wsID), "http://172.17.0.5:8000")
|
||||
|
||||
// HibernateWorkspace does a SELECT first.
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "tier"}).AddRow("Idle Agent", 1))
|
||||
|
||||
// Then UPDATE status.
|
||||
mock.ExpectExec(`UPDATE workspaces SET status = 'hibernated'`).
|
||||
WithArgs(wsID).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Broadcaster inserts a structure_events row.
|
||||
mock.ExpectExec(`INSERT INTO structure_events`).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
handler.HibernateWorkspace(context.Background(), wsID)
|
||||
|
||||
// All DB expectations were exercised.
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
|
||||
// Redis keys must all be gone.
|
||||
for _, suffix := range []string{"", ":url", ":internal_url"} {
|
||||
key := fmt.Sprintf("ws:%s%s", wsID, suffix)
|
||||
if _, err := mr.Get(key); err == nil {
|
||||
t.Errorf("expected Redis key %q to be deleted, but it still exists", key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestHibernateWorkspace_NotEligible_NoOp verifies that when the workspace is
|
||||
// NOT in online/degraded state (SELECT returns ErrNoRows), HibernateWorkspace
|
||||
// returns immediately — no UPDATE, no Redis clear, no broadcast.
|
||||
func TestHibernateWorkspace_NotEligible_NoOp(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
mr := setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
wsID := "ws-already-offline"
|
||||
|
||||
// Simulate workspace not in eligible state (offline, paused, removed …)
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
// Set a Redis key to confirm it is NOT cleared by early return.
|
||||
mr.Set(fmt.Sprintf("ws:%s:url", wsID), "http://still-here:8000")
|
||||
|
||||
handler.HibernateWorkspace(context.Background(), wsID)
|
||||
|
||||
// No further DB operations should have happened.
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
|
||||
// Redis key must still exist — HibernateWorkspace returned early.
|
||||
if _, err := mr.Get(fmt.Sprintf("ws:%s:url", wsID)); err != nil {
|
||||
t.Errorf("expected Redis key to still exist after no-op, but it was deleted: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHibernateWorkspace_DBUpdateFails_NoCrash verifies that a DB error on the
|
||||
// UPDATE does not panic — the function logs and returns silently.
|
||||
func TestHibernateWorkspace_DBUpdateFails_NoCrash(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
wsID := "ws-update-fail"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "tier"}).AddRow("Flaky Agent", 2))
|
||||
|
||||
mock.ExpectExec(`UPDATE workspaces SET status = 'hibernated'`).
|
||||
WithArgs(wsID).
|
||||
WillReturnError(fmt.Errorf("db: connection refused"))
|
||||
|
||||
// Must not panic — test will catch a panic via t.Fatal.
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Fatalf("HibernateWorkspace panicked on UPDATE error: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
handler.HibernateWorkspace(context.Background(), wsID)
|
||||
|
||||
// SELECT + UPDATE expectations met; no INSERT INTO structure_events expected.
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// POST /workspaces/:id/hibernate HTTP handler tests
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// hibernateRequest fires POST /workspaces/{id}/hibernate against the handler
|
||||
// and returns the response recorder.
|
||||
func hibernateRequest(t *testing.T, handler *WorkspaceHandler, wsID string) *httptest.ResponseRecorder {
|
||||
t.Helper()
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: wsID}}
|
||||
c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/hibernate", nil)
|
||||
handler.Hibernate(c)
|
||||
return w
|
||||
}
|
||||
|
||||
// TestHibernateHandler_Online_Returns200 verifies that an online workspace
|
||||
// that is eligible for hibernation returns 200 {"status":"hibernated"}.
|
||||
func TestHibernateHandler_Online_Returns200(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
wsID := "ws-handler-online"
|
||||
|
||||
// Hibernate() handler SELECT — verifies workspace is online/degraded.
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "tier"}).AddRow("Online Bot", 1))
|
||||
|
||||
// HibernateWorkspace() SELECT — same query, checks state again before acting.
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"name", "tier"}).AddRow("Online Bot", 1))
|
||||
|
||||
// HibernateWorkspace() UPDATE.
|
||||
mock.ExpectExec(`UPDATE workspaces SET status = 'hibernated'`).
|
||||
WithArgs(wsID).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// Broadcaster INSERT.
|
||||
mock.ExpectExec(`INSERT INTO structure_events`).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
w := hibernateRequest(t, handler, wsID)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to decode response: %v", err)
|
||||
}
|
||||
if resp["status"] != "hibernated" {
|
||||
t.Errorf(`expected {"status":"hibernated"}, got %v`, resp)
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHibernateHandler_NotActive_Returns404 verifies that a workspace not in
|
||||
// online/degraded state (e.g. offline, paused, already hibernated) returns 404.
|
||||
func TestHibernateHandler_NotActive_Returns404(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
wsID := "ws-handler-paused"
|
||||
|
||||
// Handler's eligibility SELECT returns no rows — workspace is not online/degraded.
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
w := hibernateRequest(t, handler, wsID)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Fatalf("expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("failed to decode response: %v", err)
|
||||
}
|
||||
if !strings.Contains(fmt.Sprint(resp["error"]), "not found") {
|
||||
t.Errorf("expected error mentioning 'not found', got %v", resp)
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHibernateHandler_DBError_Returns500 verifies that an unexpected DB error
|
||||
// on the eligibility SELECT returns 500.
|
||||
func TestHibernateHandler_DBError_Returns500(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
|
||||
wsID := "ws-handler-dberror"
|
||||
|
||||
mock.ExpectQuery(`SELECT name, tier FROM workspaces WHERE id = .* AND status IN`).
|
||||
WithArgs(wsID).
|
||||
WillReturnError(fmt.Errorf("db: connection reset"))
|
||||
|
||||
w := hibernateRequest(t, handler, wsID)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Fatalf("expected 500, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
@ -643,7 +643,14 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, defa
|
||||
log.Printf("Org import: schedule '%s' on %s has empty prompt (neither prompt nor prompt_file set) — skipping insert", sched.Name, ws.Name)
|
||||
continue
|
||||
}
|
||||
nextRun, _ := scheduler.ComputeNextRun(sched.CronExpr, tz, time.Now())
|
||||
// #722: surface the error rather than silently using time.Time{} (zero)
|
||||
// which lib/pq stores as 0001-01-01 and may confuse the fire query.
|
||||
nextRun, nextRunErr := scheduler.ComputeNextRun(sched.CronExpr, tz, time.Now())
|
||||
if nextRunErr != nil {
|
||||
log.Printf("Org import: invalid cron expression for schedule '%s' on %s: %v — skipping insert",
|
||||
sched.Name, ws.Name, nextRunErr)
|
||||
continue
|
||||
}
|
||||
if _, err := db.DB.ExecContext(context.Background(), orgImportScheduleSQL,
|
||||
id, sched.Name, sched.CronExpr, tz, prompt, enabled, nextRun); err != nil {
|
||||
log.Printf("Org import: failed to upsert schedule '%s' for %s: %v", sched.Name, ws.Name, err)
|
||||
|
||||
@ -3,8 +3,11 @@ package handlers
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler"
|
||||
)
|
||||
|
||||
func TestOrgDefaults_InitialPrompt_YAMLParsing(t *testing.T) {
|
||||
@ -602,3 +605,48 @@ func TestPlugins_BackwardCompat(t *testing.T) {
|
||||
t.Fatalf("got %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestOrgImport_ScheduleComputeError (#722 Bug 2) ───────────────────────────
|
||||
//
|
||||
// The org importer previously used `nextRun, _ := scheduler.ComputeNextRun(...)`,
|
||||
// discarding the error and passing time.Time{} (zero value) to the INSERT.
|
||||
// After fix #722 it surfaces the error and skips the INSERT via `continue`.
|
||||
//
|
||||
// This test verifies that the inputs an org.yaml schedule can supply (bad cron
|
||||
// expression, invalid timezone) DO cause ComputeNextRun to return a non-nil
|
||||
// error — confirming that the fix is meaningful and the skip path is reachable.
|
||||
|
||||
func TestOrgImport_ScheduleComputeError(t *testing.T) {
|
||||
now := time.Now()
|
||||
cases := []struct {
|
||||
name string
|
||||
cronExpr string
|
||||
tz string
|
||||
}{
|
||||
{
|
||||
name: "invalid cron expression",
|
||||
cronExpr: "not-a-cron-expr",
|
||||
tz: "UTC",
|
||||
},
|
||||
{
|
||||
name: "invalid timezone",
|
||||
cronExpr: "0 9 * * 1",
|
||||
tz: "Not/A/Valid/Timezone",
|
||||
},
|
||||
{
|
||||
name: "both invalid",
|
||||
cronExpr: "every monday",
|
||||
tz: "Moon/Far_Side",
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
_, err := scheduler.ComputeNextRun(tc.cronExpr, tc.tz, now)
|
||||
if err == nil {
|
||||
t.Errorf("ComputeNextRun(%q, %q) returned nil error — "+
|
||||
"org importer would silently insert zero next_run_at; #722 fix requires non-nil",
|
||||
tc.cronExpr, tc.tz)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"crypto/subtle"
|
||||
"database/sql"
|
||||
"log"
|
||||
"net/http"
|
||||
@ -64,20 +65,31 @@ func WorkspaceAuth(database *sql.DB) gin.HandlerFunc {
|
||||
// AdminAuth returns a Gin middleware for global/admin routes (e.g.
|
||||
// /settings/secrets, /admin/secrets) that have no per-workspace scope.
|
||||
//
|
||||
// Same lazy-bootstrap contract as WorkspaceAuth: if no live token exists
|
||||
// anywhere on the platform (fresh install / pre-Phase-30 upgrade), requests
|
||||
// are let through so existing deployments keep working. Once any workspace
|
||||
// has a live token every request to these routes MUST present a valid bearer
|
||||
// token — no Origin-based bypass. (#623)
|
||||
// # Credential tier (evaluated in order)
|
||||
//
|
||||
// Any valid workspace bearer token is accepted — the route is not scoped to
|
||||
// a specific workspace so we only verify the token is live and unrevoked.
|
||||
// 1. Lazy-bootstrap fail-open: if no live workspace token exists anywhere on
|
||||
// the platform (fresh install / pre-Phase-30 upgrade), every request passes
|
||||
// through so existing deployments keep working.
|
||||
//
|
||||
// 2. ADMIN_TOKEN env var (recommended, closes #684): when set, the bearer
|
||||
// MUST equal this value exactly (constant-time comparison). Workspace
|
||||
// bearer tokens are intentionally rejected even if valid — a compromised
|
||||
// workspace agent must not be able to read global secrets, steal GitHub App
|
||||
// installation tokens, or enumerate pending approvals across the platform.
|
||||
// Set ADMIN_TOKEN to a strong random secret (e.g. openssl rand -base64 32).
|
||||
//
|
||||
// 3. Fallback — workspace token (deprecated, backward-compat): when
|
||||
// ADMIN_TOKEN is not set and workspace tokens do exist globally, any valid
|
||||
// workspace bearer token is still accepted. This preserves existing
|
||||
// behaviour for deployments that have not yet configured ADMIN_TOKEN, but
|
||||
// it leaves the blast-radius isolation gap described in #684 open. Set
|
||||
// ADMIN_TOKEN to eliminate this fallback.
|
||||
//
|
||||
// NOTE: canvasOriginAllowed / isSameOriginCanvas are intentionally NOT called
|
||||
// here. The Origin header is trivially forgeable by any container on the
|
||||
// Docker network; using it as an auth bypass would let an attacker reach
|
||||
// /settings/secrets, /bundles/import, /events, etc. without a bearer token.
|
||||
// Those short-circuits belong ONLY in CanvasOrBearer (cosmetic routes).
|
||||
// Those short-circuits belong ONLY in CanvasOrBearer (cosmetic routes). (#623)
|
||||
func AdminAuth(database *sql.DB) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
@ -88,18 +100,34 @@ func AdminAuth(database *sql.DB) gin.HandlerFunc {
|
||||
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "auth check failed"})
|
||||
return
|
||||
}
|
||||
if hasLive {
|
||||
// Bearer token is the ONLY accepted credential for admin routes.
|
||||
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
|
||||
if tok != "" {
|
||||
if err := wsauth.ValidateAnyToken(ctx, database, tok); err != nil {
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid admin auth token"})
|
||||
return
|
||||
}
|
||||
c.Next()
|
||||
if !hasLive {
|
||||
// Tier 1: fail-open on fresh install / pre-Phase-30 upgrade.
|
||||
c.Next()
|
||||
return
|
||||
}
|
||||
|
||||
// Bearer token is the ONLY accepted credential for admin routes.
|
||||
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
|
||||
if tok == "" {
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "admin auth required"})
|
||||
return
|
||||
}
|
||||
|
||||
// Tier 2 (#684 fix): dedicated ADMIN_TOKEN — workspace bearer tokens
|
||||
// must not grant access to admin routes.
|
||||
if adminSecret := os.Getenv("ADMIN_TOKEN"); adminSecret != "" {
|
||||
if subtle.ConstantTimeCompare([]byte(tok), []byte(adminSecret)) != 1 {
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid admin auth token"})
|
||||
return
|
||||
}
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "admin auth required"})
|
||||
c.Next()
|
||||
return
|
||||
}
|
||||
|
||||
// Tier 3 (deprecated): ADMIN_TOKEN not configured — fall back to any
|
||||
// valid workspace token. Operators should set ADMIN_TOKEN to close #684.
|
||||
if err := wsauth.ValidateAnyToken(ctx, database, tok); err != nil {
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid admin auth token"})
|
||||
return
|
||||
}
|
||||
c.Next()
|
||||
|
||||
@ -940,3 +940,233 @@ func TestAdminAuth_623_ValidBearer_WithOrigin_Passes(t *testing.T) {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ── Issue #684 — AdminAuth accepts any workspace bearer as admin credential ──
|
||||
//
|
||||
// Root cause: AdminAuth called ValidateAnyToken which matched any live
|
||||
// workspace token. A compromised workspace agent could present its own bearer
|
||||
// and reach /admin/github-installation-token, /approvals/pending, etc.
|
||||
//
|
||||
// Fix: when ADMIN_TOKEN env var is set the middleware verifies the bearer
|
||||
// against that secret exclusively (constant-time). Workspace tokens are
|
||||
// rejected even if valid. When ADMIN_TOKEN is not set the old behaviour is
|
||||
// preserved for backward-compat (deprecated fallback, tier 3).
|
||||
|
||||
// TestAdminAuth_684_AdminTokenSet_WorkspaceTokenRejected — the primary
|
||||
// regression test: when ADMIN_TOKEN is configured, a valid workspace bearer
|
||||
// token MUST be rejected with 401 on admin routes (#684).
|
||||
func TestAdminAuth_684_AdminTokenSet_WorkspaceTokenRejected(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock.New: %v", err)
|
||||
}
|
||||
defer mockDB.Close()
|
||||
|
||||
t.Setenv("ADMIN_TOKEN", "super-secret-admin-token-xyz")
|
||||
|
||||
// Platform has live workspace tokens — AdminAuth is active.
|
||||
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
|
||||
|
||||
// ValidateAnyToken must NOT be called — workspace tokens must be rejected
|
||||
// before any DB lookup when ADMIN_TOKEN is set.
|
||||
|
||||
r := gin.New()
|
||||
r.GET("/admin/github-installation-token", AdminAuth(mockDB), func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"token": "ghp_live_token"})
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
// #684 attack: compromised workspace agent sends its own bearer.
|
||||
req, _ := http.NewRequest(http.MethodGet, "/admin/github-installation-token", nil)
|
||||
req.Header.Set("Authorization", "Bearer some-valid-workspace-bearer-token")
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusUnauthorized {
|
||||
t.Errorf("#684 workspace token w/ ADMIN_TOKEN set: expected 401, got %d: %s",
|
||||
w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAdminAuth_684_AdminTokenSet_CorrectAdminTokenAccepted — when ADMIN_TOKEN
|
||||
// is set, presenting the exact ADMIN_TOKEN value must grant access (200).
|
||||
func TestAdminAuth_684_AdminTokenSet_CorrectAdminTokenAccepted(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock.New: %v", err)
|
||||
}
|
||||
defer mockDB.Close()
|
||||
|
||||
const adminSecret = "super-secret-admin-token-xyz"
|
||||
t.Setenv("ADMIN_TOKEN", adminSecret)
|
||||
|
||||
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
|
||||
|
||||
// No DB token lookup — ADMIN_TOKEN check is env-only, no DB round-trip.
|
||||
|
||||
r := gin.New()
|
||||
r.GET("/admin/github-installation-token", AdminAuth(mockDB), func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"token": "ghp_live_token"})
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req, _ := http.NewRequest(http.MethodGet, "/admin/github-installation-token", nil)
|
||||
req.Header.Set("Authorization", "Bearer "+adminSecret)
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("#684 correct ADMIN_TOKEN: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAdminAuth_684_AdminTokenSet_WrongAdminToken_Returns401 — when ADMIN_TOKEN
|
||||
// is set, presenting a different value must return 401.
|
||||
func TestAdminAuth_684_AdminTokenSet_WrongAdminToken_Returns401(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock.New: %v", err)
|
||||
}
|
||||
defer mockDB.Close()
|
||||
|
||||
t.Setenv("ADMIN_TOKEN", "correct-admin-secret")
|
||||
|
||||
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
|
||||
|
||||
r := gin.New()
|
||||
r.GET("/admin/liveness", AdminAuth(mockDB), func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"subsystems": gin.H{}})
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req, _ := http.NewRequest(http.MethodGet, "/admin/liveness", nil)
|
||||
req.Header.Set("Authorization", "Bearer wrong-admin-secret")
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusUnauthorized {
|
||||
t.Errorf("#684 wrong ADMIN_TOKEN: expected 401, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAdminAuth_684_AdminTokenSet_NoBearer_Returns401 — when ADMIN_TOKEN is
|
||||
// set, a request with no bearer must still return 401.
|
||||
func TestAdminAuth_684_AdminTokenSet_NoBearer_Returns401(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock.New: %v", err)
|
||||
}
|
||||
defer mockDB.Close()
|
||||
|
||||
t.Setenv("ADMIN_TOKEN", "correct-admin-secret")
|
||||
|
||||
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
|
||||
|
||||
r := gin.New()
|
||||
r.GET("/approvals/pending", AdminAuth(mockDB), func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"approvals": []interface{}{}})
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req, _ := http.NewRequest(http.MethodGet, "/approvals/pending", nil)
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusUnauthorized {
|
||||
t.Errorf("#684 no bearer w/ ADMIN_TOKEN set: expected 401, got %d: %s",
|
||||
w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAdminAuth_684_AdminTokenNotSet_FallsBackToWorkspaceToken — when
|
||||
// ADMIN_TOKEN is NOT set, a valid workspace token is still accepted (deprecated
|
||||
// tier-3 fallback for backward compatibility).
|
||||
func TestAdminAuth_684_AdminTokenNotSet_FallsBackToWorkspaceToken(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock.New: %v", err)
|
||||
}
|
||||
defer mockDB.Close()
|
||||
|
||||
// ADMIN_TOKEN explicitly unset — tier-3 fallback active.
|
||||
t.Setenv("ADMIN_TOKEN", "")
|
||||
|
||||
workspaceToken := "any-live-workspace-token"
|
||||
tokenHash := sha256.Sum256([]byte(workspaceToken))
|
||||
|
||||
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
|
||||
|
||||
mock.ExpectQuery(validateAnyTokenSelectQuery).
|
||||
WithArgs(tokenHash[:]).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("tok-ws-1"))
|
||||
|
||||
mock.ExpectExec(validateTokenUpdateQuery).
|
||||
WithArgs("tok-ws-1").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
r := gin.New()
|
||||
r.GET("/admin/secrets", AdminAuth(mockDB), func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"ok": true})
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req, _ := http.NewRequest(http.MethodGet, "/admin/secrets", nil)
|
||||
req.Header.Set("Authorization", "Bearer "+workspaceToken)
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("#684 fallback (no ADMIN_TOKEN): expected 200, got %d: %s",
|
||||
w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAdminAuth_684_FailOpen_AdminTokenSet_NoGlobalTokens — even when
|
||||
// ADMIN_TOKEN is set, a fresh install (no tokens globally) must still
|
||||
// fail-open (tier-1 contract unchanged).
|
||||
func TestAdminAuth_684_FailOpen_AdminTokenSet_NoGlobalTokens(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock.New: %v", err)
|
||||
}
|
||||
defer mockDB.Close()
|
||||
|
||||
t.Setenv("ADMIN_TOKEN", "some-admin-secret")
|
||||
|
||||
// HasAnyLiveTokenGlobal returns 0 — fresh install.
|
||||
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
r := gin.New()
|
||||
r.GET("/admin/secrets", AdminAuth(mockDB), func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"ok": true})
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req, _ := http.NewRequest(http.MethodGet, "/admin/secrets", nil)
|
||||
// No bearer — but fail-open should still pass.
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("#684 fail-open w/ ADMIN_TOKEN set (no global tokens): expected 200, got %d: %s",
|
||||
w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,6 +112,11 @@ func (s *Scheduler) Start(ctx context.Context) {
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// #722 — startup repair: find any enabled schedule whose next_run_at was
|
||||
// NULL'd by the pre-fix bug and recompute it now. Without this pass those
|
||||
// schedules would never fire again even after the binary is updated.
|
||||
s.repairNullNextRunAt(ctx)
|
||||
|
||||
// Heartbeat + initial lastTickAt so /admin/liveness and Healthy() both
|
||||
// pass during the first 30s interval after startup.
|
||||
supervised.Heartbeat("scheduler")
|
||||
@ -279,12 +284,19 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
var nextRunPtr *time.Time
|
||||
if nextErr == nil {
|
||||
nextRunPtr = &nextRun
|
||||
} else {
|
||||
// #722: if ComputeNextRun fails, keep the existing next_run_at so the
|
||||
// schedule is not silently removed from the fire query (NULL next_run_at
|
||||
// is excluded by the tick WHERE clause). COALESCE($2, next_run_at) does
|
||||
// this: when $2 is NULL the DB column value is preserved as-is.
|
||||
log.Printf("Scheduler: ComputeNextRun error for '%s' (%s) — preserving existing next_run_at: %v",
|
||||
sched.Name, sched.ID, nextErr)
|
||||
}
|
||||
|
||||
_, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_schedules
|
||||
SET last_run_at = now(),
|
||||
next_run_at = $2,
|
||||
next_run_at = COALESCE($2, next_run_at),
|
||||
run_count = run_count + 1,
|
||||
last_status = $3,
|
||||
last_error = $4,
|
||||
@ -334,6 +346,11 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
|
||||
var nextRunPtr *time.Time
|
||||
if nextErr == nil {
|
||||
nextRunPtr = &nextRun
|
||||
} else {
|
||||
// #722: same guard as in fireSchedule — preserve existing next_run_at
|
||||
// rather than writing NULL when the cron expression cannot be parsed.
|
||||
log.Printf("Scheduler: ComputeNextRun error in recordSkipped for '%s' (%s) — preserving existing next_run_at: %v",
|
||||
sched.Name, sched.ID, nextErr)
|
||||
}
|
||||
|
||||
// Advance next_run_at + bump run_count so the liveness view reflects
|
||||
@ -342,7 +359,7 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
|
||||
_, _ = db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_schedules
|
||||
SET last_run_at = now(),
|
||||
next_run_at = $2,
|
||||
next_run_at = COALESCE($2, next_run_at),
|
||||
run_count = run_count + 1,
|
||||
last_status = 'skipped',
|
||||
last_error = $3,
|
||||
@ -371,6 +388,60 @@ func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, active
|
||||
}
|
||||
}
|
||||
|
||||
// repairNullNextRunAt is called once during Start() to recompute next_run_at
|
||||
// for any enabled schedule where it is NULL — a state left by the pre-#722 bug
|
||||
// where a ComputeNextRun error caused an UPDATE that wrote NULL.
|
||||
// Without this repair those schedules would never appear in the tick query
|
||||
// (which requires next_run_at IS NOT NULL) even after the binary is patched.
|
||||
func (s *Scheduler) repairNullNextRunAt(ctx context.Context) {
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id, cron_expr, timezone
|
||||
FROM workspace_schedules
|
||||
WHERE enabled = true AND next_run_at IS NULL
|
||||
`)
|
||||
if err != nil {
|
||||
log.Printf("Scheduler: startup repair query error: %v", err)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type repairRow struct {
|
||||
ID string
|
||||
CronExpr string
|
||||
Timezone string
|
||||
}
|
||||
|
||||
var repaired, failed int
|
||||
for rows.Next() {
|
||||
var r repairRow
|
||||
if err := rows.Scan(&r.ID, &r.CronExpr, &r.Timezone); err != nil {
|
||||
log.Printf("Scheduler: startup repair scan error: %v", err)
|
||||
continue
|
||||
}
|
||||
nextRun, err := ComputeNextRun(r.CronExpr, r.Timezone, time.Now())
|
||||
if err != nil {
|
||||
log.Printf("Scheduler: startup repair: cannot compute next_run_at for schedule %s (%s): %v — leaving NULL",
|
||||
r.ID, r.CronExpr, err)
|
||||
failed++
|
||||
continue
|
||||
}
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_schedules SET next_run_at = $2, updated_at = now() WHERE id = $1
|
||||
`, r.ID, nextRun); err != nil {
|
||||
log.Printf("Scheduler: startup repair: update failed for schedule %s: %v", r.ID, err)
|
||||
failed++
|
||||
} else {
|
||||
repaired++
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Printf("Scheduler: startup repair rows error: %v", err)
|
||||
}
|
||||
if repaired > 0 || failed > 0 {
|
||||
log.Printf("Scheduler: startup repair: %d schedule(s) repaired, %d skipped (bad cron/tz)", repaired, failed)
|
||||
}
|
||||
}
|
||||
|
||||
func truncate(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
|
||||
@ -2,6 +2,7 @@ package scheduler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -10,6 +11,9 @@ import (
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// errDBDown is a sentinel error used by tests to simulate a DB connection failure.
|
||||
var errDBDown = sql.ErrConnDone
|
||||
|
||||
// setupTestDB replaces the global db.DB with a sqlmock and returns the mock
|
||||
// handle. The real DB is restored (by closing the mock conn) via t.Cleanup.
|
||||
func setupTestDB(t *testing.T) sqlmock.Sqlmock {
|
||||
@ -237,6 +241,187 @@ func TestRecordSkipped_writesSkippedStatus(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ── successProxy ─────────────────────────────────────────────────────────────
|
||||
|
||||
// successProxy is a test double whose ProxyA2ARequest always returns HTTP 200
|
||||
// with no error, simulating a healthy A2A round-trip.
|
||||
type successProxy struct{}
|
||||
|
||||
func (p *successProxy) ProxyA2ARequest(
|
||||
_ context.Context, _ string, _ []byte, _ string, _ bool,
|
||||
) (int, []byte, error) {
|
||||
return 200, []byte(`{"ok":true}`), nil
|
||||
}
|
||||
|
||||
// ── TestFireSchedule_ComputeNextRunError (#722 Bug 1) ─────────────────────────
|
||||
//
|
||||
// When ComputeNextRun fails (bad cron expression), fireSchedule must NOT write
|
||||
// NULL to next_run_at — it must use COALESCE so the existing DB value is kept.
|
||||
// Proof: the UPDATE ExecContext must still be called (schedule not abandoned)
|
||||
// and sqlmock satisfies all expectations (no unexpected SQL).
|
||||
|
||||
func TestFireSchedule_ComputeNextRunError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
sched := scheduleRow{
|
||||
ID: "11111111-dead-beef-0000-000000000001",
|
||||
WorkspaceID: "22222222-dead-beef-0000-000000000002",
|
||||
Name: "bad-cron-job",
|
||||
CronExpr: "not-a-valid-cron", // guaranteed to fail ComputeNextRun
|
||||
Timezone: "UTC",
|
||||
Prompt: "do something",
|
||||
}
|
||||
|
||||
// active_tasks check → 0 (workspace is idle; proceed to fire)
|
||||
mock.ExpectQuery(`SELECT COALESCE`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"coalesce"}).AddRow(0))
|
||||
|
||||
// UPDATE must fire — COALESCE($2, next_run_at) keeps existing value when $2 is nil.
|
||||
// AnyArg for $2 because it will be nil (ComputeNextRun failed).
|
||||
mock.ExpectExec(`UPDATE workspace_schedules`).
|
||||
WithArgs(sched.ID, sqlmock.AnyArg(), "ok", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
// activity_logs INSERT always fires
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), "ok", "").
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
s := New(&successProxy{}, nil)
|
||||
s.fireSchedule(context.Background(), sched)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations — schedule update was skipped or next_run_at not preserved: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestRecordSkipped_ComputeNextRunError (#722 Bug 1 — skipped path) ─────────
|
||||
//
|
||||
// Same invariant as TestFireSchedule_ComputeNextRunError but for the
|
||||
// recordSkipped path: a bad cron expression must not NULL out next_run_at.
|
||||
|
||||
func TestRecordSkipped_ComputeNextRunError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
sched := scheduleRow{
|
||||
ID: "33333333-dead-beef-0000-000000000003",
|
||||
WorkspaceID: "44444444-dead-beef-0000-000000000004",
|
||||
Name: "bad-cron-skip",
|
||||
CronExpr: "not-a-valid-cron",
|
||||
Timezone: "UTC",
|
||||
Prompt: "skipped task",
|
||||
}
|
||||
|
||||
mock.ExpectExec(`UPDATE workspace_schedules`).
|
||||
WithArgs(sched.ID, sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`INSERT INTO activity_logs`).
|
||||
WithArgs(sched.WorkspaceID, sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
s := New(nil, nil)
|
||||
s.recordSkipped(context.Background(), sched, 2)
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestRepairNullNextRunAt_RepairsRows (#722 Bug 3) ──────────────────────────
|
||||
//
|
||||
// repairNullNextRunAt must SELECT enabled schedules with NULL next_run_at,
|
||||
// compute the next fire time, and UPDATE each row.
|
||||
|
||||
func TestRepairNullNextRunAt_RepairsRows(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
// Two schedules whose next_run_at is NULL and whose cron exprs are valid.
|
||||
mock.ExpectQuery(`SELECT id, cron_expr, timezone`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "cron_expr", "timezone"}).
|
||||
AddRow("sched-repair-01", "0 * * * *", "UTC").
|
||||
AddRow("sched-repair-02", "30 9 * * 1", "America/New_York"))
|
||||
|
||||
// Expect one UPDATE per repaired row.
|
||||
mock.ExpectExec(`UPDATE workspace_schedules`).
|
||||
WithArgs("sched-repair-01", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
mock.ExpectExec(`UPDATE workspace_schedules`).
|
||||
WithArgs("sched-repair-02", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
s := New(nil, nil)
|
||||
s.repairNullNextRunAt(context.Background())
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestRepairNullNextRunAt_DBError_NoPanic (#722 Bug 3) ──────────────────────
|
||||
//
|
||||
// A DB error from the SELECT must be logged but must not panic — the scheduler
|
||||
// startup should proceed normally.
|
||||
|
||||
func TestRepairNullNextRunAt_DBError_NoPanic(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
mock.ExpectQuery(`SELECT id, cron_expr, timezone`).
|
||||
WillReturnError(errDBDown)
|
||||
|
||||
s := New(nil, nil)
|
||||
// Must not panic:
|
||||
s.repairNullNextRunAt(context.Background())
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
// repairNullNextRunAt + hibernation (#711 + #722 integration)
|
||||
// ──────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// TestRepairNullNextRunAt_HibernatedWorkspace_ScheduleRepaired verifies that
|
||||
// repairNullNextRunAt() repairs schedules belonging to hibernated workspaces.
|
||||
//
|
||||
// Context: the repair query is:
|
||||
//
|
||||
// SELECT id, cron_expr, timezone
|
||||
// FROM workspace_schedules
|
||||
// WHERE enabled = true AND next_run_at IS NULL
|
||||
//
|
||||
// Critically, there is NO "AND workspace.status != 'hibernated'" filter.
|
||||
// This is intentional — a hibernated workspace should wake up on schedule
|
||||
// (via the auto-wake A2A path). If the repair skipped hibernated workspaces,
|
||||
// any schedule whose next_run_at was NULL'd before hibernation would never
|
||||
// fire again even after the workspace wakes.
|
||||
//
|
||||
// This test simulates a schedule with a NULL next_run_at whose owning workspace
|
||||
// is currently hibernated, and asserts the UPDATE fires to set next_run_at.
|
||||
func TestRepairNullNextRunAt_HibernatedWorkspace_ScheduleRepaired(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
|
||||
// The repair SELECT has no workspace status filter — a hibernated workspace's
|
||||
// schedule appears in the result set normally.
|
||||
mock.ExpectQuery(`SELECT id, cron_expr, timezone`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "cron_expr", "timezone"}).
|
||||
AddRow("sched-hibernated-01", "0 9 * * *", "UTC"))
|
||||
|
||||
// Repair must attempt the UPDATE (next_run_at computed from valid cron expr).
|
||||
mock.ExpectExec(`UPDATE workspace_schedules`).
|
||||
WithArgs("sched-hibernated-01", sqlmock.AnyArg()).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
|
||||
s := New(nil, nil)
|
||||
s.repairNullNextRunAt(context.Background())
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet DB expectations: %v\n"+
|
||||
"repairNullNextRunAt must not filter out hibernated workspaces — "+
|
||||
"their schedules must still be repaired so they fire on wake", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ── TestRecordSkipped_shortWorkspaceIDNoPanic ─────────────────────────────────
|
||||
// Guards against the short() regression: recordSkipped must not panic if
|
||||
// WorkspaceID is unexpectedly shorter than the 12-char prefix used in logs.
|
||||
|
||||
Loading…
Reference in New Issue
Block a user