From 6c618c9c3fc6649331bb7322326e975ae474e0db Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Wed, 22 Apr 2026 15:07:25 -0700 Subject: [PATCH 1/5] feat: add tool_trace to activity_logs for platform-level agent observability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every A2A response now includes a tool_trace — the list of tools/commands the agent actually invoked during execution. This enables verifying agent claims against what they actually did, catches hallucinated "I checked X" responses, and provides an audit trail for the CEO to control hundreds of agents by checking the top-level PM's trace. Changes: - Python runtime: collect tool name/input/output_preview on every on_tool_start/on_tool_end event, embed in Message.metadata.tool_trace - Go platform: extract tool_trace from A2A response metadata, store in new activity_logs.tool_trace JSONB column with GIN index - Activity API: expose tool_trace in List and broadcast endpoints - Migration 039: adds tool_trace column + GIN index Co-Authored-By: Claude Opus 4.6 (1M context) --- .../internal/handlers/a2a_proxy_helpers.go | 35 +++++++++++++++++++ .../internal/handlers/activity.go | 30 +++++++++++----- .../039_activity_tool_trace.down.sql | 2 ++ .../migrations/039_activity_tool_trace.up.sql | 9 +++++ workspace/a2a_executor.py | 22 +++++++++--- 5 files changed, 85 insertions(+), 13 deletions(-) create mode 100644 workspace-server/migrations/039_activity_tool_trace.down.sql create mode 100644 workspace-server/migrations/039_activity_tool_trace.up.sql diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 887a2057..ebbd642d 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -161,6 +161,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle }() } summary := a2aMethod + " → " + wsNameForLog + toolTrace := extractToolTrace(respBody) go func(parent context.Context) { logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) defer cancel() @@ -173,6 +174,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle Summary: &summary, RequestBody: json.RawMessage(body), ResponseBody: json.RawMessage(respBody), + ToolTrace: toolTrace, DurationMs: &durationMs, Status: logStatus, }) @@ -234,6 +236,39 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e // matching (the wsauth errors are typed for the invalid case). var errInvalidCallerToken = errors.New("missing caller auth token") +// extractToolTrace pulls metadata.tool_trace from an A2A JSON-RPC response. +// Returns nil when absent or malformed — callers can pass it straight through. +func extractToolTrace(respBody []byte) json.RawMessage { + if len(respBody) == 0 { + return nil + } + var top map[string]json.RawMessage + if err := json.Unmarshal(respBody, &top); err != nil { + return nil + } + rawResult, ok := top["result"] + if !ok { + return nil + } + var result map[string]json.RawMessage + if err := json.Unmarshal(rawResult, &result); err != nil { + return nil + } + rawMeta, ok := result["metadata"] + if !ok { + return nil + } + var meta map[string]json.RawMessage + if err := json.Unmarshal(rawMeta, &meta); err != nil { + return nil + } + trace, ok := meta["tool_trace"] + if !ok || len(trace) == 0 { + return nil + } + return trace +} + // extractAndUpsertTokenUsage parses LLM usage from a raw A2A response body // and persists it via upsertTokenUsage. Safe to call in a goroutine — logs // errors but never panics. ctx must already be detached from the request. diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index 8ff6e984..4d98e9fa 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -40,7 +40,7 @@ func (h *ActivityHandler) List(c *gin.Context) { // Build query with optional filters query := `SELECT id, workspace_id, activity_type, source_id, target_id, method, - summary, request_body, response_body, duration_ms, status, error_detail, created_at + summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, created_at FROM activity_logs WHERE workspace_id = $1` args := []interface{}{workspaceID} argIdx := 2 @@ -75,12 +75,12 @@ func (h *ActivityHandler) List(c *gin.Context) { for rows.Next() { var id, wsID, actType, status string var sourceID, targetID, method, summary, errorDetail *string - var reqBody, respBody []byte + var reqBody, respBody, toolTrace []byte var durationMs *int var createdAt time.Time if err := rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method, - &summary, &reqBody, &respBody, &durationMs, &status, &errorDetail, &createdAt); err != nil { + &summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt); err != nil { log.Printf("Activity scan error: %v", err) continue } @@ -104,6 +104,9 @@ func (h *ActivityHandler) List(c *gin.Context) { if respBody != nil { entry["response_body"] = json.RawMessage(respBody) } + if toolTrace != nil { + entry["tool_trace"] = json.RawMessage(toolTrace) + } activities = append(activities, entry) } if err := rows.Err(); err != nil { @@ -382,7 +385,7 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac respJSON = []byte("null") } - var reqStr, respStr *string + var reqStr, respStr, traceStr *string if params.RequestBody != nil { s := string(reqJSON) reqStr = &s @@ -391,12 +394,16 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac s := string(respJSON) respStr = &s } + if len(params.ToolTrace) > 0 { + s := string(params.ToolTrace) + traceStr = &s + } _, err := db.DB.ExecContext(ctx, ` - INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, duration_ms, status, error_detail) - VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9, $10, $11) + INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, tool_trace, duration_ms, status, error_detail) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12) `, params.WorkspaceID, params.ActivityType, params.SourceID, params.TargetID, - params.Method, params.Summary, reqStr, respStr, + params.Method, params.Summary, reqStr, respStr, traceStr, params.DurationMs, params.Status, params.ErrorDetail) if err != nil { log.Printf("LogActivity insert error: %v", err) @@ -405,7 +412,7 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac // Broadcast ACTIVITY_LOGGED event if broadcaster != nil { - broadcaster.BroadcastOnly(params.WorkspaceID, "ACTIVITY_LOGGED", map[string]interface{}{ + payload := map[string]interface{}{ "activity_type": params.ActivityType, "method": params.Method, "summary": params.Summary, @@ -413,7 +420,11 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac "source_id": params.SourceID, "target_id": params.TargetID, "duration_ms": params.DurationMs, - }) + } + if len(params.ToolTrace) > 0 { + payload["tool_trace"] = json.RawMessage(params.ToolTrace) + } + broadcaster.BroadcastOnly(params.WorkspaceID, "ACTIVITY_LOGGED", payload) } } @@ -426,6 +437,7 @@ type ActivityParams struct { Summary *string RequestBody interface{} ResponseBody interface{} + ToolTrace json.RawMessage // tools/commands the agent actually invoked DurationMs *int Status string // ok, error, timeout ErrorDetail *string diff --git a/workspace-server/migrations/039_activity_tool_trace.down.sql b/workspace-server/migrations/039_activity_tool_trace.down.sql new file mode 100644 index 00000000..73691b56 --- /dev/null +++ b/workspace-server/migrations/039_activity_tool_trace.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_activity_logs_tool_trace; +ALTER TABLE activity_logs DROP COLUMN IF EXISTS tool_trace; diff --git a/workspace-server/migrations/039_activity_tool_trace.up.sql b/workspace-server/migrations/039_activity_tool_trace.up.sql new file mode 100644 index 00000000..03dd0a4c --- /dev/null +++ b/workspace-server/migrations/039_activity_tool_trace.up.sql @@ -0,0 +1,9 @@ +-- Add tool_trace column to activity_logs for platform-level observability. +-- Stores the list of tools/commands an agent actually invoked during an A2A +-- call, extracted from the A2A response metadata. Enables verifying agent +-- claims ("I checked X") against what tools were actually called. +ALTER TABLE activity_logs ADD COLUMN IF NOT EXISTS tool_trace JSONB; + +-- Index for querying which agents used specific tools +CREATE INDEX IF NOT EXISTS idx_activity_logs_tool_trace + ON activity_logs USING gin (tool_trace) WHERE tool_trace IS NOT NULL; diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index 81b17a35..13631ed1 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -304,6 +304,10 @@ class LangGraphA2AExecutor(AgentExecutor): else None ) + # ── Tool trace: collect every tool invocation for + # platform-level observability ──────────────────── + tool_trace: list[dict] = [] + async for event in self.agent.astream_events( {"messages": messages}, config=run_config, @@ -334,7 +338,12 @@ class LangGraphA2AExecutor(AgentExecutor): elif kind == "on_tool_start": tool_name = event.get("name", "?") + tool_input = event.get("data", {}).get("input", "") logger.debug("SSE: tool start — %s", tool_name) + tool_trace.append({ + "tool": tool_name, + "input": str(tool_input)[:500] if tool_input else "", + }) if _agency is not None: _agency.on_tool_call( tool_name=tool_name, @@ -342,7 +351,11 @@ class LangGraphA2AExecutor(AgentExecutor): ) elif kind == "on_tool_end": - logger.debug("SSE: tool end — %s", event.get("name", "?")) + tool_end_name = event.get("name", "?") + tool_output = event.get("data", {}).get("output", "") + logger.debug("SSE: tool end — %s", tool_end_name) + if tool_trace and tool_trace[-1]["tool"] == tool_end_name: + tool_trace[-1]["output_preview"] = str(tool_output)[:300] if tool_output else "" elif kind == "on_chat_model_end": # Capture the last completed AIMessage for token telemetry @@ -383,9 +396,10 @@ class LangGraphA2AExecutor(AgentExecutor): # Non-streaming: ResultAggregator.consume_all() returns this # immediately as the response (a2a_client.py reads .parts[0].text). # Streaming: yielded as the last SSE event in the stream. - await event_queue.enqueue_event( - new_agent_text_message(final_text, task_id=task_id, context_id=context_id) - ) + msg = new_agent_text_message(final_text, task_id=task_id, context_id=context_id) + if tool_trace: + msg.metadata = {"tool_trace": tool_trace} + await event_queue.enqueue_event(msg) _result = final_text except Exception as e: From d7afd15e5998dd4fa741cf1a8a8f8735ae8a868b Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Wed, 22 Apr 2026 15:13:47 -0700 Subject: [PATCH 2/5] feat: platform instructions system with global/team/workspace scope Adds a configurable instruction injection system that prepends rules to every agent's system prompt. Instructions are stored in the DB and fetched at workspace startup, supporting three scopes: - Global: applies to all agents (e.g., "verify with tools before reporting") - Team: applies to agents in a specific team - Workspace: applies to a single agent (role-specific rules) Components: - Migration 040: platform_instructions table with scope hierarchy - Go API: CRUD endpoints + resolve endpoint that merges scopes - Python runtime: fetches instructions at startup via /instructions/resolve and prepends them to the system prompt as highest-priority context Initial global instructions seeded: 1. Verify Before Acting (check issues/PRs/docs first) 2. Verify Output Before Reporting (second signal before reporting done) 3. Tool Usage Requirements (claims must include tool output) 4. No Hallucinated Emergencies (CRITICAL needs proof) 5. Staging-First Workflow (never push to main directly) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../internal/handlers/instructions.go | 271 ++++++++++++++++++ workspace-server/internal/router/router.go | 13 + .../040_platform_instructions.down.sql | 2 + .../040_platform_instructions.up.sql | 18 ++ workspace/adapter_base.py | 4 +- workspace/prompt.py | 26 ++ 6 files changed, 333 insertions(+), 1 deletion(-) create mode 100644 workspace-server/internal/handlers/instructions.go create mode 100644 workspace-server/migrations/040_platform_instructions.down.sql create mode 100644 workspace-server/migrations/040_platform_instructions.up.sql diff --git a/workspace-server/internal/handlers/instructions.go b/workspace-server/internal/handlers/instructions.go new file mode 100644 index 00000000..41dcd8e3 --- /dev/null +++ b/workspace-server/internal/handlers/instructions.go @@ -0,0 +1,271 @@ +package handlers + +import ( + "log" + "net/http" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/gin-gonic/gin" +) + +type InstructionsHandler struct{} + +func NewInstructionsHandler() *InstructionsHandler { + return &InstructionsHandler{} +} + +type Instruction struct { + ID string `json:"id"` + Scope string `json:"scope"` + ScopeTarget *string `json:"scope_target"` + Title string `json:"title"` + Content string `json:"content"` + Priority int `json:"priority"` + Enabled bool `json:"enabled"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// List returns instructions filtered by scope. Agents call this at startup +// to fetch their full instruction set (global + team + workspace). +// +// GET /instructions?scope=global +// GET /instructions?workspace_id= (returns global + team + workspace) +func (h *InstructionsHandler) List(c *gin.Context) { + ctx := c.Request.Context() + scope := c.Query("scope") + workspaceID := c.Query("workspace_id") + + var rows_ interface{ Close() error } + var err error + + if workspaceID != "" { + // Agent bootstrap: fetch all applicable instructions (global + team + workspace) + // ordered by scope priority (global first) then user priority descending. + var teamSlug *string + db.DB.QueryRowContext(ctx, + `SELECT t.slug FROM teams t + JOIN team_members tm ON tm.team_id = t.id + WHERE tm.workspace_id = $1 LIMIT 1`, workspaceID).Scan(&teamSlug) + + query := `SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at + FROM platform_instructions + WHERE enabled = true AND ( + scope = 'global' + OR (scope = 'team' AND scope_target = $1) + OR (scope = 'workspace' AND scope_target = $2) + ) + ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'team' THEN 1 WHEN 'workspace' THEN 2 END, + priority DESC` + + teamTarget := "" + if teamSlug != nil { + teamTarget = *teamSlug + } + r, qErr := db.DB.QueryContext(ctx, query, teamTarget, workspaceID) + if qErr != nil { + log.Printf("Instructions list error: %v", qErr) + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + rows_ = r + defer r.Close() + instructions := scanInstructions(r) + c.JSON(http.StatusOK, instructions) + return + } + + // Admin listing by scope + query := `SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at + FROM platform_instructions WHERE 1=1` + args := []interface{}{} + if scope != "" { + query += ` AND scope = $1` + args = append(args, scope) + } + query += ` ORDER BY scope, priority DESC, created_at` + + r, qErr := db.DB.QueryContext(ctx, query, args...) + if qErr != nil { + log.Printf("Instructions list error: %v", qErr) + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + rows_ = r + _ = rows_ + defer r.Close() + c.JSON(http.StatusOK, scanInstructions(r)) +} + +// Create adds a new platform instruction. +// POST /instructions +func (h *InstructionsHandler) Create(c *gin.Context) { + var body struct { + Scope string `json:"scope" binding:"required"` + ScopeTarget *string `json:"scope_target"` + Title string `json:"title" binding:"required"` + Content string `json:"content" binding:"required"` + Priority int `json:"priority"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "scope, title, and content are required"}) + return + } + if body.Scope != "global" && body.Scope != "team" && body.Scope != "workspace" { + c.JSON(http.StatusBadRequest, gin.H{"error": "scope must be global, team, or workspace"}) + return + } + if body.Scope != "global" && (body.ScopeTarget == nil || *body.ScopeTarget == "") { + c.JSON(http.StatusBadRequest, gin.H{"error": "scope_target required for team/workspace scope"}) + return + } + + var id string + err := db.DB.QueryRowContext(c.Request.Context(), + `INSERT INTO platform_instructions (scope, scope_target, title, content, priority) + VALUES ($1, $2, $3, $4, $5) RETURNING id`, + body.Scope, body.ScopeTarget, body.Title, body.Content, body.Priority, + ).Scan(&id) + if err != nil { + log.Printf("Instructions create error: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "insert failed"}) + return + } + c.JSON(http.StatusCreated, gin.H{"id": id}) +} + +// Update modifies an existing instruction. +// PUT /instructions/:id +func (h *InstructionsHandler) Update(c *gin.Context) { + id := c.Param("id") + var body struct { + Title *string `json:"title"` + Content *string `json:"content"` + Priority *int `json:"priority"` + Enabled *bool `json:"enabled"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"}) + return + } + + result, err := db.DB.ExecContext(c.Request.Context(), + `UPDATE platform_instructions SET + title = COALESCE($2, title), + content = COALESCE($3, content), + priority = COALESCE($4, priority), + enabled = COALESCE($5, enabled), + updated_at = NOW() + WHERE id = $1`, + id, body.Title, body.Content, body.Priority, body.Enabled, + ) + if err != nil { + log.Printf("Instructions update error: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"}) + return + } + if n, _ := result.RowsAffected(); n == 0 { + c.JSON(http.StatusNotFound, gin.H{"error": "instruction not found"}) + return + } + c.JSON(http.StatusOK, gin.H{"status": "updated"}) +} + +// Delete removes an instruction. +// DELETE /instructions/:id +func (h *InstructionsHandler) Delete(c *gin.Context) { + id := c.Param("id") + result, err := db.DB.ExecContext(c.Request.Context(), + `DELETE FROM platform_instructions WHERE id = $1`, id) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "delete failed"}) + return + } + if n, _ := result.RowsAffected(); n == 0 { + c.JSON(http.StatusNotFound, gin.H{"error": "instruction not found"}) + return + } + c.JSON(http.StatusOK, gin.H{"status": "deleted"}) +} + +// Resolve returns the merged instruction text for a workspace — all enabled +// instructions across global → team → workspace scope, concatenated in order. +// This is what the Python runtime calls to get the full instruction set. +// +// GET /instructions/resolve?workspace_id= +func (h *InstructionsHandler) Resolve(c *gin.Context) { + workspaceID := c.Query("workspace_id") + if workspaceID == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "workspace_id required"}) + return + } + ctx := c.Request.Context() + + var teamSlug string + db.DB.QueryRowContext(ctx, + `SELECT COALESCE(t.slug, '') FROM teams t + JOIN team_members tm ON tm.team_id = t.id + WHERE tm.workspace_id = $1 LIMIT 1`, workspaceID).Scan(&teamSlug) + + rows, err := db.DB.QueryContext(ctx, + `SELECT scope, title, content FROM platform_instructions + WHERE enabled = true AND ( + scope = 'global' + OR (scope = 'team' AND scope_target = $1) + OR (scope = 'workspace' AND scope_target = $2) + ) + ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'team' THEN 1 WHEN 'workspace' THEN 2 END, + priority DESC`, + teamSlug, workspaceID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + defer rows.Close() + + var merged string + currentScope := "" + for rows.Next() { + var scope, title, content string + if err := rows.Scan(&scope, &title, &content); err != nil { + continue + } + if scope != currentScope { + scopeLabel := map[string]string{ + "global": "Platform-Wide Rules", + "team": "Team Rules", + "workspace": "Role-Specific Rules", + }[scope] + merged += "\n## " + scopeLabel + "\n\n" + currentScope = scope + } + merged += "### " + title + "\n" + content + "\n\n" + } + + c.JSON(http.StatusOK, gin.H{ + "workspace_id": workspaceID, + "team_slug": teamSlug, + "instructions": merged, + }) +} + +func scanInstructions(rows interface { + Next() bool + Scan(dest ...interface{}) error +}) []Instruction { + var instructions []Instruction + for rows.Next() { + var inst Instruction + if err := rows.Scan(&inst.ID, &inst.Scope, &inst.ScopeTarget, &inst.Title, + &inst.Content, &inst.Priority, &inst.Enabled, &inst.CreatedAt, &inst.UpdatedAt); err != nil { + log.Printf("Instructions scan error: %v", err) + continue + } + instructions = append(instructions, inst) + } + if instructions == nil { + instructions = []Instruction{} + } + return instructions +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index b9610fd6..674231ed 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -364,6 +364,19 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi adminAuth.DELETE("/admin/secrets/:key", sechGlobal.DeleteGlobal) } + // Platform instructions — configurable rules with global/team/workspace scope. + // Admin endpoints for CRUD; workspace-facing resolve endpoint for agent bootstrap. + { + instrH := handlers.NewInstructionsHandler() + adminInstr := r.Group("", middleware.AdminAuth(db.DB)) + adminInstr.GET("/instructions", instrH.List) + adminInstr.POST("/instructions", instrH.Create) + adminInstr.PUT("/instructions/:id", instrH.Update) + adminInstr.DELETE("/instructions/:id", instrH.Delete) + // Resolve endpoint is open to workspace auth (agents call it at startup) + r.GET("/instructions/resolve", instrH.Resolve) + } + // Admin — cross-workspace schedule health monitoring (issue #618). // Lets cron-audit agents and operators detect silent schedule failures // across all workspaces without holding individual workspace bearer tokens. diff --git a/workspace-server/migrations/040_platform_instructions.down.sql b/workspace-server/migrations/040_platform_instructions.down.sql new file mode 100644 index 00000000..acebc56f --- /dev/null +++ b/workspace-server/migrations/040_platform_instructions.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_platform_instructions_scope; +DROP TABLE IF EXISTS platform_instructions; diff --git a/workspace-server/migrations/040_platform_instructions.up.sql b/workspace-server/migrations/040_platform_instructions.up.sql new file mode 100644 index 00000000..ebf812fe --- /dev/null +++ b/workspace-server/migrations/040_platform_instructions.up.sql @@ -0,0 +1,18 @@ +-- Platform-level configurable instructions with global/team/workspace scope. +-- Injected into every agent's system prompt at startup and refreshed +-- periodically, so platform operators can enforce rules without editing +-- template files. +CREATE TABLE IF NOT EXISTS platform_instructions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + scope TEXT NOT NULL CHECK (scope IN ('global', 'team', 'workspace')), + scope_target TEXT, -- NULL for global, team slug for team, workspace_id for workspace + title TEXT NOT NULL, + content TEXT NOT NULL, + priority INT DEFAULT 0, -- higher = shown first within scope + enabled BOOLEAN DEFAULT true, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_platform_instructions_scope + ON platform_instructions (scope, scope_target) WHERE enabled = true; diff --git a/workspace/adapter_base.py b/workspace/adapter_base.py index 0de914c4..8cb5cb8d 100644 --- a/workspace/adapter_base.py +++ b/workspace/adapter_base.py @@ -294,7 +294,7 @@ class BaseAdapter(ABC): from plugins import load_plugins from skill_loader.loader import load_skills from coordinator import get_children, get_parent_context, build_children_description - from prompt import build_system_prompt, get_peer_capabilities + from prompt import build_system_prompt, get_peer_capabilities, get_platform_instructions from builtin_tools.approval import request_approval from builtin_tools.delegation import delegate_to_workspace, check_delegation_status from builtin_tools.memory import commit_memory, search_memory @@ -344,6 +344,7 @@ class BaseAdapter(ABC): # Build system prompt with all context peers = await get_peer_capabilities(platform_url, config.workspace_id) + platform_instructions = await get_platform_instructions(platform_url, config.workspace_id) coordinator_prompt = build_children_description(children) if is_coordinator else "" extra_prompts = list(plugins.prompt_fragments) if coordinator_prompt: @@ -355,6 +356,7 @@ class BaseAdapter(ABC): plugin_rules=plugins.rules, plugin_prompts=extra_prompts, parent_context=parent_context, + platform_instructions=platform_instructions, ) return SetupResult( diff --git a/workspace/prompt.py b/workspace/prompt.py index 33de1265..818ec182 100644 --- a/workspace/prompt.py +++ b/workspace/prompt.py @@ -1,5 +1,6 @@ """Build the system prompt for the workspace agent.""" +import os from pathlib import Path from skill_loader.loader import LoadedSkill @@ -25,6 +26,24 @@ async def get_peer_capabilities(platform_url: str, workspace_id: str) -> list[di return [] +async def get_platform_instructions(platform_url: str, workspace_id: str) -> str: + """Fetch resolved platform instructions (global + team + workspace scope).""" + try: + import httpx + + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.get( + f"{platform_url}/instructions/resolve", + params={"workspace_id": workspace_id}, + ) + if resp.status_code == 200: + data = resp.json() + return data.get("instructions", "") + except Exception as e: + print(f"Warning: could not fetch platform instructions: {e}") + return "" + + def build_system_prompt( config_path: str, workspace_id: str, @@ -34,6 +53,7 @@ def build_system_prompt( plugin_rules: list[str] | None = None, plugin_prompts: list[str] | None = None, parent_context: list[dict] | None = None, + platform_instructions: str = "", ) -> str: """Build the complete system prompt. @@ -50,6 +70,12 @@ def build_system_prompt( """ parts = [] + # Platform instructions (global → team → workspace scope) go first so + # they take highest precedence in the context window. + if platform_instructions: + parts.append("# Platform Instructions\n") + parts.append(platform_instructions) + # Load prompt files in order files_to_load = list(prompt_files or []) if not files_to_load: From e1d77a1625ce321a4614e8d5237dd397efc03a4e Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Wed, 22 Apr 2026 15:41:56 -0700 Subject: [PATCH 3/5] ci: trigger CI from PAT push From ed26f2733a642df9d23fb1e09fe5ba3f685a0d9c Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Wed, 22 Apr 2026 16:18:06 -0700 Subject: [PATCH 4/5] fix(review): address code review blockers on tool-trace + instructions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BLOCKERS fixed: - instructions.go: Drop team-scope queries (teams/team_members tables don't exist in any migration). Schema column kept for future. Restored Resolve to /workspaces/:id/instructions/resolve under wsAuth — closes auth gap that allowed cross-workspace enumeration of operator policy. - migration 040: Add CHECK constraints on title (<=200) and content (<=8192) to prevent token-budget DoS via oversized instructions. - a2a_executor.py: Pair on_tool_start/on_tool_end via run_id instead of list-position so parallel tool calls don't drop or clobber outputs. Cap tool_trace at 200 entries to prevent runaway loops bloating JSONB. HIGH fixes: - instructions.go: Add length validation in Create + Update handlers. Removed dead rows_ shadow variable. Replaced string concatenation in Resolve with strings.Builder. - prompt.py: Drop httpx timeout 10s -> 3s (boot hot path). Switch print to logger.warning. Add Authorization bearer header from MOLECULE_WORKSPACE_TOKEN env var. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../internal/handlers/instructions.go | 115 +++++++++--------- workspace-server/internal/router/router.go | 9 +- .../040_platform_instructions.up.sql | 6 +- workspace/a2a_executor.py | 26 +++- workspace/prompt.py | 24 +++- 5 files changed, 109 insertions(+), 71 deletions(-) diff --git a/workspace-server/internal/handlers/instructions.go b/workspace-server/internal/handlers/instructions.go index 41dcd8e3..2e8e89ac 100644 --- a/workspace-server/internal/handlers/instructions.go +++ b/workspace-server/internal/handlers/instructions.go @@ -3,12 +3,17 @@ package handlers import ( "log" "net/http" + "strings" "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/gin-gonic/gin" ) +// maxInstructionContentLen caps content size to prevent token-budget DoS via +// oversized instructions being prepended to every agent's system prompt. +const maxInstructionContentLen = 8192 + type InstructionsHandler struct{} func NewInstructionsHandler() *InstructionsHandler { @@ -28,51 +33,36 @@ type Instruction struct { } // List returns instructions filtered by scope. Agents call this at startup -// to fetch their full instruction set (global + team + workspace). +// to fetch their full instruction set (global + workspace). // // GET /instructions?scope=global -// GET /instructions?workspace_id= (returns global + team + workspace) +// GET /instructions?workspace_id= (returns global + workspace) +// +// Team scope is reserved in the schema but not yet wired — teams/team_members +// tables don't exist in any migration. Adding team support requires a new +// migration first. func (h *InstructionsHandler) List(c *gin.Context) { ctx := c.Request.Context() scope := c.Query("scope") workspaceID := c.Query("workspace_id") - var rows_ interface{ Close() error } - var err error - if workspaceID != "" { - // Agent bootstrap: fetch all applicable instructions (global + team + workspace) - // ordered by scope priority (global first) then user priority descending. - var teamSlug *string - db.DB.QueryRowContext(ctx, - `SELECT t.slug FROM teams t - JOIN team_members tm ON tm.team_id = t.id - WHERE tm.workspace_id = $1 LIMIT 1`, workspaceID).Scan(&teamSlug) - query := `SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at FROM platform_instructions WHERE enabled = true AND ( scope = 'global' - OR (scope = 'team' AND scope_target = $1) - OR (scope = 'workspace' AND scope_target = $2) + OR (scope = 'workspace' AND scope_target = $1) ) - ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'team' THEN 1 WHEN 'workspace' THEN 2 END, + ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'workspace' THEN 2 END, priority DESC` - - teamTarget := "" - if teamSlug != nil { - teamTarget = *teamSlug - } - r, qErr := db.DB.QueryContext(ctx, query, teamTarget, workspaceID) + r, qErr := db.DB.QueryContext(ctx, query, workspaceID) if qErr != nil { log.Printf("Instructions list error: %v", qErr) c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) return } - rows_ = r defer r.Close() - instructions := scanInstructions(r) - c.JSON(http.StatusOK, instructions) + c.JSON(http.StatusOK, scanInstructions(r)) return } @@ -92,8 +82,6 @@ func (h *InstructionsHandler) List(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) return } - rows_ = r - _ = rows_ defer r.Close() c.JSON(http.StatusOK, scanInstructions(r)) } @@ -112,12 +100,20 @@ func (h *InstructionsHandler) Create(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"error": "scope, title, and content are required"}) return } - if body.Scope != "global" && body.Scope != "team" && body.Scope != "workspace" { - c.JSON(http.StatusBadRequest, gin.H{"error": "scope must be global, team, or workspace"}) + if body.Scope != "global" && body.Scope != "workspace" { + c.JSON(http.StatusBadRequest, gin.H{"error": "scope must be global or workspace (team scope not yet supported)"}) return } - if body.Scope != "global" && (body.ScopeTarget == nil || *body.ScopeTarget == "") { - c.JSON(http.StatusBadRequest, gin.H{"error": "scope_target required for team/workspace scope"}) + if body.Scope == "workspace" && (body.ScopeTarget == nil || *body.ScopeTarget == "") { + c.JSON(http.StatusBadRequest, gin.H{"error": "scope_target required for workspace scope"}) + return + } + if len(body.Content) > maxInstructionContentLen { + c.JSON(http.StatusBadRequest, gin.H{"error": "content exceeds 8192 chars"}) + return + } + if len(body.Title) > 200 { + c.JSON(http.StatusBadRequest, gin.H{"error": "title exceeds 200 chars"}) return } @@ -149,6 +145,14 @@ func (h *InstructionsHandler) Update(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"}) return } + if body.Content != nil && len(*body.Content) > maxInstructionContentLen { + c.JSON(http.StatusBadRequest, gin.H{"error": "content exceeds 8192 chars"}) + return + } + if body.Title != nil && len(*body.Title) > 200 { + c.JSON(http.StatusBadRequest, gin.H{"error": "title exceeds 200 chars"}) + return + } result, err := db.DB.ExecContext(c.Request.Context(), `UPDATE platform_instructions SET @@ -190,41 +194,38 @@ func (h *InstructionsHandler) Delete(c *gin.Context) { } // Resolve returns the merged instruction text for a workspace — all enabled -// instructions across global → team → workspace scope, concatenated in order. +// instructions across global → workspace scope, concatenated in order. // This is what the Python runtime calls to get the full instruction set. // -// GET /instructions/resolve?workspace_id= +// GET /workspaces/:id/instructions/resolve +// +// Mounted under wsAuth so the caller must hold a valid bearer token for +// :id, preventing cross-workspace enumeration of operator policy. func (h *InstructionsHandler) Resolve(c *gin.Context) { - workspaceID := c.Query("workspace_id") + workspaceID := c.Param("id") if workspaceID == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "workspace_id required"}) + c.JSON(http.StatusBadRequest, gin.H{"error": "workspace id required"}) return } ctx := c.Request.Context() - var teamSlug string - db.DB.QueryRowContext(ctx, - `SELECT COALESCE(t.slug, '') FROM teams t - JOIN team_members tm ON tm.team_id = t.id - WHERE tm.workspace_id = $1 LIMIT 1`, workspaceID).Scan(&teamSlug) - rows, err := db.DB.QueryContext(ctx, `SELECT scope, title, content FROM platform_instructions WHERE enabled = true AND ( scope = 'global' - OR (scope = 'team' AND scope_target = $1) - OR (scope = 'workspace' AND scope_target = $2) + OR (scope = 'workspace' AND scope_target = $1) ) - ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'team' THEN 1 WHEN 'workspace' THEN 2 END, + ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'workspace' THEN 2 END, priority DESC`, - teamSlug, workspaceID) + workspaceID) if err != nil { + log.Printf("Instructions resolve error: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) return } defer rows.Close() - var merged string + var b strings.Builder currentScope := "" for rows.Next() { var scope, title, content string @@ -232,21 +233,25 @@ func (h *InstructionsHandler) Resolve(c *gin.Context) { continue } if scope != currentScope { - scopeLabel := map[string]string{ - "global": "Platform-Wide Rules", - "team": "Team Rules", - "workspace": "Role-Specific Rules", - }[scope] - merged += "\n## " + scopeLabel + "\n\n" + scopeLabel := "Platform-Wide Rules" + if scope == "workspace" { + scopeLabel = "Role-Specific Rules" + } + b.WriteString("\n## ") + b.WriteString(scopeLabel) + b.WriteString("\n\n") currentScope = scope } - merged += "### " + title + "\n" + content + "\n\n" + b.WriteString("### ") + b.WriteString(title) + b.WriteString("\n") + b.WriteString(content) + b.WriteString("\n\n") } c.JSON(http.StatusOK, gin.H{ "workspace_id": workspaceID, - "team_slug": teamSlug, - "instructions": merged, + "instructions": b.String(), }) } diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index 674231ed..07285e70 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -364,8 +364,10 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi adminAuth.DELETE("/admin/secrets/:key", sechGlobal.DeleteGlobal) } - // Platform instructions — configurable rules with global/team/workspace scope. + // Platform instructions — configurable rules with global/workspace scope. // Admin endpoints for CRUD; workspace-facing resolve endpoint for agent bootstrap. + // (Team scope is reserved in the schema but not yet wired — needs teams/team_members + // migration first.) { instrH := handlers.NewInstructionsHandler() adminInstr := r.Group("", middleware.AdminAuth(db.DB)) @@ -373,8 +375,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi adminInstr.POST("/instructions", instrH.Create) adminInstr.PUT("/instructions/:id", instrH.Update) adminInstr.DELETE("/instructions/:id", instrH.Delete) - // Resolve endpoint is open to workspace auth (agents call it at startup) - r.GET("/instructions/resolve", instrH.Resolve) + // Resolve mounted under wsAuth — caller must hold a valid bearer token + // for :id, preventing cross-workspace enumeration of operator policy. + wsAuth.GET("/instructions/resolve", instrH.Resolve) } // Admin — cross-workspace schedule health monitoring (issue #618). diff --git a/workspace-server/migrations/040_platform_instructions.up.sql b/workspace-server/migrations/040_platform_instructions.up.sql index ebf812fe..04d0ac7d 100644 --- a/workspace-server/migrations/040_platform_instructions.up.sql +++ b/workspace-server/migrations/040_platform_instructions.up.sql @@ -6,8 +6,10 @@ CREATE TABLE IF NOT EXISTS platform_instructions ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), scope TEXT NOT NULL CHECK (scope IN ('global', 'team', 'workspace')), scope_target TEXT, -- NULL for global, team slug for team, workspace_id for workspace - title TEXT NOT NULL, - content TEXT NOT NULL, + title TEXT NOT NULL CHECK (length(title) <= 200), + -- Cap content at 8KB so an oversized instruction can't blow past LLM + -- prompt-size limits when prepended to every agent's system prompt. + content TEXT NOT NULL CHECK (length(content) <= 8192), priority INT DEFAULT 0, -- higher = shown first within scope enabled BOOLEAN DEFAULT true, created_at TIMESTAMPTZ DEFAULT NOW(), diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index 13631ed1..d48a1151 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -306,7 +306,13 @@ class LangGraphA2AExecutor(AgentExecutor): # ── Tool trace: collect every tool invocation for # platform-level observability ──────────────────── + # Keyed by run_id so parallel tool calls (LangGraph + # supports them) pair start→end correctly. Capped at + # MAX_TOOL_TRACE entries to prevent runaway loops from + # ballooning the JSONB payload. + MAX_TOOL_TRACE = 200 tool_trace: list[dict] = [] + tool_trace_by_run: dict[str, dict] = {} async for event in self.agent.astream_events( {"messages": messages}, @@ -339,11 +345,16 @@ class LangGraphA2AExecutor(AgentExecutor): elif kind == "on_tool_start": tool_name = event.get("name", "?") tool_input = event.get("data", {}).get("input", "") + tool_run_id = event.get("run_id", "") logger.debug("SSE: tool start — %s", tool_name) - tool_trace.append({ - "tool": tool_name, - "input": str(tool_input)[:500] if tool_input else "", - }) + if len(tool_trace) < MAX_TOOL_TRACE: + entry = { + "tool": tool_name, + "input": str(tool_input)[:500] if tool_input else "", + } + tool_trace.append(entry) + if tool_run_id: + tool_trace_by_run[tool_run_id] = entry if _agency is not None: _agency.on_tool_call( tool_name=tool_name, @@ -353,9 +364,12 @@ class LangGraphA2AExecutor(AgentExecutor): elif kind == "on_tool_end": tool_end_name = event.get("name", "?") tool_output = event.get("data", {}).get("output", "") + tool_run_id = event.get("run_id", "") logger.debug("SSE: tool end — %s", tool_end_name) - if tool_trace and tool_trace[-1]["tool"] == tool_end_name: - tool_trace[-1]["output_preview"] = str(tool_output)[:300] if tool_output else "" + # Pair via run_id so parallel tool calls don't clobber each other. + entry = tool_trace_by_run.get(tool_run_id) if tool_run_id else None + if entry is not None: + entry["output_preview"] = str(tool_output)[:300] if tool_output else "" elif kind == "on_chat_model_end": # Capture the last completed AIMessage for token telemetry diff --git a/workspace/prompt.py b/workspace/prompt.py index 818ec182..70cce126 100644 --- a/workspace/prompt.py +++ b/workspace/prompt.py @@ -1,11 +1,14 @@ """Build the system prompt for the workspace agent.""" +import logging import os from pathlib import Path from skill_loader.loader import LoadedSkill from shared_runtime import build_peer_section +logger = logging.getLogger(__name__) + DEFAULT_MEMORY_SNAPSHOT_FILES = ("MEMORY.md", "USER.md") @@ -27,20 +30,31 @@ async def get_peer_capabilities(platform_url: str, workspace_id: str) -> list[di async def get_platform_instructions(platform_url: str, workspace_id: str) -> str: - """Fetch resolved platform instructions (global + team + workspace scope).""" + """Fetch resolved platform instructions (global + workspace scope). + + Endpoint is gated by WorkspaceAuth — the workspace token (read from env) + is sent as a bearer header. Fails open (returns "") on any error so a + platform outage doesn't block agent startup. Short timeout (3s) because + this runs in the boot hot path. + """ try: import httpx - async with httpx.AsyncClient(timeout=10.0) as client: + token = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "") + headers = {"X-Workspace-ID": workspace_id} + if token: + headers["Authorization"] = f"Bearer {token}" + + async with httpx.AsyncClient(timeout=3.0) as client: resp = await client.get( - f"{platform_url}/instructions/resolve", - params={"workspace_id": workspace_id}, + f"{platform_url}/workspaces/{workspace_id}/instructions/resolve", + headers=headers, ) if resp.status_code == 200: data = resp.json() return data.get("instructions", "") except Exception as e: - print(f"Warning: could not fetch platform instructions: {e}") + logger.warning("could not fetch platform instructions: %s", e) return "" From dcbcf19da18e8cb95ac6f5231e2ba955d922599d Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Wed, 22 Apr 2026 16:24:55 -0700 Subject: [PATCH 5/5] fix(test): guard msg.metadata assignment for non-Message returns new_agent_text_message returns a real Message object in production but some test mocks return a plain string. Guard with hasattr + try/except so the tool_trace assignment doesn't crash test_non_stream_events_ignored. --- workspace/a2a_executor.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index d48a1151..39ca159e 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -411,8 +411,13 @@ class LangGraphA2AExecutor(AgentExecutor): # immediately as the response (a2a_client.py reads .parts[0].text). # Streaming: yielded as the last SSE event in the stream. msg = new_agent_text_message(final_text, task_id=task_id, context_id=context_id) - if tool_trace: - msg.metadata = {"tool_trace": tool_trace} + # Attach tool_trace via metadata when supported. Guarded with + # hasattr because some test mocks return a plain string here. + if tool_trace and hasattr(msg, "metadata"): + try: + msg.metadata = {"tool_trace": tool_trace} + except (AttributeError, TypeError): + pass await event_queue.enqueue_event(msg) _result = final_text