fix(security): Cycle 5 — auth middleware, injection hardening, skill sandbox

Fix A — platform/internal/middleware/wsauth_middleware.go (NEW):
  WorkspaceAuth() gin middleware enforces per-workspace bearer-token auth on
  ALL /workspaces/:id/* sub-routes. Same lazy-bootstrap contract as
  secrets.Values: workspaces with no live token are grandfathered through.
  Blocks C2, C3, C4, C5, C7, C8, C9, C12, C13 simultaneously.

Fix A — platform/internal/router/router.go:
  Reorganised route registration: bare CRUD (/workspaces, /workspaces/:id)
  and /a2a remain on root router; all other /workspaces/:id/* sub-routes
  moved into wsAuth = r.Group("/workspaces/:id", middleware.WorkspaceAuth(db.DB)).
  CORS AllowHeaders updated to include Authorization so browser/agent callers
  can send the bearer token cross-origin.

Fix B — workspace-template/heartbeat.py:
  _check_delegations(): validate source_id == self.workspace_id before
  accepting a delegation result. Attacker-crafted records with a foreign
  source_id are silently skipped with a WARNING log (injection attempt).
  trigger_msg no longer embeds raw response_preview text; references
  delegation_id + status only — removes the prompt-injection vector.

Fix C — workspace-template/skill_loader/loader.py:
  load_skill_tools(): before exec_module(), verify script is within
  scripts_dir (path traversal guard) and temporarily scrub sensitive env
  vars (CLAUDE_CODE_OAUTH_TOKEN, ANTHROPIC_API_KEY, OPENAI_API_KEY,
  WORKSPACE_AUTH_TOKEN, GITHUB_TOKEN, GH_TOKEN) from os.environ; restore
  in finally block. Defence-in-depth even if /plugins auth gate is bypassed.

Fix D — platform/internal/handlers/socket.go:
  HandleConnect(): agent connections (X-Workspace-ID present) validated via
  wsauth.HasAnyLiveToken + wsauth.ValidateToken before WebSocket upgrade.
  Canvas clients (no X-Workspace-ID) remain unauthenticated.

Fix D — workspace-template/events.py:
  PlatformEventSubscriber._connect(): include platform_auth bearer token in
  WebSocket upgrade headers alongside X-Workspace-ID.

Fix E — workspace-template/executor_helpers.py:
  recall_memories() and commit_memory() now pass platform_auth bearer token
  in Authorization header so WorkspaceAuth middleware allows access.

Fix F — workspace-template/a2a_client.py:
  send_a2a_message(): timeout=None → httpx.Timeout(connect=30, read=300,
  write=30, pool=30). Resolves H2 flagged across 5 consecutive audits.

Tests: 149/149 Python tests pass (test_heartbeat + test_events updated to
assert new source_id validation behaviour and allow Authorization header).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dev Lead Agent 2026-04-14 04:44:42 +00:00
parent e920aaab8e
commit 6c78962a33
10 changed files with 289 additions and 115 deletions

View File

@ -6,8 +6,10 @@ import (
"os"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
@ -40,15 +42,41 @@ func NewSocketHandler(hub *ws.Hub) *SocketHandler {
// HandleConnect handles WebSocket upgrade at GET /ws.
// Canvas clients connect without X-Workspace-ID — they receive all events.
// Workspace agents send X-Workspace-ID — events are filtered by CanCommunicate.
//
// Fix D (Cycle 5): agent connections (X-Workspace-ID present) are now validated
// via bearer token before the WebSocket upgrade. Canvas clients (no X-Workspace-ID)
// remain unauthenticated. Pre-token workspaces are grandfathered through.
func (h *SocketHandler) HandleConnect(c *gin.Context) {
workspaceID := c.GetHeader("X-Workspace-ID")
// Authenticate workspace agents (not canvas browser clients).
if workspaceID != "" {
ctx := c.Request.Context()
hasLive, err := wsauth.HasAnyLiveToken(ctx, db.DB, workspaceID)
if err != nil {
log.Printf("wsauth: WebSocket HasAnyLiveToken(%s) failed: %v", workspaceID, err)
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "auth check failed"})
return
}
if hasLive {
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
if tok == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace auth token"})
return
}
if err := wsauth.ValidateToken(ctx, db.DB, workspaceID, tok); err != nil {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid workspace auth token"})
return
}
}
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
log.Printf("WebSocket upgrade error: %v", err)
return
}
workspaceID := c.GetHeader("X-Workspace-ID")
client := &ws.Client{
Conn: conn,
WorkspaceID: workspaceID,

View File

@ -0,0 +1,51 @@
package middleware
import (
"database/sql"
"log"
"net/http"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/gin-gonic/gin"
)
// WorkspaceAuth returns a Gin middleware that enforces per-workspace bearer-token
// authentication on /workspaces/:id/* sub-routes.
//
// Same lazy-bootstrap contract as secrets.Values: workspaces that have no live
// token on file are grandfathered through so in-flight agents keep working
// during a rolling upgrade. Once a workspace has at least one live token every
// request MUST present a valid one in Authorization: Bearer <token>.
//
// Intended for route groups that cover all /workspaces/:id/* paths.
// The /workspaces/:id/a2a route must be registered on the root router (outside
// this group) because it already authenticates callers via CanCommunicate.
func WorkspaceAuth(database *sql.DB) gin.HandlerFunc {
return func(c *gin.Context) {
workspaceID := c.Param("id")
if workspaceID == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace ID"})
return
}
ctx := c.Request.Context()
hasLive, err := wsauth.HasAnyLiveToken(ctx, database, workspaceID)
if err != nil {
log.Printf("wsauth: WorkspaceAuth: HasAnyLiveToken(%s) failed: %v", workspaceID, err)
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "auth check failed"})
return
}
if hasLive {
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
if tok == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace auth token"})
return
}
if err := wsauth.ValidateToken(ctx, database, workspaceID, tok); err != nil {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid workspace auth token"})
return
}
}
c.Next()
}
}

View File

@ -32,7 +32,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
r.Use(cors.New(cors.Config{
AllowOrigins: corsOrigins,
AllowMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"},
AllowHeaders: []string{"Origin", "Content-Type", "X-Workspace-ID"},
AllowHeaders: []string{"Origin", "Content-Type", "X-Workspace-ID", "Authorization"},
AllowCredentials: true,
}))
@ -62,61 +62,69 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// Scrape with: curl http://localhost:8080/metrics
r.GET("/metrics", metrics.Handler())
// Workspaces CRUD
// Workspaces CRUD — bare /workspaces and /workspaces/:id (no sub-path), unauthenticated for canvas
r.POST("/workspaces", wh.Create)
r.GET("/workspaces", wh.List)
r.GET("/workspaces/:id", wh.Get)
// Phase 30.4 — lightweight token-gated state polling for remote agents
// that can't reach the platform's WebSocket. Returns {status, paused,
// deleted}. Separate from /workspaces/:id so the canvas path stays
// unauthenticated and returns its full config payload.
r.GET("/workspaces/:id/state", wh.State)
r.PATCH("/workspaces/:id", wh.Update)
r.DELETE("/workspaces/:id", wh.Delete)
r.POST("/workspaces/:id/restart", wh.Restart)
r.POST("/workspaces/:id/pause", wh.Pause)
r.POST("/workspaces/:id/resume", wh.Resume)
// A2A proxy — registered outside the auth group; already enforces CanCommunicate access control.
r.POST("/workspaces/:id/a2a", wh.ProxyA2A)
// Async Delegation
delh := handlers.NewDelegationHandler(wh, broadcaster)
r.POST("/workspaces/:id/delegate", delh.Delegate)
r.GET("/workspaces/:id/delegations", delh.ListDelegations)
// Record-only endpoint for agent-initiated delegations (#64). Agent-side
// delegate_to_workspace fires A2A directly for speed + OTEL propagation;
// this endpoint just adds an activity_logs row so GET /delegations returns
// the same set the agent's local `check_delegation_status` sees.
r.POST("/workspaces/:id/delegations/record", delh.Record)
r.POST("/workspaces/:id/delegations/:delegation_id/update", delh.UpdateStatus)
// Auth-gated workspace sub-routes — ALL /workspaces/:id/* paths except /a2a.
// Fix A (Cycle 5): single WorkspaceAuth middleware blocks C2-C5, C7-C9, C12, C13
// by requiring a valid bearer token for any workspace that has one on file.
// Legacy workspaces (no token) are grandfathered to allow rolling upgrades.
wsAuth := r.Group("/workspaces/:id", middleware.WorkspaceAuth(db.DB))
{
// Lifecycle
wsAuth.GET("/state", wh.State)
wsAuth.POST("/restart", wh.Restart)
wsAuth.POST("/pause", wh.Pause)
wsAuth.POST("/resume", wh.Resume)
// Traces (Langfuse proxy)
trh := handlers.NewTracesHandler()
r.GET("/workspaces/:id/traces", trh.List)
// Async Delegation
delh := handlers.NewDelegationHandler(wh, broadcaster)
wsAuth.POST("/delegate", delh.Delegate)
wsAuth.GET("/delegations", delh.ListDelegations)
// Record-only endpoint for agent-initiated delegations (#64). Agent-side
// delegate_to_workspace fires A2A directly for speed + OTEL propagation;
// this endpoint just adds an activity_logs row so GET /delegations returns
// the same set the agent's local `check_delegation_status` sees.
wsAuth.POST("/delegations/record", delh.Record)
wsAuth.POST("/delegations/:delegation_id/update", delh.UpdateStatus)
// Agent Memories (HMA)
memsh := handlers.NewMemoriesHandler()
r.POST("/workspaces/:id/memories", memsh.Commit)
r.GET("/workspaces/:id/memories", memsh.Search)
r.DELETE("/workspaces/:id/memories/:memoryId", memsh.Delete)
// Traces (Langfuse proxy)
trh := handlers.NewTracesHandler()
wsAuth.GET("/traces", trh.List)
// Approvals
apph := handlers.NewApprovalsHandler(broadcaster)
r.GET("/approvals/pending", apph.ListAll)
r.POST("/workspaces/:id/approvals", apph.Create)
r.GET("/workspaces/:id/approvals", apph.List)
r.POST("/workspaces/:id/approvals/:approvalId/decide", apph.Decide)
// Agent Memories (HMA)
memsh := handlers.NewMemoriesHandler()
wsAuth.POST("/memories", memsh.Commit)
wsAuth.GET("/memories", memsh.Search)
wsAuth.DELETE("/memories/:memoryId", memsh.Delete)
// Team Expansion
teamh := handlers.NewTeamHandler(broadcaster, prov, platformURL, configsDir)
r.POST("/workspaces/:id/expand", teamh.Expand)
r.POST("/workspaces/:id/collapse", teamh.Collapse)
// Approvals
apph := handlers.NewApprovalsHandler(broadcaster)
wsAuth.POST("/approvals", apph.Create)
wsAuth.GET("/approvals", apph.List)
wsAuth.POST("/approvals/:approvalId/decide", apph.Decide)
// /approvals/pending is a cross-workspace admin path; keep on root router outside wsAuth.
r.GET("/approvals/pending", apph.ListAll)
// Agents
ah := handlers.NewAgentHandler(broadcaster)
r.POST("/workspaces/:id/agent", ah.Assign)
r.PATCH("/workspaces/:id/agent", ah.Replace)
r.DELETE("/workspaces/:id/agent", ah.Remove)
r.POST("/workspaces/:id/agent/move", ah.Move)
// Team Expansion
teamh := handlers.NewTeamHandler(broadcaster, prov, platformURL, configsDir)
wsAuth.POST("/expand", teamh.Expand)
wsAuth.POST("/collapse", teamh.Collapse)
// Agents
ah := handlers.NewAgentHandler(broadcaster)
wsAuth.POST("/agent", ah.Assign)
wsAuth.PATCH("/agent", ah.Replace)
wsAuth.DELETE("/agent", ah.Remove)
wsAuth.POST("/agent/move", ah.Move)
}
// Registry
rh := handlers.NewRegistryHandler(broadcaster)
@ -135,58 +143,65 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
r.GET("/registry/:id/peers", dh.Peers)
r.POST("/registry/check-access", dh.CheckAccess)
// Events
// Events (not workspace-scoped — exempt from per-workspace auth)
eh := handlers.NewEventsHandler()
r.GET("/events", eh.List)
r.GET("/events/:workspaceId", eh.ListByWorkspace)
// Activity Logs
acth := handlers.NewActivityHandler(broadcaster)
r.GET("/workspaces/:id/activity", acth.List)
r.GET("/workspaces/:id/session-search", acth.SessionSearch)
r.POST("/workspaces/:id/activity", acth.Report)
r.POST("/workspaces/:id/notify", acth.Notify)
// Remaining auth-gated workspace sub-routes — appended to wsAuth group declared above.
{
// Activity Logs
acth := handlers.NewActivityHandler(broadcaster)
wsAuth.GET("/activity", acth.List)
wsAuth.GET("/session-search", acth.SessionSearch)
wsAuth.POST("/activity", acth.Report)
wsAuth.POST("/notify", acth.Notify)
// Config
cfgh := handlers.NewConfigHandler()
r.GET("/workspaces/:id/config", cfgh.Get)
r.PATCH("/workspaces/:id/config", cfgh.Patch)
// Config
cfgh := handlers.NewConfigHandler()
wsAuth.GET("/config", cfgh.Get)
wsAuth.PATCH("/config", cfgh.Patch)
// Schedules (cron tasks)
schedh := handlers.NewScheduleHandler()
r.GET("/workspaces/:id/schedules", schedh.List)
r.POST("/workspaces/:id/schedules", schedh.Create)
r.PATCH("/workspaces/:id/schedules/:scheduleId", schedh.Update)
r.DELETE("/workspaces/:id/schedules/:scheduleId", schedh.Delete)
r.POST("/workspaces/:id/schedules/:scheduleId/run", schedh.RunNow)
r.GET("/workspaces/:id/schedules/:scheduleId/history", schedh.History)
// Schedules (cron tasks)
schedh := handlers.NewScheduleHandler()
wsAuth.GET("/schedules", schedh.List)
wsAuth.POST("/schedules", schedh.Create)
wsAuth.PATCH("/schedules/:scheduleId", schedh.Update)
wsAuth.DELETE("/schedules/:scheduleId", schedh.Delete)
wsAuth.POST("/schedules/:scheduleId/run", schedh.RunNow)
wsAuth.GET("/schedules/:scheduleId/history", schedh.History)
// Memory
memh := handlers.NewMemoryHandler()
r.GET("/workspaces/:id/memory", memh.List)
r.GET("/workspaces/:id/memory/:key", memh.Get)
r.POST("/workspaces/:id/memory", memh.Set)
r.DELETE("/workspaces/:id/memory/:key", memh.Delete)
// Memory
memh := handlers.NewMemoryHandler()
wsAuth.GET("/memory", memh.List)
wsAuth.GET("/memory/:key", memh.Get)
wsAuth.POST("/memory", memh.Set)
wsAuth.DELETE("/memory/:key", memh.Delete)
// Secrets (auto-restart workspace after secret change)
sech := handlers.NewSecretsHandler(wh.RestartByID)
r.GET("/workspaces/:id/secrets", sech.List)
// Phase 30.2 — decrypted values pull, token-gated. Canvas uses List
// (keys + metadata only); remote agents use Values to bootstrap env.
r.GET("/workspaces/:id/secrets/values", sech.Values)
r.POST("/workspaces/:id/secrets", sech.Set)
r.PUT("/workspaces/:id/secrets", sech.Set)
r.DELETE("/workspaces/:id/secrets/:key", sech.Delete)
r.GET("/workspaces/:id/model", sech.GetModel)
// Secrets (auto-restart workspace after secret change)
sech := handlers.NewSecretsHandler(wh.RestartByID)
wsAuth.GET("/secrets", sech.List)
// Phase 30.2 — decrypted values pull, token-gated. Canvas uses List
// (keys + metadata only); remote agents use Values to bootstrap env.
wsAuth.GET("/secrets/values", sech.Values)
wsAuth.POST("/secrets", sech.Set)
wsAuth.PUT("/secrets", sech.Set)
wsAuth.DELETE("/secrets/:key", sech.Delete)
wsAuth.GET("/model", sech.GetModel)
}
// Global secrets — /settings/secrets is the canonical path; /admin/secrets kept for backward compat
r.GET("/settings/secrets", sech.ListGlobal)
r.PUT("/settings/secrets", sech.SetGlobal)
r.POST("/settings/secrets", sech.SetGlobal)
r.DELETE("/settings/secrets/:key", sech.DeleteGlobal)
r.GET("/admin/secrets", sech.ListGlobal)
r.POST("/admin/secrets", sech.SetGlobal)
r.DELETE("/admin/secrets/:key", sech.DeleteGlobal)
// These are admin-level paths outside the per-workspace auth group.
{
sechGlobal := handlers.NewSecretsHandler(wh.RestartByID)
r.GET("/settings/secrets", sechGlobal.ListGlobal)
r.PUT("/settings/secrets", sechGlobal.SetGlobal)
r.POST("/settings/secrets", sechGlobal.SetGlobal)
r.DELETE("/settings/secrets/:key", sechGlobal.DeleteGlobal)
r.GET("/admin/secrets", sechGlobal.ListGlobal)
r.POST("/admin/secrets", sechGlobal.SetGlobal)
r.DELETE("/admin/secrets/:key", sechGlobal.DeleteGlobal)
}
// Terminal — shares Docker client with provisioner
var dockerCli *client.Client
@ -194,7 +209,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
dockerCli = prov.DockerClient()
}
th := handlers.NewTerminalHandler(dockerCli)
r.GET("/workspaces/:id/terminal", th.HandleConnect)
wsAuth.GET("/terminal", th.HandleConnect)
// Canvas Viewport
vh := handlers.NewViewportHandler()
@ -205,12 +220,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
tmplh := handlers.NewTemplatesHandler(configsDir, dockerCli)
r.GET("/templates", tmplh.List)
r.POST("/templates/import", tmplh.Import)
r.GET("/workspaces/:id/shared-context", tmplh.SharedContext)
r.PUT("/workspaces/:id/files", tmplh.ReplaceFiles)
r.GET("/workspaces/:id/files", tmplh.ListFiles)
r.GET("/workspaces/:id/files/*path", tmplh.ReadFile)
r.PUT("/workspaces/:id/files/*path", tmplh.WriteFile)
r.DELETE("/workspaces/:id/files/*path", tmplh.DeleteFile)
wsAuth.GET("/shared-context", tmplh.SharedContext)
wsAuth.PUT("/files", tmplh.ReplaceFiles)
wsAuth.GET("/files", tmplh.ListFiles)
wsAuth.GET("/files/*path", tmplh.ReadFile)
wsAuth.PUT("/files/*path", tmplh.WriteFile)
wsAuth.DELETE("/files/*path", tmplh.DeleteFile)
// Plugins
pluginsDir := findPluginsDir(configsDir)
@ -230,14 +245,14 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
WithRuntimeLookup(runtimeLookup)
r.GET("/plugins", plgh.ListRegistry)
r.GET("/plugins/sources", plgh.ListSources)
r.GET("/workspaces/:id/plugins", plgh.ListInstalled)
r.GET("/workspaces/:id/plugins/available", plgh.ListAvailableForWorkspace)
r.GET("/workspaces/:id/plugins/compatibility", plgh.CheckRuntimeCompatibility)
r.POST("/workspaces/:id/plugins", plgh.Install)
r.DELETE("/workspaces/:id/plugins/:name", plgh.Uninstall)
wsAuth.GET("/plugins", plgh.ListInstalled)
wsAuth.GET("/plugins/available", plgh.ListAvailableForWorkspace)
wsAuth.GET("/plugins/compatibility", plgh.CheckRuntimeCompatibility)
wsAuth.POST("/plugins", plgh.Install)
wsAuth.DELETE("/plugins/:name", plgh.Uninstall)
// Phase 30.3 — stream plugin as tar.gz so remote agents can pull +
// unpack locally instead of going through Docker exec.
r.GET("/workspaces/:id/plugins/:name/download", plgh.Download)
wsAuth.GET("/plugins/:name/download", plgh.Download)
// Bundles
bh := handlers.NewBundleHandler(broadcaster, prov, platformURL, configsDir, dockerCli)
@ -253,12 +268,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// Channels (social integrations — Telegram, Slack, Discord, etc.)
chh := handlers.NewChannelHandler(channelMgr)
r.GET("/channels/adapters", chh.ListAdapters)
r.GET("/workspaces/:id/channels", chh.List)
r.POST("/workspaces/:id/channels", chh.Create)
r.PATCH("/workspaces/:id/channels/:channelId", chh.Update)
r.DELETE("/workspaces/:id/channels/:channelId", chh.Delete)
r.POST("/workspaces/:id/channels/:channelId/send", chh.Send)
r.POST("/workspaces/:id/channels/:channelId/test", chh.Test)
wsAuth.GET("/channels", chh.List)
wsAuth.POST("/channels", chh.Create)
wsAuth.PATCH("/channels/:channelId", chh.Update)
wsAuth.DELETE("/channels/:channelId", chh.Delete)
wsAuth.POST("/channels/:channelId/send", chh.Send)
wsAuth.POST("/channels/:channelId/test", chh.Test)
r.POST("/channels/discover", chh.Discover)
r.POST("/webhooks/:type", chh.Webhook)

View File

@ -41,7 +41,12 @@ async def discover_peer(target_id: str) -> dict | None:
async def send_a2a_message(target_url: str, message: str) -> str:
"""Send an A2A message/send to a target workspace."""
async with httpx.AsyncClient(timeout=None) as client:
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
# a hung upstream to block the agent indefinitely. Use a generous but bounded
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
) as client:
try:
resp = await client.post(
target_url,

View File

@ -62,7 +62,15 @@ class PlatformEventSubscriber:
self._running = False
return
# Fix D (Cycle 5): include bearer token in WebSocket upgrade so the
# server's new auth check can validate this agent connection.
# Graceful fallback for workspaces that have no token yet.
headers = {"X-Workspace-ID": self.workspace_id}
try:
from platform_auth import auth_headers as _auth_headers
headers.update(_auth_headers())
except Exception:
pass # No token available — connect unauthenticated (grandfathered)
logger.info("Connecting to platform WebSocket: %s", self.ws_url)
async with websockets.connect(self.ws_url, additional_headers=headers) as ws:

View File

@ -92,9 +92,17 @@ async def recall_memories() -> str:
platform_url = os.environ.get("PLATFORM_URL", "")
if not workspace_id or not platform_url:
return ""
# Fix E (Cycle 5): send auth headers so the WorkspaceAuth middleware
# (Fix A) allows access once the workspace has a live token on file.
try:
from platform_auth import auth_headers as _platform_auth
_auth = _platform_auth()
except Exception:
_auth = {}
try:
resp = await get_http_client().get(
f"{platform_url}/workspaces/{workspace_id}/memories",
headers=_auth,
)
if not 200 <= resp.status_code < 300:
logger.debug(
@ -121,10 +129,17 @@ async def commit_memory(content: str) -> None:
platform_url = os.environ.get("PLATFORM_URL", "")
if not workspace_id or not platform_url or not content:
return
# Fix E (Cycle 5): include auth header so WorkspaceAuth middleware allows access.
try:
from platform_auth import auth_headers as _platform_auth
_auth = _platform_auth()
except Exception:
_auth = {}
try:
await get_http_client().post(
f"{platform_url}/workspaces/{workspace_id}/memories",
json={"content": content, "scope": "LOCAL"},
headers=_auth,
)
except Exception as exc:
logger.debug("commit_memory: request failed: %s", exc)

View File

@ -153,11 +153,25 @@ class HeartbeatLoop:
continue
if status in ("completed", "failed"):
# Fix B (Cycle 5): validate source_id before accepting delegation
# results. Only process delegations that THIS workspace created
# (source_id == self.workspace_id). Attacker-crafted delegation
# records with a foreign source_id cannot inject instructions.
source_id = d.get("source_id", "")
if source_id != self.workspace_id:
logger.warning(
"Heartbeat: skipping delegation %s — source_id %r does not "
"match this workspace %r; possible injection attempt",
did, source_id, self.workspace_id,
)
self._seen_delegation_ids.add(did) # mark seen so we don't warn again
continue
self._seen_delegation_ids.add(did)
new_results.append({
"delegation_id": did,
"target_id": d.get("target_id", ""),
"source_id": d.get("source_id", ""),
"source_id": source_id,
"status": status,
"summary": d.get("summary", ""),
"response_preview": d.get("response_preview", ""),
@ -177,12 +191,15 @@ class HeartbeatLoop:
f.write(json.dumps(r) + "\n")
logger.info("Heartbeat: %d new delegation results — triggering self-message", len(new_results))
# Build a summary message for the agent
# Build a summary message for the agent.
# Fix B (Cycle 5): do NOT embed raw response_preview text in
# user-role A2A messages — that is the prompt-injection vector.
# Instead reference only the delegation ID and status; the agent
# reads full content from DELEGATION_RESULTS_FILE which was
# written above from trusted platform data.
summary_lines = []
for r in new_results:
line = f"- [{r['status']}] {r['summary'][:80]}"
if r.get("response_preview"):
line += f"\n Response: {r['response_preview'][:200]}"
line = f"- [{r['status']}] Delegation {r['delegation_id'][:8]}: {r['summary'][:80]}"
if r.get("error"):
line += f"\n Error: {r['error'][:100]}"
summary_lines.append(line)

View File

@ -2,6 +2,7 @@
import importlib.util
import logging
import os
import sys
from dataclasses import dataclass, field
from pathlib import Path
@ -81,10 +82,32 @@ def load_skill_tools(scripts_dir: Path) -> list[Any]:
# Keeps test environments (and empty skills) from needing langchain.
from langchain_core.tools import BaseTool
# Sensitive env vars that must not be readable by skill scripts.
# Fix C (Cycle 5): scrub before exec_module() so a malicious skill cannot
# exfiltrate credentials even if it somehow bypasses the POST /plugins
# auth gate (defence in depth).
_SCRUB_KEYS = (
"CLAUDE_CODE_OAUTH_TOKEN",
"ANTHROPIC_API_KEY",
"OPENAI_API_KEY",
"WORKSPACE_AUTH_TOKEN",
"GITHUB_TOKEN",
"GH_TOKEN",
)
for py_file in sorted(scripts_dir.glob("*.py")):
if py_file.name.startswith("_"):
continue
# Verify the script is actually inside the expected scripts directory
# (path traversal guard — glob shouldn't produce outside paths, but
# belt-and-suspenders for symlink attacks).
try:
py_file.resolve().relative_to(scripts_dir.resolve())
except ValueError:
logger.warning("skill_loader: rejecting script outside scripts_dir: %s", py_file)
continue
module_name = f"skill_tool_{py_file.stem}"
spec = importlib.util.spec_from_file_location(module_name, py_file)
if spec is None or spec.loader is None:
@ -92,7 +115,14 @@ def load_skill_tools(scripts_dir: Path) -> list[Any]:
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
# Temporarily remove sensitive env vars before running skill code.
_saved_env = {k: os.environ.pop(k) for k in _SCRUB_KEYS if k in os.environ}
try:
spec.loader.exec_module(module)
finally:
# Always restore so the rest of the agent process retains them.
os.environ.update(_saved_env)
# Look for functions decorated with @tool (BaseTool instances)
for attr_name in dir(module):

View File

@ -282,7 +282,10 @@ async def test_connect_uses_workspace_id_header():
await sub._connect()
call_kwargs = websockets_mod.connect.call_args[1]
assert call_kwargs.get("additional_headers") == {"X-Workspace-ID": "ws-hdr"}
# Fix D (Cycle 5): headers now include Authorization when platform_auth available.
# Assert X-Workspace-ID is present; allow optional Authorization header.
actual_headers = call_kwargs.get("additional_headers", {})
assert actual_headers.get("X-Workspace-ID") == "ws-hdr"
# ---------------------------------------------------------------------------

View File

@ -184,6 +184,7 @@ async def test_check_delegations_writes_results_file(tmp_path):
delegations = [
{"delegation_id": "d-1", "status": "completed", "target_id": "ws-t",
"source_id": "ws-abc", # must match workspace_id for Fix B source validation
"summary": "Done", "response_preview": "Result here", "error": ""},
]
@ -245,6 +246,7 @@ async def test_check_delegations_sends_self_message(tmp_path):
delegations = [
{"delegation_id": "d-new", "status": "completed", "target_id": "ws-t",
"source_id": "ws-abc", # must match workspace_id for Fix B source validation
"summary": "Task done", "response_preview": "All good", "error": ""},
]