fix(security): Cycle 5 — auth middleware, injection hardening, skill sandbox

Fix A — platform/internal/middleware/wsauth_middleware.go (NEW):
  WorkspaceAuth() gin middleware enforces per-workspace bearer-token auth on
  ALL /workspaces/:id/* sub-routes. Same lazy-bootstrap contract as
  secrets.Values: workspaces with no live token are grandfathered through.
  Blocks C2, C3, C4, C5, C7, C8, C9, C12, C13 simultaneously.

Fix A — platform/internal/router/router.go:
  Reorganised route registration: bare CRUD (/workspaces, /workspaces/:id)
  and /a2a remain on root router; all other /workspaces/:id/* sub-routes
  moved into wsAuth = r.Group("/workspaces/:id", middleware.WorkspaceAuth(db.DB)).
  CORS AllowHeaders updated to include Authorization so browser/agent callers
  can send the bearer token cross-origin.

Fix B — workspace-template/heartbeat.py:
  _check_delegations(): validate source_id == self.workspace_id before
  accepting a delegation result. Attacker-crafted records with a foreign
  source_id are silently skipped with a WARNING log (injection attempt).
  trigger_msg no longer embeds raw response_preview text; references
  delegation_id + status only — removes the prompt-injection vector.

Fix C — workspace-template/skill_loader/loader.py:
  load_skill_tools(): before exec_module(), verify script is within
  scripts_dir (path traversal guard) and temporarily scrub sensitive env
  vars (CLAUDE_CODE_OAUTH_TOKEN, ANTHROPIC_API_KEY, OPENAI_API_KEY,
  WORKSPACE_AUTH_TOKEN, GITHUB_TOKEN, GH_TOKEN) from os.environ; restore
  in finally block. Defence-in-depth even if /plugins auth gate is bypassed.

Fix D — platform/internal/handlers/socket.go:
  HandleConnect(): agent connections (X-Workspace-ID present) validated via
  wsauth.HasAnyLiveToken + wsauth.ValidateToken before WebSocket upgrade.
  Canvas clients (no X-Workspace-ID) remain unauthenticated.

Fix D — workspace-template/events.py:
  PlatformEventSubscriber._connect(): include platform_auth bearer token in
  WebSocket upgrade headers alongside X-Workspace-ID.

Fix E — workspace-template/executor_helpers.py:
  recall_memories() and commit_memory() now pass platform_auth bearer token
  in Authorization header so WorkspaceAuth middleware allows access.

Fix F — workspace-template/a2a_client.py:
  send_a2a_message(): timeout=None → httpx.Timeout(connect=30, read=300,
  write=30, pool=30). Resolves H2 flagged across 5 consecutive audits.

Tests: 149/149 Python tests pass (test_heartbeat + test_events updated to
assert new source_id validation behaviour and allow Authorization header).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Dev Lead Agent 2026-04-14 04:44:42 +00:00
parent 26992d6ba9
commit bea0e96a86
10 changed files with 289 additions and 115 deletions

View File

@ -6,8 +6,10 @@ import (
"os" "os"
"strings" "strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics" "github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws" "github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
@ -40,15 +42,41 @@ func NewSocketHandler(hub *ws.Hub) *SocketHandler {
// HandleConnect handles WebSocket upgrade at GET /ws. // HandleConnect handles WebSocket upgrade at GET /ws.
// Canvas clients connect without X-Workspace-ID — they receive all events. // Canvas clients connect without X-Workspace-ID — they receive all events.
// Workspace agents send X-Workspace-ID — events are filtered by CanCommunicate. // Workspace agents send X-Workspace-ID — events are filtered by CanCommunicate.
//
// Fix D (Cycle 5): agent connections (X-Workspace-ID present) are now validated
// via bearer token before the WebSocket upgrade. Canvas clients (no X-Workspace-ID)
// remain unauthenticated. Pre-token workspaces are grandfathered through.
func (h *SocketHandler) HandleConnect(c *gin.Context) { func (h *SocketHandler) HandleConnect(c *gin.Context) {
workspaceID := c.GetHeader("X-Workspace-ID")
// Authenticate workspace agents (not canvas browser clients).
if workspaceID != "" {
ctx := c.Request.Context()
hasLive, err := wsauth.HasAnyLiveToken(ctx, db.DB, workspaceID)
if err != nil {
log.Printf("wsauth: WebSocket HasAnyLiveToken(%s) failed: %v", workspaceID, err)
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "auth check failed"})
return
}
if hasLive {
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
if tok == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace auth token"})
return
}
if err := wsauth.ValidateToken(ctx, db.DB, workspaceID, tok); err != nil {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid workspace auth token"})
return
}
}
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil { if err != nil {
log.Printf("WebSocket upgrade error: %v", err) log.Printf("WebSocket upgrade error: %v", err)
return return
} }
workspaceID := c.GetHeader("X-Workspace-ID")
client := &ws.Client{ client := &ws.Client{
Conn: conn, Conn: conn,
WorkspaceID: workspaceID, WorkspaceID: workspaceID,

View File

@ -0,0 +1,51 @@
package middleware
import (
"database/sql"
"log"
"net/http"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/gin-gonic/gin"
)
// WorkspaceAuth returns a Gin middleware that enforces per-workspace bearer-token
// authentication on /workspaces/:id/* sub-routes.
//
// Same lazy-bootstrap contract as secrets.Values: workspaces that have no live
// token on file are grandfathered through so in-flight agents keep working
// during a rolling upgrade. Once a workspace has at least one live token every
// request MUST present a valid one in Authorization: Bearer <token>.
//
// Intended for route groups that cover all /workspaces/:id/* paths.
// The /workspaces/:id/a2a route must be registered on the root router (outside
// this group) because it already authenticates callers via CanCommunicate.
func WorkspaceAuth(database *sql.DB) gin.HandlerFunc {
return func(c *gin.Context) {
workspaceID := c.Param("id")
if workspaceID == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace ID"})
return
}
ctx := c.Request.Context()
hasLive, err := wsauth.HasAnyLiveToken(ctx, database, workspaceID)
if err != nil {
log.Printf("wsauth: WorkspaceAuth: HasAnyLiveToken(%s) failed: %v", workspaceID, err)
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "auth check failed"})
return
}
if hasLive {
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
if tok == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace auth token"})
return
}
if err := wsauth.ValidateToken(ctx, database, workspaceID, tok); err != nil {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid workspace auth token"})
return
}
}
c.Next()
}
}

View File

@ -32,7 +32,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
r.Use(cors.New(cors.Config{ r.Use(cors.New(cors.Config{
AllowOrigins: corsOrigins, AllowOrigins: corsOrigins,
AllowMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"}, AllowMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"},
AllowHeaders: []string{"Origin", "Content-Type", "X-Workspace-ID"}, AllowHeaders: []string{"Origin", "Content-Type", "X-Workspace-ID", "Authorization"},
AllowCredentials: true, AllowCredentials: true,
})) }))
@ -62,61 +62,69 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// Scrape with: curl http://localhost:8080/metrics // Scrape with: curl http://localhost:8080/metrics
r.GET("/metrics", metrics.Handler()) r.GET("/metrics", metrics.Handler())
// Workspaces CRUD // Workspaces CRUD — bare /workspaces and /workspaces/:id (no sub-path), unauthenticated for canvas
r.POST("/workspaces", wh.Create) r.POST("/workspaces", wh.Create)
r.GET("/workspaces", wh.List) r.GET("/workspaces", wh.List)
r.GET("/workspaces/:id", wh.Get) r.GET("/workspaces/:id", wh.Get)
// Phase 30.4 — lightweight token-gated state polling for remote agents
// that can't reach the platform's WebSocket. Returns {status, paused,
// deleted}. Separate from /workspaces/:id so the canvas path stays
// unauthenticated and returns its full config payload.
r.GET("/workspaces/:id/state", wh.State)
r.PATCH("/workspaces/:id", wh.Update) r.PATCH("/workspaces/:id", wh.Update)
r.DELETE("/workspaces/:id", wh.Delete) r.DELETE("/workspaces/:id", wh.Delete)
r.POST("/workspaces/:id/restart", wh.Restart)
r.POST("/workspaces/:id/pause", wh.Pause) // A2A proxy — registered outside the auth group; already enforces CanCommunicate access control.
r.POST("/workspaces/:id/resume", wh.Resume)
r.POST("/workspaces/:id/a2a", wh.ProxyA2A) r.POST("/workspaces/:id/a2a", wh.ProxyA2A)
// Async Delegation // Auth-gated workspace sub-routes — ALL /workspaces/:id/* paths except /a2a.
delh := handlers.NewDelegationHandler(wh, broadcaster) // Fix A (Cycle 5): single WorkspaceAuth middleware blocks C2-C5, C7-C9, C12, C13
r.POST("/workspaces/:id/delegate", delh.Delegate) // by requiring a valid bearer token for any workspace that has one on file.
r.GET("/workspaces/:id/delegations", delh.ListDelegations) // Legacy workspaces (no token) are grandfathered to allow rolling upgrades.
// Record-only endpoint for agent-initiated delegations (#64). Agent-side wsAuth := r.Group("/workspaces/:id", middleware.WorkspaceAuth(db.DB))
// delegate_to_workspace fires A2A directly for speed + OTEL propagation; {
// this endpoint just adds an activity_logs row so GET /delegations returns // Lifecycle
// the same set the agent's local `check_delegation_status` sees. wsAuth.GET("/state", wh.State)
r.POST("/workspaces/:id/delegations/record", delh.Record) wsAuth.POST("/restart", wh.Restart)
r.POST("/workspaces/:id/delegations/:delegation_id/update", delh.UpdateStatus) wsAuth.POST("/pause", wh.Pause)
wsAuth.POST("/resume", wh.Resume)
// Traces (Langfuse proxy) // Async Delegation
trh := handlers.NewTracesHandler() delh := handlers.NewDelegationHandler(wh, broadcaster)
r.GET("/workspaces/:id/traces", trh.List) wsAuth.POST("/delegate", delh.Delegate)
wsAuth.GET("/delegations", delh.ListDelegations)
// Record-only endpoint for agent-initiated delegations (#64). Agent-side
// delegate_to_workspace fires A2A directly for speed + OTEL propagation;
// this endpoint just adds an activity_logs row so GET /delegations returns
// the same set the agent's local `check_delegation_status` sees.
wsAuth.POST("/delegations/record", delh.Record)
wsAuth.POST("/delegations/:delegation_id/update", delh.UpdateStatus)
// Agent Memories (HMA) // Traces (Langfuse proxy)
memsh := handlers.NewMemoriesHandler() trh := handlers.NewTracesHandler()
r.POST("/workspaces/:id/memories", memsh.Commit) wsAuth.GET("/traces", trh.List)
r.GET("/workspaces/:id/memories", memsh.Search)
r.DELETE("/workspaces/:id/memories/:memoryId", memsh.Delete)
// Approvals // Agent Memories (HMA)
apph := handlers.NewApprovalsHandler(broadcaster) memsh := handlers.NewMemoriesHandler()
r.GET("/approvals/pending", apph.ListAll) wsAuth.POST("/memories", memsh.Commit)
r.POST("/workspaces/:id/approvals", apph.Create) wsAuth.GET("/memories", memsh.Search)
r.GET("/workspaces/:id/approvals", apph.List) wsAuth.DELETE("/memories/:memoryId", memsh.Delete)
r.POST("/workspaces/:id/approvals/:approvalId/decide", apph.Decide)
// Team Expansion // Approvals
teamh := handlers.NewTeamHandler(broadcaster, prov, platformURL, configsDir) apph := handlers.NewApprovalsHandler(broadcaster)
r.POST("/workspaces/:id/expand", teamh.Expand) wsAuth.POST("/approvals", apph.Create)
r.POST("/workspaces/:id/collapse", teamh.Collapse) wsAuth.GET("/approvals", apph.List)
wsAuth.POST("/approvals/:approvalId/decide", apph.Decide)
// /approvals/pending is a cross-workspace admin path; keep on root router outside wsAuth.
r.GET("/approvals/pending", apph.ListAll)
// Agents // Team Expansion
ah := handlers.NewAgentHandler(broadcaster) teamh := handlers.NewTeamHandler(broadcaster, prov, platformURL, configsDir)
r.POST("/workspaces/:id/agent", ah.Assign) wsAuth.POST("/expand", teamh.Expand)
r.PATCH("/workspaces/:id/agent", ah.Replace) wsAuth.POST("/collapse", teamh.Collapse)
r.DELETE("/workspaces/:id/agent", ah.Remove)
r.POST("/workspaces/:id/agent/move", ah.Move) // Agents
ah := handlers.NewAgentHandler(broadcaster)
wsAuth.POST("/agent", ah.Assign)
wsAuth.PATCH("/agent", ah.Replace)
wsAuth.DELETE("/agent", ah.Remove)
wsAuth.POST("/agent/move", ah.Move)
}
// Registry // Registry
rh := handlers.NewRegistryHandler(broadcaster) rh := handlers.NewRegistryHandler(broadcaster)
@ -135,58 +143,65 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
r.GET("/registry/:id/peers", dh.Peers) r.GET("/registry/:id/peers", dh.Peers)
r.POST("/registry/check-access", dh.CheckAccess) r.POST("/registry/check-access", dh.CheckAccess)
// Events // Events (not workspace-scoped — exempt from per-workspace auth)
eh := handlers.NewEventsHandler() eh := handlers.NewEventsHandler()
r.GET("/events", eh.List) r.GET("/events", eh.List)
r.GET("/events/:workspaceId", eh.ListByWorkspace) r.GET("/events/:workspaceId", eh.ListByWorkspace)
// Activity Logs // Remaining auth-gated workspace sub-routes — appended to wsAuth group declared above.
acth := handlers.NewActivityHandler(broadcaster) {
r.GET("/workspaces/:id/activity", acth.List) // Activity Logs
r.GET("/workspaces/:id/session-search", acth.SessionSearch) acth := handlers.NewActivityHandler(broadcaster)
r.POST("/workspaces/:id/activity", acth.Report) wsAuth.GET("/activity", acth.List)
r.POST("/workspaces/:id/notify", acth.Notify) wsAuth.GET("/session-search", acth.SessionSearch)
wsAuth.POST("/activity", acth.Report)
wsAuth.POST("/notify", acth.Notify)
// Config // Config
cfgh := handlers.NewConfigHandler() cfgh := handlers.NewConfigHandler()
r.GET("/workspaces/:id/config", cfgh.Get) wsAuth.GET("/config", cfgh.Get)
r.PATCH("/workspaces/:id/config", cfgh.Patch) wsAuth.PATCH("/config", cfgh.Patch)
// Schedules (cron tasks) // Schedules (cron tasks)
schedh := handlers.NewScheduleHandler() schedh := handlers.NewScheduleHandler()
r.GET("/workspaces/:id/schedules", schedh.List) wsAuth.GET("/schedules", schedh.List)
r.POST("/workspaces/:id/schedules", schedh.Create) wsAuth.POST("/schedules", schedh.Create)
r.PATCH("/workspaces/:id/schedules/:scheduleId", schedh.Update) wsAuth.PATCH("/schedules/:scheduleId", schedh.Update)
r.DELETE("/workspaces/:id/schedules/:scheduleId", schedh.Delete) wsAuth.DELETE("/schedules/:scheduleId", schedh.Delete)
r.POST("/workspaces/:id/schedules/:scheduleId/run", schedh.RunNow) wsAuth.POST("/schedules/:scheduleId/run", schedh.RunNow)
r.GET("/workspaces/:id/schedules/:scheduleId/history", schedh.History) wsAuth.GET("/schedules/:scheduleId/history", schedh.History)
// Memory // Memory
memh := handlers.NewMemoryHandler() memh := handlers.NewMemoryHandler()
r.GET("/workspaces/:id/memory", memh.List) wsAuth.GET("/memory", memh.List)
r.GET("/workspaces/:id/memory/:key", memh.Get) wsAuth.GET("/memory/:key", memh.Get)
r.POST("/workspaces/:id/memory", memh.Set) wsAuth.POST("/memory", memh.Set)
r.DELETE("/workspaces/:id/memory/:key", memh.Delete) wsAuth.DELETE("/memory/:key", memh.Delete)
// Secrets (auto-restart workspace after secret change) // Secrets (auto-restart workspace after secret change)
sech := handlers.NewSecretsHandler(wh.RestartByID) sech := handlers.NewSecretsHandler(wh.RestartByID)
r.GET("/workspaces/:id/secrets", sech.List) wsAuth.GET("/secrets", sech.List)
// Phase 30.2 — decrypted values pull, token-gated. Canvas uses List // Phase 30.2 — decrypted values pull, token-gated. Canvas uses List
// (keys + metadata only); remote agents use Values to bootstrap env. // (keys + metadata only); remote agents use Values to bootstrap env.
r.GET("/workspaces/:id/secrets/values", sech.Values) wsAuth.GET("/secrets/values", sech.Values)
r.POST("/workspaces/:id/secrets", sech.Set) wsAuth.POST("/secrets", sech.Set)
r.PUT("/workspaces/:id/secrets", sech.Set) wsAuth.PUT("/secrets", sech.Set)
r.DELETE("/workspaces/:id/secrets/:key", sech.Delete) wsAuth.DELETE("/secrets/:key", sech.Delete)
r.GET("/workspaces/:id/model", sech.GetModel) wsAuth.GET("/model", sech.GetModel)
}
// Global secrets — /settings/secrets is the canonical path; /admin/secrets kept for backward compat // Global secrets — /settings/secrets is the canonical path; /admin/secrets kept for backward compat
r.GET("/settings/secrets", sech.ListGlobal) // These are admin-level paths outside the per-workspace auth group.
r.PUT("/settings/secrets", sech.SetGlobal) {
r.POST("/settings/secrets", sech.SetGlobal) sechGlobal := handlers.NewSecretsHandler(wh.RestartByID)
r.DELETE("/settings/secrets/:key", sech.DeleteGlobal) r.GET("/settings/secrets", sechGlobal.ListGlobal)
r.GET("/admin/secrets", sech.ListGlobal) r.PUT("/settings/secrets", sechGlobal.SetGlobal)
r.POST("/admin/secrets", sech.SetGlobal) r.POST("/settings/secrets", sechGlobal.SetGlobal)
r.DELETE("/admin/secrets/:key", sech.DeleteGlobal) r.DELETE("/settings/secrets/:key", sechGlobal.DeleteGlobal)
r.GET("/admin/secrets", sechGlobal.ListGlobal)
r.POST("/admin/secrets", sechGlobal.SetGlobal)
r.DELETE("/admin/secrets/:key", sechGlobal.DeleteGlobal)
}
// Terminal — shares Docker client with provisioner // Terminal — shares Docker client with provisioner
var dockerCli *client.Client var dockerCli *client.Client
@ -194,7 +209,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
dockerCli = prov.DockerClient() dockerCli = prov.DockerClient()
} }
th := handlers.NewTerminalHandler(dockerCli) th := handlers.NewTerminalHandler(dockerCli)
r.GET("/workspaces/:id/terminal", th.HandleConnect) wsAuth.GET("/terminal", th.HandleConnect)
// Canvas Viewport // Canvas Viewport
vh := handlers.NewViewportHandler() vh := handlers.NewViewportHandler()
@ -205,12 +220,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
tmplh := handlers.NewTemplatesHandler(configsDir, dockerCli) tmplh := handlers.NewTemplatesHandler(configsDir, dockerCli)
r.GET("/templates", tmplh.List) r.GET("/templates", tmplh.List)
r.POST("/templates/import", tmplh.Import) r.POST("/templates/import", tmplh.Import)
r.GET("/workspaces/:id/shared-context", tmplh.SharedContext) wsAuth.GET("/shared-context", tmplh.SharedContext)
r.PUT("/workspaces/:id/files", tmplh.ReplaceFiles) wsAuth.PUT("/files", tmplh.ReplaceFiles)
r.GET("/workspaces/:id/files", tmplh.ListFiles) wsAuth.GET("/files", tmplh.ListFiles)
r.GET("/workspaces/:id/files/*path", tmplh.ReadFile) wsAuth.GET("/files/*path", tmplh.ReadFile)
r.PUT("/workspaces/:id/files/*path", tmplh.WriteFile) wsAuth.PUT("/files/*path", tmplh.WriteFile)
r.DELETE("/workspaces/:id/files/*path", tmplh.DeleteFile) wsAuth.DELETE("/files/*path", tmplh.DeleteFile)
// Plugins // Plugins
pluginsDir := findPluginsDir(configsDir) pluginsDir := findPluginsDir(configsDir)
@ -230,14 +245,14 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
WithRuntimeLookup(runtimeLookup) WithRuntimeLookup(runtimeLookup)
r.GET("/plugins", plgh.ListRegistry) r.GET("/plugins", plgh.ListRegistry)
r.GET("/plugins/sources", plgh.ListSources) r.GET("/plugins/sources", plgh.ListSources)
r.GET("/workspaces/:id/plugins", plgh.ListInstalled) wsAuth.GET("/plugins", plgh.ListInstalled)
r.GET("/workspaces/:id/plugins/available", plgh.ListAvailableForWorkspace) wsAuth.GET("/plugins/available", plgh.ListAvailableForWorkspace)
r.GET("/workspaces/:id/plugins/compatibility", plgh.CheckRuntimeCompatibility) wsAuth.GET("/plugins/compatibility", plgh.CheckRuntimeCompatibility)
r.POST("/workspaces/:id/plugins", plgh.Install) wsAuth.POST("/plugins", plgh.Install)
r.DELETE("/workspaces/:id/plugins/:name", plgh.Uninstall) wsAuth.DELETE("/plugins/:name", plgh.Uninstall)
// Phase 30.3 — stream plugin as tar.gz so remote agents can pull + // Phase 30.3 — stream plugin as tar.gz so remote agents can pull +
// unpack locally instead of going through Docker exec. // unpack locally instead of going through Docker exec.
r.GET("/workspaces/:id/plugins/:name/download", plgh.Download) wsAuth.GET("/plugins/:name/download", plgh.Download)
// Bundles // Bundles
bh := handlers.NewBundleHandler(broadcaster, prov, platformURL, configsDir, dockerCli) bh := handlers.NewBundleHandler(broadcaster, prov, platformURL, configsDir, dockerCli)
@ -253,12 +268,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// Channels (social integrations — Telegram, Slack, Discord, etc.) // Channels (social integrations — Telegram, Slack, Discord, etc.)
chh := handlers.NewChannelHandler(channelMgr) chh := handlers.NewChannelHandler(channelMgr)
r.GET("/channels/adapters", chh.ListAdapters) r.GET("/channels/adapters", chh.ListAdapters)
r.GET("/workspaces/:id/channels", chh.List) wsAuth.GET("/channels", chh.List)
r.POST("/workspaces/:id/channels", chh.Create) wsAuth.POST("/channels", chh.Create)
r.PATCH("/workspaces/:id/channels/:channelId", chh.Update) wsAuth.PATCH("/channels/:channelId", chh.Update)
r.DELETE("/workspaces/:id/channels/:channelId", chh.Delete) wsAuth.DELETE("/channels/:channelId", chh.Delete)
r.POST("/workspaces/:id/channels/:channelId/send", chh.Send) wsAuth.POST("/channels/:channelId/send", chh.Send)
r.POST("/workspaces/:id/channels/:channelId/test", chh.Test) wsAuth.POST("/channels/:channelId/test", chh.Test)
r.POST("/channels/discover", chh.Discover) r.POST("/channels/discover", chh.Discover)
r.POST("/webhooks/:type", chh.Webhook) r.POST("/webhooks/:type", chh.Webhook)

View File

@ -41,7 +41,12 @@ async def discover_peer(target_id: str) -> dict | None:
async def send_a2a_message(target_url: str, message: str) -> str: async def send_a2a_message(target_url: str, message: str) -> str:
"""Send an A2A message/send to a target workspace.""" """Send an A2A message/send to a target workspace."""
async with httpx.AsyncClient(timeout=None) as client: # Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
# a hung upstream to block the agent indefinitely. Use a generous but bounded
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
) as client:
try: try:
resp = await client.post( resp = await client.post(
target_url, target_url,

View File

@ -62,7 +62,15 @@ class PlatformEventSubscriber:
self._running = False self._running = False
return return
# Fix D (Cycle 5): include bearer token in WebSocket upgrade so the
# server's new auth check can validate this agent connection.
# Graceful fallback for workspaces that have no token yet.
headers = {"X-Workspace-ID": self.workspace_id} headers = {"X-Workspace-ID": self.workspace_id}
try:
from platform_auth import auth_headers as _auth_headers
headers.update(_auth_headers())
except Exception:
pass # No token available — connect unauthenticated (grandfathered)
logger.info("Connecting to platform WebSocket: %s", self.ws_url) logger.info("Connecting to platform WebSocket: %s", self.ws_url)
async with websockets.connect(self.ws_url, additional_headers=headers) as ws: async with websockets.connect(self.ws_url, additional_headers=headers) as ws:

View File

@ -92,9 +92,17 @@ async def recall_memories() -> str:
platform_url = os.environ.get("PLATFORM_URL", "") platform_url = os.environ.get("PLATFORM_URL", "")
if not workspace_id or not platform_url: if not workspace_id or not platform_url:
return "" return ""
# Fix E (Cycle 5): send auth headers so the WorkspaceAuth middleware
# (Fix A) allows access once the workspace has a live token on file.
try:
from platform_auth import auth_headers as _platform_auth
_auth = _platform_auth()
except Exception:
_auth = {}
try: try:
resp = await get_http_client().get( resp = await get_http_client().get(
f"{platform_url}/workspaces/{workspace_id}/memories", f"{platform_url}/workspaces/{workspace_id}/memories",
headers=_auth,
) )
if not 200 <= resp.status_code < 300: if not 200 <= resp.status_code < 300:
logger.debug( logger.debug(
@ -121,10 +129,17 @@ async def commit_memory(content: str) -> None:
platform_url = os.environ.get("PLATFORM_URL", "") platform_url = os.environ.get("PLATFORM_URL", "")
if not workspace_id or not platform_url or not content: if not workspace_id or not platform_url or not content:
return return
# Fix E (Cycle 5): include auth header so WorkspaceAuth middleware allows access.
try:
from platform_auth import auth_headers as _platform_auth
_auth = _platform_auth()
except Exception:
_auth = {}
try: try:
await get_http_client().post( await get_http_client().post(
f"{platform_url}/workspaces/{workspace_id}/memories", f"{platform_url}/workspaces/{workspace_id}/memories",
json={"content": content, "scope": "LOCAL"}, json={"content": content, "scope": "LOCAL"},
headers=_auth,
) )
except Exception as exc: except Exception as exc:
logger.debug("commit_memory: request failed: %s", exc) logger.debug("commit_memory: request failed: %s", exc)

View File

@ -153,11 +153,25 @@ class HeartbeatLoop:
continue continue
if status in ("completed", "failed"): if status in ("completed", "failed"):
# Fix B (Cycle 5): validate source_id before accepting delegation
# results. Only process delegations that THIS workspace created
# (source_id == self.workspace_id). Attacker-crafted delegation
# records with a foreign source_id cannot inject instructions.
source_id = d.get("source_id", "")
if source_id != self.workspace_id:
logger.warning(
"Heartbeat: skipping delegation %s — source_id %r does not "
"match this workspace %r; possible injection attempt",
did, source_id, self.workspace_id,
)
self._seen_delegation_ids.add(did) # mark seen so we don't warn again
continue
self._seen_delegation_ids.add(did) self._seen_delegation_ids.add(did)
new_results.append({ new_results.append({
"delegation_id": did, "delegation_id": did,
"target_id": d.get("target_id", ""), "target_id": d.get("target_id", ""),
"source_id": d.get("source_id", ""), "source_id": source_id,
"status": status, "status": status,
"summary": d.get("summary", ""), "summary": d.get("summary", ""),
"response_preview": d.get("response_preview", ""), "response_preview": d.get("response_preview", ""),
@ -177,12 +191,15 @@ class HeartbeatLoop:
f.write(json.dumps(r) + "\n") f.write(json.dumps(r) + "\n")
logger.info("Heartbeat: %d new delegation results — triggering self-message", len(new_results)) logger.info("Heartbeat: %d new delegation results — triggering self-message", len(new_results))
# Build a summary message for the agent # Build a summary message for the agent.
# Fix B (Cycle 5): do NOT embed raw response_preview text in
# user-role A2A messages — that is the prompt-injection vector.
# Instead reference only the delegation ID and status; the agent
# reads full content from DELEGATION_RESULTS_FILE which was
# written above from trusted platform data.
summary_lines = [] summary_lines = []
for r in new_results: for r in new_results:
line = f"- [{r['status']}] {r['summary'][:80]}" line = f"- [{r['status']}] Delegation {r['delegation_id'][:8]}: {r['summary'][:80]}"
if r.get("response_preview"):
line += f"\n Response: {r['response_preview'][:200]}"
if r.get("error"): if r.get("error"):
line += f"\n Error: {r['error'][:100]}" line += f"\n Error: {r['error'][:100]}"
summary_lines.append(line) summary_lines.append(line)

View File

@ -2,6 +2,7 @@
import importlib.util import importlib.util
import logging import logging
import os
import sys import sys
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
@ -81,10 +82,32 @@ def load_skill_tools(scripts_dir: Path) -> list[Any]:
# Keeps test environments (and empty skills) from needing langchain. # Keeps test environments (and empty skills) from needing langchain.
from langchain_core.tools import BaseTool from langchain_core.tools import BaseTool
# Sensitive env vars that must not be readable by skill scripts.
# Fix C (Cycle 5): scrub before exec_module() so a malicious skill cannot
# exfiltrate credentials even if it somehow bypasses the POST /plugins
# auth gate (defence in depth).
_SCRUB_KEYS = (
"CLAUDE_CODE_OAUTH_TOKEN",
"ANTHROPIC_API_KEY",
"OPENAI_API_KEY",
"WORKSPACE_AUTH_TOKEN",
"GITHUB_TOKEN",
"GH_TOKEN",
)
for py_file in sorted(scripts_dir.glob("*.py")): for py_file in sorted(scripts_dir.glob("*.py")):
if py_file.name.startswith("_"): if py_file.name.startswith("_"):
continue continue
# Verify the script is actually inside the expected scripts directory
# (path traversal guard — glob shouldn't produce outside paths, but
# belt-and-suspenders for symlink attacks).
try:
py_file.resolve().relative_to(scripts_dir.resolve())
except ValueError:
logger.warning("skill_loader: rejecting script outside scripts_dir: %s", py_file)
continue
module_name = f"skill_tool_{py_file.stem}" module_name = f"skill_tool_{py_file.stem}"
spec = importlib.util.spec_from_file_location(module_name, py_file) spec = importlib.util.spec_from_file_location(module_name, py_file)
if spec is None or spec.loader is None: if spec is None or spec.loader is None:
@ -92,7 +115,14 @@ def load_skill_tools(scripts_dir: Path) -> list[Any]:
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module sys.modules[module_name] = module
spec.loader.exec_module(module)
# Temporarily remove sensitive env vars before running skill code.
_saved_env = {k: os.environ.pop(k) for k in _SCRUB_KEYS if k in os.environ}
try:
spec.loader.exec_module(module)
finally:
# Always restore so the rest of the agent process retains them.
os.environ.update(_saved_env)
# Look for functions decorated with @tool (BaseTool instances) # Look for functions decorated with @tool (BaseTool instances)
for attr_name in dir(module): for attr_name in dir(module):

View File

@ -282,7 +282,10 @@ async def test_connect_uses_workspace_id_header():
await sub._connect() await sub._connect()
call_kwargs = websockets_mod.connect.call_args[1] call_kwargs = websockets_mod.connect.call_args[1]
assert call_kwargs.get("additional_headers") == {"X-Workspace-ID": "ws-hdr"} # Fix D (Cycle 5): headers now include Authorization when platform_auth available.
# Assert X-Workspace-ID is present; allow optional Authorization header.
actual_headers = call_kwargs.get("additional_headers", {})
assert actual_headers.get("X-Workspace-ID") == "ws-hdr"
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@ -184,6 +184,7 @@ async def test_check_delegations_writes_results_file(tmp_path):
delegations = [ delegations = [
{"delegation_id": "d-1", "status": "completed", "target_id": "ws-t", {"delegation_id": "d-1", "status": "completed", "target_id": "ws-t",
"source_id": "ws-abc", # must match workspace_id for Fix B source validation
"summary": "Done", "response_preview": "Result here", "error": ""}, "summary": "Done", "response_preview": "Result here", "error": ""},
] ]
@ -245,6 +246,7 @@ async def test_check_delegations_sends_self_message(tmp_path):
delegations = [ delegations = [
{"delegation_id": "d-new", "status": "completed", "target_id": "ws-t", {"delegation_id": "d-new", "status": "completed", "target_id": "ws-t",
"source_id": "ws-abc", # must match workspace_id for Fix B source validation
"summary": "Task done", "response_preview": "All good", "error": ""}, "summary": "Task done", "response_preview": "All good", "error": ""},
] ]