molecule-core/workspace/prompt.py
rabbitblood ed26f2733a fix(review): address code review blockers on tool-trace + instructions
BLOCKERS fixed:
- instructions.go: Drop team-scope queries (teams/team_members tables don't
  exist in any migration). Schema column kept for future. Restored Resolve
  to /workspaces/:id/instructions/resolve under wsAuth — closes auth gap
  that allowed cross-workspace enumeration of operator policy.
- migration 040: Add CHECK constraints on title (<=200) and content (<=8192)
  to prevent token-budget DoS via oversized instructions.
- a2a_executor.py: Pair on_tool_start/on_tool_end via run_id instead of
  list-position so parallel tool calls don't drop or clobber outputs. Cap
  tool_trace at 200 entries to prevent runaway loops bloating JSONB.

HIGH fixes:
- instructions.go: Add length validation in Create + Update handlers.
  Removed dead rows_ shadow variable. Replaced string concatenation in
  Resolve with strings.Builder.
- prompt.py: Drop httpx timeout 10s -> 3s (boot hot path). Switch print
  to logger.warning. Add Authorization bearer header from
  MOLECULE_WORKSPACE_TOKEN env var.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-22 16:18:06 -07:00

173 lines
6.0 KiB
Python

"""Build the system prompt for the workspace agent."""
import logging
import os
from pathlib import Path
from skill_loader.loader import LoadedSkill
from shared_runtime import build_peer_section
logger = logging.getLogger(__name__)
DEFAULT_MEMORY_SNAPSHOT_FILES = ("MEMORY.md", "USER.md")
async def get_peer_capabilities(platform_url: str, workspace_id: str) -> list[dict]:
"""Fetch peer workspace capabilities from the platform."""
try:
import httpx
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{platform_url}/registry/{workspace_id}/peers",
headers={"X-Workspace-ID": workspace_id},
)
if resp.status_code == 200:
return resp.json()
except Exception as e:
print(f"Warning: could not fetch peers: {e}")
return []
async def get_platform_instructions(platform_url: str, workspace_id: str) -> str:
"""Fetch resolved platform instructions (global + workspace scope).
Endpoint is gated by WorkspaceAuth — the workspace token (read from env)
is sent as a bearer header. Fails open (returns "") on any error so a
platform outage doesn't block agent startup. Short timeout (3s) because
this runs in the boot hot path.
"""
try:
import httpx
token = os.environ.get("MOLECULE_WORKSPACE_TOKEN", "")
headers = {"X-Workspace-ID": workspace_id}
if token:
headers["Authorization"] = f"Bearer {token}"
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(
f"{platform_url}/workspaces/{workspace_id}/instructions/resolve",
headers=headers,
)
if resp.status_code == 200:
data = resp.json()
return data.get("instructions", "")
except Exception as e:
logger.warning("could not fetch platform instructions: %s", e)
return ""
def build_system_prompt(
config_path: str,
workspace_id: str,
loaded_skills: list[LoadedSkill],
peers: list[dict],
prompt_files: list[str] | None = None,
plugin_rules: list[str] | None = None,
plugin_prompts: list[str] | None = None,
parent_context: list[dict] | None = None,
platform_instructions: str = "",
) -> str:
"""Build the complete system prompt.
Loads prompt files in order from config_path. If prompt_files is specified
in config.yaml, those files are loaded in order. Otherwise falls back to
system-prompt.md for backwards compatibility.
If MEMORY.md or USER.md exist alongside the config, they are appended as a
frozen memory snapshot without needing to list them explicitly.
This allows different agent frameworks to use their own file structures:
- OpenClaw: SOUL.md, BOOTSTRAP.md, AGENTS.md, HEARTBEAT.md, TOOLS.md, USER.md
- Claude Code: CLAUDE.md
- Default: system-prompt.md
"""
parts = []
# Platform instructions (global → team → workspace scope) go first so
# they take highest precedence in the context window.
if platform_instructions:
parts.append("# Platform Instructions\n")
parts.append(platform_instructions)
# Load prompt files in order
files_to_load = list(prompt_files or [])
if not files_to_load:
# Backwards compatible: fall back to system-prompt.md
files_to_load = ["system-prompt.md"]
seen_files = set(files_to_load)
for filename in files_to_load:
file_path = Path(config_path) / filename
if file_path.exists():
content = file_path.read_text().strip()
if content:
parts.append(content)
else:
print(f"Warning: prompt file not found: {file_path}")
# Hermes-style memory snapshot files: load automatically when present.
# These stay as thin markdown files so the runtime does not need a new storage layer.
for filename in DEFAULT_MEMORY_SNAPSHOT_FILES:
if filename in seen_files:
continue
file_path = Path(config_path) / filename
if file_path.exists():
content = file_path.read_text().strip()
if content:
parts.append(content)
# Inject parent's shared context (if this workspace is a child)
if parent_context:
parts.append("\n## Parent Context\n")
parts.append("The following context was shared by your parent workspace:\n")
for ctx_file in parent_context:
path = ctx_file.get("path", "unknown")
content = ctx_file.get("content", "")
if content.strip():
parts.append(f"### {path}")
parts.append(content.strip())
parts.append("")
# Inject plugin rules (always-on guidelines from ECC, Superpowers, etc.)
if plugin_rules:
parts.append("\n## Platform Rules\n")
for rule in plugin_rules:
parts.append(rule)
parts.append("")
# Inject plugin prompt fragments
if plugin_prompts:
parts.append("\n## Platform Guidelines\n")
for fragment in plugin_prompts:
parts.append(fragment)
parts.append("")
# Add skill instructions
if loaded_skills:
parts.append("\n## Your Skills\n")
for skill in loaded_skills:
parts.append(f"### {skill.metadata.name}")
if skill.metadata.description:
parts.append(skill.metadata.description)
parts.append(skill.instructions)
parts.append("")
# Add peer capabilities with a single shared renderer.
peer_section = build_peer_section(peers)
if peer_section:
parts.append(peer_section)
# Add delegation failure handling
parts.append("""
## Handling delegation failures
If a delegation fails:
1. Check if the task is blocking — if not, continue other work
2. Retry transient failures (connection errors) after 30 seconds
3. For persistent failures, report to the caller with context
4. Never silently drop a failed task
""")
return "\n".join(parts)