diff --git a/platform/internal/handlers/activity.go b/platform/internal/handlers/activity.go index 9be1daf8..b92538d8 100644 --- a/platform/internal/handlers/activity.go +++ b/platform/internal/handlers/activity.go @@ -329,7 +329,18 @@ func (h *ActivityHandler) Report(c *gin.Context) { if reqBody == nil { reqBody = body.Metadata } + // C2 (from #169) — source_id spoof defense. WorkspaceAuth middleware + // already proves the caller owns :id, but that check doesn't cover the + // body field. Without this guard, workspace A authenticated for its own + // /activity endpoint could still set source_id= in + // the payload and attribute the log to B. Reject any body where + // source_id is non-empty AND differs from the authenticated workspace. + // Empty source_id falls through to the default-to-self branch below. sourceID := body.SourceID + if sourceID != "" && sourceID != workspaceID { + c.JSON(http.StatusForbidden, gin.H{"error": "source_id must match authenticated workspace"}) + return + } if sourceID == "" { sourceID = workspaceID } diff --git a/platform/internal/handlers/schedules.go b/platform/internal/handlers/schedules.go index 5c9e7319..281f471b 100644 --- a/platform/internal/handlers/schedules.go +++ b/platform/internal/handlers/schedules.go @@ -275,8 +275,12 @@ func (h *ScheduleHandler) History(c *gin.Context) { workspaceID := c.Param("id") ctx := c.Request.Context() + // #152: include error_detail in history so UI can show why a run failed. + // activity_logs.error_detail is populated by scheduler.fireSchedule when + // the A2A proxy returns non-2xx or the update SQL reports an error. rows, err := db.DB.QueryContext(ctx, ` SELECT created_at, duration_ms, status, + COALESCE(error_detail, '') as error_detail, COALESCE(request_body::text, '{}') as request_body FROM activity_logs WHERE workspace_id = $1 @@ -292,17 +296,18 @@ func (h *ScheduleHandler) History(c *gin.Context) { defer rows.Close() type historyEntry struct { - Timestamp time.Time `json:"timestamp"` - DurationMs *int `json:"duration_ms"` - Status *string `json:"status"` - Request json.RawMessage `json:"request"` + Timestamp time.Time `json:"timestamp"` + DurationMs *int `json:"duration_ms"` + Status *string `json:"status"` + ErrorDetail string `json:"error_detail"` + Request json.RawMessage `json:"request"` } entries := make([]historyEntry, 0) for rows.Next() { var e historyEntry var reqStr string - if err := rows.Scan(&e.Timestamp, &e.DurationMs, &e.Status, &reqStr); err != nil { + if err := rows.Scan(&e.Timestamp, &e.DurationMs, &e.Status, &e.ErrorDetail, &reqStr); err != nil { continue } e.Request = json.RawMessage(reqStr) diff --git a/platform/internal/middleware/wsauth_middleware.go b/platform/internal/middleware/wsauth_middleware.go index e74aefac..9eee64f7 100644 --- a/platform/internal/middleware/wsauth_middleware.go +++ b/platform/internal/middleware/wsauth_middleware.go @@ -4,6 +4,8 @@ import ( "database/sql" "log" "net/http" + "os" + "strings" "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" "github.com/gin-gonic/gin" @@ -84,3 +86,82 @@ func AdminAuth(database *sql.DB) gin.HandlerFunc { c.Next() } } + +// CanvasOrBearer is a softer admin-auth variant used ONLY for cosmetic +// canvas routes where forging the request has zero security impact (PUT +// /canvas/viewport: worst case an attacker resets the shared viewport +// position, user refreshes the page, problem solved). +// +// Accepts either: +// +// 1. A valid bearer token (same contract as AdminAuth) — covers molecli, +// agent-to-platform calls, and anyone using the API directly. +// 2. A browser Origin header that matches CORS_ORIGINS (canvas itself). +// This is NOT a strict auth boundary — curl can forge Origin — but for +// cosmetic-only routes the trade-off is acceptable. Non-cosmetic routes +// MUST NOT use this middleware (see #194 review on why it would re-open +// #164 CRITICAL if applied to /bundles/import). +// +// Lazy-bootstrap fail-open preserved: zero-token installs pass everything +// through so fresh self-hosted / dev sessions aren't bricked. +func CanvasOrBearer(database *sql.DB) gin.HandlerFunc { + return func(c *gin.Context) { + ctx := c.Request.Context() + + hasLive, err := wsauth.HasAnyLiveTokenGlobal(ctx, database) + if err != nil { + log.Printf("wsauth: CanvasOrBearer HasAnyLiveTokenGlobal failed: %v — allowing request", err) + c.Next() + return + } + if !hasLive { + c.Next() + return + } + + // Path 1: valid bearer. + if tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization")); tok != "" { + if err := wsauth.ValidateAnyToken(ctx, database, tok); err == nil { + c.Next() + return + } + } + + // Path 2: canvas origin match. Read CORS_ORIGINS at request time so + // tests can override via t.Setenv. canvasOriginAllowed returns true + // iff Origin is non-empty AND exactly matches one of the configured + // origins. Empty Origin (same-origin / server-to-server) does NOT + // pass this check — those callers must use the bearer path. + if canvasOriginAllowed(c.GetHeader("Origin")) { + c.Next() + return + } + + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "admin auth required"}) + } +} + +// canvasOriginAllowed returns true if origin matches any entry in the +// CORS_ORIGINS env var (comma-separated) or the localhost defaults. +// Exact-match only; no prefix or wildcard logic — that's handled by the +// real CORS middleware upstream. The intent here is "did this request come +// from the canvas page the user is already logged into?" — a binary check. +func canvasOriginAllowed(origin string) bool { + if origin == "" { + return false + } + allowed := []string{"http://localhost:3000", "http://localhost:3001"} + if v := os.Getenv("CORS_ORIGINS"); v != "" { + for _, o := range strings.Split(v, ",") { + if o = strings.TrimSpace(o); o != "" { + allowed = append(allowed, o) + } + } + } + for _, a := range allowed { + if a == origin { + return true + } + } + return false +} diff --git a/platform/internal/middleware/wsauth_middleware_test.go b/platform/internal/middleware/wsauth_middleware_test.go index 5fbb861f..783065a4 100644 --- a/platform/internal/middleware/wsauth_middleware_test.go +++ b/platform/internal/middleware/wsauth_middleware_test.go @@ -678,3 +678,126 @@ func TestAdminAuth_Issue120_PatchWorkspace_NoBearer_Returns401(t *testing.T) { t.Errorf("unmet sqlmock expectations: %v", err) } } + +// ── CanvasOrBearer (#168) ──────────────────────────────────────────────────── +// Narrow softer variant of AdminAuth used ONLY on PUT /canvas/viewport. +// Accepts bearer or a matching Origin header. MUST NOT be used anywhere a +// forged request would leak data or create resources. + +func TestCanvasOrBearer_NoTokens_FailOpen(t *testing.T) { + mockDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock: %v", err) + } + defer mockDB.Close() + + mock.ExpectQuery(hasAnyLiveTokenGlobalQuery). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + + r := gin.New() + r.PUT("/canvas/viewport", CanvasOrBearer(mockDB), func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"ok": true}) + }) + + w := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPut, "/canvas/viewport", nil) + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("bootstrap fail-open: got %d, want 200 (%s)", w.Code, w.Body.String()) + } +} + +func TestCanvasOrBearer_TokensExist_NoCreds_Returns401(t *testing.T) { + mockDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock: %v", err) + } + defer mockDB.Close() + + mock.ExpectQuery(hasAnyLiveTokenGlobalQuery). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1)) + + r := gin.New() + r.PUT("/canvas/viewport", CanvasOrBearer(mockDB), func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"ok": true}) + }) + + w := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPut, "/canvas/viewport", nil) + r.ServeHTTP(w, req) + + if w.Code != http.StatusUnauthorized { + t.Errorf("no creds: got %d, want 401", w.Code) + } +} + +func TestCanvasOrBearer_TokensExist_CanvasOrigin_Passes(t *testing.T) { + mockDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock: %v", err) + } + defer mockDB.Close() + + mock.ExpectQuery(hasAnyLiveTokenGlobalQuery). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1)) + + t.Setenv("CORS_ORIGINS", "https://acme.moleculesai.app,https://bob.moleculesai.app") + + r := gin.New() + r.PUT("/canvas/viewport", CanvasOrBearer(mockDB), func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"ok": true}) + }) + + w := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPut, "/canvas/viewport", nil) + req.Header.Set("Origin", "https://acme.moleculesai.app") + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("canvas origin: got %d, want 200 (%s)", w.Code, w.Body.String()) + } +} + +func TestCanvasOrBearer_TokensExist_WrongOrigin_Returns401(t *testing.T) { + mockDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock: %v", err) + } + defer mockDB.Close() + + mock.ExpectQuery(hasAnyLiveTokenGlobalQuery). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1)) + + t.Setenv("CORS_ORIGINS", "https://acme.moleculesai.app") + + r := gin.New() + r.PUT("/canvas/viewport", CanvasOrBearer(mockDB), func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"ok": true}) + }) + + w := httptest.NewRecorder() + req, _ := http.NewRequest(http.MethodPut, "/canvas/viewport", nil) + req.Header.Set("Origin", "https://evil.example.com") + r.ServeHTTP(w, req) + + if w.Code != http.StatusUnauthorized { + t.Errorf("wrong origin: got %d, want 401", w.Code) + } +} + +func TestCanvasOriginAllowed_EmptyOriginRejected(t *testing.T) { + if canvasOriginAllowed("") { + t.Error("empty Origin must not pass") + } +} + +func TestCanvasOriginAllowed_LocalhostDefault(t *testing.T) { + t.Setenv("CORS_ORIGINS", "") + if !canvasOriginAllowed("http://localhost:3000") { + t.Error("localhost:3000 should be allowed by default") + } + if canvasOriginAllowed("http://evil.example.com") { + t.Error("random origin should not be allowed") + } +} diff --git a/platform/internal/router/router.go b/platform/internal/router/router.go index fc3e579c..9db76af4 100644 --- a/platform/internal/router/router.go +++ b/platform/internal/router/router.go @@ -294,16 +294,17 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi th := handlers.NewTerminalHandler(dockerCli) wsAuth.GET("/terminal", th.HandleConnect) - // Canvas Viewport — #166: PUT gated behind AdminAuth so an anon caller - // can't reset the shared viewport state for all users. GET remains open - // because the canvas bootstraps without a bearer and needs the initial - // viewport for first paint. + // Canvas Viewport — #166 + #168: GET stays fully open for bootstrap. + // PUT uses CanvasOrBearer (accepts Origin-match OR bearer token) so the + // browser canvas can persist drag/zoom state without a bearer, while + // bearer-carrying clients (molecli, integration tests) still work. + // Viewport corruption is cosmetic-only — worst case a user refreshes + // the page — so the softer check is acceptable here. This middleware + // MUST NOT be used on routes that leak prompts, create workspaces, + // or write files (#164/#165/#190 class). vh := handlers.NewViewportHandler() r.GET("/canvas/viewport", vh.Get) - { - viewportAdmin := r.Group("", middleware.AdminAuth(db.DB)) - viewportAdmin.PUT("/canvas/viewport", vh.Save) - } + r.PUT("/canvas/viewport", middleware.CanvasOrBearer(db.DB), vh.Save) // Templates tmplh := handlers.NewTemplatesHandler(configsDir, dockerCli) diff --git a/platform/internal/scheduler/scheduler.go b/platform/internal/scheduler/scheduler.go index c9fd7da1..43285f47 100644 --- a/platform/internal/scheduler/scheduler.go +++ b/platform/internal/scheduler/scheduler.go @@ -222,6 +222,27 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { } }() + // #115 concurrency-aware skip — before firing check if the target + // workspace is already executing a task. If so, skip this tick instead + // of colliding (which used to surface as "workspace agent busy" errors + // and register as a hard fail). advance next_run_at so the next cron + // slot gets a fresh chance; log a skipped cron_run row so history shows + // the gap instead of a silent miss. COALESCE guards against NULL. + var activeTasks int + if err := db.DB.QueryRowContext(ctx, + `SELECT COALESCE(active_tasks, 0) FROM workspaces WHERE id = $1`, + sched.WorkspaceID, + ).Scan(&activeTasks); err == nil && activeTasks > 0 { + wsID := sched.WorkspaceID + if len(wsID) > 12 { + wsID = wsID[:12] + } + log.Printf("Scheduler: skipping '%s' on busy workspace %s (active_tasks=%d)", + sched.Name, wsID, activeTasks) + s.recordSkipped(ctx, sched, activeTasks) + return + } + fireCtx, cancel := context.WithTimeout(ctx, fireTimeout) defer cancel() @@ -290,10 +311,14 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { "cron_expr": sched.CronExpr, "prompt": truncate(sched.Prompt, 200), }) + // #152: persist lastError into error_detail on the activity_logs row + // so GET /workspaces/:id/schedules/:id/history can surface why a run + // failed (previously dropped — history returned status without any + // error context, making root-cause debugging impossible). _, _ = db.DB.ExecContext(ctx, ` - INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, created_at) - VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, now()) - `, sched.WorkspaceID, "Cron: "+sched.Name, string(cronMeta), lastStatus) + INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at) + VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, $5, now()) + `, sched.WorkspaceID, "Cron: "+sched.Name, string(cronMeta), lastStatus, lastError) if s.broadcaster != nil { s.broadcaster.RecordAndBroadcast(ctx, "CRON_EXECUTED", sched.WorkspaceID, map[string]interface{}{ @@ -304,6 +329,56 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { } } +// recordSkipped advances next_run_at and logs a cron_run activity entry +// with status='skipped' when the target workspace was already busy. +// Issue #115 — replaces the previous "busy → fire → fail → retry next +// tick" loop with "busy → skip → advance → try next slot". Keeps the +// history surface honest (a skip is not an error) and stops filling +// last_error with noise. +func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, activeTasks int) { + reason := fmt.Sprintf("skipped: workspace busy (active_tasks=%d)", activeTasks) + + nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now()) + var nextRunPtr *time.Time + if nextErr == nil { + nextRunPtr = &nextRun + } + + // Advance next_run_at + bump run_count so the liveness view reflects + // that we're still ticking. last_status='skipped', last_error carries + // the reason for operators debugging via the schedule history API. + _, _ = db.DB.ExecContext(ctx, ` + UPDATE workspace_schedules + SET last_run_at = now(), + next_run_at = $2, + run_count = run_count + 1, + last_status = 'skipped', + last_error = $3, + updated_at = now() + WHERE id = $1 + `, sched.ID, nextRunPtr, reason) + + cronMeta, _ := json.Marshal(map[string]interface{}{ + "schedule_id": sched.ID, + "schedule_name": sched.Name, + "cron_expr": sched.CronExpr, + "skipped": true, + "active_tasks": activeTasks, + }) + _, _ = db.DB.ExecContext(ctx, ` + INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at) + VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now()) + `, sched.WorkspaceID, "Cron skipped: "+sched.Name, string(cronMeta), reason) + + if s.broadcaster != nil { + _ = s.broadcaster.RecordAndBroadcast(ctx, "CRON_SKIPPED", sched.WorkspaceID, map[string]interface{}{ + "schedule_id": sched.ID, + "schedule_name": sched.Name, + "reason": reason, + }) + } +} + func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s diff --git a/workspace-template/main.py b/workspace-template/main.py index 77894997..23782e9d 100644 --- a/workspace-template/main.py +++ b/workspace-template/main.py @@ -12,7 +12,7 @@ import httpx import uvicorn from a2a.server.apps import A2AStarletteApplication from a2a.server.request_handlers import DefaultRequestHandler -from a2a.server.tasks import InMemoryTaskStore, InMemoryPushNotificationConfigStore, PushNotificationSender +from a2a.server.tasks import InMemoryTaskStore from a2a.types import AgentCard, AgentCapabilities, AgentSkill from adapters import get_adapter, AdapterConfig @@ -152,12 +152,20 @@ async def main(): # pragma: no cover defaultOutputModes=["text/plain", "application/json"], ) - # 7. Wrap in A2A + # 7. Wrap in A2A. + # + # Regression fix (#204): PR #198 tried to wire push_config_store + + # push_sender to satisfy #175 (push notification capability), but + # PushNotificationSender is an abstract base class in the a2a-sdk and + # can't be instantiated directly. Passing it crashed main.py on startup + # with `TypeError: Can't instantiate abstract class`. Dropped back to + # DefaultRequestHandler's own defaults — pushNotifications capability + # in the AgentCard below is still advertised via AgentCapabilities so + # clients know we COULD do pushes; actually implementing them requires + # a concrete sender subclass, tracked as a Phase-H follow-up to #175. handler = DefaultRequestHandler( agent_executor=executor, task_store=InMemoryTaskStore(), - push_config_store=InMemoryPushNotificationConfigStore(), - push_sender=PushNotificationSender(), ) app = A2AStarletteApplication(