diff --git a/platform/internal/handlers/socket.go b/platform/internal/handlers/socket.go index e40732a8..6aa92ff2 100644 --- a/platform/internal/handlers/socket.go +++ b/platform/internal/handlers/socket.go @@ -6,8 +6,10 @@ import ( "os" "strings" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics" "github.com/Molecule-AI/molecule-monorepo/platform/internal/ws" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) @@ -40,15 +42,41 @@ func NewSocketHandler(hub *ws.Hub) *SocketHandler { // HandleConnect handles WebSocket upgrade at GET /ws. // Canvas clients connect without X-Workspace-ID — they receive all events. // Workspace agents send X-Workspace-ID — events are filtered by CanCommunicate. +// +// Fix D (Cycle 5): agent connections (X-Workspace-ID present) are now validated +// via bearer token before the WebSocket upgrade. Canvas clients (no X-Workspace-ID) +// remain unauthenticated. Pre-token workspaces are grandfathered through. func (h *SocketHandler) HandleConnect(c *gin.Context) { + workspaceID := c.GetHeader("X-Workspace-ID") + + // Authenticate workspace agents (not canvas browser clients). + if workspaceID != "" { + ctx := c.Request.Context() + hasLive, err := wsauth.HasAnyLiveToken(ctx, db.DB, workspaceID) + if err != nil { + log.Printf("wsauth: WebSocket HasAnyLiveToken(%s) failed: %v", workspaceID, err) + c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "auth check failed"}) + return + } + if hasLive { + tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization")) + if tok == "" { + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace auth token"}) + return + } + if err := wsauth.ValidateToken(ctx, db.DB, workspaceID, tok); err != nil { + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid workspace auth token"}) + return + } + } + } + conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { log.Printf("WebSocket upgrade error: %v", err) return } - workspaceID := c.GetHeader("X-Workspace-ID") - client := &ws.Client{ Conn: conn, WorkspaceID: workspaceID, diff --git a/platform/internal/middleware/wsauth_middleware.go b/platform/internal/middleware/wsauth_middleware.go new file mode 100644 index 00000000..52cf0d8b --- /dev/null +++ b/platform/internal/middleware/wsauth_middleware.go @@ -0,0 +1,51 @@ +package middleware + +import ( + "database/sql" + "log" + "net/http" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth" + "github.com/gin-gonic/gin" +) + +// WorkspaceAuth returns a Gin middleware that enforces per-workspace bearer-token +// authentication on /workspaces/:id/* sub-routes. +// +// Same lazy-bootstrap contract as secrets.Values: workspaces that have no live +// token on file are grandfathered through so in-flight agents keep working +// during a rolling upgrade. Once a workspace has at least one live token every +// request MUST present a valid one in Authorization: Bearer . +// +// Intended for route groups that cover all /workspaces/:id/* paths. +// The /workspaces/:id/a2a route must be registered on the root router (outside +// this group) because it already authenticates callers via CanCommunicate. +func WorkspaceAuth(database *sql.DB) gin.HandlerFunc { + return func(c *gin.Context) { + workspaceID := c.Param("id") + if workspaceID == "" { + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace ID"}) + return + } + ctx := c.Request.Context() + + hasLive, err := wsauth.HasAnyLiveToken(ctx, database, workspaceID) + if err != nil { + log.Printf("wsauth: WorkspaceAuth: HasAnyLiveToken(%s) failed: %v", workspaceID, err) + c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "auth check failed"}) + return + } + if hasLive { + tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization")) + if tok == "" { + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace auth token"}) + return + } + if err := wsauth.ValidateToken(ctx, database, workspaceID, tok); err != nil { + c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid workspace auth token"}) + return + } + } + c.Next() + } +} diff --git a/platform/internal/router/router.go b/platform/internal/router/router.go index 4c46e057..4dba378c 100644 --- a/platform/internal/router/router.go +++ b/platform/internal/router/router.go @@ -32,7 +32,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi r.Use(cors.New(cors.Config{ AllowOrigins: corsOrigins, AllowMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"}, - AllowHeaders: []string{"Origin", "Content-Type", "X-Workspace-ID"}, + AllowHeaders: []string{"Origin", "Content-Type", "X-Workspace-ID", "Authorization"}, AllowCredentials: true, })) @@ -62,61 +62,69 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // Scrape with: curl http://localhost:8080/metrics r.GET("/metrics", metrics.Handler()) - // Workspaces CRUD + // Workspaces CRUD — bare /workspaces and /workspaces/:id (no sub-path), unauthenticated for canvas r.POST("/workspaces", wh.Create) r.GET("/workspaces", wh.List) r.GET("/workspaces/:id", wh.Get) - // Phase 30.4 — lightweight token-gated state polling for remote agents - // that can't reach the platform's WebSocket. Returns {status, paused, - // deleted}. Separate from /workspaces/:id so the canvas path stays - // unauthenticated and returns its full config payload. - r.GET("/workspaces/:id/state", wh.State) r.PATCH("/workspaces/:id", wh.Update) r.DELETE("/workspaces/:id", wh.Delete) - r.POST("/workspaces/:id/restart", wh.Restart) - r.POST("/workspaces/:id/pause", wh.Pause) - r.POST("/workspaces/:id/resume", wh.Resume) + + // A2A proxy — registered outside the auth group; already enforces CanCommunicate access control. r.POST("/workspaces/:id/a2a", wh.ProxyA2A) - // Async Delegation - delh := handlers.NewDelegationHandler(wh, broadcaster) - r.POST("/workspaces/:id/delegate", delh.Delegate) - r.GET("/workspaces/:id/delegations", delh.ListDelegations) - // Record-only endpoint for agent-initiated delegations (#64). Agent-side - // delegate_to_workspace fires A2A directly for speed + OTEL propagation; - // this endpoint just adds an activity_logs row so GET /delegations returns - // the same set the agent's local `check_delegation_status` sees. - r.POST("/workspaces/:id/delegations/record", delh.Record) - r.POST("/workspaces/:id/delegations/:delegation_id/update", delh.UpdateStatus) + // Auth-gated workspace sub-routes — ALL /workspaces/:id/* paths except /a2a. + // Fix A (Cycle 5): single WorkspaceAuth middleware blocks C2-C5, C7-C9, C12, C13 + // by requiring a valid bearer token for any workspace that has one on file. + // Legacy workspaces (no token) are grandfathered to allow rolling upgrades. + wsAuth := r.Group("/workspaces/:id", middleware.WorkspaceAuth(db.DB)) + { + // Lifecycle + wsAuth.GET("/state", wh.State) + wsAuth.POST("/restart", wh.Restart) + wsAuth.POST("/pause", wh.Pause) + wsAuth.POST("/resume", wh.Resume) - // Traces (Langfuse proxy) - trh := handlers.NewTracesHandler() - r.GET("/workspaces/:id/traces", trh.List) + // Async Delegation + delh := handlers.NewDelegationHandler(wh, broadcaster) + wsAuth.POST("/delegate", delh.Delegate) + wsAuth.GET("/delegations", delh.ListDelegations) + // Record-only endpoint for agent-initiated delegations (#64). Agent-side + // delegate_to_workspace fires A2A directly for speed + OTEL propagation; + // this endpoint just adds an activity_logs row so GET /delegations returns + // the same set the agent's local `check_delegation_status` sees. + wsAuth.POST("/delegations/record", delh.Record) + wsAuth.POST("/delegations/:delegation_id/update", delh.UpdateStatus) - // Agent Memories (HMA) - memsh := handlers.NewMemoriesHandler() - r.POST("/workspaces/:id/memories", memsh.Commit) - r.GET("/workspaces/:id/memories", memsh.Search) - r.DELETE("/workspaces/:id/memories/:memoryId", memsh.Delete) + // Traces (Langfuse proxy) + trh := handlers.NewTracesHandler() + wsAuth.GET("/traces", trh.List) - // Approvals - apph := handlers.NewApprovalsHandler(broadcaster) - r.GET("/approvals/pending", apph.ListAll) - r.POST("/workspaces/:id/approvals", apph.Create) - r.GET("/workspaces/:id/approvals", apph.List) - r.POST("/workspaces/:id/approvals/:approvalId/decide", apph.Decide) + // Agent Memories (HMA) + memsh := handlers.NewMemoriesHandler() + wsAuth.POST("/memories", memsh.Commit) + wsAuth.GET("/memories", memsh.Search) + wsAuth.DELETE("/memories/:memoryId", memsh.Delete) - // Team Expansion - teamh := handlers.NewTeamHandler(broadcaster, prov, platformURL, configsDir) - r.POST("/workspaces/:id/expand", teamh.Expand) - r.POST("/workspaces/:id/collapse", teamh.Collapse) + // Approvals + apph := handlers.NewApprovalsHandler(broadcaster) + wsAuth.POST("/approvals", apph.Create) + wsAuth.GET("/approvals", apph.List) + wsAuth.POST("/approvals/:approvalId/decide", apph.Decide) + // /approvals/pending is a cross-workspace admin path; keep on root router outside wsAuth. + r.GET("/approvals/pending", apph.ListAll) - // Agents - ah := handlers.NewAgentHandler(broadcaster) - r.POST("/workspaces/:id/agent", ah.Assign) - r.PATCH("/workspaces/:id/agent", ah.Replace) - r.DELETE("/workspaces/:id/agent", ah.Remove) - r.POST("/workspaces/:id/agent/move", ah.Move) + // Team Expansion + teamh := handlers.NewTeamHandler(broadcaster, prov, platformURL, configsDir) + wsAuth.POST("/expand", teamh.Expand) + wsAuth.POST("/collapse", teamh.Collapse) + + // Agents + ah := handlers.NewAgentHandler(broadcaster) + wsAuth.POST("/agent", ah.Assign) + wsAuth.PATCH("/agent", ah.Replace) + wsAuth.DELETE("/agent", ah.Remove) + wsAuth.POST("/agent/move", ah.Move) + } // Registry rh := handlers.NewRegistryHandler(broadcaster) @@ -135,58 +143,65 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi r.GET("/registry/:id/peers", dh.Peers) r.POST("/registry/check-access", dh.CheckAccess) - // Events + // Events (not workspace-scoped — exempt from per-workspace auth) eh := handlers.NewEventsHandler() r.GET("/events", eh.List) r.GET("/events/:workspaceId", eh.ListByWorkspace) - // Activity Logs - acth := handlers.NewActivityHandler(broadcaster) - r.GET("/workspaces/:id/activity", acth.List) - r.GET("/workspaces/:id/session-search", acth.SessionSearch) - r.POST("/workspaces/:id/activity", acth.Report) - r.POST("/workspaces/:id/notify", acth.Notify) + // Remaining auth-gated workspace sub-routes — appended to wsAuth group declared above. + { + // Activity Logs + acth := handlers.NewActivityHandler(broadcaster) + wsAuth.GET("/activity", acth.List) + wsAuth.GET("/session-search", acth.SessionSearch) + wsAuth.POST("/activity", acth.Report) + wsAuth.POST("/notify", acth.Notify) - // Config - cfgh := handlers.NewConfigHandler() - r.GET("/workspaces/:id/config", cfgh.Get) - r.PATCH("/workspaces/:id/config", cfgh.Patch) + // Config + cfgh := handlers.NewConfigHandler() + wsAuth.GET("/config", cfgh.Get) + wsAuth.PATCH("/config", cfgh.Patch) - // Schedules (cron tasks) - schedh := handlers.NewScheduleHandler() - r.GET("/workspaces/:id/schedules", schedh.List) - r.POST("/workspaces/:id/schedules", schedh.Create) - r.PATCH("/workspaces/:id/schedules/:scheduleId", schedh.Update) - r.DELETE("/workspaces/:id/schedules/:scheduleId", schedh.Delete) - r.POST("/workspaces/:id/schedules/:scheduleId/run", schedh.RunNow) - r.GET("/workspaces/:id/schedules/:scheduleId/history", schedh.History) + // Schedules (cron tasks) + schedh := handlers.NewScheduleHandler() + wsAuth.GET("/schedules", schedh.List) + wsAuth.POST("/schedules", schedh.Create) + wsAuth.PATCH("/schedules/:scheduleId", schedh.Update) + wsAuth.DELETE("/schedules/:scheduleId", schedh.Delete) + wsAuth.POST("/schedules/:scheduleId/run", schedh.RunNow) + wsAuth.GET("/schedules/:scheduleId/history", schedh.History) - // Memory - memh := handlers.NewMemoryHandler() - r.GET("/workspaces/:id/memory", memh.List) - r.GET("/workspaces/:id/memory/:key", memh.Get) - r.POST("/workspaces/:id/memory", memh.Set) - r.DELETE("/workspaces/:id/memory/:key", memh.Delete) + // Memory + memh := handlers.NewMemoryHandler() + wsAuth.GET("/memory", memh.List) + wsAuth.GET("/memory/:key", memh.Get) + wsAuth.POST("/memory", memh.Set) + wsAuth.DELETE("/memory/:key", memh.Delete) - // Secrets (auto-restart workspace after secret change) - sech := handlers.NewSecretsHandler(wh.RestartByID) - r.GET("/workspaces/:id/secrets", sech.List) - // Phase 30.2 — decrypted values pull, token-gated. Canvas uses List - // (keys + metadata only); remote agents use Values to bootstrap env. - r.GET("/workspaces/:id/secrets/values", sech.Values) - r.POST("/workspaces/:id/secrets", sech.Set) - r.PUT("/workspaces/:id/secrets", sech.Set) - r.DELETE("/workspaces/:id/secrets/:key", sech.Delete) - r.GET("/workspaces/:id/model", sech.GetModel) + // Secrets (auto-restart workspace after secret change) + sech := handlers.NewSecretsHandler(wh.RestartByID) + wsAuth.GET("/secrets", sech.List) + // Phase 30.2 — decrypted values pull, token-gated. Canvas uses List + // (keys + metadata only); remote agents use Values to bootstrap env. + wsAuth.GET("/secrets/values", sech.Values) + wsAuth.POST("/secrets", sech.Set) + wsAuth.PUT("/secrets", sech.Set) + wsAuth.DELETE("/secrets/:key", sech.Delete) + wsAuth.GET("/model", sech.GetModel) + } // Global secrets — /settings/secrets is the canonical path; /admin/secrets kept for backward compat - r.GET("/settings/secrets", sech.ListGlobal) - r.PUT("/settings/secrets", sech.SetGlobal) - r.POST("/settings/secrets", sech.SetGlobal) - r.DELETE("/settings/secrets/:key", sech.DeleteGlobal) - r.GET("/admin/secrets", sech.ListGlobal) - r.POST("/admin/secrets", sech.SetGlobal) - r.DELETE("/admin/secrets/:key", sech.DeleteGlobal) + // These are admin-level paths outside the per-workspace auth group. + { + sechGlobal := handlers.NewSecretsHandler(wh.RestartByID) + r.GET("/settings/secrets", sechGlobal.ListGlobal) + r.PUT("/settings/secrets", sechGlobal.SetGlobal) + r.POST("/settings/secrets", sechGlobal.SetGlobal) + r.DELETE("/settings/secrets/:key", sechGlobal.DeleteGlobal) + r.GET("/admin/secrets", sechGlobal.ListGlobal) + r.POST("/admin/secrets", sechGlobal.SetGlobal) + r.DELETE("/admin/secrets/:key", sechGlobal.DeleteGlobal) + } // Terminal — shares Docker client with provisioner var dockerCli *client.Client @@ -194,7 +209,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi dockerCli = prov.DockerClient() } th := handlers.NewTerminalHandler(dockerCli) - r.GET("/workspaces/:id/terminal", th.HandleConnect) + wsAuth.GET("/terminal", th.HandleConnect) // Canvas Viewport vh := handlers.NewViewportHandler() @@ -205,12 +220,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi tmplh := handlers.NewTemplatesHandler(configsDir, dockerCli) r.GET("/templates", tmplh.List) r.POST("/templates/import", tmplh.Import) - r.GET("/workspaces/:id/shared-context", tmplh.SharedContext) - r.PUT("/workspaces/:id/files", tmplh.ReplaceFiles) - r.GET("/workspaces/:id/files", tmplh.ListFiles) - r.GET("/workspaces/:id/files/*path", tmplh.ReadFile) - r.PUT("/workspaces/:id/files/*path", tmplh.WriteFile) - r.DELETE("/workspaces/:id/files/*path", tmplh.DeleteFile) + wsAuth.GET("/shared-context", tmplh.SharedContext) + wsAuth.PUT("/files", tmplh.ReplaceFiles) + wsAuth.GET("/files", tmplh.ListFiles) + wsAuth.GET("/files/*path", tmplh.ReadFile) + wsAuth.PUT("/files/*path", tmplh.WriteFile) + wsAuth.DELETE("/files/*path", tmplh.DeleteFile) // Plugins pluginsDir := findPluginsDir(configsDir) @@ -230,14 +245,14 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi WithRuntimeLookup(runtimeLookup) r.GET("/plugins", plgh.ListRegistry) r.GET("/plugins/sources", plgh.ListSources) - r.GET("/workspaces/:id/plugins", plgh.ListInstalled) - r.GET("/workspaces/:id/plugins/available", plgh.ListAvailableForWorkspace) - r.GET("/workspaces/:id/plugins/compatibility", plgh.CheckRuntimeCompatibility) - r.POST("/workspaces/:id/plugins", plgh.Install) - r.DELETE("/workspaces/:id/plugins/:name", plgh.Uninstall) + wsAuth.GET("/plugins", plgh.ListInstalled) + wsAuth.GET("/plugins/available", plgh.ListAvailableForWorkspace) + wsAuth.GET("/plugins/compatibility", plgh.CheckRuntimeCompatibility) + wsAuth.POST("/plugins", plgh.Install) + wsAuth.DELETE("/plugins/:name", plgh.Uninstall) // Phase 30.3 — stream plugin as tar.gz so remote agents can pull + // unpack locally instead of going through Docker exec. - r.GET("/workspaces/:id/plugins/:name/download", plgh.Download) + wsAuth.GET("/plugins/:name/download", plgh.Download) // Bundles bh := handlers.NewBundleHandler(broadcaster, prov, platformURL, configsDir, dockerCli) @@ -253,12 +268,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // Channels (social integrations — Telegram, Slack, Discord, etc.) chh := handlers.NewChannelHandler(channelMgr) r.GET("/channels/adapters", chh.ListAdapters) - r.GET("/workspaces/:id/channels", chh.List) - r.POST("/workspaces/:id/channels", chh.Create) - r.PATCH("/workspaces/:id/channels/:channelId", chh.Update) - r.DELETE("/workspaces/:id/channels/:channelId", chh.Delete) - r.POST("/workspaces/:id/channels/:channelId/send", chh.Send) - r.POST("/workspaces/:id/channels/:channelId/test", chh.Test) + wsAuth.GET("/channels", chh.List) + wsAuth.POST("/channels", chh.Create) + wsAuth.PATCH("/channels/:channelId", chh.Update) + wsAuth.DELETE("/channels/:channelId", chh.Delete) + wsAuth.POST("/channels/:channelId/send", chh.Send) + wsAuth.POST("/channels/:channelId/test", chh.Test) r.POST("/channels/discover", chh.Discover) r.POST("/webhooks/:type", chh.Webhook) diff --git a/workspace-template/a2a_client.py b/workspace-template/a2a_client.py index 7805cdc0..ded66fc2 100644 --- a/workspace-template/a2a_client.py +++ b/workspace-template/a2a_client.py @@ -41,7 +41,12 @@ async def discover_peer(target_id: str) -> dict | None: async def send_a2a_message(target_url: str, message: str) -> str: """Send an A2A message/send to a target workspace.""" - async with httpx.AsyncClient(timeout=None) as client: + # Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed + # a hung upstream to block the agent indefinitely. Use a generous but bounded + # timeout: 30s connect + 300s read (long enough for slow LLM responses). + async with httpx.AsyncClient( + timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0) + ) as client: try: resp = await client.post( target_url, diff --git a/workspace-template/events.py b/workspace-template/events.py index a62e6072..a682dcab 100644 --- a/workspace-template/events.py +++ b/workspace-template/events.py @@ -62,7 +62,15 @@ class PlatformEventSubscriber: self._running = False return + # Fix D (Cycle 5): include bearer token in WebSocket upgrade so the + # server's new auth check can validate this agent connection. + # Graceful fallback for workspaces that have no token yet. headers = {"X-Workspace-ID": self.workspace_id} + try: + from platform_auth import auth_headers as _auth_headers + headers.update(_auth_headers()) + except Exception: + pass # No token available — connect unauthenticated (grandfathered) logger.info("Connecting to platform WebSocket: %s", self.ws_url) async with websockets.connect(self.ws_url, additional_headers=headers) as ws: diff --git a/workspace-template/executor_helpers.py b/workspace-template/executor_helpers.py index 9540d0ba..c435f84e 100644 --- a/workspace-template/executor_helpers.py +++ b/workspace-template/executor_helpers.py @@ -92,9 +92,17 @@ async def recall_memories() -> str: platform_url = os.environ.get("PLATFORM_URL", "") if not workspace_id or not platform_url: return "" + # Fix E (Cycle 5): send auth headers so the WorkspaceAuth middleware + # (Fix A) allows access once the workspace has a live token on file. + try: + from platform_auth import auth_headers as _platform_auth + _auth = _platform_auth() + except Exception: + _auth = {} try: resp = await get_http_client().get( f"{platform_url}/workspaces/{workspace_id}/memories", + headers=_auth, ) if not 200 <= resp.status_code < 300: logger.debug( @@ -121,10 +129,17 @@ async def commit_memory(content: str) -> None: platform_url = os.environ.get("PLATFORM_URL", "") if not workspace_id or not platform_url or not content: return + # Fix E (Cycle 5): include auth header so WorkspaceAuth middleware allows access. + try: + from platform_auth import auth_headers as _platform_auth + _auth = _platform_auth() + except Exception: + _auth = {} try: await get_http_client().post( f"{platform_url}/workspaces/{workspace_id}/memories", json={"content": content, "scope": "LOCAL"}, + headers=_auth, ) except Exception as exc: logger.debug("commit_memory: request failed: %s", exc) diff --git a/workspace-template/heartbeat.py b/workspace-template/heartbeat.py index c7896297..64f5b827 100644 --- a/workspace-template/heartbeat.py +++ b/workspace-template/heartbeat.py @@ -153,11 +153,25 @@ class HeartbeatLoop: continue if status in ("completed", "failed"): + # Fix B (Cycle 5): validate source_id before accepting delegation + # results. Only process delegations that THIS workspace created + # (source_id == self.workspace_id). Attacker-crafted delegation + # records with a foreign source_id cannot inject instructions. + source_id = d.get("source_id", "") + if source_id != self.workspace_id: + logger.warning( + "Heartbeat: skipping delegation %s — source_id %r does not " + "match this workspace %r; possible injection attempt", + did, source_id, self.workspace_id, + ) + self._seen_delegation_ids.add(did) # mark seen so we don't warn again + continue + self._seen_delegation_ids.add(did) new_results.append({ "delegation_id": did, "target_id": d.get("target_id", ""), - "source_id": d.get("source_id", ""), + "source_id": source_id, "status": status, "summary": d.get("summary", ""), "response_preview": d.get("response_preview", ""), @@ -177,12 +191,15 @@ class HeartbeatLoop: f.write(json.dumps(r) + "\n") logger.info("Heartbeat: %d new delegation results — triggering self-message", len(new_results)) - # Build a summary message for the agent + # Build a summary message for the agent. + # Fix B (Cycle 5): do NOT embed raw response_preview text in + # user-role A2A messages — that is the prompt-injection vector. + # Instead reference only the delegation ID and status; the agent + # reads full content from DELEGATION_RESULTS_FILE which was + # written above from trusted platform data. summary_lines = [] for r in new_results: - line = f"- [{r['status']}] {r['summary'][:80]}" - if r.get("response_preview"): - line += f"\n Response: {r['response_preview'][:200]}" + line = f"- [{r['status']}] Delegation {r['delegation_id'][:8]}: {r['summary'][:80]}" if r.get("error"): line += f"\n Error: {r['error'][:100]}" summary_lines.append(line) diff --git a/workspace-template/skill_loader/loader.py b/workspace-template/skill_loader/loader.py index 00c33ee6..99f353a2 100644 --- a/workspace-template/skill_loader/loader.py +++ b/workspace-template/skill_loader/loader.py @@ -2,6 +2,7 @@ import importlib.util import logging +import os import sys from dataclasses import dataclass, field from pathlib import Path @@ -81,10 +82,32 @@ def load_skill_tools(scripts_dir: Path) -> list[Any]: # Keeps test environments (and empty skills) from needing langchain. from langchain_core.tools import BaseTool + # Sensitive env vars that must not be readable by skill scripts. + # Fix C (Cycle 5): scrub before exec_module() so a malicious skill cannot + # exfiltrate credentials even if it somehow bypasses the POST /plugins + # auth gate (defence in depth). + _SCRUB_KEYS = ( + "CLAUDE_CODE_OAUTH_TOKEN", + "ANTHROPIC_API_KEY", + "OPENAI_API_KEY", + "WORKSPACE_AUTH_TOKEN", + "GITHUB_TOKEN", + "GH_TOKEN", + ) + for py_file in sorted(scripts_dir.glob("*.py")): if py_file.name.startswith("_"): continue + # Verify the script is actually inside the expected scripts directory + # (path traversal guard — glob shouldn't produce outside paths, but + # belt-and-suspenders for symlink attacks). + try: + py_file.resolve().relative_to(scripts_dir.resolve()) + except ValueError: + logger.warning("skill_loader: rejecting script outside scripts_dir: %s", py_file) + continue + module_name = f"skill_tool_{py_file.stem}" spec = importlib.util.spec_from_file_location(module_name, py_file) if spec is None or spec.loader is None: @@ -92,7 +115,14 @@ def load_skill_tools(scripts_dir: Path) -> list[Any]: module = importlib.util.module_from_spec(spec) sys.modules[module_name] = module - spec.loader.exec_module(module) + + # Temporarily remove sensitive env vars before running skill code. + _saved_env = {k: os.environ.pop(k) for k in _SCRUB_KEYS if k in os.environ} + try: + spec.loader.exec_module(module) + finally: + # Always restore so the rest of the agent process retains them. + os.environ.update(_saved_env) # Look for functions decorated with @tool (BaseTool instances) for attr_name in dir(module): diff --git a/workspace-template/tests/test_events.py b/workspace-template/tests/test_events.py index 7f5e0b95..24ba5ad3 100644 --- a/workspace-template/tests/test_events.py +++ b/workspace-template/tests/test_events.py @@ -282,7 +282,10 @@ async def test_connect_uses_workspace_id_header(): await sub._connect() call_kwargs = websockets_mod.connect.call_args[1] - assert call_kwargs.get("additional_headers") == {"X-Workspace-ID": "ws-hdr"} + # Fix D (Cycle 5): headers now include Authorization when platform_auth available. + # Assert X-Workspace-ID is present; allow optional Authorization header. + actual_headers = call_kwargs.get("additional_headers", {}) + assert actual_headers.get("X-Workspace-ID") == "ws-hdr" # --------------------------------------------------------------------------- diff --git a/workspace-template/tests/test_heartbeat.py b/workspace-template/tests/test_heartbeat.py index bbb7a67a..09e971e8 100644 --- a/workspace-template/tests/test_heartbeat.py +++ b/workspace-template/tests/test_heartbeat.py @@ -184,6 +184,7 @@ async def test_check_delegations_writes_results_file(tmp_path): delegations = [ {"delegation_id": "d-1", "status": "completed", "target_id": "ws-t", + "source_id": "ws-abc", # must match workspace_id for Fix B source validation "summary": "Done", "response_preview": "Result here", "error": ""}, ] @@ -245,6 +246,7 @@ async def test_check_delegations_sends_self_message(tmp_path): delegations = [ {"delegation_id": "d-new", "status": "completed", "target_id": "ws-t", + "source_id": "ws-abc", # must match workspace_id for Fix B source validation "summary": "Task done", "response_preview": "All good", "error": ""}, ]