fix(review): address code review blockers on tool-trace + instructions

BLOCKERS fixed:
- instructions.go: Drop team-scope queries (teams/team_members tables don't
  exist in any migration). Schema column kept for future. Restored Resolve
  to /workspaces/:id/instructions/resolve under wsAuth — closes auth gap
  that allowed cross-workspace enumeration of operator policy.
- migration 040: Add CHECK constraints on title (<=200) and content (<=8192)
  to prevent token-budget DoS via oversized instructions.
- a2a_executor.py: Pair on_tool_start/on_tool_end via run_id instead of
  list-position so parallel tool calls don't drop or clobber outputs. Cap
  tool_trace at 200 entries to prevent runaway loops bloating JSONB.

HIGH fixes:
- instructions.go: Add length validation in Create + Update handlers.
  Removed dead rows_ shadow variable. Replaced string concatenation in
  Resolve with strings.Builder.
- prompt.py: Drop httpx timeout 10s -> 3s (boot hot path). Switch print
  to logger.warning. Add Authorization bearer header from
  MOLECULE_WORKSPACE_TOKEN env var.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rabbitblood 2026-04-22 16:18:06 -07:00
parent e1d77a1625
commit ed26f2733a
5 changed files with 109 additions and 71 deletions

View File

@ -3,12 +3,17 @@ package handlers
import (
"log"
"net/http"
"strings"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
// maxInstructionContentLen caps content size to prevent token-budget DoS via
// oversized instructions being prepended to every agent's system prompt.
const maxInstructionContentLen = 8192
type InstructionsHandler struct{}
func NewInstructionsHandler() *InstructionsHandler {
@ -28,51 +33,36 @@ type Instruction struct {
}
// List returns instructions filtered by scope. Agents call this at startup
// to fetch their full instruction set (global + team + workspace).
// to fetch their full instruction set (global + workspace).
//
// GET /instructions?scope=global
// GET /instructions?workspace_id=<uuid> (returns global + team + workspace)
// GET /instructions?workspace_id=<uuid> (returns global + workspace)
//
// Team scope is reserved in the schema but not yet wired — teams/team_members
// tables don't exist in any migration. Adding team support requires a new
// migration first.
func (h *InstructionsHandler) List(c *gin.Context) {
ctx := c.Request.Context()
scope := c.Query("scope")
workspaceID := c.Query("workspace_id")
var rows_ interface{ Close() error }
var err error
if workspaceID != "" {
// Agent bootstrap: fetch all applicable instructions (global + team + workspace)
// ordered by scope priority (global first) then user priority descending.
var teamSlug *string
db.DB.QueryRowContext(ctx,
`SELECT t.slug FROM teams t
JOIN team_members tm ON tm.team_id = t.id
WHERE tm.workspace_id = $1 LIMIT 1`, workspaceID).Scan(&teamSlug)
query := `SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at
FROM platform_instructions
WHERE enabled = true AND (
scope = 'global'
OR (scope = 'team' AND scope_target = $1)
OR (scope = 'workspace' AND scope_target = $2)
OR (scope = 'workspace' AND scope_target = $1)
)
ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'team' THEN 1 WHEN 'workspace' THEN 2 END,
ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'workspace' THEN 2 END,
priority DESC`
teamTarget := ""
if teamSlug != nil {
teamTarget = *teamSlug
}
r, qErr := db.DB.QueryContext(ctx, query, teamTarget, workspaceID)
r, qErr := db.DB.QueryContext(ctx, query, workspaceID)
if qErr != nil {
log.Printf("Instructions list error: %v", qErr)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
rows_ = r
defer r.Close()
instructions := scanInstructions(r)
c.JSON(http.StatusOK, instructions)
c.JSON(http.StatusOK, scanInstructions(r))
return
}
@ -92,8 +82,6 @@ func (h *InstructionsHandler) List(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
rows_ = r
_ = rows_
defer r.Close()
c.JSON(http.StatusOK, scanInstructions(r))
}
@ -112,12 +100,20 @@ func (h *InstructionsHandler) Create(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"error": "scope, title, and content are required"})
return
}
if body.Scope != "global" && body.Scope != "team" && body.Scope != "workspace" {
c.JSON(http.StatusBadRequest, gin.H{"error": "scope must be global, team, or workspace"})
if body.Scope != "global" && body.Scope != "workspace" {
c.JSON(http.StatusBadRequest, gin.H{"error": "scope must be global or workspace (team scope not yet supported)"})
return
}
if body.Scope != "global" && (body.ScopeTarget == nil || *body.ScopeTarget == "") {
c.JSON(http.StatusBadRequest, gin.H{"error": "scope_target required for team/workspace scope"})
if body.Scope == "workspace" && (body.ScopeTarget == nil || *body.ScopeTarget == "") {
c.JSON(http.StatusBadRequest, gin.H{"error": "scope_target required for workspace scope"})
return
}
if len(body.Content) > maxInstructionContentLen {
c.JSON(http.StatusBadRequest, gin.H{"error": "content exceeds 8192 chars"})
return
}
if len(body.Title) > 200 {
c.JSON(http.StatusBadRequest, gin.H{"error": "title exceeds 200 chars"})
return
}
@ -149,6 +145,14 @@ func (h *InstructionsHandler) Update(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"})
return
}
if body.Content != nil && len(*body.Content) > maxInstructionContentLen {
c.JSON(http.StatusBadRequest, gin.H{"error": "content exceeds 8192 chars"})
return
}
if body.Title != nil && len(*body.Title) > 200 {
c.JSON(http.StatusBadRequest, gin.H{"error": "title exceeds 200 chars"})
return
}
result, err := db.DB.ExecContext(c.Request.Context(),
`UPDATE platform_instructions SET
@ -190,41 +194,38 @@ func (h *InstructionsHandler) Delete(c *gin.Context) {
}
// Resolve returns the merged instruction text for a workspace — all enabled
// instructions across global → team → workspace scope, concatenated in order.
// instructions across global → workspace scope, concatenated in order.
// This is what the Python runtime calls to get the full instruction set.
//
// GET /instructions/resolve?workspace_id=<uuid>
// GET /workspaces/:id/instructions/resolve
//
// Mounted under wsAuth so the caller must hold a valid bearer token for
// :id, preventing cross-workspace enumeration of operator policy.
func (h *InstructionsHandler) Resolve(c *gin.Context) {
workspaceID := c.Query("workspace_id")
workspaceID := c.Param("id")
if workspaceID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "workspace_id required"})
c.JSON(http.StatusBadRequest, gin.H{"error": "workspace id required"})
return
}
ctx := c.Request.Context()
var teamSlug string
db.DB.QueryRowContext(ctx,
`SELECT COALESCE(t.slug, '') FROM teams t
JOIN team_members tm ON tm.team_id = t.id
WHERE tm.workspace_id = $1 LIMIT 1`, workspaceID).Scan(&teamSlug)
rows, err := db.DB.QueryContext(ctx,
`SELECT scope, title, content FROM platform_instructions
WHERE enabled = true AND (
scope = 'global'
OR (scope = 'team' AND scope_target = $1)
OR (scope = 'workspace' AND scope_target = $2)
OR (scope = 'workspace' AND scope_target = $1)
)
ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'team' THEN 1 WHEN 'workspace' THEN 2 END,
ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'workspace' THEN 2 END,
priority DESC`,
teamSlug, workspaceID)
workspaceID)
if err != nil {
log.Printf("Instructions resolve error: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
return
}
defer rows.Close()
var merged string
var b strings.Builder
currentScope := ""
for rows.Next() {
var scope, title, content string
@ -232,21 +233,25 @@ func (h *InstructionsHandler) Resolve(c *gin.Context) {
continue
}
if scope != currentScope {
scopeLabel := map[string]string{
"global": "Platform-Wide Rules",
"team": "Team Rules",
"workspace": "Role-Specific Rules",
}[scope]
merged += "\n## " + scopeLabel + "\n\n"
scopeLabel := "Platform-Wide Rules"
if scope == "workspace" {
scopeLabel = "Role-Specific Rules"
}
b.WriteString("\n## ")
b.WriteString(scopeLabel)
b.WriteString("\n\n")
currentScope = scope
}
merged += "### " + title + "\n" + content + "\n\n"
b.WriteString("### ")
b.WriteString(title)
b.WriteString("\n")
b.WriteString(content)
b.WriteString("\n\n")
}
c.JSON(http.StatusOK, gin.H{
"workspace_id": workspaceID,
"team_slug": teamSlug,
"instructions": merged,
"instructions": b.String(),
})
}

View File

@ -364,8 +364,10 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
adminAuth.DELETE("/admin/secrets/:key", sechGlobal.DeleteGlobal)
}
// Platform instructions — configurable rules with global/team/workspace scope.
// Platform instructions — configurable rules with global/workspace scope.
// Admin endpoints for CRUD; workspace-facing resolve endpoint for agent bootstrap.
// (Team scope is reserved in the schema but not yet wired — needs teams/team_members
// migration first.)
{
instrH := handlers.NewInstructionsHandler()
adminInstr := r.Group("", middleware.AdminAuth(db.DB))
@ -373,8 +375,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
adminInstr.POST("/instructions", instrH.Create)
adminInstr.PUT("/instructions/:id", instrH.Update)
adminInstr.DELETE("/instructions/:id", instrH.Delete)
// Resolve endpoint is open to workspace auth (agents call it at startup)
r.GET("/instructions/resolve", instrH.Resolve)
// Resolve mounted under wsAuth — caller must hold a valid bearer token
// for :id, preventing cross-workspace enumeration of operator policy.
wsAuth.GET("/instructions/resolve", instrH.Resolve)
}
// Admin — cross-workspace schedule health monitoring (issue #618).

View File

@ -6,8 +6,10 @@ CREATE TABLE IF NOT EXISTS platform_instructions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
scope TEXT NOT NULL CHECK (scope IN ('global', 'team', 'workspace')),
scope_target TEXT, -- NULL for global, team slug for team, workspace_id for workspace
title TEXT NOT NULL,
content TEXT NOT NULL,
title TEXT NOT NULL CHECK (length(title) <= 200),
-- Cap content at 8KB so an oversized instruction can't blow past LLM
-- prompt-size limits when prepended to every agent's system prompt.
content TEXT NOT NULL CHECK (length(content) <= 8192),
priority INT DEFAULT 0, -- higher = shown first within scope
enabled BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),

View File

@ -306,7 +306,13 @@ class LangGraphA2AExecutor(AgentExecutor):
# ── Tool trace: collect every tool invocation for
# platform-level observability ────────────────────
# Keyed by run_id so parallel tool calls (LangGraph
# supports them) pair start→end correctly. Capped at
# MAX_TOOL_TRACE entries to prevent runaway loops from
# ballooning the JSONB payload.
MAX_TOOL_TRACE = 200
tool_trace: list[dict] = []
tool_trace_by_run: dict[str, dict] = {}
async for event in self.agent.astream_events(
{"messages": messages},
@ -339,11 +345,16 @@ class LangGraphA2AExecutor(AgentExecutor):
elif kind == "on_tool_start":
tool_name = event.get("name", "?")
tool_input = event.get("data", {}).get("input", "")
tool_run_id = event.get("run_id", "")
logger.debug("SSE: tool start — %s", tool_name)
tool_trace.append({
"tool": tool_name,
"input": str(tool_input)[:500] if tool_input else "",
})
if len(tool_trace) < MAX_TOOL_TRACE:
entry = {
"tool": tool_name,
"input": str(tool_input)[:500] if tool_input else "",
}
tool_trace.append(entry)
if tool_run_id:
tool_trace_by_run[tool_run_id] = entry
if _agency is not None:
_agency.on_tool_call(
tool_name=tool_name,
@ -353,9 +364,12 @@ class LangGraphA2AExecutor(AgentExecutor):
elif kind == "on_tool_end":
tool_end_name = event.get("name", "?")
tool_output = event.get("data", {}).get("output", "")
tool_run_id = event.get("run_id", "")
logger.debug("SSE: tool end — %s", tool_end_name)
if tool_trace and tool_trace[-1]["tool"] == tool_end_name:
tool_trace[-1]["output_preview"] = str(tool_output)[:300] if tool_output else ""
# Pair via run_id so parallel tool calls don't clobber each other.
entry = tool_trace_by_run.get(tool_run_id) if tool_run_id else None
if entry is not None:
entry["output_preview"] = str(tool_output)[:300] if tool_output else ""
elif kind == "on_chat_model_end":
# Capture the last completed AIMessage for token telemetry

View File

@ -1,11 +1,14 @@
"""Build the system prompt for the workspace agent."""
import logging
import os
from pathlib import Path
from skill_loader.loader import LoadedSkill
from shared_runtime import build_peer_section
logger = logging.getLogger(__name__)
DEFAULT_MEMORY_SNAPSHOT_FILES = ("MEMORY.md", "USER.md")
@ -27,20 +30,31 @@ async def get_peer_capabilities(platform_url: str, workspace_id: str) -> list[di
async def get_platform_instructions(platform_url: str, workspace_id: str) -> str:
"""Fetch resolved platform instructions (global + team + workspace scope)."""
"""Fetch resolved platform instructions (global + workspace scope).
Endpoint is gated by WorkspaceAuth the workspace token (read from env)
is sent as a bearer header. Fails open (returns "") on any error so a
platform outage doesn't block agent startup. Short timeout (3s) because
this runs in the boot hot path.
"""
try:
import httpx
async with httpx.AsyncClient(timeout=10.0) as client:
token = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "")
headers = {"X-Workspace-ID": workspace_id}
if token:
headers["Authorization"] = f"Bearer {token}"
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(
f"{platform_url}/instructions/resolve",
params={"workspace_id": workspace_id},
f"{platform_url}/workspaces/{workspace_id}/instructions/resolve",
headers=headers,
)
if resp.status_code == 200:
data = resp.json()
return data.get("instructions", "")
except Exception as e:
print(f"Warning: could not fetch platform instructions: {e}")
logger.warning("could not fetch platform instructions: %s", e)
return ""