Merge branch 'main' into feat/hermes-phase1-provider-registry
This commit is contained in:
commit
c2fee56d59
@ -329,7 +329,18 @@ func (h *ActivityHandler) Report(c *gin.Context) {
|
||||
if reqBody == nil {
|
||||
reqBody = body.Metadata
|
||||
}
|
||||
// C2 (from #169) — source_id spoof defense. WorkspaceAuth middleware
|
||||
// already proves the caller owns :id, but that check doesn't cover the
|
||||
// body field. Without this guard, workspace A authenticated for its own
|
||||
// /activity endpoint could still set source_id=<workspace B's UUID> in
|
||||
// the payload and attribute the log to B. Reject any body where
|
||||
// source_id is non-empty AND differs from the authenticated workspace.
|
||||
// Empty source_id falls through to the default-to-self branch below.
|
||||
sourceID := body.SourceID
|
||||
if sourceID != "" && sourceID != workspaceID {
|
||||
c.JSON(http.StatusForbidden, gin.H{"error": "source_id must match authenticated workspace"})
|
||||
return
|
||||
}
|
||||
if sourceID == "" {
|
||||
sourceID = workspaceID
|
||||
}
|
||||
|
||||
@ -275,8 +275,12 @@ func (h *ScheduleHandler) History(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// #152: include error_detail in history so UI can show why a run failed.
|
||||
// activity_logs.error_detail is populated by scheduler.fireSchedule when
|
||||
// the A2A proxy returns non-2xx or the update SQL reports an error.
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT created_at, duration_ms, status,
|
||||
COALESCE(error_detail, '') as error_detail,
|
||||
COALESCE(request_body::text, '{}') as request_body
|
||||
FROM activity_logs
|
||||
WHERE workspace_id = $1
|
||||
@ -292,17 +296,18 @@ func (h *ScheduleHandler) History(c *gin.Context) {
|
||||
defer rows.Close()
|
||||
|
||||
type historyEntry struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
DurationMs *int `json:"duration_ms"`
|
||||
Status *string `json:"status"`
|
||||
Request json.RawMessage `json:"request"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
DurationMs *int `json:"duration_ms"`
|
||||
Status *string `json:"status"`
|
||||
ErrorDetail string `json:"error_detail"`
|
||||
Request json.RawMessage `json:"request"`
|
||||
}
|
||||
|
||||
entries := make([]historyEntry, 0)
|
||||
for rows.Next() {
|
||||
var e historyEntry
|
||||
var reqStr string
|
||||
if err := rows.Scan(&e.Timestamp, &e.DurationMs, &e.Status, &reqStr); err != nil {
|
||||
if err := rows.Scan(&e.Timestamp, &e.DurationMs, &e.Status, &e.ErrorDetail, &reqStr); err != nil {
|
||||
continue
|
||||
}
|
||||
e.Request = json.RawMessage(reqStr)
|
||||
|
||||
@ -4,6 +4,8 @@ import (
|
||||
"database/sql"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
|
||||
"github.com/gin-gonic/gin"
|
||||
@ -84,3 +86,82 @@ func AdminAuth(database *sql.DB) gin.HandlerFunc {
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
|
||||
// CanvasOrBearer is a softer admin-auth variant used ONLY for cosmetic
|
||||
// canvas routes where forging the request has zero security impact (PUT
|
||||
// /canvas/viewport: worst case an attacker resets the shared viewport
|
||||
// position, user refreshes the page, problem solved).
|
||||
//
|
||||
// Accepts either:
|
||||
//
|
||||
// 1. A valid bearer token (same contract as AdminAuth) — covers molecli,
|
||||
// agent-to-platform calls, and anyone using the API directly.
|
||||
// 2. A browser Origin header that matches CORS_ORIGINS (canvas itself).
|
||||
// This is NOT a strict auth boundary — curl can forge Origin — but for
|
||||
// cosmetic-only routes the trade-off is acceptable. Non-cosmetic routes
|
||||
// MUST NOT use this middleware (see #194 review on why it would re-open
|
||||
// #164 CRITICAL if applied to /bundles/import).
|
||||
//
|
||||
// Lazy-bootstrap fail-open preserved: zero-token installs pass everything
|
||||
// through so fresh self-hosted / dev sessions aren't bricked.
|
||||
func CanvasOrBearer(database *sql.DB) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
hasLive, err := wsauth.HasAnyLiveTokenGlobal(ctx, database)
|
||||
if err != nil {
|
||||
log.Printf("wsauth: CanvasOrBearer HasAnyLiveTokenGlobal failed: %v — allowing request", err)
|
||||
c.Next()
|
||||
return
|
||||
}
|
||||
if !hasLive {
|
||||
c.Next()
|
||||
return
|
||||
}
|
||||
|
||||
// Path 1: valid bearer.
|
||||
if tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization")); tok != "" {
|
||||
if err := wsauth.ValidateAnyToken(ctx, database, tok); err == nil {
|
||||
c.Next()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Path 2: canvas origin match. Read CORS_ORIGINS at request time so
|
||||
// tests can override via t.Setenv. canvasOriginAllowed returns true
|
||||
// iff Origin is non-empty AND exactly matches one of the configured
|
||||
// origins. Empty Origin (same-origin / server-to-server) does NOT
|
||||
// pass this check — those callers must use the bearer path.
|
||||
if canvasOriginAllowed(c.GetHeader("Origin")) {
|
||||
c.Next()
|
||||
return
|
||||
}
|
||||
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "admin auth required"})
|
||||
}
|
||||
}
|
||||
|
||||
// canvasOriginAllowed returns true if origin matches any entry in the
|
||||
// CORS_ORIGINS env var (comma-separated) or the localhost defaults.
|
||||
// Exact-match only; no prefix or wildcard logic — that's handled by the
|
||||
// real CORS middleware upstream. The intent here is "did this request come
|
||||
// from the canvas page the user is already logged into?" — a binary check.
|
||||
func canvasOriginAllowed(origin string) bool {
|
||||
if origin == "" {
|
||||
return false
|
||||
}
|
||||
allowed := []string{"http://localhost:3000", "http://localhost:3001"}
|
||||
if v := os.Getenv("CORS_ORIGINS"); v != "" {
|
||||
for _, o := range strings.Split(v, ",") {
|
||||
if o = strings.TrimSpace(o); o != "" {
|
||||
allowed = append(allowed, o)
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, a := range allowed {
|
||||
if a == origin {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@ -678,3 +678,126 @@ func TestAdminAuth_Issue120_PatchWorkspace_NoBearer_Returns401(t *testing.T) {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ── CanvasOrBearer (#168) ────────────────────────────────────────────────────
|
||||
// Narrow softer variant of AdminAuth used ONLY on PUT /canvas/viewport.
|
||||
// Accepts bearer or a matching Origin header. MUST NOT be used anywhere a
|
||||
// forged request would leak data or create resources.
|
||||
|
||||
func TestCanvasOrBearer_NoTokens_FailOpen(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock: %v", err)
|
||||
}
|
||||
defer mockDB.Close()
|
||||
|
||||
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
|
||||
|
||||
r := gin.New()
|
||||
r.PUT("/canvas/viewport", CanvasOrBearer(mockDB), func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"ok": true})
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req, _ := http.NewRequest(http.MethodPut, "/canvas/viewport", nil)
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("bootstrap fail-open: got %d, want 200 (%s)", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestCanvasOrBearer_TokensExist_NoCreds_Returns401(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock: %v", err)
|
||||
}
|
||||
defer mockDB.Close()
|
||||
|
||||
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
|
||||
|
||||
r := gin.New()
|
||||
r.PUT("/canvas/viewport", CanvasOrBearer(mockDB), func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"ok": true})
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req, _ := http.NewRequest(http.MethodPut, "/canvas/viewport", nil)
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusUnauthorized {
|
||||
t.Errorf("no creds: got %d, want 401", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCanvasOrBearer_TokensExist_CanvasOrigin_Passes(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock: %v", err)
|
||||
}
|
||||
defer mockDB.Close()
|
||||
|
||||
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
|
||||
|
||||
t.Setenv("CORS_ORIGINS", "https://acme.moleculesai.app,https://bob.moleculesai.app")
|
||||
|
||||
r := gin.New()
|
||||
r.PUT("/canvas/viewport", CanvasOrBearer(mockDB), func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"ok": true})
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req, _ := http.NewRequest(http.MethodPut, "/canvas/viewport", nil)
|
||||
req.Header.Set("Origin", "https://acme.moleculesai.app")
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("canvas origin: got %d, want 200 (%s)", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestCanvasOrBearer_TokensExist_WrongOrigin_Returns401(t *testing.T) {
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock: %v", err)
|
||||
}
|
||||
defer mockDB.Close()
|
||||
|
||||
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
|
||||
|
||||
t.Setenv("CORS_ORIGINS", "https://acme.moleculesai.app")
|
||||
|
||||
r := gin.New()
|
||||
r.PUT("/canvas/viewport", CanvasOrBearer(mockDB), func(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"ok": true})
|
||||
})
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req, _ := http.NewRequest(http.MethodPut, "/canvas/viewport", nil)
|
||||
req.Header.Set("Origin", "https://evil.example.com")
|
||||
r.ServeHTTP(w, req)
|
||||
|
||||
if w.Code != http.StatusUnauthorized {
|
||||
t.Errorf("wrong origin: got %d, want 401", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCanvasOriginAllowed_EmptyOriginRejected(t *testing.T) {
|
||||
if canvasOriginAllowed("") {
|
||||
t.Error("empty Origin must not pass")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCanvasOriginAllowed_LocalhostDefault(t *testing.T) {
|
||||
t.Setenv("CORS_ORIGINS", "")
|
||||
if !canvasOriginAllowed("http://localhost:3000") {
|
||||
t.Error("localhost:3000 should be allowed by default")
|
||||
}
|
||||
if canvasOriginAllowed("http://evil.example.com") {
|
||||
t.Error("random origin should not be allowed")
|
||||
}
|
||||
}
|
||||
|
||||
@ -294,16 +294,17 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
th := handlers.NewTerminalHandler(dockerCli)
|
||||
wsAuth.GET("/terminal", th.HandleConnect)
|
||||
|
||||
// Canvas Viewport — #166: PUT gated behind AdminAuth so an anon caller
|
||||
// can't reset the shared viewport state for all users. GET remains open
|
||||
// because the canvas bootstraps without a bearer and needs the initial
|
||||
// viewport for first paint.
|
||||
// Canvas Viewport — #166 + #168: GET stays fully open for bootstrap.
|
||||
// PUT uses CanvasOrBearer (accepts Origin-match OR bearer token) so the
|
||||
// browser canvas can persist drag/zoom state without a bearer, while
|
||||
// bearer-carrying clients (molecli, integration tests) still work.
|
||||
// Viewport corruption is cosmetic-only — worst case a user refreshes
|
||||
// the page — so the softer check is acceptable here. This middleware
|
||||
// MUST NOT be used on routes that leak prompts, create workspaces,
|
||||
// or write files (#164/#165/#190 class).
|
||||
vh := handlers.NewViewportHandler()
|
||||
r.GET("/canvas/viewport", vh.Get)
|
||||
{
|
||||
viewportAdmin := r.Group("", middleware.AdminAuth(db.DB))
|
||||
viewportAdmin.PUT("/canvas/viewport", vh.Save)
|
||||
}
|
||||
r.PUT("/canvas/viewport", middleware.CanvasOrBearer(db.DB), vh.Save)
|
||||
|
||||
// Templates
|
||||
tmplh := handlers.NewTemplatesHandler(configsDir, dockerCli)
|
||||
|
||||
@ -222,6 +222,27 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
}
|
||||
}()
|
||||
|
||||
// #115 concurrency-aware skip — before firing check if the target
|
||||
// workspace is already executing a task. If so, skip this tick instead
|
||||
// of colliding (which used to surface as "workspace agent busy" errors
|
||||
// and register as a hard fail). advance next_run_at so the next cron
|
||||
// slot gets a fresh chance; log a skipped cron_run row so history shows
|
||||
// the gap instead of a silent miss. COALESCE guards against NULL.
|
||||
var activeTasks int
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT COALESCE(active_tasks, 0) FROM workspaces WHERE id = $1`,
|
||||
sched.WorkspaceID,
|
||||
).Scan(&activeTasks); err == nil && activeTasks > 0 {
|
||||
wsID := sched.WorkspaceID
|
||||
if len(wsID) > 12 {
|
||||
wsID = wsID[:12]
|
||||
}
|
||||
log.Printf("Scheduler: skipping '%s' on busy workspace %s (active_tasks=%d)",
|
||||
sched.Name, wsID, activeTasks)
|
||||
s.recordSkipped(ctx, sched, activeTasks)
|
||||
return
|
||||
}
|
||||
|
||||
fireCtx, cancel := context.WithTimeout(ctx, fireTimeout)
|
||||
defer cancel()
|
||||
|
||||
@ -290,10 +311,14 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
"cron_expr": sched.CronExpr,
|
||||
"prompt": truncate(sched.Prompt, 200),
|
||||
})
|
||||
// #152: persist lastError into error_detail on the activity_logs row
|
||||
// so GET /workspaces/:id/schedules/:id/history can surface why a run
|
||||
// failed (previously dropped — history returned status without any
|
||||
// error context, making root-cause debugging impossible).
|
||||
_, _ = db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, created_at)
|
||||
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, now())
|
||||
`, sched.WorkspaceID, "Cron: "+sched.Name, string(cronMeta), lastStatus)
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
|
||||
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, $4, $5, now())
|
||||
`, sched.WorkspaceID, "Cron: "+sched.Name, string(cronMeta), lastStatus, lastError)
|
||||
|
||||
if s.broadcaster != nil {
|
||||
s.broadcaster.RecordAndBroadcast(ctx, "CRON_EXECUTED", sched.WorkspaceID, map[string]interface{}{
|
||||
@ -304,6 +329,56 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
|
||||
}
|
||||
}
|
||||
|
||||
// recordSkipped advances next_run_at and logs a cron_run activity entry
|
||||
// with status='skipped' when the target workspace was already busy.
|
||||
// Issue #115 — replaces the previous "busy → fire → fail → retry next
|
||||
// tick" loop with "busy → skip → advance → try next slot". Keeps the
|
||||
// history surface honest (a skip is not an error) and stops filling
|
||||
// last_error with noise.
|
||||
func (s *Scheduler) recordSkipped(ctx context.Context, sched scheduleRow, activeTasks int) {
|
||||
reason := fmt.Sprintf("skipped: workspace busy (active_tasks=%d)", activeTasks)
|
||||
|
||||
nextRun, nextErr := ComputeNextRun(sched.CronExpr, sched.Timezone, time.Now())
|
||||
var nextRunPtr *time.Time
|
||||
if nextErr == nil {
|
||||
nextRunPtr = &nextRun
|
||||
}
|
||||
|
||||
// Advance next_run_at + bump run_count so the liveness view reflects
|
||||
// that we're still ticking. last_status='skipped', last_error carries
|
||||
// the reason for operators debugging via the schedule history API.
|
||||
_, _ = db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_schedules
|
||||
SET last_run_at = now(),
|
||||
next_run_at = $2,
|
||||
run_count = run_count + 1,
|
||||
last_status = 'skipped',
|
||||
last_error = $3,
|
||||
updated_at = now()
|
||||
WHERE id = $1
|
||||
`, sched.ID, nextRunPtr, reason)
|
||||
|
||||
cronMeta, _ := json.Marshal(map[string]interface{}{
|
||||
"schedule_id": sched.ID,
|
||||
"schedule_name": sched.Name,
|
||||
"cron_expr": sched.CronExpr,
|
||||
"skipped": true,
|
||||
"active_tasks": activeTasks,
|
||||
})
|
||||
_, _ = db.DB.ExecContext(ctx, `
|
||||
INSERT INTO activity_logs (workspace_id, activity_type, source_id, method, summary, request_body, status, error_detail, created_at)
|
||||
VALUES ($1, 'cron_run', NULL, 'cron', $2, $3::jsonb, 'skipped', $4, now())
|
||||
`, sched.WorkspaceID, "Cron skipped: "+sched.Name, string(cronMeta), reason)
|
||||
|
||||
if s.broadcaster != nil {
|
||||
_ = s.broadcaster.RecordAndBroadcast(ctx, "CRON_SKIPPED", sched.WorkspaceID, map[string]interface{}{
|
||||
"schedule_id": sched.ID,
|
||||
"schedule_name": sched.Name,
|
||||
"reason": reason,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func truncate(s string, maxLen int) string {
|
||||
if len(s) <= maxLen {
|
||||
return s
|
||||
|
||||
Loading…
Reference in New Issue
Block a user