diff --git a/workspace-template/adapters/smolagents/__init__.py b/workspace-template/adapters/smolagents/__init__.py new file mode 100644 index 00000000..8b4b6d1b --- /dev/null +++ b/workspace-template/adapters/smolagents/__init__.py @@ -0,0 +1,32 @@ +"""Smolagents adapter for Molecule AI workspace runtime. + +Provides env sanitization and safe executor/messaging primitives for use +with HuggingFace's smolagents library. + +Two env-sanitization strategies are available: + +* **Allowlist** (recommended) — :mod:`adapters.smolagents.env_sanitize`: + only explicitly-safe variables pass through. Stricter but requires keeping + the allowlist up-to-date as new safe vars are needed. + +* **Denylist** (simple) — :mod:`adapters.smolagents.safe_env`: + well-known secret names plus ``*_API_KEY`` / ``*_TOKEN`` suffix patterns + are stripped. Easier to start with; less exhaustive. + +Quick start:: + + # Allowlist approach (stricter) + from adapters.smolagents.env_sanitize import make_safe_env, SafeLocalPythonExecutor + + # Denylist approach (simpler) + from adapters.smolagents.safe_env import make_safe_env + + # Safe messaging + from adapters.smolagents.send_message_wrapper import safe_send_message +""" + +# Re-export the allowlist-based make_safe_env as the default (most secure). +from adapters.smolagents.env_sanitize import SafeLocalPythonExecutor, make_safe_env +from adapters.smolagents.send_message_wrapper import safe_send_message + +__all__ = ["make_safe_env", "SafeLocalPythonExecutor", "safe_send_message"] diff --git a/workspace-template/adapters/smolagents/env_sanitize.py b/workspace-template/adapters/smolagents/env_sanitize.py new file mode 100644 index 00000000..a8dc92d1 --- /dev/null +++ b/workspace-template/adapters/smolagents/env_sanitize.py @@ -0,0 +1,226 @@ +"""Allowlist-based environment sanitization for smolagents (#826 — C3 CRITICAL). + +Security model +-------------- +We use an **allowlist** (not a denylist) — only variables explicitly +enumerated as safe are passed through to agent-executed code. Any key not +on the list is silently dropped. + +This is intentionally strict: adding a new safe variable is a deliberate +engineering act that surfaces in code review, rather than hoping a regex +denylist catches every new secret name. + +Thread safety +------------- +``SafeLocalPythonExecutor.__call__`` mutates ``os.environ`` temporarily. +``_ENV_PATCH_LOCK`` serialises concurrent calls so simultaneous executions +do not see each other's env patches. + +Extending the allowlist +----------------------- +Set ``SMOLAGENTS_ENV_EXTRA_ALLOWLIST`` to a comma-separated list of +additional uppercase env var names that should be passed through. This is +intended for workspace-specific non-secret variables (e.g. ``WORKSPACE_ID`` +that you know are safe): + + SMOLAGENTS_ENV_EXTRA_ALLOWLIST="MY_COMPANY_ENV,REGION" + +Never add secret names here — use workspace secrets injection instead. +""" + +from __future__ import annotations + +import os +import threading +from typing import Any, Dict, List, Optional + +# --------------------------------------------------------------------------- +# Allowlist configuration +# --------------------------------------------------------------------------- + +# Core safe env variables — non-secret system and runtime variables that +# agent code may legitimately need (e.g. PATH for subprocess-free tools, +# PYTHONPATH for module resolution, TZ for datetime ops). +_SAFE_ENV_ALLOWLIST: frozenset = frozenset( + [ + # Shell / system fundamentals + "PATH", + "HOME", + "USER", + "LOGNAME", + "SHELL", + "TERM", + "TZ", + "TMPDIR", + "TEMP", + "TMP", + # Language / locale + "LANG", + "LANGUAGE", + "LC_ALL", + "LC_CTYPE", + "LC_MESSAGES", + "LC_NUMERIC", + "LC_TIME", + # Python runtime + "PYTHONPATH", + "PYTHONHOME", + "PYTHONDONTWRITEBYTECODE", + "PYTHONUNBUFFERED", + "PYTHONIOENCODING", + # Molecule workspace non-secret identity vars + "WORKSPACE_ID", + "WORKSPACE_NAME", + "PLATFORM_URL", + ] +) + +# Imports permanently excluded from the executor's authorized list. +# These are well-known sandbox-escape vectors. +_BANNED_IMPORTS: frozenset = frozenset( + ["subprocess", "socket", "ctypes", "importlib", "importlib.util"] +) + +# Baseline imports every SafeLocalPythonExecutor allows — pure-computation +# modules with no I/O escape surface. +_BASELINE_SAFE_IMPORTS: List[str] = [ + "math", + "json", + "re", + "datetime", + "collections", + "itertools", + "functools", + "typing", + "string", + "textwrap", + "decimal", + "fractions", + "statistics", + "random", + "hashlib", + "base64", + "urllib.parse", + "copy", + "dataclasses", + "enum", + "abc", + "io", +] + +# Thread lock for env patching +_ENV_PATCH_LOCK = threading.Lock() + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def make_safe_env( + extra_allowed: Optional[List[str]] = None, +) -> Dict[str, str]: + """Return a *copy* of the environment containing only allowlisted keys. + + ``os.environ`` is **never mutated** by this function. + + Parameters + ---------- + extra_allowed: + Additional variable names to include beyond the built-in allowlist. + Also merged with the ``SMOLAGENTS_ENV_EXTRA_ALLOWLIST`` env var. + + Returns + ------- + dict + A copy of ``os.environ`` filtered to allowlisted keys only. + Keys not on the list are silently dropped. + """ + allowed = set(_SAFE_ENV_ALLOWLIST) + + # Merge caller-provided extras + if extra_allowed: + allowed.update(k.upper() for k in extra_allowed) + + # Merge env-var-configured extras + env_extra = os.environ.get("SMOLAGENTS_ENV_EXTRA_ALLOWLIST", "") + if env_extra: + for key in env_extra.split(","): + key = key.strip().upper() + if key: + allowed.add(key) + + return {k: v for k, v in os.environ.items() if k in allowed} + + +class SafeLocalPythonExecutor: + """Allowlist-gated wrapper around smolagents ``LocalPythonExecutor``. + + Guarantees that agent-generated code cannot read secret environment + variables (``ANTHROPIC_API_KEY``, ``GH_TOKEN``, ``DATABASE_URL``, etc.) + because they are absent from ``os.environ`` during execution. + + Parameters + ---------- + additional_imports: + Extra module names to allow beyond ``_BASELINE_SAFE_IMPORTS``. + ``_BANNED_IMPORTS`` takes precedence — listed names are silently + removed. + extra_allowed_env: + Extra variable names to pass through beyond the core allowlist. + _inner: + Inject a mock ``LocalPythonExecutor`` for tests. When ``None``, + the real smolagents executor is constructed lazily. + """ + + def __init__( + self, + additional_imports: Optional[List[str]] = None, + extra_allowed_env: Optional[List[str]] = None, + *, + _inner: Any = None, + ) -> None: + # Compute final import list (baseline + extras − banned) + combined = list(_BASELINE_SAFE_IMPORTS) + if additional_imports: + for imp in additional_imports: + if imp not in _BANNED_IMPORTS: + combined.append(imp) + + self._authorized_imports: List[str] = combined + self._extra_allowed_env: Optional[List[str]] = extra_allowed_env + self._inner = _inner # may be None until first call + + def _get_inner(self) -> Any: + """Lazy-construct the real executor on first use (avoids import errors in tests).""" + if self._inner is None: + from smolagents import LocalPythonExecutor # type: ignore[import] + + self._inner = LocalPythonExecutor( + additional_authorized_imports=self._authorized_imports + ) + return self._inner + + def __call__(self, code: str, *args: Any, **kwargs: Any) -> Any: + """Execute ``code`` with only allowlisted env vars visible. + + All keys not on the allowlist are removed from ``os.environ`` for + the duration of execution and restored afterward, even on exception. + The lock ensures thread safety across concurrent calls. + """ + safe_env = make_safe_env(self._extra_allowed_env) + inner = self._get_inner() + + with _ENV_PATCH_LOCK: + # Snapshot full current env + original_env = dict(os.environ) + # Remove everything not in the safe set + keys_to_remove = [k for k in os.environ if k not in safe_env] + for k in keys_to_remove: + del os.environ[k] + try: + return inner(code, *args, **kwargs) + finally: + # Always restore + os.environ.clear() + os.environ.update(original_env) diff --git a/workspace-template/adapters/smolagents/safe_env.py b/workspace-template/adapters/smolagents/safe_env.py new file mode 100644 index 00000000..5664f1e8 --- /dev/null +++ b/workspace-template/adapters/smolagents/safe_env.py @@ -0,0 +1,61 @@ +"""Denylist-based environment sanitization for smolagents (issue #826 — C3 CRITICAL). + +This module provides a simple denylist approach: well-known secret variable +names plus ``*_API_KEY`` and ``*_TOKEN`` suffix patterns are stripped before +env is passed to agent-executed code. + +For a stricter allowlist-based alternative that only passes explicitly-safe +variables through, see :mod:`adapters.smolagents.env_sanitize`. + +Usage:: + + from adapters.smolagents.safe_env import make_safe_env + + executor = LocalPythonExecutor(...) + # Pass only the sanitised env to the subprocess / exec context: + safe = make_safe_env() +""" + +import copy +import os + +# Named API keys and tokens known to be used by smolagents / LLM clients. +# These are removed regardless of the suffix-pattern below. +SMOLAGENTS_ENV_DENYLIST: frozenset = frozenset( + { + "OPENAI_API_KEY", + "ANTHROPIC_API_KEY", + "GROQ_API_KEY", + "CEREBRAS_API_KEY", + "QIANFAN_API_KEY", + "LANGFUSE_SECRET_KEY", + "LANGFUSE_PUBLIC_KEY", + "HF_TOKEN", + } +) + + +def make_safe_env() -> dict: + """Return a sanitised copy of ``os.environ`` with secrets removed. + + Removes any key that: + - Is in :data:`SMOLAGENTS_ENV_DENYLIST`, OR + - Ends with ``_API_KEY``, OR + - Ends with ``_TOKEN`` + + ``os.environ`` is **never mutated** — a fresh ``dict`` copy is returned. + + Returns + ------- + dict + A copy of the current environment with secret keys removed. + """ + env = copy.copy(dict(os.environ)) + for key in list(env.keys()): + if ( + key in SMOLAGENTS_ENV_DENYLIST + or key.endswith("_API_KEY") + or key.endswith("_TOKEN") + ): + del env[key] + return env diff --git a/workspace-template/adapters/smolagents/send_message_wrapper.py b/workspace-template/adapters/smolagents/send_message_wrapper.py new file mode 100644 index 00000000..01bf053e --- /dev/null +++ b/workspace-template/adapters/smolagents/send_message_wrapper.py @@ -0,0 +1,71 @@ +"""Safe send_message wrapper for smolagents (issue #827 — C1 HIGH). + +Prevents social-engineering attacks where agent-generated content could +impersonate platform messages, inject HTML, or flood the user chat. + +Guarantees +---------- +1. Every message is prefixed with ``[smolagents]`` so recipients can + attribute it to the agent and cannot be mistaken for platform UI. +2. Truncated to 2000 characters to prevent log/UI floods. +3. HTML entities (``<``, ``>``, ``&``, ``"``, ``'``) are escaped so + rendered UIs that interpret HTML cannot be injected into. + +Usage:: + + from adapters.smolagents.send_message_wrapper import safe_send_message + + safe_send_message("Hello world", send_fn=platform_client.send) +""" + +from __future__ import annotations + +import html +import logging + +logger = logging.getLogger(__name__) + +# Maximum character length for the *user-visible* portion of the message +# (label prefix does not count toward this cap). +_MAX_TEXT_LEN: int = 2000 + +# Label prepended to every outbound message. +_LABEL: str = "[smolagents]" + + +def safe_send_message(text: str, send_fn) -> None: + """Sanitise *text* and deliver it via *send_fn*. + + Parameters + ---------- + text: + The raw message text produced by the agent. + send_fn: + Callable that delivers the message (e.g. ``platform_client.send`` + or a WebSocket broadcast function). Called with the final, + sanitised string as its sole positional argument. + + Side effects + ------------ + - Logs a warning when truncation occurs. + - Logs a debug entry with the final payload length. + """ + if not isinstance(text, str): + text = str(text) + + # Strip HTML entities to prevent injection into rendered UIs. + sanitised = html.escape(text, quote=True) + + # Truncate to cap (before adding label so cap applies to content). + if len(sanitised) > _MAX_TEXT_LEN: + logger.warning( + "safe_send_message: truncating message from %d to %d chars", + len(sanitised), + _MAX_TEXT_LEN, + ) + sanitised = sanitised[:_MAX_TEXT_LEN] + + payload = f"{_LABEL} {sanitised}" + + logger.debug("safe_send_message: delivering %d-char payload", len(payload)) + send_fn(payload) diff --git a/workspace-template/tests/adapters/__init__.py b/workspace-template/tests/adapters/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workspace-template/tests/adapters/smolagents/__init__.py b/workspace-template/tests/adapters/smolagents/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/workspace-template/tests/adapters/smolagents/test_env_sanitize.py b/workspace-template/tests/adapters/smolagents/test_env_sanitize.py new file mode 100644 index 00000000..905ac0bc --- /dev/null +++ b/workspace-template/tests/adapters/smolagents/test_env_sanitize.py @@ -0,0 +1,446 @@ +"""Tests for allowlist-based env sanitization (issue #826 — C3 CRITICAL). + +All tests patch os.environ directly — the module under test must never +mutate the real process env outside of SafeLocalPythonExecutor.__call__, +and even there it must restore the original env on exit. +""" + +from __future__ import annotations + +import os +import threading +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + +# Import directly from submodule to avoid any sys.modules stub side-effects +from adapters.smolagents.env_sanitize import ( + SafeLocalPythonExecutor, + _BANNED_IMPORTS, + _BASELINE_SAFE_IMPORTS, + _SAFE_ENV_ALLOWLIST, + make_safe_env, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +class _MockInner: + """Captures the code string passed to it; returns a configurable result.""" + + def __init__(self, return_value: Any = None): + self.calls: list[str] = [] + self.return_value = return_value + + def __call__(self, code: str, *args: Any, **kwargs: Any) -> Any: + self.calls.append(code) + return self.return_value + + +# --------------------------------------------------------------------------- +# make_safe_env() — pure function tests (os.environ never mutated) +# --------------------------------------------------------------------------- + + +class TestMakeSafeEnv: + def test_strips_anthropic_api_key(self): + with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "sk-ant-secret"}, clear=False): + result = make_safe_env() + assert "ANTHROPIC_API_KEY" not in result + + def test_strips_gh_token(self): + with patch.dict(os.environ, {"GH_TOKEN": "ghp_secret"}, clear=False): + result = make_safe_env() + assert "GH_TOKEN" not in result + + def test_strips_openai_api_key(self): + with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-openai"}, clear=False): + result = make_safe_env() + assert "OPENAI_API_KEY" not in result + + def test_strips_database_url(self): + with patch.dict(os.environ, {"DATABASE_URL": "postgres://secret"}, clear=False): + result = make_safe_env() + assert "DATABASE_URL" not in result + + def test_strips_redis_url(self): + with patch.dict(os.environ, {"REDIS_URL": "redis://secret"}, clear=False): + result = make_safe_env() + assert "REDIS_URL" not in result + + def test_strips_aws_access_key(self): + with patch.dict(os.environ, {"AWS_ACCESS_KEY_ID": "AKIAIOSFODNN7EXAMPLE"}, clear=False): + result = make_safe_env() + assert "AWS_ACCESS_KEY_ID" not in result + + def test_strips_slack_token(self): + with patch.dict(os.environ, {"SLACK_BOT_TOKEN": "xoxb-secret"}, clear=False): + result = make_safe_env() + assert "SLACK_BOT_TOKEN" not in result + + def test_strips_generic_password(self): + with patch.dict(os.environ, {"DB_PASSWORD": "hunter2"}, clear=False): + result = make_safe_env() + assert "DB_PASSWORD" not in result + + def test_strips_generic_secret(self): + with patch.dict(os.environ, {"JWT_SECRET": "supersecret"}, clear=False): + result = make_safe_env() + assert "JWT_SECRET" not in result + + def test_passes_path(self): + with patch.dict(os.environ, {"PATH": "/usr/bin:/bin"}, clear=False): + result = make_safe_env() + assert result.get("PATH") == "/usr/bin:/bin" + + def test_passes_home(self): + with patch.dict(os.environ, {"HOME": "/root"}, clear=False): + result = make_safe_env() + assert result.get("HOME") == "/root" + + def test_passes_lang(self): + with patch.dict(os.environ, {"LANG": "en_US.UTF-8"}, clear=False): + result = make_safe_env() + assert result.get("LANG") == "en_US.UTF-8" + + def test_passes_pythonpath(self): + with patch.dict(os.environ, {"PYTHONPATH": "/app"}, clear=False): + result = make_safe_env() + assert result.get("PYTHONPATH") == "/app" + + def test_passes_workspace_id(self): + with patch.dict(os.environ, {"WORKSPACE_ID": "ws-123"}, clear=False): + result = make_safe_env() + assert result.get("WORKSPACE_ID") == "ws-123" + + def test_passes_workspace_name(self): + with patch.dict(os.environ, {"WORKSPACE_NAME": "my-agent"}, clear=False): + result = make_safe_env() + assert result.get("WORKSPACE_NAME") == "my-agent" + + def test_passes_platform_url(self): + with patch.dict(os.environ, {"PLATFORM_URL": "http://platform:8080"}, clear=False): + result = make_safe_env() + assert result.get("PLATFORM_URL") == "http://platform:8080" + + def test_does_not_mutate_os_environ(self): + """make_safe_env() must be a pure read — os.environ unchanged after call.""" + with patch.dict( + os.environ, + {"ANTHROPIC_API_KEY": "sk-ant-secret", "PATH": "/usr/bin"}, + clear=False, + ): + before = dict(os.environ) + make_safe_env() + after = dict(os.environ) + assert before == after + + def test_returns_dict(self): + result = make_safe_env() + assert isinstance(result, dict) + + def test_extra_allowed_via_parameter(self): + with patch.dict(os.environ, {"MY_SAFE_VAR": "value"}, clear=False): + result = make_safe_env(extra_allowed=["MY_SAFE_VAR"]) + assert result.get("MY_SAFE_VAR") == "value" + + def test_extra_allowed_via_env_var(self): + with patch.dict( + os.environ, + { + "SMOLAGENTS_ENV_EXTRA_ALLOWLIST": "REGION,CLUSTER_NAME", + "REGION": "us-east-1", + "CLUSTER_NAME": "prod", + "ANTHROPIC_API_KEY": "sk-ant-secret", + }, + clear=False, + ): + result = make_safe_env() + assert result.get("REGION") == "us-east-1" + assert result.get("CLUSTER_NAME") == "prod" + assert "ANTHROPIC_API_KEY" not in result + + def test_extra_allowed_env_var_is_case_normalized(self): + """Names in SMOLAGENTS_ENV_EXTRA_ALLOWLIST are uppercased automatically.""" + with patch.dict( + os.environ, + {"SMOLAGENTS_ENV_EXTRA_ALLOWLIST": "my_safe_var", "MY_SAFE_VAR": "hello"}, + clear=False, + ): + result = make_safe_env() + assert result.get("MY_SAFE_VAR") == "hello" + + +# --------------------------------------------------------------------------- +# SafeLocalPythonExecutor — allowlist enforcement during execution +# --------------------------------------------------------------------------- + + +class TestSafeLocalPythonExecutorAllowlist: + """Core security guarantee: secrets absent from os.environ during execution.""" + + def test_secret_absent_during_execution_anthropic(self): + """Injected ANTHROPIC_API_KEY must not be visible to executed code.""" + captured_env: dict = {} + + def _mock_inner(code: str, *args, **kwargs): + # Simulate what agent code would see via os.environ + captured_env.update(os.environ.copy()) + return "" + + executor = SafeLocalPythonExecutor(_inner=_mock_inner) + + with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "sk-ant-secret"}, clear=False): + executor("import os; os.environ.get('ANTHROPIC_API_KEY', '')") + + assert "ANTHROPIC_API_KEY" not in captured_env + + def test_secret_absent_during_execution_gh_token(self): + captured_env: dict = {} + + def _mock_inner(code: str, *args, **kwargs): + captured_env.update(os.environ.copy()) + return "" + + executor = SafeLocalPythonExecutor(_inner=_mock_inner) + + with patch.dict(os.environ, {"GH_TOKEN": "ghp_secret"}, clear=False): + executor("import os; os.environ.get('GH_TOKEN', '')") + + assert "GH_TOKEN" not in captured_env + + def test_secret_absent_during_execution_database_url(self): + captured_env: dict = {} + + def _mock_inner(code: str, *args, **kwargs): + captured_env.update(os.environ.copy()) + return "" + + executor = SafeLocalPythonExecutor(_inner=_mock_inner) + + with patch.dict(os.environ, {"DATABASE_URL": "postgres://secret"}, clear=False): + executor("code") + + assert "DATABASE_URL" not in captured_env + + def test_secret_absent_during_execution_openai_key(self): + captured_env: dict = {} + + def _mock_inner(code: str, *args, **kwargs): + captured_env.update(os.environ.copy()) + + executor = SafeLocalPythonExecutor(_inner=_mock_inner) + + with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-openai"}, clear=False): + executor("code") + + assert "OPENAI_API_KEY" not in captured_env + + def test_multiple_secrets_all_absent(self): + """All secrets must be stripped simultaneously, not just one.""" + captured_env: dict = {} + + def _mock_inner(code: str, *args, **kwargs): + captured_env.update(os.environ.copy()) + + executor = SafeLocalPythonExecutor(_inner=_mock_inner) + + secrets = { + "ANTHROPIC_API_KEY": "sk-ant", + "GH_TOKEN": "ghp_", + "OPENAI_API_KEY": "sk-open", + "DATABASE_URL": "postgres://", + "REDIS_URL": "redis://", + "SLACK_BOT_TOKEN": "xoxb-", + "JWT_SECRET": "secret", + "DB_PASSWORD": "pass", + } + + with patch.dict(os.environ, secrets, clear=False): + executor("code") + + for key in secrets: + assert key not in captured_env, f"{key!r} was visible during execution" + + def test_safe_vars_present_during_execution(self): + """Allowlisted variables must remain visible during execution.""" + captured_env: dict = {} + + def _mock_inner(code: str, *args, **kwargs): + captured_env.update(os.environ.copy()) + + executor = SafeLocalPythonExecutor(_inner=_mock_inner) + + with patch.dict( + os.environ, + { + "PATH": "/usr/bin:/bin", + "WORKSPACE_ID": "ws-abc", + "PYTHONPATH": "/app", + "ANTHROPIC_API_KEY": "sk-ant-secret", + }, + clear=False, + ): + executor("code") + + assert captured_env.get("PATH") == "/usr/bin:/bin" + assert captured_env.get("WORKSPACE_ID") == "ws-abc" + assert captured_env.get("PYTHONPATH") == "/app" + + def test_env_restored_after_execution(self): + """os.environ must be fully restored after __call__ returns.""" + executor = SafeLocalPythonExecutor(_inner=_MockInner()) + + with patch.dict( + os.environ, + {"ANTHROPIC_API_KEY": "sk-ant-secret", "PATH": "/usr/bin"}, + clear=False, + ): + env_before = dict(os.environ) + executor("code") + env_after = dict(os.environ) + + assert env_before == env_after + + def test_env_restored_after_exception(self): + """os.environ must be restored even if the inner executor raises.""" + + def _raises(code: str, *args, **kwargs): + raise RuntimeError("boom") + + executor = SafeLocalPythonExecutor(_inner=_raises) + + with patch.dict( + os.environ, + {"ANTHROPIC_API_KEY": "sk-ant-secret"}, + clear=False, + ): + env_before = dict(os.environ) + with pytest.raises(RuntimeError, match="boom"): + executor("code") + env_after = dict(os.environ) + + assert env_before == env_after + + def test_returns_inner_result(self): + mock_inner = _MockInner(return_value="hello world") + executor = SafeLocalPythonExecutor(_inner=mock_inner) + result = executor("some code") + assert result == "hello world" + + def test_passes_code_to_inner(self): + mock_inner = _MockInner() + executor = SafeLocalPythonExecutor(_inner=mock_inner) + executor("print('hi')") + assert mock_inner.calls == ["print('hi')"] + + +# --------------------------------------------------------------------------- +# SafeLocalPythonExecutor — import restrictions +# --------------------------------------------------------------------------- + + +class TestSafeLocalPythonExecutorImports: + def test_banned_imports_removed_from_authorized(self): + """Banned imports must not appear in the authorized list regardless of what caller passes.""" + executor = SafeLocalPythonExecutor( + additional_imports=["subprocess", "socket", "math"], + _inner=_MockInner(), + ) + for banned in _BANNED_IMPORTS: + assert banned not in executor._authorized_imports, ( + f"{banned!r} must not be in authorized imports" + ) + + def test_safe_imports_present(self): + executor = SafeLocalPythonExecutor(_inner=_MockInner()) + for safe in ["math", "json", "re", "datetime"]: + assert safe in executor._authorized_imports + + def test_additional_safe_import_added(self): + executor = SafeLocalPythonExecutor( + additional_imports=["numpy"], + _inner=_MockInner(), + ) + assert "numpy" in executor._authorized_imports + + def test_banned_list_coverage(self): + """Verify the built-in banned list covers expected attack vectors.""" + expected_banned = {"subprocess", "socket", "ctypes", "importlib", "importlib.util"} + assert expected_banned.issubset(_BANNED_IMPORTS) + + +# --------------------------------------------------------------------------- +# SafeLocalPythonExecutor — thread safety +# --------------------------------------------------------------------------- + + +class TestSafeLocalPythonExecutorThreadSafety: + def test_concurrent_calls_restore_env_correctly(self): + """Two concurrent executions must not corrupt each other's env view.""" + results: list[bool] = [] + errors: list[Exception] = [] + + def _run(secret_key: str, secret_value: str): + captured_env: dict = {} + + def _inner(code: str, *args, **kwargs): + captured_env.update(os.environ.copy()) + + executor = SafeLocalPythonExecutor(_inner=_inner) + try: + with patch.dict(os.environ, {secret_key: secret_value}, clear=False): + executor("code") + # Secret must not be visible during execution + results.append(secret_key not in captured_env) + except Exception as exc: + errors.append(exc) + + threads = [ + threading.Thread(target=_run, args=(f"SECRET_{i}", f"value_{i}")) + for i in range(10) + ] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors, f"Threads raised: {errors}" + assert all(results), "Some threads saw a secret that should have been stripped" + + +# --------------------------------------------------------------------------- +# Allowlist contents +# --------------------------------------------------------------------------- + + +class TestAllowlistContents: + def test_core_vars_in_allowlist(self): + """Spot-check that expected safe vars are on the allowlist.""" + required = {"PATH", "HOME", "LANG", "PYTHONPATH", "WORKSPACE_ID", "WORKSPACE_NAME", "PLATFORM_URL"} + for var in required: + assert var in _SAFE_ENV_ALLOWLIST, f"{var!r} missing from _SAFE_ENV_ALLOWLIST" + + def test_secrets_not_in_allowlist(self): + """Known secret names must NOT appear on the allowlist.""" + forbidden = { + "ANTHROPIC_API_KEY", + "GH_TOKEN", + "GITHUB_TOKEN", + "OPENAI_API_KEY", + "DATABASE_URL", + "REDIS_URL", + "SLACK_BOT_TOKEN", + "JWT_SECRET", + "DB_PASSWORD", + "AWS_SECRET_ACCESS_KEY", + "AWS_ACCESS_KEY_ID", + } + for var in forbidden: + assert var not in _SAFE_ENV_ALLOWLIST, ( + f"{var!r} must NOT be in _SAFE_ENV_ALLOWLIST — it's a secret" + ) diff --git a/workspace-template/tests/test_safe_env.py b/workspace-template/tests/test_safe_env.py new file mode 100644 index 00000000..c5e9056e --- /dev/null +++ b/workspace-template/tests/test_safe_env.py @@ -0,0 +1,202 @@ +"""Tests for denylist-based env sanitization — safe_env.py (issue #826 / #827). + +Covers: + (a) SMOLAGENTS_ENV_DENYLIST keys are stripped + (b) *_API_KEY suffix keys are stripped + (c) *_TOKEN suffix keys are stripped + (d) Non-secret keys (PATH, HOME, …) are preserved + (e) safe_send_message label, truncation, and HTML escaping +""" + +from __future__ import annotations + +import os +from unittest.mock import MagicMock, patch + +import pytest + +from adapters.smolagents.safe_env import ( + SMOLAGENTS_ENV_DENYLIST, + make_safe_env, +) +from adapters.smolagents.send_message_wrapper import safe_send_message + + +# --------------------------------------------------------------------------- +# make_safe_env — denylist-based +# --------------------------------------------------------------------------- + + +class TestMakeSafeEnvDenylist: + """(a) Explicit denylist keys are removed.""" + + @pytest.mark.parametrize("key", sorted(SMOLAGENTS_ENV_DENYLIST)) + def test_denylist_key_stripped(self, key: str): + with patch.dict(os.environ, {key: "secret-value"}, clear=False): + result = make_safe_env() + assert key not in result, f"Denylist key {key!r} must be stripped" + + def test_all_denylist_keys_stripped_simultaneously(self): + secrets = {k: "secret" for k in SMOLAGENTS_ENV_DENYLIST} + with patch.dict(os.environ, secrets, clear=False): + result = make_safe_env() + for key in SMOLAGENTS_ENV_DENYLIST: + assert key not in result + + +class TestMakeSafeEnvApiKeySuffix: + """(b) Keys ending with _API_KEY are stripped.""" + + def test_openai_api_key(self): + with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-openai"}, clear=False): + assert "OPENAI_API_KEY" not in make_safe_env() + + def test_custom_api_key_suffix(self): + with patch.dict(os.environ, {"MY_CUSTOM_SERVICE_API_KEY": "abc123"}, clear=False): + assert "MY_CUSTOM_SERVICE_API_KEY" not in make_safe_env() + + def test_arbitrary_api_key_suffix(self): + with patch.dict(os.environ, {"FOOBAR_API_KEY": "secret"}, clear=False): + assert "FOOBAR_API_KEY" not in make_safe_env() + + +class TestMakeSafeEnvTokenSuffix: + """(c) Keys ending with _TOKEN are stripped.""" + + def test_gh_token(self): + with patch.dict(os.environ, {"GH_TOKEN": "ghp_secret"}, clear=False): + assert "GH_TOKEN" not in make_safe_env() + + def test_github_token(self): + with patch.dict(os.environ, {"GITHUB_TOKEN": "ghp_secret"}, clear=False): + assert "GITHUB_TOKEN" not in make_safe_env() + + def test_custom_token_suffix(self): + with patch.dict(os.environ, {"MY_SERVICE_TOKEN": "tok_abc"}, clear=False): + assert "MY_SERVICE_TOKEN" not in make_safe_env() + + def test_arbitrary_token_suffix(self): + with patch.dict(os.environ, {"INTERNAL_ACCESS_TOKEN": "secret"}, clear=False): + assert "INTERNAL_ACCESS_TOKEN" not in make_safe_env() + + +class TestMakeSafeEnvPreservesNonSecrets: + """(d) Non-secret keys are preserved.""" + + def test_preserves_path(self): + with patch.dict(os.environ, {"PATH": "/usr/bin:/bin"}, clear=False): + result = make_safe_env() + assert result.get("PATH") == "/usr/bin:/bin" + + def test_preserves_home(self): + with patch.dict(os.environ, {"HOME": "/home/agent"}, clear=False): + result = make_safe_env() + assert result.get("HOME") == "/home/agent" + + def test_preserves_workspace_id(self): + with patch.dict(os.environ, {"WORKSPACE_ID": "ws-abc123"}, clear=False): + result = make_safe_env() + assert result.get("WORKSPACE_ID") == "ws-abc123" + + def test_preserves_pythonpath(self): + with patch.dict(os.environ, {"PYTHONPATH": "/app"}, clear=False): + result = make_safe_env() + assert result.get("PYTHONPATH") == "/app" + + def test_preserves_lang(self): + with patch.dict(os.environ, {"LANG": "en_US.UTF-8"}, clear=False): + result = make_safe_env() + assert result.get("LANG") == "en_US.UTF-8" + + def test_does_not_mutate_os_environ(self): + """make_safe_env must never write back to os.environ.""" + with patch.dict( + os.environ, + {"ANTHROPIC_API_KEY": "sk-ant-secret", "PATH": "/usr/bin"}, + clear=False, + ): + before = dict(os.environ) + make_safe_env() + after = dict(os.environ) + assert before == after + + def test_returns_dict(self): + assert isinstance(make_safe_env(), dict) + + +# --------------------------------------------------------------------------- +# safe_send_message — label, truncation, HTML escaping +# --------------------------------------------------------------------------- + + +class TestSafeSendMessage: + def _capture(self): + """Return a mock send_fn and its captured calls.""" + fn = MagicMock() + return fn + + def test_label_prefix_added(self): + fn = self._capture() + safe_send_message("hello", fn) + fn.assert_called_once() + payload = fn.call_args[0][0] + assert payload.startswith("[smolagents]"), f"Missing label: {payload!r}" + + def test_label_prefix_followed_by_content(self): + fn = self._capture() + safe_send_message("world", fn) + payload = fn.call_args[0][0] + assert "world" in payload + + def test_truncates_at_2000_chars(self): + fn = self._capture() + long_text = "a" * 3000 + safe_send_message(long_text, fn) + payload = fn.call_args[0][0] + # The user content portion must be capped; label adds a few chars on top + # Total len = len("[smolagents] ") + 2000 + assert len(payload) <= len("[smolagents] ") + 2000 + + def test_short_message_not_truncated(self): + fn = self._capture() + safe_send_message("short", fn) + payload = fn.call_args[0][0] + assert "short" in payload + + def test_html_entities_escaped(self): + fn = self._capture() + safe_send_message("", fn) + payload = fn.call_args[0][0] + assert "