From d6f7bd208725c7a8896a04d3e3d70722bf7ca2e3 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Thu, 16 Apr 2026 00:45:30 -0700 Subject: [PATCH] fix(#249): add /schedules/health endpoint accessible to CanCommunicate peers (#400) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rebased cleanly onto current main (resolves the add/add conflicts that blocked CI on PR #374 — the original branch diverged from a pre-repo-bootstrap commit that predated most files). Changes: - schedules.go: add scheduleHealthResponse struct + Health handler (mirrors A2A proxy auth pattern: X-Workspace-ID + CanCommunicate gate) - router.go: register GET /workspaces/:id/schedules/health on r (not wsAuth) so peer agents can query without holding the target workspace's bearer token - schedules_test.go: 7 new tests (missing caller 401, self-call OK, legacy peer grandfathered, non-peer 403, system caller bypass, no prompt exposure, DB error 500) isSystemCaller/validateCallerToken reused from a2a_proxy.go (same package). registry.CanCommunicate import added to schedules.go. Closes #249 Supersedes PR #374 (which could not get CI due to merge conflict) Co-authored-by: PM (Molecule AI) Co-authored-by: Claude Sonnet 4.6 --- platform/internal/handlers/schedules.go | 93 +++++++ platform/internal/handlers/schedules_test.go | 261 +++++++++++++++++++ platform/internal/router/router.go | 5 + 3 files changed, 359 insertions(+) diff --git a/platform/internal/handlers/schedules.go b/platform/internal/handlers/schedules.go index 281f471b8..c11d74cc2 100644 --- a/platform/internal/handlers/schedules.go +++ b/platform/internal/handlers/schedules.go @@ -10,6 +10,7 @@ import ( "github.com/gin-gonic/gin" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/registry" "github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler" ) @@ -317,3 +318,95 @@ func (h *ScheduleHandler) History(c *gin.Context) { c.JSON(http.StatusOK, entries) } +// scheduleHealthResponse is the read-only health view of a schedule. +// It deliberately omits prompt and cron_expr so sensitive task content is +// never exposed to peer workspaces — only execution-state fields needed to +// detect silent cron failures are returned (issue #249). +type scheduleHealthResponse struct { + ID string `json:"id"` + Name string `json:"name"` + Enabled bool `json:"enabled"` + LastRunAt *time.Time `json:"last_run_at"` + NextRunAt *time.Time `json:"next_run_at"` + RunCount int `json:"run_count"` + LastStatus string `json:"last_status"` + LastError string `json:"last_error"` +} + +// Health returns schedule health fields (last_run_at, last_status, run_count, +// etc.) for all schedules belonging to a workspace. +// +// Unlike GET /workspaces/:id/schedules (which requires the workspace's own +// bearer token), this endpoint is accessible to CanCommunicate peers — i.e., +// any workspace in the same org hierarchy — so peer agents can detect silent +// cron failures without needing admin auth (issue #249). +// +// Auth rules (mirrors the A2A proxy pattern): +// - X-Workspace-ID header is required to identify the caller. +// - If the caller workspace has any live tokens, the Authorization: Bearer +// header must carry that caller's own valid token (lazy-bootstrap: legacy +// workspaces with no tokens are grandfathered through). +// - registry.CanCommunicate(callerID, workspaceID) must return true. +// - System callers (webhook:*, system:*, test:*) bypass token + access checks. +// - Self-calls (callerID == workspaceID) are always allowed. +// +// Prompt and cron_expr are intentionally absent from the response. +func (h *ScheduleHandler) Health(c *gin.Context) { + workspaceID := c.Param("id") + callerID := c.GetHeader("X-Workspace-ID") + ctx := c.Request.Context() + + // Caller identity is mandatory — anonymous reads are not permitted. + if callerID == "" { + c.JSON(http.StatusUnauthorized, gin.H{"error": "X-Workspace-ID header required"}) + return + } + + // Validate the caller's own bearer token (Phase 30.5 contract). + // Skip for system callers and self-calls, same as the A2A proxy. + if !isSystemCaller(callerID) && callerID != workspaceID { + if err := validateCallerToken(ctx, c, callerID); err != nil { + return // response already written with 401 + } + } + + // CanCommunicate gate — only peers in the org hierarchy may read health. + if callerID != workspaceID && !isSystemCaller(callerID) { + if !registry.CanCommunicate(callerID, workspaceID) { + log.Printf("ScheduleHealth: access denied %s → %s", callerID, workspaceID) + c.JSON(http.StatusForbidden, gin.H{"error": "access denied"}) + return + } + } + + rows, err := db.DB.QueryContext(ctx, ` + SELECT id, name, enabled, last_run_at, next_run_at, run_count, last_status, last_error + FROM workspace_schedules + WHERE workspace_id = $1 + ORDER BY created_at ASC + `, workspaceID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to query schedules"}) + return + } + defer rows.Close() + + schedules := make([]scheduleHealthResponse, 0) + for rows.Next() { + var s scheduleHealthResponse + if err := rows.Scan( + &s.ID, &s.Name, &s.Enabled, &s.LastRunAt, &s.NextRunAt, + &s.RunCount, &s.LastStatus, &s.LastError, + ); err != nil { + log.Printf("ScheduleHealth: scan error: %v", err) + continue + } + schedules = append(schedules, s) + } + if err := rows.Err(); err != nil { + log.Printf("ScheduleHealth: rows error: %v", err) + } + + c.JSON(http.StatusOK, schedules) +} + diff --git a/platform/internal/handlers/schedules_test.go b/platform/internal/handlers/schedules_test.go index a3d307f6b..335a61210 100644 --- a/platform/internal/handlers/schedules_test.go +++ b/platform/internal/handlers/schedules_test.go @@ -2,6 +2,8 @@ package handlers import ( "bytes" + "database/sql" + "encoding/json" "net/http" "net/http/httptest" "regexp" @@ -171,3 +173,262 @@ func TestHistory_IncludesErrorDetail(t *testing.T) { t.Errorf("sqlmock: %v", err) } } + +// ==================== Health — issue #249 ==================== +// +// GET /workspaces/:id/schedules/health is accessible to CanCommunicate peers +// without workspace bearer auth. The handler mirrors the A2A proxy's auth +// pattern: X-Workspace-ID + caller token + CanCommunicate gate. + +const healthWorkspaceID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa" +const healthCallerID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb" + +// healthCols is the column set returned by the Health SELECT. +var healthCols = []string{"id", "name", "enabled", "last_run_at", "next_run_at", "run_count", "last_status", "last_error"} + +// TestScheduleHealth_MissingCallerID_Rejected verifies that requests without +// X-Workspace-ID are rejected with 401 — anonymous peer reads are not allowed. +func TestScheduleHealth_MissingCallerID_Rejected(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + handler := NewScheduleHandler() + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}} + c.Request = httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil) + + handler.Health(c) + + if w.Code != http.StatusUnauthorized { + t.Fatalf("expected 401 for missing caller, got %d: %s", w.Code, w.Body.String()) + } +} + +// TestScheduleHealth_SelfCall_Allowed verifies that when callerID == workspaceID +// (self-call) the request is allowed and health fields are returned without any +// CanCommunicate DB lookups. +func TestScheduleHealth_SelfCall_Allowed(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewScheduleHandler() + + now := time.Now().UTC().Truncate(time.Second) + // Self-call: no token check, no CanCommunicate queries. + // Expect only the health SELECT. + mock.ExpectQuery(`SELECT id, name, enabled, last_run_at, next_run_at, run_count, last_status, last_error\s+FROM workspace_schedules`). + WithArgs(healthWorkspaceID). + WillReturnRows(sqlmock.NewRows(healthCols). + AddRow("sched-1", "nightly", true, &now, &now, 42, "ok", "")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}} + req := httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil) + req.Header.Set("X-Workspace-ID", healthWorkspaceID) // self-call + c.Request = req + + handler.Health(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 for self-call, got %d: %s", w.Code, w.Body.String()) + } + + var resp []scheduleHealthResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to parse response: %v", err) + } + if len(resp) != 1 || resp[0].ID != "sched-1" || resp[0].RunCount != 42 { + t.Errorf("unexpected health response: %+v", resp) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestScheduleHealth_CanCommunicatePeer_LegacyNoToken verifies that a legacy +// peer (no live tokens on file for the caller) is grandfathered through the +// token check and can read health when CanCommunicate is satisfied. +func TestScheduleHealth_CanCommunicatePeer_LegacyNoToken(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewScheduleHandler() + + now := time.Now().UTC().Truncate(time.Second) + + // 1. validateCallerToken: caller has zero live tokens → grandfather through. + mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`). + WithArgs(healthCallerID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + + // 2. CanCommunicate: caller and target share the same parent (siblings → allowed). + mockCanCommunicate(mock, healthCallerID, healthWorkspaceID, true) + + // 3. Health SELECT. + mock.ExpectQuery(`SELECT id, name, enabled, last_run_at, next_run_at, run_count, last_status, last_error\s+FROM workspace_schedules`). + WithArgs(healthWorkspaceID). + WillReturnRows(sqlmock.NewRows(healthCols). + AddRow("sched-2", "hourly", true, &now, &now, 7, "ok", "")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}} + req := httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil) + req.Header.Set("X-Workspace-ID", healthCallerID) + c.Request = req + + handler.Health(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 for peer with no tokens, got %d: %s", w.Code, w.Body.String()) + } + + var resp []scheduleHealthResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to parse response: %v", err) + } + if len(resp) != 1 || resp[0].RunCount != 7 { + t.Errorf("unexpected response: %+v", resp) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestScheduleHealth_AccessDenied_NonPeer verifies that a workspace which fails +// CanCommunicate (different org branch) receives 403 — not 401 or 500. +func TestScheduleHealth_AccessDenied_NonPeer(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewScheduleHandler() + + // 1. validateCallerToken: no live tokens → grandfather. + mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`). + WithArgs(healthCallerID). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + + // 2. CanCommunicate: different parents → denied. + mockCanCommunicate(mock, healthCallerID, healthWorkspaceID, false) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}} + req := httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil) + req.Header.Set("X-Workspace-ID", healthCallerID) + c.Request = req + + handler.Health(c) + + if w.Code != http.StatusForbidden { + t.Fatalf("expected 403 for non-peer, got %d: %s", w.Code, w.Body.String()) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestScheduleHealth_SystemCaller_Allowed verifies that system callers +// (webhook:*, system:*, test:*) bypass token + CanCommunicate checks. +func TestScheduleHealth_SystemCaller_Allowed(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewScheduleHandler() + + now := time.Now().UTC().Truncate(time.Second) + + // No token check, no CanCommunicate queries — just the health SELECT. + mock.ExpectQuery(`SELECT id, name, enabled, last_run_at, next_run_at, run_count, last_status, last_error\s+FROM workspace_schedules`). + WithArgs(healthWorkspaceID). + WillReturnRows(sqlmock.NewRows(healthCols). + AddRow("sched-3", "weekly", false, nil, &now, 0, "", "")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}} + req := httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil) + req.Header.Set("X-Workspace-ID", "system:monitor") + c.Request = req + + handler.Health(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200 for system caller, got %d: %s", w.Code, w.Body.String()) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestScheduleHealth_NoPromptExposed verifies that the health response never +// includes prompt or cron_expr — only execution-state fields are returned. +func TestScheduleHealth_NoPromptExposed(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewScheduleHandler() + + now := time.Now().UTC().Truncate(time.Second) + + // No token check, no CanCommunicate queries for system caller. + mock.ExpectQuery(`SELECT id, name, enabled, last_run_at, next_run_at, run_count, last_status, last_error\s+FROM workspace_schedules`). + WithArgs(healthWorkspaceID). + WillReturnRows(sqlmock.NewRows(healthCols). + AddRow("sched-4", "daily", true, &now, &now, 3, "ok", "")) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}} + req := httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil) + req.Header.Set("X-Workspace-ID", "system:test") + c.Request = req + + handler.Health(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + rawBody := w.Body.String() + for _, forbidden := range []string{"prompt", "cron_expr", "timezone"} { + if strings.Contains(rawBody, forbidden) { + t.Errorf("health response must not contain %q field: %s", forbidden, rawBody) + } + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// TestScheduleHealth_DBError_Returns500 verifies that a DB failure on the health +// SELECT produces a 500, not a panic. +func TestScheduleHealth_DBError_Returns500(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewScheduleHandler() + + // No token check, no CanCommunicate queries for system caller. + mock.ExpectQuery(`SELECT id, name, enabled, last_run_at, next_run_at, run_count, last_status, last_error\s+FROM workspace_schedules`). + WithArgs(healthWorkspaceID). + WillReturnError(sql.ErrConnDone) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}} + req := httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil) + req.Header.Set("X-Workspace-ID", "system:test") + c.Request = req + + handler.Health(c) + + if w.Code != http.StatusInternalServerError { + t.Fatalf("expected 500 on DB error, got %d: %s", w.Code, w.Body.String()) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} diff --git a/platform/internal/router/router.go b/platform/internal/router/router.go index fe6dbb7a9..63c95e413 100644 --- a/platform/internal/router/router.go +++ b/platform/internal/router/router.go @@ -250,6 +250,11 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi wsAuth.DELETE("/schedules/:scheduleId", schedh.Delete) wsAuth.POST("/schedules/:scheduleId/run", schedh.RunNow) wsAuth.GET("/schedules/:scheduleId/history", schedh.History) + // Schedule health — open to CanCommunicate peers (no workspace bearer token + // required) so peer agents can detect silent cron failures without admin auth. + // Auth is enforced inside the handler via X-Workspace-ID + CanCommunicate + // (mirrors the /workspaces/:id/a2a pattern). Issue #249. + r.GET("/workspaces/:id/schedules/health", schedh.Health) // Memory memh := handlers.NewMemoryHandler()