From ed26f2733a642df9d23fb1e09fe5ba3f685a0d9c Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Wed, 22 Apr 2026 16:18:06 -0700 Subject: [PATCH] fix(review): address code review blockers on tool-trace + instructions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BLOCKERS fixed: - instructions.go: Drop team-scope queries (teams/team_members tables don't exist in any migration). Schema column kept for future. Restored Resolve to /workspaces/:id/instructions/resolve under wsAuth — closes auth gap that allowed cross-workspace enumeration of operator policy. - migration 040: Add CHECK constraints on title (<=200) and content (<=8192) to prevent token-budget DoS via oversized instructions. - a2a_executor.py: Pair on_tool_start/on_tool_end via run_id instead of list-position so parallel tool calls don't drop or clobber outputs. Cap tool_trace at 200 entries to prevent runaway loops bloating JSONB. HIGH fixes: - instructions.go: Add length validation in Create + Update handlers. Removed dead rows_ shadow variable. Replaced string concatenation in Resolve with strings.Builder. - prompt.py: Drop httpx timeout 10s -> 3s (boot hot path). Switch print to logger.warning. Add Authorization bearer header from MOLECULE_WORKSPACE_TOKEN env var. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../internal/handlers/instructions.go | 115 +++++++++--------- workspace-server/internal/router/router.go | 9 +- .../040_platform_instructions.up.sql | 6 +- workspace/a2a_executor.py | 26 +++- workspace/prompt.py | 24 +++- 5 files changed, 109 insertions(+), 71 deletions(-) diff --git a/workspace-server/internal/handlers/instructions.go b/workspace-server/internal/handlers/instructions.go index 41dcd8e3..2e8e89ac 100644 --- a/workspace-server/internal/handlers/instructions.go +++ b/workspace-server/internal/handlers/instructions.go @@ -3,12 +3,17 @@ package handlers import ( "log" "net/http" + "strings" "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/gin-gonic/gin" ) +// maxInstructionContentLen caps content size to prevent token-budget DoS via +// oversized instructions being prepended to every agent's system prompt. +const maxInstructionContentLen = 8192 + type InstructionsHandler struct{} func NewInstructionsHandler() *InstructionsHandler { @@ -28,51 +33,36 @@ type Instruction struct { } // List returns instructions filtered by scope. Agents call this at startup -// to fetch their full instruction set (global + team + workspace). +// to fetch their full instruction set (global + workspace). // // GET /instructions?scope=global -// GET /instructions?workspace_id= (returns global + team + workspace) +// GET /instructions?workspace_id= (returns global + workspace) +// +// Team scope is reserved in the schema but not yet wired — teams/team_members +// tables don't exist in any migration. Adding team support requires a new +// migration first. func (h *InstructionsHandler) List(c *gin.Context) { ctx := c.Request.Context() scope := c.Query("scope") workspaceID := c.Query("workspace_id") - var rows_ interface{ Close() error } - var err error - if workspaceID != "" { - // Agent bootstrap: fetch all applicable instructions (global + team + workspace) - // ordered by scope priority (global first) then user priority descending. - var teamSlug *string - db.DB.QueryRowContext(ctx, - `SELECT t.slug FROM teams t - JOIN team_members tm ON tm.team_id = t.id - WHERE tm.workspace_id = $1 LIMIT 1`, workspaceID).Scan(&teamSlug) - query := `SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE enabled = true AND ( scope = 'global' - OR (scope = 'team' AND scope_target = $1) - OR (scope = 'workspace' AND scope_target = $2) + OR (scope = 'workspace' AND scope_target = $1) ) - ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'team' THEN 1 WHEN 'workspace' THEN 2 END, + ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'workspace' THEN 2 END, priority DESC` - - teamTarget := "" - if teamSlug != nil { - teamTarget = *teamSlug - } - r, qErr := db.DB.QueryContext(ctx, query, teamTarget, workspaceID) + r, qErr := db.DB.QueryContext(ctx, query, workspaceID) if qErr != nil { log.Printf("Instructions list error: %v", qErr) c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) return } - rows_ = r defer r.Close() - instructions := scanInstructions(r) - c.JSON(http.StatusOK, instructions) + c.JSON(http.StatusOK, scanInstructions(r)) return } @@ -92,8 +82,6 @@ func (h *InstructionsHandler) List(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) return } - rows_ = r - _ = rows_ defer r.Close() c.JSON(http.StatusOK, scanInstructions(r)) } @@ -112,12 +100,20 @@ func (h *InstructionsHandler) Create(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"error": "scope, title, and content are required"}) return } - if body.Scope != "global" && body.Scope != "team" && body.Scope != "workspace" { - c.JSON(http.StatusBadRequest, gin.H{"error": "scope must be global, team, or workspace"}) + if body.Scope != "global" && body.Scope != "workspace" { + c.JSON(http.StatusBadRequest, gin.H{"error": "scope must be global or workspace (team scope not yet supported)"}) return } - if body.Scope != "global" && (body.ScopeTarget == nil || *body.ScopeTarget == "") { - c.JSON(http.StatusBadRequest, gin.H{"error": "scope_target required for team/workspace scope"}) + if body.Scope == "workspace" && (body.ScopeTarget == nil || *body.ScopeTarget == "") { + c.JSON(http.StatusBadRequest, gin.H{"error": "scope_target required for workspace scope"}) + return + } + if len(body.Content) > maxInstructionContentLen { + c.JSON(http.StatusBadRequest, gin.H{"error": "content exceeds 8192 chars"}) + return + } + if len(body.Title) > 200 { + c.JSON(http.StatusBadRequest, gin.H{"error": "title exceeds 200 chars"}) return } @@ -149,6 +145,14 @@ func (h *InstructionsHandler) Update(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"}) return } + if body.Content != nil && len(*body.Content) > maxInstructionContentLen { + c.JSON(http.StatusBadRequest, gin.H{"error": "content exceeds 8192 chars"}) + return + } + if body.Title != nil && len(*body.Title) > 200 { + c.JSON(http.StatusBadRequest, gin.H{"error": "title exceeds 200 chars"}) + return + } result, err := db.DB.ExecContext(c.Request.Context(), `UPDATE platform_instructions SET @@ -190,41 +194,38 @@ func (h *InstructionsHandler) Delete(c *gin.Context) { } // Resolve returns the merged instruction text for a workspace — all enabled -// instructions across global → team → workspace scope, concatenated in order. +// instructions across global → workspace scope, concatenated in order. // This is what the Python runtime calls to get the full instruction set. // -// GET /instructions/resolve?workspace_id= +// GET /workspaces/:id/instructions/resolve +// +// Mounted under wsAuth so the caller must hold a valid bearer token for +// :id, preventing cross-workspace enumeration of operator policy. func (h *InstructionsHandler) Resolve(c *gin.Context) { - workspaceID := c.Query("workspace_id") + workspaceID := c.Param("id") if workspaceID == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "workspace_id required"}) + c.JSON(http.StatusBadRequest, gin.H{"error": "workspace id required"}) return } ctx := c.Request.Context() - var teamSlug string - db.DB.QueryRowContext(ctx, - `SELECT COALESCE(t.slug, '') FROM teams t - JOIN team_members tm ON tm.team_id = t.id - WHERE tm.workspace_id = $1 LIMIT 1`, workspaceID).Scan(&teamSlug) - rows, err := db.DB.QueryContext(ctx, `SELECT scope, title, content FROM platform_instructions WHERE enabled = true AND ( scope = 'global' - OR (scope = 'team' AND scope_target = $1) - OR (scope = 'workspace' AND scope_target = $2) + OR (scope = 'workspace' AND scope_target = $1) ) - ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'team' THEN 1 WHEN 'workspace' THEN 2 END, + ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'workspace' THEN 2 END, priority DESC`, - teamSlug, workspaceID) + workspaceID) if err != nil { + log.Printf("Instructions resolve error: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) return } defer rows.Close() - var merged string + var b strings.Builder currentScope := "" for rows.Next() { var scope, title, content string @@ -232,21 +233,25 @@ func (h *InstructionsHandler) Resolve(c *gin.Context) { continue } if scope != currentScope { - scopeLabel := map[string]string{ - "global": "Platform-Wide Rules", - "team": "Team Rules", - "workspace": "Role-Specific Rules", - }[scope] - merged += "\n## " + scopeLabel + "\n\n" + scopeLabel := "Platform-Wide Rules" + if scope == "workspace" { + scopeLabel = "Role-Specific Rules" + } + b.WriteString("\n## ") + b.WriteString(scopeLabel) + b.WriteString("\n\n") currentScope = scope } - merged += "### " + title + "\n" + content + "\n\n" + b.WriteString("### ") + b.WriteString(title) + b.WriteString("\n") + b.WriteString(content) + b.WriteString("\n\n") } c.JSON(http.StatusOK, gin.H{ "workspace_id": workspaceID, - "team_slug": teamSlug, - "instructions": merged, + "instructions": b.String(), }) } diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index 674231ed..07285e70 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -364,8 +364,10 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi adminAuth.DELETE("/admin/secrets/:key", sechGlobal.DeleteGlobal) } - // Platform instructions — configurable rules with global/team/workspace scope. + // Platform instructions — configurable rules with global/workspace scope. // Admin endpoints for CRUD; workspace-facing resolve endpoint for agent bootstrap. + // (Team scope is reserved in the schema but not yet wired — needs teams/team_members + // migration first.) { instrH := handlers.NewInstructionsHandler() adminInstr := r.Group("", middleware.AdminAuth(db.DB)) @@ -373,8 +375,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi adminInstr.POST("/instructions", instrH.Create) adminInstr.PUT("/instructions/:id", instrH.Update) adminInstr.DELETE("/instructions/:id", instrH.Delete) - // Resolve endpoint is open to workspace auth (agents call it at startup) - r.GET("/instructions/resolve", instrH.Resolve) + // Resolve mounted under wsAuth — caller must hold a valid bearer token + // for :id, preventing cross-workspace enumeration of operator policy. + wsAuth.GET("/instructions/resolve", instrH.Resolve) } // Admin — cross-workspace schedule health monitoring (issue #618). diff --git a/workspace-server/migrations/040_platform_instructions.up.sql b/workspace-server/migrations/040_platform_instructions.up.sql index ebf812fe..04d0ac7d 100644 --- a/workspace-server/migrations/040_platform_instructions.up.sql +++ b/workspace-server/migrations/040_platform_instructions.up.sql @@ -6,8 +6,10 @@ CREATE TABLE IF NOT EXISTS platform_instructions ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), scope TEXT NOT NULL CHECK (scope IN ('global', 'team', 'workspace')), scope_target TEXT, -- NULL for global, team slug for team, workspace_id for workspace - title TEXT NOT NULL, - content TEXT NOT NULL, + title TEXT NOT NULL CHECK (length(title) <= 200), + -- Cap content at 8KB so an oversized instruction can't blow past LLM + -- prompt-size limits when prepended to every agent's system prompt. + content TEXT NOT NULL CHECK (length(content) <= 8192), priority INT DEFAULT 0, -- higher = shown first within scope enabled BOOLEAN DEFAULT true, created_at TIMESTAMPTZ DEFAULT NOW(), diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index 13631ed1..d48a1151 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -306,7 +306,13 @@ class LangGraphA2AExecutor(AgentExecutor): # ── Tool trace: collect every tool invocation for # platform-level observability ──────────────────── + # Keyed by run_id so parallel tool calls (LangGraph + # supports them) pair start→end correctly. Capped at + # MAX_TOOL_TRACE entries to prevent runaway loops from + # ballooning the JSONB payload. + MAX_TOOL_TRACE = 200 tool_trace: list[dict] = [] + tool_trace_by_run: dict[str, dict] = {} async for event in self.agent.astream_events( {"messages": messages}, @@ -339,11 +345,16 @@ class LangGraphA2AExecutor(AgentExecutor): elif kind == "on_tool_start": tool_name = event.get("name", "?") tool_input = event.get("data", {}).get("input", "") + tool_run_id = event.get("run_id", "") logger.debug("SSE: tool start — %s", tool_name) - tool_trace.append({ - "tool": tool_name, - "input": str(tool_input)[:500] if tool_input else "", - }) + if len(tool_trace) < MAX_TOOL_TRACE: + entry = { + "tool": tool_name, + "input": str(tool_input)[:500] if tool_input else "", + } + tool_trace.append(entry) + if tool_run_id: + tool_trace_by_run[tool_run_id] = entry if _agency is not None: _agency.on_tool_call( tool_name=tool_name, @@ -353,9 +364,12 @@ class LangGraphA2AExecutor(AgentExecutor): elif kind == "on_tool_end": tool_end_name = event.get("name", "?") tool_output = event.get("data", {}).get("output", "") + tool_run_id = event.get("run_id", "") logger.debug("SSE: tool end — %s", tool_end_name) - if tool_trace and tool_trace[-1]["tool"] == tool_end_name: - tool_trace[-1]["output_preview"] = str(tool_output)[:300] if tool_output else "" + # Pair via run_id so parallel tool calls don't clobber each other. + entry = tool_trace_by_run.get(tool_run_id) if tool_run_id else None + if entry is not None: + entry["output_preview"] = str(tool_output)[:300] if tool_output else "" elif kind == "on_chat_model_end": # Capture the last completed AIMessage for token telemetry diff --git a/workspace/prompt.py b/workspace/prompt.py index 818ec182..70cce126 100644 --- a/workspace/prompt.py +++ b/workspace/prompt.py @@ -1,11 +1,14 @@ """Build the system prompt for the workspace agent.""" +import logging import os from pathlib import Path from skill_loader.loader import LoadedSkill from shared_runtime import build_peer_section +logger = logging.getLogger(__name__) + DEFAULT_MEMORY_SNAPSHOT_FILES = ("MEMORY.md", "USER.md") @@ -27,20 +30,31 @@ async def get_peer_capabilities(platform_url: str, workspace_id: str) -> list[di async def get_platform_instructions(platform_url: str, workspace_id: str) -> str: - """Fetch resolved platform instructions (global + team + workspace scope).""" + """Fetch resolved platform instructions (global + workspace scope). + + Endpoint is gated by WorkspaceAuth — the workspace token (read from env) + is sent as a bearer header. Fails open (returns "") on any error so a + platform outage doesn't block agent startup. Short timeout (3s) because + this runs in the boot hot path. + """ try: import httpx - async with httpx.AsyncClient(timeout=10.0) as client: + token = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "") + headers = {"X-Workspace-ID": workspace_id} + if token: + headers["Authorization"] = f"Bearer {token}" + + async with httpx.AsyncClient(timeout=3.0) as client: resp = await client.get( - f"{platform_url}/instructions/resolve", - params={"workspace_id": workspace_id}, + f"{platform_url}/workspaces/{workspace_id}/instructions/resolve", + headers=headers, ) if resp.status_code == 200: data = resp.json() return data.get("instructions", "") except Exception as e: - print(f"Warning: could not fetch platform instructions: {e}") + logger.warning("could not fetch platform instructions: %s", e) return ""