diff --git a/workspace-server/internal/handlers/a2a_proxy_helpers.go b/workspace-server/internal/handlers/a2a_proxy_helpers.go index 887a2057..ebbd642d 100644 --- a/workspace-server/internal/handlers/a2a_proxy_helpers.go +++ b/workspace-server/internal/handlers/a2a_proxy_helpers.go @@ -161,6 +161,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle }() } summary := a2aMethod + " → " + wsNameForLog + toolTrace := extractToolTrace(respBody) go func(parent context.Context) { logCtx, cancel := context.WithTimeout(context.WithoutCancel(parent), 30*time.Second) defer cancel() @@ -173,6 +174,7 @@ func (h *WorkspaceHandler) logA2ASuccess(ctx context.Context, workspaceID, calle Summary: &summary, RequestBody: json.RawMessage(body), ResponseBody: json.RawMessage(respBody), + ToolTrace: toolTrace, DurationMs: &durationMs, Status: logStatus, }) @@ -234,6 +236,39 @@ func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) e // matching (the wsauth errors are typed for the invalid case). var errInvalidCallerToken = errors.New("missing caller auth token") +// extractToolTrace pulls metadata.tool_trace from an A2A JSON-RPC response. +// Returns nil when absent or malformed — callers can pass it straight through. +func extractToolTrace(respBody []byte) json.RawMessage { + if len(respBody) == 0 { + return nil + } + var top map[string]json.RawMessage + if err := json.Unmarshal(respBody, &top); err != nil { + return nil + } + rawResult, ok := top["result"] + if !ok { + return nil + } + var result map[string]json.RawMessage + if err := json.Unmarshal(rawResult, &result); err != nil { + return nil + } + rawMeta, ok := result["metadata"] + if !ok { + return nil + } + var meta map[string]json.RawMessage + if err := json.Unmarshal(rawMeta, &meta); err != nil { + return nil + } + trace, ok := meta["tool_trace"] + if !ok || len(trace) == 0 { + return nil + } + return trace +} + // extractAndUpsertTokenUsage parses LLM usage from a raw A2A response body // and persists it via upsertTokenUsage. Safe to call in a goroutine — logs // errors but never panics. ctx must already be detached from the request. diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index 8ff6e984..4d98e9fa 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -40,7 +40,7 @@ func (h *ActivityHandler) List(c *gin.Context) { // Build query with optional filters query := `SELECT id, workspace_id, activity_type, source_id, target_id, method, - summary, request_body, response_body, duration_ms, status, error_detail, created_at + summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, created_at FROM activity_logs WHERE workspace_id = $1` args := []interface{}{workspaceID} argIdx := 2 @@ -75,12 +75,12 @@ func (h *ActivityHandler) List(c *gin.Context) { for rows.Next() { var id, wsID, actType, status string var sourceID, targetID, method, summary, errorDetail *string - var reqBody, respBody []byte + var reqBody, respBody, toolTrace []byte var durationMs *int var createdAt time.Time if err := rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method, - &summary, &reqBody, &respBody, &durationMs, &status, &errorDetail, &createdAt); err != nil { + &summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt); err != nil { log.Printf("Activity scan error: %v", err) continue } @@ -104,6 +104,9 @@ func (h *ActivityHandler) List(c *gin.Context) { if respBody != nil { entry["response_body"] = json.RawMessage(respBody) } + if toolTrace != nil { + entry["tool_trace"] = json.RawMessage(toolTrace) + } activities = append(activities, entry) } if err := rows.Err(); err != nil { @@ -382,7 +385,7 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac respJSON = []byte("null") } - var reqStr, respStr *string + var reqStr, respStr, traceStr *string if params.RequestBody != nil { s := string(reqJSON) reqStr = &s @@ -391,12 +394,16 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac s := string(respJSON) respStr = &s } + if len(params.ToolTrace) > 0 { + s := string(params.ToolTrace) + traceStr = &s + } _, err := db.DB.ExecContext(ctx, ` - INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, duration_ms, status, error_detail) - VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9, $10, $11) + INSERT INTO activity_logs (workspace_id, activity_type, source_id, target_id, method, summary, request_body, response_body, tool_trace, duration_ms, status, error_detail) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9::jsonb, $10, $11, $12) `, params.WorkspaceID, params.ActivityType, params.SourceID, params.TargetID, - params.Method, params.Summary, reqStr, respStr, + params.Method, params.Summary, reqStr, respStr, traceStr, params.DurationMs, params.Status, params.ErrorDetail) if err != nil { log.Printf("LogActivity insert error: %v", err) @@ -405,7 +412,7 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac // Broadcast ACTIVITY_LOGGED event if broadcaster != nil { - broadcaster.BroadcastOnly(params.WorkspaceID, "ACTIVITY_LOGGED", map[string]interface{}{ + payload := map[string]interface{}{ "activity_type": params.ActivityType, "method": params.Method, "summary": params.Summary, @@ -413,7 +420,11 @@ func LogActivity(ctx context.Context, broadcaster *events.Broadcaster, params Ac "source_id": params.SourceID, "target_id": params.TargetID, "duration_ms": params.DurationMs, - }) + } + if len(params.ToolTrace) > 0 { + payload["tool_trace"] = json.RawMessage(params.ToolTrace) + } + broadcaster.BroadcastOnly(params.WorkspaceID, "ACTIVITY_LOGGED", payload) } } @@ -426,6 +437,7 @@ type ActivityParams struct { Summary *string RequestBody interface{} ResponseBody interface{} + ToolTrace json.RawMessage // tools/commands the agent actually invoked DurationMs *int Status string // ok, error, timeout ErrorDetail *string diff --git a/workspace-server/internal/handlers/instructions.go b/workspace-server/internal/handlers/instructions.go new file mode 100644 index 00000000..2e8e89ac --- /dev/null +++ b/workspace-server/internal/handlers/instructions.go @@ -0,0 +1,276 @@ +package handlers + +import ( + "log" + "net/http" + "strings" + "time" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/gin-gonic/gin" +) + +// maxInstructionContentLen caps content size to prevent token-budget DoS via +// oversized instructions being prepended to every agent's system prompt. +const maxInstructionContentLen = 8192 + +type InstructionsHandler struct{} + +func NewInstructionsHandler() *InstructionsHandler { + return &InstructionsHandler{} +} + +type Instruction struct { + ID string `json:"id"` + Scope string `json:"scope"` + ScopeTarget *string `json:"scope_target"` + Title string `json:"title"` + Content string `json:"content"` + Priority int `json:"priority"` + Enabled bool `json:"enabled"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// List returns instructions filtered by scope. Agents call this at startup +// to fetch their full instruction set (global + workspace). +// +// GET /instructions?scope=global +// GET /instructions?workspace_id= (returns global + workspace) +// +// Team scope is reserved in the schema but not yet wired — teams/team_members +// tables don't exist in any migration. Adding team support requires a new +// migration first. +func (h *InstructionsHandler) List(c *gin.Context) { + ctx := c.Request.Context() + scope := c.Query("scope") + workspaceID := c.Query("workspace_id") + + if workspaceID != "" { + query := `SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at + FROM platform_instructions + WHERE enabled = true AND ( + scope = 'global' + OR (scope = 'workspace' AND scope_target = $1) + ) + ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'workspace' THEN 2 END, + priority DESC` + r, qErr := db.DB.QueryContext(ctx, query, workspaceID) + if qErr != nil { + log.Printf("Instructions list error: %v", qErr) + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + defer r.Close() + c.JSON(http.StatusOK, scanInstructions(r)) + return + } + + // Admin listing by scope + query := `SELECT id, scope, scope_target, title, content, priority, enabled, created_at, updated_at + FROM platform_instructions WHERE 1=1` + args := []interface{}{} + if scope != "" { + query += ` AND scope = $1` + args = append(args, scope) + } + query += ` ORDER BY scope, priority DESC, created_at` + + r, qErr := db.DB.QueryContext(ctx, query, args...) + if qErr != nil { + log.Printf("Instructions list error: %v", qErr) + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + defer r.Close() + c.JSON(http.StatusOK, scanInstructions(r)) +} + +// Create adds a new platform instruction. +// POST /instructions +func (h *InstructionsHandler) Create(c *gin.Context) { + var body struct { + Scope string `json:"scope" binding:"required"` + ScopeTarget *string `json:"scope_target"` + Title string `json:"title" binding:"required"` + Content string `json:"content" binding:"required"` + Priority int `json:"priority"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "scope, title, and content are required"}) + return + } + if body.Scope != "global" && body.Scope != "workspace" { + c.JSON(http.StatusBadRequest, gin.H{"error": "scope must be global or workspace (team scope not yet supported)"}) + return + } + if body.Scope == "workspace" && (body.ScopeTarget == nil || *body.ScopeTarget == "") { + c.JSON(http.StatusBadRequest, gin.H{"error": "scope_target required for workspace scope"}) + return + } + if len(body.Content) > maxInstructionContentLen { + c.JSON(http.StatusBadRequest, gin.H{"error": "content exceeds 8192 chars"}) + return + } + if len(body.Title) > 200 { + c.JSON(http.StatusBadRequest, gin.H{"error": "title exceeds 200 chars"}) + return + } + + var id string + err := db.DB.QueryRowContext(c.Request.Context(), + `INSERT INTO platform_instructions (scope, scope_target, title, content, priority) + VALUES ($1, $2, $3, $4, $5) RETURNING id`, + body.Scope, body.ScopeTarget, body.Title, body.Content, body.Priority, + ).Scan(&id) + if err != nil { + log.Printf("Instructions create error: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "insert failed"}) + return + } + c.JSON(http.StatusCreated, gin.H{"id": id}) +} + +// Update modifies an existing instruction. +// PUT /instructions/:id +func (h *InstructionsHandler) Update(c *gin.Context) { + id := c.Param("id") + var body struct { + Title *string `json:"title"` + Content *string `json:"content"` + Priority *int `json:"priority"` + Enabled *bool `json:"enabled"` + } + if err := c.ShouldBindJSON(&body); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request"}) + return + } + if body.Content != nil && len(*body.Content) > maxInstructionContentLen { + c.JSON(http.StatusBadRequest, gin.H{"error": "content exceeds 8192 chars"}) + return + } + if body.Title != nil && len(*body.Title) > 200 { + c.JSON(http.StatusBadRequest, gin.H{"error": "title exceeds 200 chars"}) + return + } + + result, err := db.DB.ExecContext(c.Request.Context(), + `UPDATE platform_instructions SET + title = COALESCE($2, title), + content = COALESCE($3, content), + priority = COALESCE($4, priority), + enabled = COALESCE($5, enabled), + updated_at = NOW() + WHERE id = $1`, + id, body.Title, body.Content, body.Priority, body.Enabled, + ) + if err != nil { + log.Printf("Instructions update error: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"}) + return + } + if n, _ := result.RowsAffected(); n == 0 { + c.JSON(http.StatusNotFound, gin.H{"error": "instruction not found"}) + return + } + c.JSON(http.StatusOK, gin.H{"status": "updated"}) +} + +// Delete removes an instruction. +// DELETE /instructions/:id +func (h *InstructionsHandler) Delete(c *gin.Context) { + id := c.Param("id") + result, err := db.DB.ExecContext(c.Request.Context(), + `DELETE FROM platform_instructions WHERE id = $1`, id) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "delete failed"}) + return + } + if n, _ := result.RowsAffected(); n == 0 { + c.JSON(http.StatusNotFound, gin.H{"error": "instruction not found"}) + return + } + c.JSON(http.StatusOK, gin.H{"status": "deleted"}) +} + +// Resolve returns the merged instruction text for a workspace — all enabled +// instructions across global → workspace scope, concatenated in order. +// This is what the Python runtime calls to get the full instruction set. +// +// GET /workspaces/:id/instructions/resolve +// +// Mounted under wsAuth so the caller must hold a valid bearer token for +// :id, preventing cross-workspace enumeration of operator policy. +func (h *InstructionsHandler) Resolve(c *gin.Context) { + workspaceID := c.Param("id") + if workspaceID == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "workspace id required"}) + return + } + ctx := c.Request.Context() + + rows, err := db.DB.QueryContext(ctx, + `SELECT scope, title, content FROM platform_instructions + WHERE enabled = true AND ( + scope = 'global' + OR (scope = 'workspace' AND scope_target = $1) + ) + ORDER BY CASE scope WHEN 'global' THEN 0 WHEN 'workspace' THEN 2 END, + priority DESC`, + workspaceID) + if err != nil { + log.Printf("Instructions resolve error: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + defer rows.Close() + + var b strings.Builder + currentScope := "" + for rows.Next() { + var scope, title, content string + if err := rows.Scan(&scope, &title, &content); err != nil { + continue + } + if scope != currentScope { + scopeLabel := "Platform-Wide Rules" + if scope == "workspace" { + scopeLabel = "Role-Specific Rules" + } + b.WriteString("\n## ") + b.WriteString(scopeLabel) + b.WriteString("\n\n") + currentScope = scope + } + b.WriteString("### ") + b.WriteString(title) + b.WriteString("\n") + b.WriteString(content) + b.WriteString("\n\n") + } + + c.JSON(http.StatusOK, gin.H{ + "workspace_id": workspaceID, + "instructions": b.String(), + }) +} + +func scanInstructions(rows interface { + Next() bool + Scan(dest ...interface{}) error +}) []Instruction { + var instructions []Instruction + for rows.Next() { + var inst Instruction + if err := rows.Scan(&inst.ID, &inst.Scope, &inst.ScopeTarget, &inst.Title, + &inst.Content, &inst.Priority, &inst.Enabled, &inst.CreatedAt, &inst.UpdatedAt); err != nil { + log.Printf("Instructions scan error: %v", err) + continue + } + instructions = append(instructions, inst) + } + if instructions == nil { + instructions = []Instruction{} + } + return instructions +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index b9610fd6..07285e70 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -364,6 +364,22 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi adminAuth.DELETE("/admin/secrets/:key", sechGlobal.DeleteGlobal) } + // Platform instructions — configurable rules with global/workspace scope. + // Admin endpoints for CRUD; workspace-facing resolve endpoint for agent bootstrap. + // (Team scope is reserved in the schema but not yet wired — needs teams/team_members + // migration first.) + { + instrH := handlers.NewInstructionsHandler() + adminInstr := r.Group("", middleware.AdminAuth(db.DB)) + adminInstr.GET("/instructions", instrH.List) + adminInstr.POST("/instructions", instrH.Create) + adminInstr.PUT("/instructions/:id", instrH.Update) + adminInstr.DELETE("/instructions/:id", instrH.Delete) + // Resolve mounted under wsAuth — caller must hold a valid bearer token + // for :id, preventing cross-workspace enumeration of operator policy. + wsAuth.GET("/instructions/resolve", instrH.Resolve) + } + // Admin — cross-workspace schedule health monitoring (issue #618). // Lets cron-audit agents and operators detect silent schedule failures // across all workspaces without holding individual workspace bearer tokens. diff --git a/workspace-server/migrations/039_activity_tool_trace.down.sql b/workspace-server/migrations/039_activity_tool_trace.down.sql new file mode 100644 index 00000000..73691b56 --- /dev/null +++ b/workspace-server/migrations/039_activity_tool_trace.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_activity_logs_tool_trace; +ALTER TABLE activity_logs DROP COLUMN IF EXISTS tool_trace; diff --git a/workspace-server/migrations/039_activity_tool_trace.up.sql b/workspace-server/migrations/039_activity_tool_trace.up.sql new file mode 100644 index 00000000..03dd0a4c --- /dev/null +++ b/workspace-server/migrations/039_activity_tool_trace.up.sql @@ -0,0 +1,9 @@ +-- Add tool_trace column to activity_logs for platform-level observability. +-- Stores the list of tools/commands an agent actually invoked during an A2A +-- call, extracted from the A2A response metadata. Enables verifying agent +-- claims ("I checked X") against what tools were actually called. +ALTER TABLE activity_logs ADD COLUMN IF NOT EXISTS tool_trace JSONB; + +-- Index for querying which agents used specific tools +CREATE INDEX IF NOT EXISTS idx_activity_logs_tool_trace + ON activity_logs USING gin (tool_trace) WHERE tool_trace IS NOT NULL; diff --git a/workspace-server/migrations/040_platform_instructions.down.sql b/workspace-server/migrations/040_platform_instructions.down.sql new file mode 100644 index 00000000..acebc56f --- /dev/null +++ b/workspace-server/migrations/040_platform_instructions.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS idx_platform_instructions_scope; +DROP TABLE IF EXISTS platform_instructions; diff --git a/workspace-server/migrations/040_platform_instructions.up.sql b/workspace-server/migrations/040_platform_instructions.up.sql new file mode 100644 index 00000000..04d0ac7d --- /dev/null +++ b/workspace-server/migrations/040_platform_instructions.up.sql @@ -0,0 +1,20 @@ +-- Platform-level configurable instructions with global/team/workspace scope. +-- Injected into every agent's system prompt at startup and refreshed +-- periodically, so platform operators can enforce rules without editing +-- template files. +CREATE TABLE IF NOT EXISTS platform_instructions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + scope TEXT NOT NULL CHECK (scope IN ('global', 'team', 'workspace')), + scope_target TEXT, -- NULL for global, team slug for team, workspace_id for workspace + title TEXT NOT NULL CHECK (length(title) <= 200), + -- Cap content at 8KB so an oversized instruction can't blow past LLM + -- prompt-size limits when prepended to every agent's system prompt. + content TEXT NOT NULL CHECK (length(content) <= 8192), + priority INT DEFAULT 0, -- higher = shown first within scope + enabled BOOLEAN DEFAULT true, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_platform_instructions_scope + ON platform_instructions (scope, scope_target) WHERE enabled = true; diff --git a/workspace/a2a_executor.py b/workspace/a2a_executor.py index 81b17a35..39ca159e 100644 --- a/workspace/a2a_executor.py +++ b/workspace/a2a_executor.py @@ -304,6 +304,16 @@ class LangGraphA2AExecutor(AgentExecutor): else None ) + # ── Tool trace: collect every tool invocation for + # platform-level observability ──────────────────── + # Keyed by run_id so parallel tool calls (LangGraph + # supports them) pair start→end correctly. Capped at + # MAX_TOOL_TRACE entries to prevent runaway loops from + # ballooning the JSONB payload. + MAX_TOOL_TRACE = 200 + tool_trace: list[dict] = [] + tool_trace_by_run: dict[str, dict] = {} + async for event in self.agent.astream_events( {"messages": messages}, config=run_config, @@ -334,7 +344,17 @@ class LangGraphA2AExecutor(AgentExecutor): elif kind == "on_tool_start": tool_name = event.get("name", "?") + tool_input = event.get("data", {}).get("input", "") + tool_run_id = event.get("run_id", "") logger.debug("SSE: tool start — %s", tool_name) + if len(tool_trace) < MAX_TOOL_TRACE: + entry = { + "tool": tool_name, + "input": str(tool_input)[:500] if tool_input else "", + } + tool_trace.append(entry) + if tool_run_id: + tool_trace_by_run[tool_run_id] = entry if _agency is not None: _agency.on_tool_call( tool_name=tool_name, @@ -342,7 +362,14 @@ class LangGraphA2AExecutor(AgentExecutor): ) elif kind == "on_tool_end": - logger.debug("SSE: tool end — %s", event.get("name", "?")) + tool_end_name = event.get("name", "?") + tool_output = event.get("data", {}).get("output", "") + tool_run_id = event.get("run_id", "") + logger.debug("SSE: tool end — %s", tool_end_name) + # Pair via run_id so parallel tool calls don't clobber each other. + entry = tool_trace_by_run.get(tool_run_id) if tool_run_id else None + if entry is not None: + entry["output_preview"] = str(tool_output)[:300] if tool_output else "" elif kind == "on_chat_model_end": # Capture the last completed AIMessage for token telemetry @@ -383,9 +410,15 @@ class LangGraphA2AExecutor(AgentExecutor): # Non-streaming: ResultAggregator.consume_all() returns this # immediately as the response (a2a_client.py reads .parts[0].text). # Streaming: yielded as the last SSE event in the stream. - await event_queue.enqueue_event( - new_agent_text_message(final_text, task_id=task_id, context_id=context_id) - ) + msg = new_agent_text_message(final_text, task_id=task_id, context_id=context_id) + # Attach tool_trace via metadata when supported. Guarded with + # hasattr because some test mocks return a plain string here. + if tool_trace and hasattr(msg, "metadata"): + try: + msg.metadata = {"tool_trace": tool_trace} + except (AttributeError, TypeError): + pass + await event_queue.enqueue_event(msg) _result = final_text except Exception as e: diff --git a/workspace/adapter_base.py b/workspace/adapter_base.py index 0de914c4..8cb5cb8d 100644 --- a/workspace/adapter_base.py +++ b/workspace/adapter_base.py @@ -294,7 +294,7 @@ class BaseAdapter(ABC): from plugins import load_plugins from skill_loader.loader import load_skills from coordinator import get_children, get_parent_context, build_children_description - from prompt import build_system_prompt, get_peer_capabilities + from prompt import build_system_prompt, get_peer_capabilities, get_platform_instructions from builtin_tools.approval import request_approval from builtin_tools.delegation import delegate_to_workspace, check_delegation_status from builtin_tools.memory import commit_memory, search_memory @@ -344,6 +344,7 @@ class BaseAdapter(ABC): # Build system prompt with all context peers = await get_peer_capabilities(platform_url, config.workspace_id) + platform_instructions = await get_platform_instructions(platform_url, config.workspace_id) coordinator_prompt = build_children_description(children) if is_coordinator else "" extra_prompts = list(plugins.prompt_fragments) if coordinator_prompt: @@ -355,6 +356,7 @@ class BaseAdapter(ABC): plugin_rules=plugins.rules, plugin_prompts=extra_prompts, parent_context=parent_context, + platform_instructions=platform_instructions, ) return SetupResult( diff --git a/workspace/prompt.py b/workspace/prompt.py index 33de1265..70cce126 100644 --- a/workspace/prompt.py +++ b/workspace/prompt.py @@ -1,10 +1,14 @@ """Build the system prompt for the workspace agent.""" +import logging +import os from pathlib import Path from skill_loader.loader import LoadedSkill from shared_runtime import build_peer_section +logger = logging.getLogger(__name__) + DEFAULT_MEMORY_SNAPSHOT_FILES = ("MEMORY.md", "USER.md") @@ -25,6 +29,35 @@ async def get_peer_capabilities(platform_url: str, workspace_id: str) -> list[di return [] +async def get_platform_instructions(platform_url: str, workspace_id: str) -> str: + """Fetch resolved platform instructions (global + workspace scope). + + Endpoint is gated by WorkspaceAuth — the workspace token (read from env) + is sent as a bearer header. Fails open (returns "") on any error so a + platform outage doesn't block agent startup. Short timeout (3s) because + this runs in the boot hot path. + """ + try: + import httpx + + token = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "") + headers = {"X-Workspace-ID": workspace_id} + if token: + headers["Authorization"] = f"Bearer {token}" + + async with httpx.AsyncClient(timeout=3.0) as client: + resp = await client.get( + f"{platform_url}/workspaces/{workspace_id}/instructions/resolve", + headers=headers, + ) + if resp.status_code == 200: + data = resp.json() + return data.get("instructions", "") + except Exception as e: + logger.warning("could not fetch platform instructions: %s", e) + return "" + + def build_system_prompt( config_path: str, workspace_id: str, @@ -34,6 +67,7 @@ def build_system_prompt( plugin_rules: list[str] | None = None, plugin_prompts: list[str] | None = None, parent_context: list[dict] | None = None, + platform_instructions: str = "", ) -> str: """Build the complete system prompt. @@ -50,6 +84,12 @@ def build_system_prompt( """ parts = [] + # Platform instructions (global → team → workspace scope) go first so + # they take highest precedence in the context window. + if platform_instructions: + parts.append("# Platform Instructions\n") + parts.append(platform_instructions) + # Load prompt files in order files_to_load = list(prompt_files or []) if not files_to_load: