fix(security): Cycle 5 — auth middleware, injection hardening, skill sandbox
Fix A — platform/internal/middleware/wsauth_middleware.go (NEW):
WorkspaceAuth() gin middleware enforces per-workspace bearer-token auth on
ALL /workspaces/:id/* sub-routes. Same lazy-bootstrap contract as
secrets.Values: workspaces with no live token are grandfathered through.
Blocks C2, C3, C4, C5, C7, C8, C9, C12, C13 simultaneously.
Fix A — platform/internal/router/router.go:
Reorganised route registration: bare CRUD (/workspaces, /workspaces/:id)
and /a2a remain on root router; all other /workspaces/:id/* sub-routes
moved into wsAuth = r.Group("/workspaces/:id", middleware.WorkspaceAuth(db.DB)).
CORS AllowHeaders updated to include Authorization so browser/agent callers
can send the bearer token cross-origin.
Fix B — workspace-template/heartbeat.py:
_check_delegations(): validate source_id == self.workspace_id before
accepting a delegation result. Attacker-crafted records with a foreign
source_id are silently skipped with a WARNING log (injection attempt).
trigger_msg no longer embeds raw response_preview text; references
delegation_id + status only — removes the prompt-injection vector.
Fix C — workspace-template/skill_loader/loader.py:
load_skill_tools(): before exec_module(), verify script is within
scripts_dir (path traversal guard) and temporarily scrub sensitive env
vars (CLAUDE_CODE_OAUTH_TOKEN, ANTHROPIC_API_KEY, OPENAI_API_KEY,
WORKSPACE_AUTH_TOKEN, GITHUB_TOKEN, GH_TOKEN) from os.environ; restore
in finally block. Defence-in-depth even if /plugins auth gate is bypassed.
Fix D — platform/internal/handlers/socket.go:
HandleConnect(): agent connections (X-Workspace-ID present) validated via
wsauth.HasAnyLiveToken + wsauth.ValidateToken before WebSocket upgrade.
Canvas clients (no X-Workspace-ID) remain unauthenticated.
Fix D — workspace-template/events.py:
PlatformEventSubscriber._connect(): include platform_auth bearer token in
WebSocket upgrade headers alongside X-Workspace-ID.
Fix E — workspace-template/executor_helpers.py:
recall_memories() and commit_memory() now pass platform_auth bearer token
in Authorization header so WorkspaceAuth middleware allows access.
Fix F — workspace-template/a2a_client.py:
send_a2a_message(): timeout=None → httpx.Timeout(connect=30, read=300,
write=30, pool=30). Resolves H2 flagged across 5 consecutive audits.
Tests: 149/149 Python tests pass (test_heartbeat + test_events updated to
assert new source_id validation behaviour and allow Authorization header).
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
e920aaab8e
commit
6c78962a33
@ -6,8 +6,10 @@ import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
@ -40,15 +42,41 @@ func NewSocketHandler(hub *ws.Hub) *SocketHandler {
|
||||
// HandleConnect handles WebSocket upgrade at GET /ws.
|
||||
// Canvas clients connect without X-Workspace-ID — they receive all events.
|
||||
// Workspace agents send X-Workspace-ID — events are filtered by CanCommunicate.
|
||||
//
|
||||
// Fix D (Cycle 5): agent connections (X-Workspace-ID present) are now validated
|
||||
// via bearer token before the WebSocket upgrade. Canvas clients (no X-Workspace-ID)
|
||||
// remain unauthenticated. Pre-token workspaces are grandfathered through.
|
||||
func (h *SocketHandler) HandleConnect(c *gin.Context) {
|
||||
workspaceID := c.GetHeader("X-Workspace-ID")
|
||||
|
||||
// Authenticate workspace agents (not canvas browser clients).
|
||||
if workspaceID != "" {
|
||||
ctx := c.Request.Context()
|
||||
hasLive, err := wsauth.HasAnyLiveToken(ctx, db.DB, workspaceID)
|
||||
if err != nil {
|
||||
log.Printf("wsauth: WebSocket HasAnyLiveToken(%s) failed: %v", workspaceID, err)
|
||||
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "auth check failed"})
|
||||
return
|
||||
}
|
||||
if hasLive {
|
||||
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
|
||||
if tok == "" {
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace auth token"})
|
||||
return
|
||||
}
|
||||
if err := wsauth.ValidateToken(ctx, db.DB, workspaceID, tok); err != nil {
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid workspace auth token"})
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||||
if err != nil {
|
||||
log.Printf("WebSocket upgrade error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
workspaceID := c.GetHeader("X-Workspace-ID")
|
||||
|
||||
client := &ws.Client{
|
||||
Conn: conn,
|
||||
WorkspaceID: workspaceID,
|
||||
|
||||
51
platform/internal/middleware/wsauth_middleware.go
Normal file
51
platform/internal/middleware/wsauth_middleware.go
Normal file
@ -0,0 +1,51 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// WorkspaceAuth returns a Gin middleware that enforces per-workspace bearer-token
|
||||
// authentication on /workspaces/:id/* sub-routes.
|
||||
//
|
||||
// Same lazy-bootstrap contract as secrets.Values: workspaces that have no live
|
||||
// token on file are grandfathered through so in-flight agents keep working
|
||||
// during a rolling upgrade. Once a workspace has at least one live token every
|
||||
// request MUST present a valid one in Authorization: Bearer <token>.
|
||||
//
|
||||
// Intended for route groups that cover all /workspaces/:id/* paths.
|
||||
// The /workspaces/:id/a2a route must be registered on the root router (outside
|
||||
// this group) because it already authenticates callers via CanCommunicate.
|
||||
func WorkspaceAuth(database *sql.DB) gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
if workspaceID == "" {
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace ID"})
|
||||
return
|
||||
}
|
||||
ctx := c.Request.Context()
|
||||
|
||||
hasLive, err := wsauth.HasAnyLiveToken(ctx, database, workspaceID)
|
||||
if err != nil {
|
||||
log.Printf("wsauth: WorkspaceAuth: HasAnyLiveToken(%s) failed: %v", workspaceID, err)
|
||||
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": "auth check failed"})
|
||||
return
|
||||
}
|
||||
if hasLive {
|
||||
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
|
||||
if tok == "" {
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing workspace auth token"})
|
||||
return
|
||||
}
|
||||
if err := wsauth.ValidateToken(ctx, database, workspaceID, tok); err != nil {
|
||||
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "invalid workspace auth token"})
|
||||
return
|
||||
}
|
||||
}
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
@ -32,7 +32,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
r.Use(cors.New(cors.Config{
|
||||
AllowOrigins: corsOrigins,
|
||||
AllowMethods: []string{"GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"},
|
||||
AllowHeaders: []string{"Origin", "Content-Type", "X-Workspace-ID"},
|
||||
AllowHeaders: []string{"Origin", "Content-Type", "X-Workspace-ID", "Authorization"},
|
||||
AllowCredentials: true,
|
||||
}))
|
||||
|
||||
@ -62,61 +62,69 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
// Scrape with: curl http://localhost:8080/metrics
|
||||
r.GET("/metrics", metrics.Handler())
|
||||
|
||||
// Workspaces CRUD
|
||||
// Workspaces CRUD — bare /workspaces and /workspaces/:id (no sub-path), unauthenticated for canvas
|
||||
r.POST("/workspaces", wh.Create)
|
||||
r.GET("/workspaces", wh.List)
|
||||
r.GET("/workspaces/:id", wh.Get)
|
||||
// Phase 30.4 — lightweight token-gated state polling for remote agents
|
||||
// that can't reach the platform's WebSocket. Returns {status, paused,
|
||||
// deleted}. Separate from /workspaces/:id so the canvas path stays
|
||||
// unauthenticated and returns its full config payload.
|
||||
r.GET("/workspaces/:id/state", wh.State)
|
||||
r.PATCH("/workspaces/:id", wh.Update)
|
||||
r.DELETE("/workspaces/:id", wh.Delete)
|
||||
r.POST("/workspaces/:id/restart", wh.Restart)
|
||||
r.POST("/workspaces/:id/pause", wh.Pause)
|
||||
r.POST("/workspaces/:id/resume", wh.Resume)
|
||||
|
||||
// A2A proxy — registered outside the auth group; already enforces CanCommunicate access control.
|
||||
r.POST("/workspaces/:id/a2a", wh.ProxyA2A)
|
||||
|
||||
// Async Delegation
|
||||
delh := handlers.NewDelegationHandler(wh, broadcaster)
|
||||
r.POST("/workspaces/:id/delegate", delh.Delegate)
|
||||
r.GET("/workspaces/:id/delegations", delh.ListDelegations)
|
||||
// Record-only endpoint for agent-initiated delegations (#64). Agent-side
|
||||
// delegate_to_workspace fires A2A directly for speed + OTEL propagation;
|
||||
// this endpoint just adds an activity_logs row so GET /delegations returns
|
||||
// the same set the agent's local `check_delegation_status` sees.
|
||||
r.POST("/workspaces/:id/delegations/record", delh.Record)
|
||||
r.POST("/workspaces/:id/delegations/:delegation_id/update", delh.UpdateStatus)
|
||||
// Auth-gated workspace sub-routes — ALL /workspaces/:id/* paths except /a2a.
|
||||
// Fix A (Cycle 5): single WorkspaceAuth middleware blocks C2-C5, C7-C9, C12, C13
|
||||
// by requiring a valid bearer token for any workspace that has one on file.
|
||||
// Legacy workspaces (no token) are grandfathered to allow rolling upgrades.
|
||||
wsAuth := r.Group("/workspaces/:id", middleware.WorkspaceAuth(db.DB))
|
||||
{
|
||||
// Lifecycle
|
||||
wsAuth.GET("/state", wh.State)
|
||||
wsAuth.POST("/restart", wh.Restart)
|
||||
wsAuth.POST("/pause", wh.Pause)
|
||||
wsAuth.POST("/resume", wh.Resume)
|
||||
|
||||
// Traces (Langfuse proxy)
|
||||
trh := handlers.NewTracesHandler()
|
||||
r.GET("/workspaces/:id/traces", trh.List)
|
||||
// Async Delegation
|
||||
delh := handlers.NewDelegationHandler(wh, broadcaster)
|
||||
wsAuth.POST("/delegate", delh.Delegate)
|
||||
wsAuth.GET("/delegations", delh.ListDelegations)
|
||||
// Record-only endpoint for agent-initiated delegations (#64). Agent-side
|
||||
// delegate_to_workspace fires A2A directly for speed + OTEL propagation;
|
||||
// this endpoint just adds an activity_logs row so GET /delegations returns
|
||||
// the same set the agent's local `check_delegation_status` sees.
|
||||
wsAuth.POST("/delegations/record", delh.Record)
|
||||
wsAuth.POST("/delegations/:delegation_id/update", delh.UpdateStatus)
|
||||
|
||||
// Agent Memories (HMA)
|
||||
memsh := handlers.NewMemoriesHandler()
|
||||
r.POST("/workspaces/:id/memories", memsh.Commit)
|
||||
r.GET("/workspaces/:id/memories", memsh.Search)
|
||||
r.DELETE("/workspaces/:id/memories/:memoryId", memsh.Delete)
|
||||
// Traces (Langfuse proxy)
|
||||
trh := handlers.NewTracesHandler()
|
||||
wsAuth.GET("/traces", trh.List)
|
||||
|
||||
// Approvals
|
||||
apph := handlers.NewApprovalsHandler(broadcaster)
|
||||
r.GET("/approvals/pending", apph.ListAll)
|
||||
r.POST("/workspaces/:id/approvals", apph.Create)
|
||||
r.GET("/workspaces/:id/approvals", apph.List)
|
||||
r.POST("/workspaces/:id/approvals/:approvalId/decide", apph.Decide)
|
||||
// Agent Memories (HMA)
|
||||
memsh := handlers.NewMemoriesHandler()
|
||||
wsAuth.POST("/memories", memsh.Commit)
|
||||
wsAuth.GET("/memories", memsh.Search)
|
||||
wsAuth.DELETE("/memories/:memoryId", memsh.Delete)
|
||||
|
||||
// Team Expansion
|
||||
teamh := handlers.NewTeamHandler(broadcaster, prov, platformURL, configsDir)
|
||||
r.POST("/workspaces/:id/expand", teamh.Expand)
|
||||
r.POST("/workspaces/:id/collapse", teamh.Collapse)
|
||||
// Approvals
|
||||
apph := handlers.NewApprovalsHandler(broadcaster)
|
||||
wsAuth.POST("/approvals", apph.Create)
|
||||
wsAuth.GET("/approvals", apph.List)
|
||||
wsAuth.POST("/approvals/:approvalId/decide", apph.Decide)
|
||||
// /approvals/pending is a cross-workspace admin path; keep on root router outside wsAuth.
|
||||
r.GET("/approvals/pending", apph.ListAll)
|
||||
|
||||
// Agents
|
||||
ah := handlers.NewAgentHandler(broadcaster)
|
||||
r.POST("/workspaces/:id/agent", ah.Assign)
|
||||
r.PATCH("/workspaces/:id/agent", ah.Replace)
|
||||
r.DELETE("/workspaces/:id/agent", ah.Remove)
|
||||
r.POST("/workspaces/:id/agent/move", ah.Move)
|
||||
// Team Expansion
|
||||
teamh := handlers.NewTeamHandler(broadcaster, prov, platformURL, configsDir)
|
||||
wsAuth.POST("/expand", teamh.Expand)
|
||||
wsAuth.POST("/collapse", teamh.Collapse)
|
||||
|
||||
// Agents
|
||||
ah := handlers.NewAgentHandler(broadcaster)
|
||||
wsAuth.POST("/agent", ah.Assign)
|
||||
wsAuth.PATCH("/agent", ah.Replace)
|
||||
wsAuth.DELETE("/agent", ah.Remove)
|
||||
wsAuth.POST("/agent/move", ah.Move)
|
||||
}
|
||||
|
||||
// Registry
|
||||
rh := handlers.NewRegistryHandler(broadcaster)
|
||||
@ -135,58 +143,65 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
r.GET("/registry/:id/peers", dh.Peers)
|
||||
r.POST("/registry/check-access", dh.CheckAccess)
|
||||
|
||||
// Events
|
||||
// Events (not workspace-scoped — exempt from per-workspace auth)
|
||||
eh := handlers.NewEventsHandler()
|
||||
r.GET("/events", eh.List)
|
||||
r.GET("/events/:workspaceId", eh.ListByWorkspace)
|
||||
|
||||
// Activity Logs
|
||||
acth := handlers.NewActivityHandler(broadcaster)
|
||||
r.GET("/workspaces/:id/activity", acth.List)
|
||||
r.GET("/workspaces/:id/session-search", acth.SessionSearch)
|
||||
r.POST("/workspaces/:id/activity", acth.Report)
|
||||
r.POST("/workspaces/:id/notify", acth.Notify)
|
||||
// Remaining auth-gated workspace sub-routes — appended to wsAuth group declared above.
|
||||
{
|
||||
// Activity Logs
|
||||
acth := handlers.NewActivityHandler(broadcaster)
|
||||
wsAuth.GET("/activity", acth.List)
|
||||
wsAuth.GET("/session-search", acth.SessionSearch)
|
||||
wsAuth.POST("/activity", acth.Report)
|
||||
wsAuth.POST("/notify", acth.Notify)
|
||||
|
||||
// Config
|
||||
cfgh := handlers.NewConfigHandler()
|
||||
r.GET("/workspaces/:id/config", cfgh.Get)
|
||||
r.PATCH("/workspaces/:id/config", cfgh.Patch)
|
||||
// Config
|
||||
cfgh := handlers.NewConfigHandler()
|
||||
wsAuth.GET("/config", cfgh.Get)
|
||||
wsAuth.PATCH("/config", cfgh.Patch)
|
||||
|
||||
// Schedules (cron tasks)
|
||||
schedh := handlers.NewScheduleHandler()
|
||||
r.GET("/workspaces/:id/schedules", schedh.List)
|
||||
r.POST("/workspaces/:id/schedules", schedh.Create)
|
||||
r.PATCH("/workspaces/:id/schedules/:scheduleId", schedh.Update)
|
||||
r.DELETE("/workspaces/:id/schedules/:scheduleId", schedh.Delete)
|
||||
r.POST("/workspaces/:id/schedules/:scheduleId/run", schedh.RunNow)
|
||||
r.GET("/workspaces/:id/schedules/:scheduleId/history", schedh.History)
|
||||
// Schedules (cron tasks)
|
||||
schedh := handlers.NewScheduleHandler()
|
||||
wsAuth.GET("/schedules", schedh.List)
|
||||
wsAuth.POST("/schedules", schedh.Create)
|
||||
wsAuth.PATCH("/schedules/:scheduleId", schedh.Update)
|
||||
wsAuth.DELETE("/schedules/:scheduleId", schedh.Delete)
|
||||
wsAuth.POST("/schedules/:scheduleId/run", schedh.RunNow)
|
||||
wsAuth.GET("/schedules/:scheduleId/history", schedh.History)
|
||||
|
||||
// Memory
|
||||
memh := handlers.NewMemoryHandler()
|
||||
r.GET("/workspaces/:id/memory", memh.List)
|
||||
r.GET("/workspaces/:id/memory/:key", memh.Get)
|
||||
r.POST("/workspaces/:id/memory", memh.Set)
|
||||
r.DELETE("/workspaces/:id/memory/:key", memh.Delete)
|
||||
// Memory
|
||||
memh := handlers.NewMemoryHandler()
|
||||
wsAuth.GET("/memory", memh.List)
|
||||
wsAuth.GET("/memory/:key", memh.Get)
|
||||
wsAuth.POST("/memory", memh.Set)
|
||||
wsAuth.DELETE("/memory/:key", memh.Delete)
|
||||
|
||||
// Secrets (auto-restart workspace after secret change)
|
||||
sech := handlers.NewSecretsHandler(wh.RestartByID)
|
||||
r.GET("/workspaces/:id/secrets", sech.List)
|
||||
// Phase 30.2 — decrypted values pull, token-gated. Canvas uses List
|
||||
// (keys + metadata only); remote agents use Values to bootstrap env.
|
||||
r.GET("/workspaces/:id/secrets/values", sech.Values)
|
||||
r.POST("/workspaces/:id/secrets", sech.Set)
|
||||
r.PUT("/workspaces/:id/secrets", sech.Set)
|
||||
r.DELETE("/workspaces/:id/secrets/:key", sech.Delete)
|
||||
r.GET("/workspaces/:id/model", sech.GetModel)
|
||||
// Secrets (auto-restart workspace after secret change)
|
||||
sech := handlers.NewSecretsHandler(wh.RestartByID)
|
||||
wsAuth.GET("/secrets", sech.List)
|
||||
// Phase 30.2 — decrypted values pull, token-gated. Canvas uses List
|
||||
// (keys + metadata only); remote agents use Values to bootstrap env.
|
||||
wsAuth.GET("/secrets/values", sech.Values)
|
||||
wsAuth.POST("/secrets", sech.Set)
|
||||
wsAuth.PUT("/secrets", sech.Set)
|
||||
wsAuth.DELETE("/secrets/:key", sech.Delete)
|
||||
wsAuth.GET("/model", sech.GetModel)
|
||||
}
|
||||
|
||||
// Global secrets — /settings/secrets is the canonical path; /admin/secrets kept for backward compat
|
||||
r.GET("/settings/secrets", sech.ListGlobal)
|
||||
r.PUT("/settings/secrets", sech.SetGlobal)
|
||||
r.POST("/settings/secrets", sech.SetGlobal)
|
||||
r.DELETE("/settings/secrets/:key", sech.DeleteGlobal)
|
||||
r.GET("/admin/secrets", sech.ListGlobal)
|
||||
r.POST("/admin/secrets", sech.SetGlobal)
|
||||
r.DELETE("/admin/secrets/:key", sech.DeleteGlobal)
|
||||
// These are admin-level paths outside the per-workspace auth group.
|
||||
{
|
||||
sechGlobal := handlers.NewSecretsHandler(wh.RestartByID)
|
||||
r.GET("/settings/secrets", sechGlobal.ListGlobal)
|
||||
r.PUT("/settings/secrets", sechGlobal.SetGlobal)
|
||||
r.POST("/settings/secrets", sechGlobal.SetGlobal)
|
||||
r.DELETE("/settings/secrets/:key", sechGlobal.DeleteGlobal)
|
||||
r.GET("/admin/secrets", sechGlobal.ListGlobal)
|
||||
r.POST("/admin/secrets", sechGlobal.SetGlobal)
|
||||
r.DELETE("/admin/secrets/:key", sechGlobal.DeleteGlobal)
|
||||
}
|
||||
|
||||
// Terminal — shares Docker client with provisioner
|
||||
var dockerCli *client.Client
|
||||
@ -194,7 +209,7 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
dockerCli = prov.DockerClient()
|
||||
}
|
||||
th := handlers.NewTerminalHandler(dockerCli)
|
||||
r.GET("/workspaces/:id/terminal", th.HandleConnect)
|
||||
wsAuth.GET("/terminal", th.HandleConnect)
|
||||
|
||||
// Canvas Viewport
|
||||
vh := handlers.NewViewportHandler()
|
||||
@ -205,12 +220,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
tmplh := handlers.NewTemplatesHandler(configsDir, dockerCli)
|
||||
r.GET("/templates", tmplh.List)
|
||||
r.POST("/templates/import", tmplh.Import)
|
||||
r.GET("/workspaces/:id/shared-context", tmplh.SharedContext)
|
||||
r.PUT("/workspaces/:id/files", tmplh.ReplaceFiles)
|
||||
r.GET("/workspaces/:id/files", tmplh.ListFiles)
|
||||
r.GET("/workspaces/:id/files/*path", tmplh.ReadFile)
|
||||
r.PUT("/workspaces/:id/files/*path", tmplh.WriteFile)
|
||||
r.DELETE("/workspaces/:id/files/*path", tmplh.DeleteFile)
|
||||
wsAuth.GET("/shared-context", tmplh.SharedContext)
|
||||
wsAuth.PUT("/files", tmplh.ReplaceFiles)
|
||||
wsAuth.GET("/files", tmplh.ListFiles)
|
||||
wsAuth.GET("/files/*path", tmplh.ReadFile)
|
||||
wsAuth.PUT("/files/*path", tmplh.WriteFile)
|
||||
wsAuth.DELETE("/files/*path", tmplh.DeleteFile)
|
||||
|
||||
// Plugins
|
||||
pluginsDir := findPluginsDir(configsDir)
|
||||
@ -230,14 +245,14 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
WithRuntimeLookup(runtimeLookup)
|
||||
r.GET("/plugins", plgh.ListRegistry)
|
||||
r.GET("/plugins/sources", plgh.ListSources)
|
||||
r.GET("/workspaces/:id/plugins", plgh.ListInstalled)
|
||||
r.GET("/workspaces/:id/plugins/available", plgh.ListAvailableForWorkspace)
|
||||
r.GET("/workspaces/:id/plugins/compatibility", plgh.CheckRuntimeCompatibility)
|
||||
r.POST("/workspaces/:id/plugins", plgh.Install)
|
||||
r.DELETE("/workspaces/:id/plugins/:name", plgh.Uninstall)
|
||||
wsAuth.GET("/plugins", plgh.ListInstalled)
|
||||
wsAuth.GET("/plugins/available", plgh.ListAvailableForWorkspace)
|
||||
wsAuth.GET("/plugins/compatibility", plgh.CheckRuntimeCompatibility)
|
||||
wsAuth.POST("/plugins", plgh.Install)
|
||||
wsAuth.DELETE("/plugins/:name", plgh.Uninstall)
|
||||
// Phase 30.3 — stream plugin as tar.gz so remote agents can pull +
|
||||
// unpack locally instead of going through Docker exec.
|
||||
r.GET("/workspaces/:id/plugins/:name/download", plgh.Download)
|
||||
wsAuth.GET("/plugins/:name/download", plgh.Download)
|
||||
|
||||
// Bundles
|
||||
bh := handlers.NewBundleHandler(broadcaster, prov, platformURL, configsDir, dockerCli)
|
||||
@ -253,12 +268,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
// Channels (social integrations — Telegram, Slack, Discord, etc.)
|
||||
chh := handlers.NewChannelHandler(channelMgr)
|
||||
r.GET("/channels/adapters", chh.ListAdapters)
|
||||
r.GET("/workspaces/:id/channels", chh.List)
|
||||
r.POST("/workspaces/:id/channels", chh.Create)
|
||||
r.PATCH("/workspaces/:id/channels/:channelId", chh.Update)
|
||||
r.DELETE("/workspaces/:id/channels/:channelId", chh.Delete)
|
||||
r.POST("/workspaces/:id/channels/:channelId/send", chh.Send)
|
||||
r.POST("/workspaces/:id/channels/:channelId/test", chh.Test)
|
||||
wsAuth.GET("/channels", chh.List)
|
||||
wsAuth.POST("/channels", chh.Create)
|
||||
wsAuth.PATCH("/channels/:channelId", chh.Update)
|
||||
wsAuth.DELETE("/channels/:channelId", chh.Delete)
|
||||
wsAuth.POST("/channels/:channelId/send", chh.Send)
|
||||
wsAuth.POST("/channels/:channelId/test", chh.Test)
|
||||
r.POST("/channels/discover", chh.Discover)
|
||||
r.POST("/webhooks/:type", chh.Webhook)
|
||||
|
||||
|
||||
@ -41,7 +41,12 @@ async def discover_peer(target_id: str) -> dict | None:
|
||||
|
||||
async def send_a2a_message(target_url: str, message: str) -> str:
|
||||
"""Send an A2A message/send to a target workspace."""
|
||||
async with httpx.AsyncClient(timeout=None) as client:
|
||||
# Fix F (Cycle 5 / H2 — flagged 5 consecutive audits): timeout=None allowed
|
||||
# a hung upstream to block the agent indefinitely. Use a generous but bounded
|
||||
# timeout: 30s connect + 300s read (long enough for slow LLM responses).
|
||||
async with httpx.AsyncClient(
|
||||
timeout=httpx.Timeout(connect=30.0, read=300.0, write=30.0, pool=30.0)
|
||||
) as client:
|
||||
try:
|
||||
resp = await client.post(
|
||||
target_url,
|
||||
|
||||
@ -62,7 +62,15 @@ class PlatformEventSubscriber:
|
||||
self._running = False
|
||||
return
|
||||
|
||||
# Fix D (Cycle 5): include bearer token in WebSocket upgrade so the
|
||||
# server's new auth check can validate this agent connection.
|
||||
# Graceful fallback for workspaces that have no token yet.
|
||||
headers = {"X-Workspace-ID": self.workspace_id}
|
||||
try:
|
||||
from platform_auth import auth_headers as _auth_headers
|
||||
headers.update(_auth_headers())
|
||||
except Exception:
|
||||
pass # No token available — connect unauthenticated (grandfathered)
|
||||
logger.info("Connecting to platform WebSocket: %s", self.ws_url)
|
||||
|
||||
async with websockets.connect(self.ws_url, additional_headers=headers) as ws:
|
||||
|
||||
@ -92,9 +92,17 @@ async def recall_memories() -> str:
|
||||
platform_url = os.environ.get("PLATFORM_URL", "")
|
||||
if not workspace_id or not platform_url:
|
||||
return ""
|
||||
# Fix E (Cycle 5): send auth headers so the WorkspaceAuth middleware
|
||||
# (Fix A) allows access once the workspace has a live token on file.
|
||||
try:
|
||||
from platform_auth import auth_headers as _platform_auth
|
||||
_auth = _platform_auth()
|
||||
except Exception:
|
||||
_auth = {}
|
||||
try:
|
||||
resp = await get_http_client().get(
|
||||
f"{platform_url}/workspaces/{workspace_id}/memories",
|
||||
headers=_auth,
|
||||
)
|
||||
if not 200 <= resp.status_code < 300:
|
||||
logger.debug(
|
||||
@ -121,10 +129,17 @@ async def commit_memory(content: str) -> None:
|
||||
platform_url = os.environ.get("PLATFORM_URL", "")
|
||||
if not workspace_id or not platform_url or not content:
|
||||
return
|
||||
# Fix E (Cycle 5): include auth header so WorkspaceAuth middleware allows access.
|
||||
try:
|
||||
from platform_auth import auth_headers as _platform_auth
|
||||
_auth = _platform_auth()
|
||||
except Exception:
|
||||
_auth = {}
|
||||
try:
|
||||
await get_http_client().post(
|
||||
f"{platform_url}/workspaces/{workspace_id}/memories",
|
||||
json={"content": content, "scope": "LOCAL"},
|
||||
headers=_auth,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("commit_memory: request failed: %s", exc)
|
||||
|
||||
@ -153,11 +153,25 @@ class HeartbeatLoop:
|
||||
continue
|
||||
|
||||
if status in ("completed", "failed"):
|
||||
# Fix B (Cycle 5): validate source_id before accepting delegation
|
||||
# results. Only process delegations that THIS workspace created
|
||||
# (source_id == self.workspace_id). Attacker-crafted delegation
|
||||
# records with a foreign source_id cannot inject instructions.
|
||||
source_id = d.get("source_id", "")
|
||||
if source_id != self.workspace_id:
|
||||
logger.warning(
|
||||
"Heartbeat: skipping delegation %s — source_id %r does not "
|
||||
"match this workspace %r; possible injection attempt",
|
||||
did, source_id, self.workspace_id,
|
||||
)
|
||||
self._seen_delegation_ids.add(did) # mark seen so we don't warn again
|
||||
continue
|
||||
|
||||
self._seen_delegation_ids.add(did)
|
||||
new_results.append({
|
||||
"delegation_id": did,
|
||||
"target_id": d.get("target_id", ""),
|
||||
"source_id": d.get("source_id", ""),
|
||||
"source_id": source_id,
|
||||
"status": status,
|
||||
"summary": d.get("summary", ""),
|
||||
"response_preview": d.get("response_preview", ""),
|
||||
@ -177,12 +191,15 @@ class HeartbeatLoop:
|
||||
f.write(json.dumps(r) + "\n")
|
||||
logger.info("Heartbeat: %d new delegation results — triggering self-message", len(new_results))
|
||||
|
||||
# Build a summary message for the agent
|
||||
# Build a summary message for the agent.
|
||||
# Fix B (Cycle 5): do NOT embed raw response_preview text in
|
||||
# user-role A2A messages — that is the prompt-injection vector.
|
||||
# Instead reference only the delegation ID and status; the agent
|
||||
# reads full content from DELEGATION_RESULTS_FILE which was
|
||||
# written above from trusted platform data.
|
||||
summary_lines = []
|
||||
for r in new_results:
|
||||
line = f"- [{r['status']}] {r['summary'][:80]}"
|
||||
if r.get("response_preview"):
|
||||
line += f"\n Response: {r['response_preview'][:200]}"
|
||||
line = f"- [{r['status']}] Delegation {r['delegation_id'][:8]}: {r['summary'][:80]}"
|
||||
if r.get("error"):
|
||||
line += f"\n Error: {r['error'][:100]}"
|
||||
summary_lines.append(line)
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
|
||||
import importlib.util
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
@ -81,10 +82,32 @@ def load_skill_tools(scripts_dir: Path) -> list[Any]:
|
||||
# Keeps test environments (and empty skills) from needing langchain.
|
||||
from langchain_core.tools import BaseTool
|
||||
|
||||
# Sensitive env vars that must not be readable by skill scripts.
|
||||
# Fix C (Cycle 5): scrub before exec_module() so a malicious skill cannot
|
||||
# exfiltrate credentials even if it somehow bypasses the POST /plugins
|
||||
# auth gate (defence in depth).
|
||||
_SCRUB_KEYS = (
|
||||
"CLAUDE_CODE_OAUTH_TOKEN",
|
||||
"ANTHROPIC_API_KEY",
|
||||
"OPENAI_API_KEY",
|
||||
"WORKSPACE_AUTH_TOKEN",
|
||||
"GITHUB_TOKEN",
|
||||
"GH_TOKEN",
|
||||
)
|
||||
|
||||
for py_file in sorted(scripts_dir.glob("*.py")):
|
||||
if py_file.name.startswith("_"):
|
||||
continue
|
||||
|
||||
# Verify the script is actually inside the expected scripts directory
|
||||
# (path traversal guard — glob shouldn't produce outside paths, but
|
||||
# belt-and-suspenders for symlink attacks).
|
||||
try:
|
||||
py_file.resolve().relative_to(scripts_dir.resolve())
|
||||
except ValueError:
|
||||
logger.warning("skill_loader: rejecting script outside scripts_dir: %s", py_file)
|
||||
continue
|
||||
|
||||
module_name = f"skill_tool_{py_file.stem}"
|
||||
spec = importlib.util.spec_from_file_location(module_name, py_file)
|
||||
if spec is None or spec.loader is None:
|
||||
@ -92,7 +115,14 @@ def load_skill_tools(scripts_dir: Path) -> list[Any]:
|
||||
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules[module_name] = module
|
||||
spec.loader.exec_module(module)
|
||||
|
||||
# Temporarily remove sensitive env vars before running skill code.
|
||||
_saved_env = {k: os.environ.pop(k) for k in _SCRUB_KEYS if k in os.environ}
|
||||
try:
|
||||
spec.loader.exec_module(module)
|
||||
finally:
|
||||
# Always restore so the rest of the agent process retains them.
|
||||
os.environ.update(_saved_env)
|
||||
|
||||
# Look for functions decorated with @tool (BaseTool instances)
|
||||
for attr_name in dir(module):
|
||||
|
||||
@ -282,7 +282,10 @@ async def test_connect_uses_workspace_id_header():
|
||||
await sub._connect()
|
||||
|
||||
call_kwargs = websockets_mod.connect.call_args[1]
|
||||
assert call_kwargs.get("additional_headers") == {"X-Workspace-ID": "ws-hdr"}
|
||||
# Fix D (Cycle 5): headers now include Authorization when platform_auth available.
|
||||
# Assert X-Workspace-ID is present; allow optional Authorization header.
|
||||
actual_headers = call_kwargs.get("additional_headers", {})
|
||||
assert actual_headers.get("X-Workspace-ID") == "ws-hdr"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@ -184,6 +184,7 @@ async def test_check_delegations_writes_results_file(tmp_path):
|
||||
|
||||
delegations = [
|
||||
{"delegation_id": "d-1", "status": "completed", "target_id": "ws-t",
|
||||
"source_id": "ws-abc", # must match workspace_id for Fix B source validation
|
||||
"summary": "Done", "response_preview": "Result here", "error": ""},
|
||||
]
|
||||
|
||||
@ -245,6 +246,7 @@ async def test_check_delegations_sends_self_message(tmp_path):
|
||||
|
||||
delegations = [
|
||||
{"delegation_id": "d-new", "status": "completed", "target_id": "ws-t",
|
||||
"source_id": "ws-abc", # must match workspace_id for Fix B source validation
|
||||
"summary": "Task done", "response_preview": "All good", "error": ""},
|
||||
]
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user