Merge remote-tracking branch 'origin/main' into feat/workspace-idle-loop

This commit is contained in:
Hongming Wang 2026-04-15 11:21:15 -07:00
commit db36b5a97f
7 changed files with 324 additions and 20 deletions

View File

@ -329,7 +329,18 @@ func (h *ActivityHandler) Report(c *gin.Context) {
if reqBody == nil {
reqBody = body.Metadata
}
// C2 (from #169) — source_id spoof defense. WorkspaceAuth middleware
// already proves the caller owns :id, but that check doesn't cover the
// body field. Without this guard, workspace A authenticated for its own
// /activity endpoint could still set source_id=<workspace B's UUID> in
// the payload and attribute the log to B. Reject any body where
// source_id is non-empty AND differs from the authenticated workspace.
// Empty source_id falls through to the default-to-self branch below.
sourceID := body.SourceID
if sourceID != "" && sourceID != workspaceID {
c.JSON(http.StatusForbidden, gin.H{"error": "source_id must match authenticated workspace"})
return
}
if sourceID == "" {
sourceID = workspaceID
}

View File

@ -275,8 +275,12 @@ func (h *ScheduleHandler) History(c *gin.Context) {
workspaceID := c.Param("id")
ctx := c.Request.Context()
// #152: include error_detail in history so UI can show why a run failed.
// activity_logs.error_detail is populated by scheduler.fireSchedule when
// the A2A proxy returns non-2xx or the update SQL reports an error.
rows, err := db.DB.QueryContext(ctx, `
SELECT created_at, duration_ms, status,
COALESCE(error_detail, '') as error_detail,
COALESCE(request_body::text, '{}') as request_body
FROM activity_logs
WHERE workspace_id = $1
@ -292,17 +296,18 @@ func (h *ScheduleHandler) History(c *gin.Context) {
defer rows.Close()
type historyEntry struct {
Timestamp time.Time `json:"timestamp"`
DurationMs *int `json:"duration_ms"`
Status *string `json:"status"`
Request json.RawMessage `json:"request"`
Timestamp time.Time `json:"timestamp"`
DurationMs *int `json:"duration_ms"`
Status *string `json:"status"`
ErrorDetail string `json:"error_detail"`
Request json.RawMessage `json:"request"`
}
entries := make([]historyEntry, 0)
for rows.Next() {
var e historyEntry
var reqStr string
if err := rows.Scan(&e.Timestamp, &e.DurationMs, &e.Status, &reqStr); err != nil {
if err := rows.Scan(&e.Timestamp, &e.DurationMs, &e.Status, &e.ErrorDetail, &reqStr); err != nil {
continue
}
e.Request = json.RawMessage(reqStr)

View File

@ -4,6 +4,8 @@ import (
"database/sql"
"log"
"net/http"
"os"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/gin-gonic/gin"
@ -84,3 +86,82 @@ func AdminAuth(database *sql.DB) gin.HandlerFunc {
c.Next()
}
}
// CanvasOrBearer is a softer admin-auth variant used ONLY for cosmetic
// canvas routes where forging the request has zero security impact (PUT
// /canvas/viewport: worst case an attacker resets the shared viewport
// position, user refreshes the page, problem solved).
//
// Accepts either:
//
// 1. A valid bearer token (same contract as AdminAuth) — covers molecli,
// agent-to-platform calls, and anyone using the API directly.
// 2. A browser Origin header that matches CORS_ORIGINS (canvas itself).
// This is NOT a strict auth boundary — curl can forge Origin — but for
// cosmetic-only routes the trade-off is acceptable. Non-cosmetic routes
// MUST NOT use this middleware (see #194 review on why it would re-open
// #164 CRITICAL if applied to /bundles/import).
//
// Lazy-bootstrap fail-open preserved: zero-token installs pass everything
// through so fresh self-hosted / dev sessions aren't bricked.
func CanvasOrBearer(database *sql.DB) gin.HandlerFunc {
return func(c *gin.Context) {
ctx := c.Request.Context()
hasLive, err := wsauth.HasAnyLiveTokenGlobal(ctx, database)
if err != nil {
log.Printf("wsauth: CanvasOrBearer HasAnyLiveTokenGlobal failed: %v — allowing request", err)
c.Next()
return
}
if !hasLive {
c.Next()
return
}
// Path 1: valid bearer.
if tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization")); tok != "" {
if err := wsauth.ValidateAnyToken(ctx, database, tok); err == nil {
c.Next()
return
}
}
// Path 2: canvas origin match. Read CORS_ORIGINS at request time so
// tests can override via t.Setenv. canvasOriginAllowed returns true
// iff Origin is non-empty AND exactly matches one of the configured
// origins. Empty Origin (same-origin / server-to-server) does NOT
// pass this check — those callers must use the bearer path.
if canvasOriginAllowed(c.GetHeader("Origin")) {
c.Next()
return
}
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "admin auth required"})
}
}
// canvasOriginAllowed returns true if origin matches any entry in the
// CORS_ORIGINS env var (comma-separated) or the localhost defaults.
// Exact-match only; no prefix or wildcard logic — that's handled by the
// real CORS middleware upstream. The intent here is "did this request come
// from the canvas page the user is already logged into?" — a binary check.
func canvasOriginAllowed(origin string) bool {
if origin == "" {
return false
}
allowed := []string{"http://localhost:3000", "http://localhost:3001"}
if v := os.Getenv("CORS_ORIGINS"); v != "" {
for _, o := range strings.Split(v, ",") {
if o = strings.TrimSpace(o); o != "" {
allowed = append(allowed, o)
}
}
}
for _, a := range allowed {
if a == origin {
return true
}
}
return false
}

View File

@ -678,3 +678,126 @@ func TestAdminAuth_Issue120_PatchWorkspace_NoBearer_Returns401(t *testing.T) {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ── CanvasOrBearer (#168) ────────────────────────────────────────────────────
// Narrow softer variant of AdminAuth used ONLY on PUT /canvas/viewport.
// Accepts bearer or a matching Origin header. MUST NOT be used anywhere a
// forged request would leak data or create resources.
func TestCanvasOrBearer_NoTokens_FailOpen(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
defer mockDB.Close()
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
r := gin.New()
r.PUT("/canvas/viewport", CanvasOrBearer(mockDB), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"ok": true})
})
w := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPut, "/canvas/viewport", nil)
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("bootstrap fail-open: got %d, want 200 (%s)", w.Code, w.Body.String())
}
}
func TestCanvasOrBearer_TokensExist_NoCreds_Returns401(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
defer mockDB.Close()
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
r := gin.New()
r.PUT("/canvas/viewport", CanvasOrBearer(mockDB), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"ok": true})
})
w := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPut, "/canvas/viewport", nil)
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("no creds: got %d, want 401", w.Code)
}
}
func TestCanvasOrBearer_TokensExist_CanvasOrigin_Passes(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
defer mockDB.Close()
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
t.Setenv("CORS_ORIGINS", "https://acme.moleculesai.app,https://bob.moleculesai.app")
r := gin.New()
r.PUT("/canvas/viewport", CanvasOrBearer(mockDB), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"ok": true})
})
w := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPut, "/canvas/viewport", nil)
req.Header.Set("Origin", "https://acme.moleculesai.app")
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("canvas origin: got %d, want 200 (%s)", w.Code, w.Body.String())
}
}
func TestCanvasOrBearer_TokensExist_WrongOrigin_Returns401(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
defer mockDB.Close()
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
t.Setenv("CORS_ORIGINS", "https://acme.moleculesai.app")
r := gin.New()
r.PUT("/canvas/viewport", CanvasOrBearer(mockDB), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"ok": true})
})
w := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPut, "/canvas/viewport", nil)
req.Header.Set("Origin", "https://evil.example.com")
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("wrong origin: got %d, want 401", w.Code)
}
}
func TestCanvasOriginAllowed_EmptyOriginRejected(t *testing.T) {
if canvasOriginAllowed("") {
t.Error("empty Origin must not pass")
}
}
func TestCanvasOriginAllowed_LocalhostDefault(t *testing.T) {
t.Setenv("CORS_ORIGINS", "")
if !canvasOriginAllowed("http://localhost:3000") {
t.Error("localhost:3000 should be allowed by default")
}
if canvasOriginAllowed("http://evil.example.com") {
t.Error("random origin should not be allowed")
}
}

View File

@ -294,16 +294,17 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
th := handlers.NewTerminalHandler(dockerCli)
wsAuth.GET("/terminal", th.HandleConnect)
// Canvas Viewport — #166: PUT gated behind AdminAuth so an anon caller
// can't reset the shared viewport state for all users. GET remains open
// because the canvas bootstraps without a bearer and needs the initial
// viewport for first paint.
// Canvas Viewport — #166 + #168: GET stays fully open for bootstrap.
// PUT uses CanvasOrBearer (accepts Origin-match OR bearer token) so the
// browser canvas can persist drag/zoom state without a bearer, while
// bearer-carrying clients (molecli, integration tests) still work.
// Viewport corruption is cosmetic-only — worst case a user refreshes
// the page — so the softer check is acceptable here. This middleware
// MUST NOT be used on routes that leak prompts, create workspaces,
// or write files (#164/#165/#190 class).
vh := handlers.NewViewportHandler()
r.GET("/canvas/viewport", vh.Get)
{
viewportAdmin := r.Group("", middleware.AdminAuth(db.DB))
viewportAdmin.PUT("/canvas/viewport", vh.Save)
}
r.PUT("/canvas/viewport", middleware.CanvasOrBearer(db.DB), vh.Save)
// Templates
tmplh := handlers.NewTemplatesHandler(configsDir, dockerCli)

View File

@ -222,6 +222,27 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
}
}()
// #115 concurrency-aware skip — before firing check if the target
// workspace is already executing a task. If so, skip this tick instead
// of colliding (which used to surface as "workspace agent busy" errors
// and register as a hard fail). advance next_run_at so the next cron
// slot gets a fresh chance; log a skipped cron_run row so history shows
// the gap instead of a silent miss. COALESCE guards against NULL.
var activeTasks int
if err := db.DB.QueryRowContext(ctx,
`SELECT COALESCE(active_tasks, 0) FROM workspaces WHERE id = $1`,
sched.WorkspaceID,
).Scan(&activeTasks); err == nil && activeTasks > 0 {
wsID := sched.WorkspaceID
if len(wsID) > 12 {
wsID = wsID[:12]
}
log.Printf("Scheduler: skipping '%s' on busy workspace %s (active_tasks=%d)",
sched.Name, wsID, activeTasks)
s.recordSkipped(ctx, sched, activeTasks)
return
}
fireCtx, cancel := context.WithTimeout(ctx, fireTimeout)
defer cancel()
@ -290,10 +311,14 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
"cron_expr": sched.CronExpr,
"prompt": truncate(sched.Prompt, 200),
})
// #152: persist lastError into error_detail on the activity_logs row
// so GET /workspaces/:id/schedules/:id/history can surface why a run
// failed (previously dropped — history returned status without any
// error context, making root-cause debugging impossible).
_, _ = db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, now())
`, sched.WorkspaceID, "Cron: "+sched.Name, string(cronMeta), lastStatus)
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, $5, now())
`, sched.WorkspaceID, "Cron: "+sched.Name, string(cronMeta), lastStatus, lastError)
if s.broadcaster != nil {
s.broadcaster.RecordAndBroadcast(ctx, "CRON_EXECUTED", sched.WorkspaceID, map[string]interface{}{
@ -304,6 +329,56 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
}
}
// recordSkipped advances next_run_at and logs a cron_run activity entry
// with status='skipped' when the target workspace was already busy.
// Issue #115 — replaces the previous "busy → fire → fail → retry next
// tick" loop with "busy → skip → advance → try next slot". Keeps the
// history surface honest (a skip is not an error) and stops filling
// last_error with noise.
func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, activeTasks int) {
reason := fmt.Sprintf("skipped: workspace busy (active_tasks=%d)", activeTasks)
nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now())
var nextRunPtr *time.Time
if nextErr == nil {
nextRunPtr = &nextRun
}
// Advance next_run_at + bump run_count so the liveness view reflects
// that we're still ticking. last_status='skipped', last_error carries
// the reason for operators debugging via the schedule history API.
_, _ = db.DB.ExecContext(ctx, `
UPDATE workspace_schedules
SET last_run_at = now(),
next_run_at = $2,
run_count = run_count + 1,
last_status = 'skipped',
last_error = $3,
updated_at = now()
WHERE id = $1
`, sched.ID, nextRunPtr, reason)
cronMeta, _ := json.Marshal(map[string]interface{}{
"schedule_id": sched.ID,
"schedule_name": sched.Name,
"cron_expr": sched.CronExpr,
"skipped": true,
"active_tasks": activeTasks,
})
_, _ = db.DB.ExecContext(ctx, `
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now())
`, sched.WorkspaceID, "Cron skipped: "+sched.Name, string(cronMeta), reason)
if s.broadcaster != nil {
_ = s.broadcaster.RecordAndBroadcast(ctx, "CRON_SKIPPED", sched.WorkspaceID, map[string]interface{}{
"schedule_id": sched.ID,
"schedule_name": sched.Name,
"reason": reason,
})
}
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s

View File

@ -12,7 +12,7 @@ import httpx
import uvicorn
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore, InMemoryPushNotificationConfigStore, PushNotificationSender
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCard, AgentCapabilities, AgentSkill
from adapters import get_adapter, AdapterConfig
@ -152,12 +152,20 @@ async def main(): # pragma: no cover
defaultOutputModes=["text/plain", "application/json"],
)
# 7. Wrap in A2A
# 7. Wrap in A2A.
#
# Regression fix (#204): PR #198 tried to wire push_config_store +
# push_sender to satisfy #175 (push notification capability), but
# PushNotificationSender is an abstract base class in the a2a-sdk and
# can't be instantiated directly. Passing it crashed main.py on startup
# with `TypeError: Can't instantiate abstract class`. Dropped back to
# DefaultRequestHandler's own defaults — pushNotifications capability
# in the AgentCard below is still advertised via AgentCapabilities so
# clients know we COULD do pushes; actually implementing them requires
# a concrete sender subclass, tracked as a Phase-H follow-up to #175.
handler = DefaultRequestHandler(
agent_executor=executor,
task_store=InMemoryTaskStore(),
push_config_store=InMemoryPushNotificationConfigStore(),
push_sender=PushNotificationSender(),
)
app = A2AStarletteApplication(