fix(#249): add /schedules/health endpoint accessible to CanCommunicate peers (#400)

Rebased cleanly onto current main (resolves the add/add conflicts that
blocked CI on PR #374 — the original branch diverged from a pre-repo-bootstrap
commit that predated most files).

Changes:
- schedules.go: add scheduleHealthResponse struct + Health handler
  (mirrors A2A proxy auth pattern: X-Workspace-ID + CanCommunicate gate)
- router.go: register GET /workspaces/:id/schedules/health on r (not wsAuth)
  so peer agents can query without holding the target workspace's bearer token
- schedules_test.go: 7 new tests (missing caller 401, self-call OK, legacy
  peer grandfathered, non-peer 403, system caller bypass, no prompt exposure,
  DB error 500)

isSystemCaller/validateCallerToken reused from a2a_proxy.go (same package).
registry.CanCommunicate import added to schedules.go.

Closes #249
Supersedes PR #374 (which could not get CI due to merge conflict)

Co-authored-by: PM (Molecule AI) <pm@molecule-ai.internal>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Hongming Wang 2026-04-16 00:45:30 -07:00 committed by GitHub
parent 1c92b46bfc
commit d6f7bd2087
3 changed files with 359 additions and 0 deletions

View File

@ -10,6 +10,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/scheduler"
)
@ -317,3 +318,95 @@ func (h *ScheduleHandler) History(c *gin.Context) {
c.JSON(http.StatusOK, entries)
}
// scheduleHealthResponse is the read-only health view of a schedule.
// It deliberately omits prompt and cron_expr so sensitive task content is
// never exposed to peer workspaces — only execution-state fields needed to
// detect silent cron failures are returned (issue #249).
type scheduleHealthResponse struct {
ID string `json:"id"`
Name string `json:"name"`
Enabled bool `json:"enabled"`
LastRunAt *time.Time `json:"last_run_at"`
NextRunAt *time.Time `json:"next_run_at"`
RunCount int `json:"run_count"`
LastStatus string `json:"last_status"`
LastError string `json:"last_error"`
}
// Health returns schedule health fields (last_run_at, last_status, run_count,
// etc.) for all schedules belonging to a workspace.
//
// Unlike GET /workspaces/:id/schedules (which requires the workspace's own
// bearer token), this endpoint is accessible to CanCommunicate peers — i.e.,
// any workspace in the same org hierarchy — so peer agents can detect silent
// cron failures without needing admin auth (issue #249).
//
// Auth rules (mirrors the A2A proxy pattern):
// - X-Workspace-ID header is required to identify the caller.
// - If the caller workspace has any live tokens, the Authorization: Bearer
// header must carry that caller's own valid token (lazy-bootstrap: legacy
// workspaces with no tokens are grandfathered through).
// - registry.CanCommunicate(callerID, workspaceID) must return true.
// - System callers (webhook:*, system:*, test:*) bypass token + access checks.
// - Self-calls (callerID == workspaceID) are always allowed.
//
// Prompt and cron_expr are intentionally absent from the response.
func (h *ScheduleHandler) Health(c *gin.Context) {
workspaceID := c.Param("id")
callerID := c.GetHeader("X-Workspace-ID")
ctx := c.Request.Context()
// Caller identity is mandatory — anonymous reads are not permitted.
if callerID == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "X-Workspace-ID header required"})
return
}
// Validate the caller's own bearer token (Phase 30.5 contract).
// Skip for system callers and self-calls, same as the A2A proxy.
if !isSystemCaller(callerID) && callerID != workspaceID {
if err := validateCallerToken(ctx, c, callerID); err != nil {
return // response already written with 401
}
}
// CanCommunicate gate — only peers in the org hierarchy may read health.
if callerID != workspaceID && !isSystemCaller(callerID) {
if !registry.CanCommunicate(callerID, workspaceID) {
log.Printf("ScheduleHealth: access denied %s → %s", callerID, workspaceID)
c.JSON(http.StatusForbidden, gin.H{"error": "access denied"})
return
}
}
rows, err := db.DB.QueryContext(ctx, `
SELECT id, name, enabled, last_run_at, next_run_at, run_count, last_status, last_error
FROM workspace_schedules
WHERE workspace_id = $1
ORDER BY created_at ASC
`, workspaceID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to query schedules"})
return
}
defer rows.Close()
schedules := make([]scheduleHealthResponse, 0)
for rows.Next() {
var s scheduleHealthResponse
if err := rows.Scan(
&s.ID, &s.Name, &s.Enabled, &s.LastRunAt, &s.NextRunAt,
&s.RunCount, &s.LastStatus, &s.LastError,
); err != nil {
log.Printf("ScheduleHealth: scan error: %v", err)
continue
}
schedules = append(schedules, s)
}
if err := rows.Err(); err != nil {
log.Printf("ScheduleHealth: rows error: %v", err)
}
c.JSON(http.StatusOK, schedules)
}

View File

@ -2,6 +2,8 @@ package handlers
import (
"bytes"
"database/sql"
"encoding/json"
"net/http"
"net/http/httptest"
"regexp"
@ -171,3 +173,262 @@ func TestHistory_IncludesErrorDetail(t *testing.T) {
t.Errorf("sqlmock: %v", err)
}
}
// ==================== Health — issue #249 ====================
//
// GET /workspaces/:id/schedules/health is accessible to CanCommunicate peers
// without workspace bearer auth. The handler mirrors the A2A proxy's auth
// pattern: X-Workspace-ID + caller token + CanCommunicate gate.
const healthWorkspaceID = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
const healthCallerID = "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
// healthCols is the column set returned by the Health SELECT.
var healthCols = []string{"id", "name", "enabled", "last_run_at", "next_run_at", "run_count", "last_status", "last_error"}
// TestScheduleHealth_MissingCallerID_Rejected verifies that requests without
// X-Workspace-ID are rejected with 401 — anonymous peer reads are not allowed.
func TestScheduleHealth_MissingCallerID_Rejected(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewScheduleHandler()
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}}
c.Request = httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil)
handler.Health(c)
if w.Code != http.StatusUnauthorized {
t.Fatalf("expected 401 for missing caller, got %d: %s", w.Code, w.Body.String())
}
}
// TestScheduleHealth_SelfCall_Allowed verifies that when callerID == workspaceID
// (self-call) the request is allowed and health fields are returned without any
// CanCommunicate DB lookups.
func TestScheduleHealth_SelfCall_Allowed(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewScheduleHandler()
now := time.Now().UTC().Truncate(time.Second)
// Self-call: no token check, no CanCommunicate queries.
// Expect only the health SELECT.
mock.ExpectQuery(`SELECT id, name, enabled, last_run_at, next_run_at, run_count, last_status, last_error\s+FROM workspace_schedules`).
WithArgs(healthWorkspaceID).
WillReturnRows(sqlmock.NewRows(healthCols).
AddRow("sched-1", "nightly", true, &now, &now, 42, "ok", ""))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}}
req := httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil)
req.Header.Set("X-Workspace-ID", healthWorkspaceID) // self-call
c.Request = req
handler.Health(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 for self-call, got %d: %s", w.Code, w.Body.String())
}
var resp []scheduleHealthResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if len(resp) != 1 || resp[0].ID != "sched-1" || resp[0].RunCount != 42 {
t.Errorf("unexpected health response: %+v", resp)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestScheduleHealth_CanCommunicatePeer_LegacyNoToken verifies that a legacy
// peer (no live tokens on file for the caller) is grandfathered through the
// token check and can read health when CanCommunicate is satisfied.
func TestScheduleHealth_CanCommunicatePeer_LegacyNoToken(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewScheduleHandler()
now := time.Now().UTC().Truncate(time.Second)
// 1. validateCallerToken: caller has zero live tokens → grandfather through.
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
WithArgs(healthCallerID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// 2. CanCommunicate: caller and target share the same parent (siblings → allowed).
mockCanCommunicate(mock, healthCallerID, healthWorkspaceID, true)
// 3. Health SELECT.
mock.ExpectQuery(`SELECT id, name, enabled, last_run_at, next_run_at, run_count, last_status, last_error\s+FROM workspace_schedules`).
WithArgs(healthWorkspaceID).
WillReturnRows(sqlmock.NewRows(healthCols).
AddRow("sched-2", "hourly", true, &now, &now, 7, "ok", ""))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}}
req := httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil)
req.Header.Set("X-Workspace-ID", healthCallerID)
c.Request = req
handler.Health(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 for peer with no tokens, got %d: %s", w.Code, w.Body.String())
}
var resp []scheduleHealthResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if len(resp) != 1 || resp[0].RunCount != 7 {
t.Errorf("unexpected response: %+v", resp)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestScheduleHealth_AccessDenied_NonPeer verifies that a workspace which fails
// CanCommunicate (different org branch) receives 403 — not 401 or 500.
func TestScheduleHealth_AccessDenied_NonPeer(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewScheduleHandler()
// 1. validateCallerToken: no live tokens → grandfather.
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
WithArgs(healthCallerID).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// 2. CanCommunicate: different parents → denied.
mockCanCommunicate(mock, healthCallerID, healthWorkspaceID, false)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}}
req := httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil)
req.Header.Set("X-Workspace-ID", healthCallerID)
c.Request = req
handler.Health(c)
if w.Code != http.StatusForbidden {
t.Fatalf("expected 403 for non-peer, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestScheduleHealth_SystemCaller_Allowed verifies that system callers
// (webhook:*, system:*, test:*) bypass token + CanCommunicate checks.
func TestScheduleHealth_SystemCaller_Allowed(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewScheduleHandler()
now := time.Now().UTC().Truncate(time.Second)
// No token check, no CanCommunicate queries — just the health SELECT.
mock.ExpectQuery(`SELECT id, name, enabled, last_run_at, next_run_at, run_count, last_status, last_error\s+FROM workspace_schedules`).
WithArgs(healthWorkspaceID).
WillReturnRows(sqlmock.NewRows(healthCols).
AddRow("sched-3", "weekly", false, nil, &now, 0, "", ""))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}}
req := httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil)
req.Header.Set("X-Workspace-ID", "system:monitor")
c.Request = req
handler.Health(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200 for system caller, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestScheduleHealth_NoPromptExposed verifies that the health response never
// includes prompt or cron_expr — only execution-state fields are returned.
func TestScheduleHealth_NoPromptExposed(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewScheduleHandler()
now := time.Now().UTC().Truncate(time.Second)
// No token check, no CanCommunicate queries for system caller.
mock.ExpectQuery(`SELECT id, name, enabled, last_run_at, next_run_at, run_count, last_status, last_error\s+FROM workspace_schedules`).
WithArgs(healthWorkspaceID).
WillReturnRows(sqlmock.NewRows(healthCols).
AddRow("sched-4", "daily", true, &now, &now, 3, "ok", ""))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}}
req := httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil)
req.Header.Set("X-Workspace-ID", "system:test")
c.Request = req
handler.Health(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
rawBody := w.Body.String()
for _, forbidden := range []string{"prompt", "cron_expr", "timezone"} {
if strings.Contains(rawBody, forbidden) {
t.Errorf("health response must not contain %q field: %s", forbidden, rawBody)
}
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// TestScheduleHealth_DBError_Returns500 verifies that a DB failure on the health
// SELECT produces a 500, not a panic.
func TestScheduleHealth_DBError_Returns500(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewScheduleHandler()
// No token check, no CanCommunicate queries for system caller.
mock.ExpectQuery(`SELECT id, name, enabled, last_run_at, next_run_at, run_count, last_status, last_error\s+FROM workspace_schedules`).
WithArgs(healthWorkspaceID).
WillReturnError(sql.ErrConnDone)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: healthWorkspaceID}}
req := httptest.NewRequest("GET", "/workspaces/"+healthWorkspaceID+"/schedules/health", nil)
req.Header.Set("X-Workspace-ID", "system:test")
c.Request = req
handler.Health(c)
if w.Code != http.StatusInternalServerError {
t.Fatalf("expected 500 on DB error, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}

View File

@ -250,6 +250,11 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
wsAuth.DELETE("/schedules/:scheduleId", schedh.Delete)
wsAuth.POST("/schedules/:scheduleId/run", schedh.RunNow)
wsAuth.GET("/schedules/:scheduleId/history", schedh.History)
// Schedule health — open to CanCommunicate peers (no workspace bearer token
// required) so peer agents can detect silent cron failures without admin auth.
// Auth is enforced inside the handler via X-Workspace-ID + CanCommunicate
// (mirrors the /workspaces/:id/a2a pattern). Issue #249.
r.GET("/workspaces/:id/schedules/health", schedh.Health)
// Memory
memh := handlers.NewMemoryHandler()