Merge pull request #891 from Molecule-AI/fix/issue-826-smol-executor-env-sanitization
feat(security): denylist env sanitization + safe messaging for smolagents
This commit is contained in:
commit
c65150edf6
32
workspace-template/adapters/smolagents/__init__.py
Normal file
32
workspace-template/adapters/smolagents/__init__.py
Normal file
@ -0,0 +1,32 @@
|
||||
"""Smolagents adapter for Molecule AI workspace runtime.
|
||||
|
||||
Provides env sanitization and safe executor/messaging primitives for use
|
||||
with HuggingFace's smolagents library.
|
||||
|
||||
Two env-sanitization strategies are available:
|
||||
|
||||
* **Allowlist** (recommended) — :mod:`adapters.smolagents.env_sanitize`:
|
||||
only explicitly-safe variables pass through. Stricter but requires keeping
|
||||
the allowlist up-to-date as new safe vars are needed.
|
||||
|
||||
* **Denylist** (simple) — :mod:`adapters.smolagents.safe_env`:
|
||||
well-known secret names plus ``*_API_KEY`` / ``*_TOKEN`` suffix patterns
|
||||
are stripped. Easier to start with; less exhaustive.
|
||||
|
||||
Quick start::
|
||||
|
||||
# Allowlist approach (stricter)
|
||||
from adapters.smolagents.env_sanitize import make_safe_env, SafeLocalPythonExecutor
|
||||
|
||||
# Denylist approach (simpler)
|
||||
from adapters.smolagents.safe_env import make_safe_env
|
||||
|
||||
# Safe messaging
|
||||
from adapters.smolagents.send_message_wrapper import safe_send_message
|
||||
"""
|
||||
|
||||
# Re-export the allowlist-based make_safe_env as the default (most secure).
|
||||
from adapters.smolagents.env_sanitize import SafeLocalPythonExecutor, make_safe_env
|
||||
from adapters.smolagents.send_message_wrapper import safe_send_message
|
||||
|
||||
__all__ = ["make_safe_env", "SafeLocalPythonExecutor", "safe_send_message"]
|
||||
226
workspace-template/adapters/smolagents/env_sanitize.py
Normal file
226
workspace-template/adapters/smolagents/env_sanitize.py
Normal file
@ -0,0 +1,226 @@
|
||||
"""Allowlist-based environment sanitization for smolagents (#826 — C3 CRITICAL).
|
||||
|
||||
Security model
|
||||
--------------
|
||||
We use an **allowlist** (not a denylist) — only variables explicitly
|
||||
enumerated as safe are passed through to agent-executed code. Any key not
|
||||
on the list is silently dropped.
|
||||
|
||||
This is intentionally strict: adding a new safe variable is a deliberate
|
||||
engineering act that surfaces in code review, rather than hoping a regex
|
||||
denylist catches every new secret name.
|
||||
|
||||
Thread safety
|
||||
-------------
|
||||
``SafeLocalPythonExecutor.__call__`` mutates ``os.environ`` temporarily.
|
||||
``_ENV_PATCH_LOCK`` serialises concurrent calls so simultaneous executions
|
||||
do not see each other's env patches.
|
||||
|
||||
Extending the allowlist
|
||||
-----------------------
|
||||
Set ``SMOLAGENTS_ENV_EXTRA_ALLOWLIST`` to a comma-separated list of
|
||||
additional uppercase env var names that should be passed through. This is
|
||||
intended for workspace-specific non-secret variables (e.g. ``WORKSPACE_ID``
|
||||
that you know are safe):
|
||||
|
||||
SMOLAGENTS_ENV_EXTRA_ALLOWLIST="MY_COMPANY_ENV,REGION"
|
||||
|
||||
Never add secret names here — use workspace secrets injection instead.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import threading
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Allowlist configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Core safe env variables — non-secret system and runtime variables that
|
||||
# agent code may legitimately need (e.g. PATH for subprocess-free tools,
|
||||
# PYTHONPATH for module resolution, TZ for datetime ops).
|
||||
_SAFE_ENV_ALLOWLIST: frozenset = frozenset(
|
||||
[
|
||||
# Shell / system fundamentals
|
||||
"PATH",
|
||||
"HOME",
|
||||
"USER",
|
||||
"LOGNAME",
|
||||
"SHELL",
|
||||
"TERM",
|
||||
"TZ",
|
||||
"TMPDIR",
|
||||
"TEMP",
|
||||
"TMP",
|
||||
# Language / locale
|
||||
"LANG",
|
||||
"LANGUAGE",
|
||||
"LC_ALL",
|
||||
"LC_CTYPE",
|
||||
"LC_MESSAGES",
|
||||
"LC_NUMERIC",
|
||||
"LC_TIME",
|
||||
# Python runtime
|
||||
"PYTHONPATH",
|
||||
"PYTHONHOME",
|
||||
"PYTHONDONTWRITEBYTECODE",
|
||||
"PYTHONUNBUFFERED",
|
||||
"PYTHONIOENCODING",
|
||||
# Molecule workspace non-secret identity vars
|
||||
"WORKSPACE_ID",
|
||||
"WORKSPACE_NAME",
|
||||
"PLATFORM_URL",
|
||||
]
|
||||
)
|
||||
|
||||
# Imports permanently excluded from the executor's authorized list.
|
||||
# These are well-known sandbox-escape vectors.
|
||||
_BANNED_IMPORTS: frozenset = frozenset(
|
||||
["subprocess", "socket", "ctypes", "importlib", "importlib.util"]
|
||||
)
|
||||
|
||||
# Baseline imports every SafeLocalPythonExecutor allows — pure-computation
|
||||
# modules with no I/O escape surface.
|
||||
_BASELINE_SAFE_IMPORTS: List[str] = [
|
||||
"math",
|
||||
"json",
|
||||
"re",
|
||||
"datetime",
|
||||
"collections",
|
||||
"itertools",
|
||||
"functools",
|
||||
"typing",
|
||||
"string",
|
||||
"textwrap",
|
||||
"decimal",
|
||||
"fractions",
|
||||
"statistics",
|
||||
"random",
|
||||
"hashlib",
|
||||
"base64",
|
||||
"urllib.parse",
|
||||
"copy",
|
||||
"dataclasses",
|
||||
"enum",
|
||||
"abc",
|
||||
"io",
|
||||
]
|
||||
|
||||
# Thread lock for env patching
|
||||
_ENV_PATCH_LOCK = threading.Lock()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def make_safe_env(
|
||||
extra_allowed: Optional[List[str]] = None,
|
||||
) -> Dict[str, str]:
|
||||
"""Return a *copy* of the environment containing only allowlisted keys.
|
||||
|
||||
``os.environ`` is **never mutated** by this function.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
extra_allowed:
|
||||
Additional variable names to include beyond the built-in allowlist.
|
||||
Also merged with the ``SMOLAGENTS_ENV_EXTRA_ALLOWLIST`` env var.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
A copy of ``os.environ`` filtered to allowlisted keys only.
|
||||
Keys not on the list are silently dropped.
|
||||
"""
|
||||
allowed = set(_SAFE_ENV_ALLOWLIST)
|
||||
|
||||
# Merge caller-provided extras
|
||||
if extra_allowed:
|
||||
allowed.update(k.upper() for k in extra_allowed)
|
||||
|
||||
# Merge env-var-configured extras
|
||||
env_extra = os.environ.get("SMOLAGENTS_ENV_EXTRA_ALLOWLIST", "")
|
||||
if env_extra:
|
||||
for key in env_extra.split(","):
|
||||
key = key.strip().upper()
|
||||
if key:
|
||||
allowed.add(key)
|
||||
|
||||
return {k: v for k, v in os.environ.items() if k in allowed}
|
||||
|
||||
|
||||
class SafeLocalPythonExecutor:
|
||||
"""Allowlist-gated wrapper around smolagents ``LocalPythonExecutor``.
|
||||
|
||||
Guarantees that agent-generated code cannot read secret environment
|
||||
variables (``ANTHROPIC_API_KEY``, ``GH_TOKEN``, ``DATABASE_URL``, etc.)
|
||||
because they are absent from ``os.environ`` during execution.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
additional_imports:
|
||||
Extra module names to allow beyond ``_BASELINE_SAFE_IMPORTS``.
|
||||
``_BANNED_IMPORTS`` takes precedence — listed names are silently
|
||||
removed.
|
||||
extra_allowed_env:
|
||||
Extra variable names to pass through beyond the core allowlist.
|
||||
_inner:
|
||||
Inject a mock ``LocalPythonExecutor`` for tests. When ``None``,
|
||||
the real smolagents executor is constructed lazily.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
additional_imports: Optional[List[str]] = None,
|
||||
extra_allowed_env: Optional[List[str]] = None,
|
||||
*,
|
||||
_inner: Any = None,
|
||||
) -> None:
|
||||
# Compute final import list (baseline + extras − banned)
|
||||
combined = list(_BASELINE_SAFE_IMPORTS)
|
||||
if additional_imports:
|
||||
for imp in additional_imports:
|
||||
if imp not in _BANNED_IMPORTS:
|
||||
combined.append(imp)
|
||||
|
||||
self._authorized_imports: List[str] = combined
|
||||
self._extra_allowed_env: Optional[List[str]] = extra_allowed_env
|
||||
self._inner = _inner # may be None until first call
|
||||
|
||||
def _get_inner(self) -> Any:
|
||||
"""Lazy-construct the real executor on first use (avoids import errors in tests)."""
|
||||
if self._inner is None:
|
||||
from smolagents import LocalPythonExecutor # type: ignore[import]
|
||||
|
||||
self._inner = LocalPythonExecutor(
|
||||
additional_authorized_imports=self._authorized_imports
|
||||
)
|
||||
return self._inner
|
||||
|
||||
def __call__(self, code: str, *args: Any, **kwargs: Any) -> Any:
|
||||
"""Execute ``code`` with only allowlisted env vars visible.
|
||||
|
||||
All keys not on the allowlist are removed from ``os.environ`` for
|
||||
the duration of execution and restored afterward, even on exception.
|
||||
The lock ensures thread safety across concurrent calls.
|
||||
"""
|
||||
safe_env = make_safe_env(self._extra_allowed_env)
|
||||
inner = self._get_inner()
|
||||
|
||||
with _ENV_PATCH_LOCK:
|
||||
# Snapshot full current env
|
||||
original_env = dict(os.environ)
|
||||
# Remove everything not in the safe set
|
||||
keys_to_remove = [k for k in os.environ if k not in safe_env]
|
||||
for k in keys_to_remove:
|
||||
del os.environ[k]
|
||||
try:
|
||||
return inner(code, *args, **kwargs)
|
||||
finally:
|
||||
# Always restore
|
||||
os.environ.clear()
|
||||
os.environ.update(original_env)
|
||||
61
workspace-template/adapters/smolagents/safe_env.py
Normal file
61
workspace-template/adapters/smolagents/safe_env.py
Normal file
@ -0,0 +1,61 @@
|
||||
"""Denylist-based environment sanitization for smolagents (issue #826 — C3 CRITICAL).
|
||||
|
||||
This module provides a simple denylist approach: well-known secret variable
|
||||
names plus ``*_API_KEY`` and ``*_TOKEN`` suffix patterns are stripped before
|
||||
env is passed to agent-executed code.
|
||||
|
||||
For a stricter allowlist-based alternative that only passes explicitly-safe
|
||||
variables through, see :mod:`adapters.smolagents.env_sanitize`.
|
||||
|
||||
Usage::
|
||||
|
||||
from adapters.smolagents.safe_env import make_safe_env
|
||||
|
||||
executor = LocalPythonExecutor(...)
|
||||
# Pass only the sanitised env to the subprocess / exec context:
|
||||
safe = make_safe_env()
|
||||
"""
|
||||
|
||||
import copy
|
||||
import os
|
||||
|
||||
# Named API keys and tokens known to be used by smolagents / LLM clients.
|
||||
# These are removed regardless of the suffix-pattern below.
|
||||
SMOLAGENTS_ENV_DENYLIST: frozenset = frozenset(
|
||||
{
|
||||
"OPENAI_API_KEY",
|
||||
"ANTHROPIC_API_KEY",
|
||||
"GROQ_API_KEY",
|
||||
"CEREBRAS_API_KEY",
|
||||
"QIANFAN_API_KEY",
|
||||
"LANGFUSE_SECRET_KEY",
|
||||
"LANGFUSE_PUBLIC_KEY",
|
||||
"HF_TOKEN",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def make_safe_env() -> dict:
|
||||
"""Return a sanitised copy of ``os.environ`` with secrets removed.
|
||||
|
||||
Removes any key that:
|
||||
- Is in :data:`SMOLAGENTS_ENV_DENYLIST`, OR
|
||||
- Ends with ``_API_KEY``, OR
|
||||
- Ends with ``_TOKEN``
|
||||
|
||||
``os.environ`` is **never mutated** — a fresh ``dict`` copy is returned.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict
|
||||
A copy of the current environment with secret keys removed.
|
||||
"""
|
||||
env = copy.copy(dict(os.environ))
|
||||
for key in list(env.keys()):
|
||||
if (
|
||||
key in SMOLAGENTS_ENV_DENYLIST
|
||||
or key.endswith("_API_KEY")
|
||||
or key.endswith("_TOKEN")
|
||||
):
|
||||
del env[key]
|
||||
return env
|
||||
@ -0,0 +1,71 @@
|
||||
"""Safe send_message wrapper for smolagents (issue #827 — C1 HIGH).
|
||||
|
||||
Prevents social-engineering attacks where agent-generated content could
|
||||
impersonate platform messages, inject HTML, or flood the user chat.
|
||||
|
||||
Guarantees
|
||||
----------
|
||||
1. Every message is prefixed with ``[smolagents]`` so recipients can
|
||||
attribute it to the agent and cannot be mistaken for platform UI.
|
||||
2. Truncated to 2000 characters to prevent log/UI floods.
|
||||
3. HTML entities (``<``, ``>``, ``&``, ``"``, ``'``) are escaped so
|
||||
rendered UIs that interpret HTML cannot be injected into.
|
||||
|
||||
Usage::
|
||||
|
||||
from adapters.smolagents.send_message_wrapper import safe_send_message
|
||||
|
||||
safe_send_message("Hello world", send_fn=platform_client.send)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import html
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Maximum character length for the *user-visible* portion of the message
|
||||
# (label prefix does not count toward this cap).
|
||||
_MAX_TEXT_LEN: int = 2000
|
||||
|
||||
# Label prepended to every outbound message.
|
||||
_LABEL: str = "[smolagents]"
|
||||
|
||||
|
||||
def safe_send_message(text: str, send_fn) -> None:
|
||||
"""Sanitise *text* and deliver it via *send_fn*.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
text:
|
||||
The raw message text produced by the agent.
|
||||
send_fn:
|
||||
Callable that delivers the message (e.g. ``platform_client.send``
|
||||
or a WebSocket broadcast function). Called with the final,
|
||||
sanitised string as its sole positional argument.
|
||||
|
||||
Side effects
|
||||
------------
|
||||
- Logs a warning when truncation occurs.
|
||||
- Logs a debug entry with the final payload length.
|
||||
"""
|
||||
if not isinstance(text, str):
|
||||
text = str(text)
|
||||
|
||||
# Strip HTML entities to prevent injection into rendered UIs.
|
||||
sanitised = html.escape(text, quote=True)
|
||||
|
||||
# Truncate to cap (before adding label so cap applies to content).
|
||||
if len(sanitised) > _MAX_TEXT_LEN:
|
||||
logger.warning(
|
||||
"safe_send_message: truncating message from %d to %d chars",
|
||||
len(sanitised),
|
||||
_MAX_TEXT_LEN,
|
||||
)
|
||||
sanitised = sanitised[:_MAX_TEXT_LEN]
|
||||
|
||||
payload = f"{_LABEL} {sanitised}"
|
||||
|
||||
logger.debug("safe_send_message: delivering %d-char payload", len(payload))
|
||||
send_fn(payload)
|
||||
0
workspace-template/tests/adapters/__init__.py
Normal file
0
workspace-template/tests/adapters/__init__.py
Normal file
@ -0,0 +1,446 @@
|
||||
"""Tests for allowlist-based env sanitization (issue #826 — C3 CRITICAL).
|
||||
|
||||
All tests patch os.environ directly — the module under test must never
|
||||
mutate the real process env outside of SafeLocalPythonExecutor.__call__,
|
||||
and even there it must restore the original env on exit.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import threading
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Import directly from submodule to avoid any sys.modules stub side-effects
|
||||
from adapters.smolagents.env_sanitize import (
|
||||
SafeLocalPythonExecutor,
|
||||
_BANNED_IMPORTS,
|
||||
_BASELINE_SAFE_IMPORTS,
|
||||
_SAFE_ENV_ALLOWLIST,
|
||||
make_safe_env,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _MockInner:
|
||||
"""Captures the code string passed to it; returns a configurable result."""
|
||||
|
||||
def __init__(self, return_value: Any = None):
|
||||
self.calls: list[str] = []
|
||||
self.return_value = return_value
|
||||
|
||||
def __call__(self, code: str, *args: Any, **kwargs: Any) -> Any:
|
||||
self.calls.append(code)
|
||||
return self.return_value
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# make_safe_env() — pure function tests (os.environ never mutated)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMakeSafeEnv:
|
||||
def test_strips_anthropic_api_key(self):
|
||||
with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "sk-ant-secret"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert "ANTHROPIC_API_KEY" not in result
|
||||
|
||||
def test_strips_gh_token(self):
|
||||
with patch.dict(os.environ, {"GH_TOKEN": "ghp_secret"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert "GH_TOKEN" not in result
|
||||
|
||||
def test_strips_openai_api_key(self):
|
||||
with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-openai"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert "OPENAI_API_KEY" not in result
|
||||
|
||||
def test_strips_database_url(self):
|
||||
with patch.dict(os.environ, {"DATABASE_URL": "postgres://secret"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert "DATABASE_URL" not in result
|
||||
|
||||
def test_strips_redis_url(self):
|
||||
with patch.dict(os.environ, {"REDIS_URL": "redis://secret"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert "REDIS_URL" not in result
|
||||
|
||||
def test_strips_aws_access_key(self):
|
||||
with patch.dict(os.environ, {"AWS_ACCESS_KEY_ID": "AKIAIOSFODNN7EXAMPLE"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert "AWS_ACCESS_KEY_ID" not in result
|
||||
|
||||
def test_strips_slack_token(self):
|
||||
with patch.dict(os.environ, {"SLACK_BOT_TOKEN": "xoxb-secret"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert "SLACK_BOT_TOKEN" not in result
|
||||
|
||||
def test_strips_generic_password(self):
|
||||
with patch.dict(os.environ, {"DB_PASSWORD": "hunter2"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert "DB_PASSWORD" not in result
|
||||
|
||||
def test_strips_generic_secret(self):
|
||||
with patch.dict(os.environ, {"JWT_SECRET": "supersecret"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert "JWT_SECRET" not in result
|
||||
|
||||
def test_passes_path(self):
|
||||
with patch.dict(os.environ, {"PATH": "/usr/bin:/bin"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert result.get("PATH") == "/usr/bin:/bin"
|
||||
|
||||
def test_passes_home(self):
|
||||
with patch.dict(os.environ, {"HOME": "/root"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert result.get("HOME") == "/root"
|
||||
|
||||
def test_passes_lang(self):
|
||||
with patch.dict(os.environ, {"LANG": "en_US.UTF-8"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert result.get("LANG") == "en_US.UTF-8"
|
||||
|
||||
def test_passes_pythonpath(self):
|
||||
with patch.dict(os.environ, {"PYTHONPATH": "/app"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert result.get("PYTHONPATH") == "/app"
|
||||
|
||||
def test_passes_workspace_id(self):
|
||||
with patch.dict(os.environ, {"WORKSPACE_ID": "ws-123"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert result.get("WORKSPACE_ID") == "ws-123"
|
||||
|
||||
def test_passes_workspace_name(self):
|
||||
with patch.dict(os.environ, {"WORKSPACE_NAME": "my-agent"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert result.get("WORKSPACE_NAME") == "my-agent"
|
||||
|
||||
def test_passes_platform_url(self):
|
||||
with patch.dict(os.environ, {"PLATFORM_URL": "http://platform:8080"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert result.get("PLATFORM_URL") == "http://platform:8080"
|
||||
|
||||
def test_does_not_mutate_os_environ(self):
|
||||
"""make_safe_env() must be a pure read — os.environ unchanged after call."""
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{"ANTHROPIC_API_KEY": "sk-ant-secret", "PATH": "/usr/bin"},
|
||||
clear=False,
|
||||
):
|
||||
before = dict(os.environ)
|
||||
make_safe_env()
|
||||
after = dict(os.environ)
|
||||
assert before == after
|
||||
|
||||
def test_returns_dict(self):
|
||||
result = make_safe_env()
|
||||
assert isinstance(result, dict)
|
||||
|
||||
def test_extra_allowed_via_parameter(self):
|
||||
with patch.dict(os.environ, {"MY_SAFE_VAR": "value"}, clear=False):
|
||||
result = make_safe_env(extra_allowed=["MY_SAFE_VAR"])
|
||||
assert result.get("MY_SAFE_VAR") == "value"
|
||||
|
||||
def test_extra_allowed_via_env_var(self):
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"SMOLAGENTS_ENV_EXTRA_ALLOWLIST": "REGION,CLUSTER_NAME",
|
||||
"REGION": "us-east-1",
|
||||
"CLUSTER_NAME": "prod",
|
||||
"ANTHROPIC_API_KEY": "sk-ant-secret",
|
||||
},
|
||||
clear=False,
|
||||
):
|
||||
result = make_safe_env()
|
||||
assert result.get("REGION") == "us-east-1"
|
||||
assert result.get("CLUSTER_NAME") == "prod"
|
||||
assert "ANTHROPIC_API_KEY" not in result
|
||||
|
||||
def test_extra_allowed_env_var_is_case_normalized(self):
|
||||
"""Names in SMOLAGENTS_ENV_EXTRA_ALLOWLIST are uppercased automatically."""
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{"SMOLAGENTS_ENV_EXTRA_ALLOWLIST": "my_safe_var", "MY_SAFE_VAR": "hello"},
|
||||
clear=False,
|
||||
):
|
||||
result = make_safe_env()
|
||||
assert result.get("MY_SAFE_VAR") == "hello"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SafeLocalPythonExecutor — allowlist enforcement during execution
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSafeLocalPythonExecutorAllowlist:
|
||||
"""Core security guarantee: secrets absent from os.environ during execution."""
|
||||
|
||||
def test_secret_absent_during_execution_anthropic(self):
|
||||
"""Injected ANTHROPIC_API_KEY must not be visible to executed code."""
|
||||
captured_env: dict = {}
|
||||
|
||||
def _mock_inner(code: str, *args, **kwargs):
|
||||
# Simulate what agent code would see via os.environ
|
||||
captured_env.update(os.environ.copy())
|
||||
return ""
|
||||
|
||||
executor = SafeLocalPythonExecutor(_inner=_mock_inner)
|
||||
|
||||
with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "sk-ant-secret"}, clear=False):
|
||||
executor("import os; os.environ.get('ANTHROPIC_API_KEY', '')")
|
||||
|
||||
assert "ANTHROPIC_API_KEY" not in captured_env
|
||||
|
||||
def test_secret_absent_during_execution_gh_token(self):
|
||||
captured_env: dict = {}
|
||||
|
||||
def _mock_inner(code: str, *args, **kwargs):
|
||||
captured_env.update(os.environ.copy())
|
||||
return ""
|
||||
|
||||
executor = SafeLocalPythonExecutor(_inner=_mock_inner)
|
||||
|
||||
with patch.dict(os.environ, {"GH_TOKEN": "ghp_secret"}, clear=False):
|
||||
executor("import os; os.environ.get('GH_TOKEN', '')")
|
||||
|
||||
assert "GH_TOKEN" not in captured_env
|
||||
|
||||
def test_secret_absent_during_execution_database_url(self):
|
||||
captured_env: dict = {}
|
||||
|
||||
def _mock_inner(code: str, *args, **kwargs):
|
||||
captured_env.update(os.environ.copy())
|
||||
return ""
|
||||
|
||||
executor = SafeLocalPythonExecutor(_inner=_mock_inner)
|
||||
|
||||
with patch.dict(os.environ, {"DATABASE_URL": "postgres://secret"}, clear=False):
|
||||
executor("code")
|
||||
|
||||
assert "DATABASE_URL" not in captured_env
|
||||
|
||||
def test_secret_absent_during_execution_openai_key(self):
|
||||
captured_env: dict = {}
|
||||
|
||||
def _mock_inner(code: str, *args, **kwargs):
|
||||
captured_env.update(os.environ.copy())
|
||||
|
||||
executor = SafeLocalPythonExecutor(_inner=_mock_inner)
|
||||
|
||||
with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-openai"}, clear=False):
|
||||
executor("code")
|
||||
|
||||
assert "OPENAI_API_KEY" not in captured_env
|
||||
|
||||
def test_multiple_secrets_all_absent(self):
|
||||
"""All secrets must be stripped simultaneously, not just one."""
|
||||
captured_env: dict = {}
|
||||
|
||||
def _mock_inner(code: str, *args, **kwargs):
|
||||
captured_env.update(os.environ.copy())
|
||||
|
||||
executor = SafeLocalPythonExecutor(_inner=_mock_inner)
|
||||
|
||||
secrets = {
|
||||
"ANTHROPIC_API_KEY": "sk-ant",
|
||||
"GH_TOKEN": "ghp_",
|
||||
"OPENAI_API_KEY": "sk-open",
|
||||
"DATABASE_URL": "postgres://",
|
||||
"REDIS_URL": "redis://",
|
||||
"SLACK_BOT_TOKEN": "xoxb-",
|
||||
"JWT_SECRET": "secret",
|
||||
"DB_PASSWORD": "pass",
|
||||
}
|
||||
|
||||
with patch.dict(os.environ, secrets, clear=False):
|
||||
executor("code")
|
||||
|
||||
for key in secrets:
|
||||
assert key not in captured_env, f"{key!r} was visible during execution"
|
||||
|
||||
def test_safe_vars_present_during_execution(self):
|
||||
"""Allowlisted variables must remain visible during execution."""
|
||||
captured_env: dict = {}
|
||||
|
||||
def _mock_inner(code: str, *args, **kwargs):
|
||||
captured_env.update(os.environ.copy())
|
||||
|
||||
executor = SafeLocalPythonExecutor(_inner=_mock_inner)
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{
|
||||
"PATH": "/usr/bin:/bin",
|
||||
"WORKSPACE_ID": "ws-abc",
|
||||
"PYTHONPATH": "/app",
|
||||
"ANTHROPIC_API_KEY": "sk-ant-secret",
|
||||
},
|
||||
clear=False,
|
||||
):
|
||||
executor("code")
|
||||
|
||||
assert captured_env.get("PATH") == "/usr/bin:/bin"
|
||||
assert captured_env.get("WORKSPACE_ID") == "ws-abc"
|
||||
assert captured_env.get("PYTHONPATH") == "/app"
|
||||
|
||||
def test_env_restored_after_execution(self):
|
||||
"""os.environ must be fully restored after __call__ returns."""
|
||||
executor = SafeLocalPythonExecutor(_inner=_MockInner())
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{"ANTHROPIC_API_KEY": "sk-ant-secret", "PATH": "/usr/bin"},
|
||||
clear=False,
|
||||
):
|
||||
env_before = dict(os.environ)
|
||||
executor("code")
|
||||
env_after = dict(os.environ)
|
||||
|
||||
assert env_before == env_after
|
||||
|
||||
def test_env_restored_after_exception(self):
|
||||
"""os.environ must be restored even if the inner executor raises."""
|
||||
|
||||
def _raises(code: str, *args, **kwargs):
|
||||
raise RuntimeError("boom")
|
||||
|
||||
executor = SafeLocalPythonExecutor(_inner=_raises)
|
||||
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{"ANTHROPIC_API_KEY": "sk-ant-secret"},
|
||||
clear=False,
|
||||
):
|
||||
env_before = dict(os.environ)
|
||||
with pytest.raises(RuntimeError, match="boom"):
|
||||
executor("code")
|
||||
env_after = dict(os.environ)
|
||||
|
||||
assert env_before == env_after
|
||||
|
||||
def test_returns_inner_result(self):
|
||||
mock_inner = _MockInner(return_value="hello world")
|
||||
executor = SafeLocalPythonExecutor(_inner=mock_inner)
|
||||
result = executor("some code")
|
||||
assert result == "hello world"
|
||||
|
||||
def test_passes_code_to_inner(self):
|
||||
mock_inner = _MockInner()
|
||||
executor = SafeLocalPythonExecutor(_inner=mock_inner)
|
||||
executor("print('hi')")
|
||||
assert mock_inner.calls == ["print('hi')"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SafeLocalPythonExecutor — import restrictions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSafeLocalPythonExecutorImports:
|
||||
def test_banned_imports_removed_from_authorized(self):
|
||||
"""Banned imports must not appear in the authorized list regardless of what caller passes."""
|
||||
executor = SafeLocalPythonExecutor(
|
||||
additional_imports=["subprocess", "socket", "math"],
|
||||
_inner=_MockInner(),
|
||||
)
|
||||
for banned in _BANNED_IMPORTS:
|
||||
assert banned not in executor._authorized_imports, (
|
||||
f"{banned!r} must not be in authorized imports"
|
||||
)
|
||||
|
||||
def test_safe_imports_present(self):
|
||||
executor = SafeLocalPythonExecutor(_inner=_MockInner())
|
||||
for safe in ["math", "json", "re", "datetime"]:
|
||||
assert safe in executor._authorized_imports
|
||||
|
||||
def test_additional_safe_import_added(self):
|
||||
executor = SafeLocalPythonExecutor(
|
||||
additional_imports=["numpy"],
|
||||
_inner=_MockInner(),
|
||||
)
|
||||
assert "numpy" in executor._authorized_imports
|
||||
|
||||
def test_banned_list_coverage(self):
|
||||
"""Verify the built-in banned list covers expected attack vectors."""
|
||||
expected_banned = {"subprocess", "socket", "ctypes", "importlib", "importlib.util"}
|
||||
assert expected_banned.issubset(_BANNED_IMPORTS)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# SafeLocalPythonExecutor — thread safety
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSafeLocalPythonExecutorThreadSafety:
|
||||
def test_concurrent_calls_restore_env_correctly(self):
|
||||
"""Two concurrent executions must not corrupt each other's env view."""
|
||||
results: list[bool] = []
|
||||
errors: list[Exception] = []
|
||||
|
||||
def _run(secret_key: str, secret_value: str):
|
||||
captured_env: dict = {}
|
||||
|
||||
def _inner(code: str, *args, **kwargs):
|
||||
captured_env.update(os.environ.copy())
|
||||
|
||||
executor = SafeLocalPythonExecutor(_inner=_inner)
|
||||
try:
|
||||
with patch.dict(os.environ, {secret_key: secret_value}, clear=False):
|
||||
executor("code")
|
||||
# Secret must not be visible during execution
|
||||
results.append(secret_key not in captured_env)
|
||||
except Exception as exc:
|
||||
errors.append(exc)
|
||||
|
||||
threads = [
|
||||
threading.Thread(target=_run, args=(f"SECRET_{i}", f"value_{i}"))
|
||||
for i in range(10)
|
||||
]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
assert not errors, f"Threads raised: {errors}"
|
||||
assert all(results), "Some threads saw a secret that should have been stripped"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Allowlist contents
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAllowlistContents:
|
||||
def test_core_vars_in_allowlist(self):
|
||||
"""Spot-check that expected safe vars are on the allowlist."""
|
||||
required = {"PATH", "HOME", "LANG", "PYTHONPATH", "WORKSPACE_ID", "WORKSPACE_NAME", "PLATFORM_URL"}
|
||||
for var in required:
|
||||
assert var in _SAFE_ENV_ALLOWLIST, f"{var!r} missing from _SAFE_ENV_ALLOWLIST"
|
||||
|
||||
def test_secrets_not_in_allowlist(self):
|
||||
"""Known secret names must NOT appear on the allowlist."""
|
||||
forbidden = {
|
||||
"ANTHROPIC_API_KEY",
|
||||
"GH_TOKEN",
|
||||
"GITHUB_TOKEN",
|
||||
"OPENAI_API_KEY",
|
||||
"DATABASE_URL",
|
||||
"REDIS_URL",
|
||||
"SLACK_BOT_TOKEN",
|
||||
"JWT_SECRET",
|
||||
"DB_PASSWORD",
|
||||
"AWS_SECRET_ACCESS_KEY",
|
||||
"AWS_ACCESS_KEY_ID",
|
||||
}
|
||||
for var in forbidden:
|
||||
assert var not in _SAFE_ENV_ALLOWLIST, (
|
||||
f"{var!r} must NOT be in _SAFE_ENV_ALLOWLIST — it's a secret"
|
||||
)
|
||||
202
workspace-template/tests/test_safe_env.py
Normal file
202
workspace-template/tests/test_safe_env.py
Normal file
@ -0,0 +1,202 @@
|
||||
"""Tests for denylist-based env sanitization — safe_env.py (issue #826 / #827).
|
||||
|
||||
Covers:
|
||||
(a) SMOLAGENTS_ENV_DENYLIST keys are stripped
|
||||
(b) *_API_KEY suffix keys are stripped
|
||||
(c) *_TOKEN suffix keys are stripped
|
||||
(d) Non-secret keys (PATH, HOME, …) are preserved
|
||||
(e) safe_send_message label, truncation, and HTML escaping
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from adapters.smolagents.safe_env import (
|
||||
SMOLAGENTS_ENV_DENYLIST,
|
||||
make_safe_env,
|
||||
)
|
||||
from adapters.smolagents.send_message_wrapper import safe_send_message
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# make_safe_env — denylist-based
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMakeSafeEnvDenylist:
|
||||
"""(a) Explicit denylist keys are removed."""
|
||||
|
||||
@pytest.mark.parametrize("key", sorted(SMOLAGENTS_ENV_DENYLIST))
|
||||
def test_denylist_key_stripped(self, key: str):
|
||||
with patch.dict(os.environ, {key: "secret-value"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert key not in result, f"Denylist key {key!r} must be stripped"
|
||||
|
||||
def test_all_denylist_keys_stripped_simultaneously(self):
|
||||
secrets = {k: "secret" for k in SMOLAGENTS_ENV_DENYLIST}
|
||||
with patch.dict(os.environ, secrets, clear=False):
|
||||
result = make_safe_env()
|
||||
for key in SMOLAGENTS_ENV_DENYLIST:
|
||||
assert key not in result
|
||||
|
||||
|
||||
class TestMakeSafeEnvApiKeySuffix:
|
||||
"""(b) Keys ending with _API_KEY are stripped."""
|
||||
|
||||
def test_openai_api_key(self):
|
||||
with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-openai"}, clear=False):
|
||||
assert "OPENAI_API_KEY" not in make_safe_env()
|
||||
|
||||
def test_custom_api_key_suffix(self):
|
||||
with patch.dict(os.environ, {"MY_CUSTOM_SERVICE_API_KEY": "abc123"}, clear=False):
|
||||
assert "MY_CUSTOM_SERVICE_API_KEY" not in make_safe_env()
|
||||
|
||||
def test_arbitrary_api_key_suffix(self):
|
||||
with patch.dict(os.environ, {"FOOBAR_API_KEY": "secret"}, clear=False):
|
||||
assert "FOOBAR_API_KEY" not in make_safe_env()
|
||||
|
||||
|
||||
class TestMakeSafeEnvTokenSuffix:
|
||||
"""(c) Keys ending with _TOKEN are stripped."""
|
||||
|
||||
def test_gh_token(self):
|
||||
with patch.dict(os.environ, {"GH_TOKEN": "ghp_secret"}, clear=False):
|
||||
assert "GH_TOKEN" not in make_safe_env()
|
||||
|
||||
def test_github_token(self):
|
||||
with patch.dict(os.environ, {"GITHUB_TOKEN": "ghp_secret"}, clear=False):
|
||||
assert "GITHUB_TOKEN" not in make_safe_env()
|
||||
|
||||
def test_custom_token_suffix(self):
|
||||
with patch.dict(os.environ, {"MY_SERVICE_TOKEN": "tok_abc"}, clear=False):
|
||||
assert "MY_SERVICE_TOKEN" not in make_safe_env()
|
||||
|
||||
def test_arbitrary_token_suffix(self):
|
||||
with patch.dict(os.environ, {"INTERNAL_ACCESS_TOKEN": "secret"}, clear=False):
|
||||
assert "INTERNAL_ACCESS_TOKEN" not in make_safe_env()
|
||||
|
||||
|
||||
class TestMakeSafeEnvPreservesNonSecrets:
|
||||
"""(d) Non-secret keys are preserved."""
|
||||
|
||||
def test_preserves_path(self):
|
||||
with patch.dict(os.environ, {"PATH": "/usr/bin:/bin"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert result.get("PATH") == "/usr/bin:/bin"
|
||||
|
||||
def test_preserves_home(self):
|
||||
with patch.dict(os.environ, {"HOME": "/home/agent"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert result.get("HOME") == "/home/agent"
|
||||
|
||||
def test_preserves_workspace_id(self):
|
||||
with patch.dict(os.environ, {"WORKSPACE_ID": "ws-abc123"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert result.get("WORKSPACE_ID") == "ws-abc123"
|
||||
|
||||
def test_preserves_pythonpath(self):
|
||||
with patch.dict(os.environ, {"PYTHONPATH": "/app"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert result.get("PYTHONPATH") == "/app"
|
||||
|
||||
def test_preserves_lang(self):
|
||||
with patch.dict(os.environ, {"LANG": "en_US.UTF-8"}, clear=False):
|
||||
result = make_safe_env()
|
||||
assert result.get("LANG") == "en_US.UTF-8"
|
||||
|
||||
def test_does_not_mutate_os_environ(self):
|
||||
"""make_safe_env must never write back to os.environ."""
|
||||
with patch.dict(
|
||||
os.environ,
|
||||
{"ANTHROPIC_API_KEY": "sk-ant-secret", "PATH": "/usr/bin"},
|
||||
clear=False,
|
||||
):
|
||||
before = dict(os.environ)
|
||||
make_safe_env()
|
||||
after = dict(os.environ)
|
||||
assert before == after
|
||||
|
||||
def test_returns_dict(self):
|
||||
assert isinstance(make_safe_env(), dict)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# safe_send_message — label, truncation, HTML escaping
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSafeSendMessage:
|
||||
def _capture(self):
|
||||
"""Return a mock send_fn and its captured calls."""
|
||||
fn = MagicMock()
|
||||
return fn
|
||||
|
||||
def test_label_prefix_added(self):
|
||||
fn = self._capture()
|
||||
safe_send_message("hello", fn)
|
||||
fn.assert_called_once()
|
||||
payload = fn.call_args[0][0]
|
||||
assert payload.startswith("[smolagents]"), f"Missing label: {payload!r}"
|
||||
|
||||
def test_label_prefix_followed_by_content(self):
|
||||
fn = self._capture()
|
||||
safe_send_message("world", fn)
|
||||
payload = fn.call_args[0][0]
|
||||
assert "world" in payload
|
||||
|
||||
def test_truncates_at_2000_chars(self):
|
||||
fn = self._capture()
|
||||
long_text = "a" * 3000
|
||||
safe_send_message(long_text, fn)
|
||||
payload = fn.call_args[0][0]
|
||||
# The user content portion must be capped; label adds a few chars on top
|
||||
# Total len = len("[smolagents] ") + 2000
|
||||
assert len(payload) <= len("[smolagents] ") + 2000
|
||||
|
||||
def test_short_message_not_truncated(self):
|
||||
fn = self._capture()
|
||||
safe_send_message("short", fn)
|
||||
payload = fn.call_args[0][0]
|
||||
assert "short" in payload
|
||||
|
||||
def test_html_entities_escaped(self):
|
||||
fn = self._capture()
|
||||
safe_send_message("<script>alert('xss')</script>", fn)
|
||||
payload = fn.call_args[0][0]
|
||||
assert "<script>" not in payload
|
||||
assert "<script>" in payload
|
||||
|
||||
def test_ampersand_escaped(self):
|
||||
fn = self._capture()
|
||||
safe_send_message("a & b", fn)
|
||||
payload = fn.call_args[0][0]
|
||||
assert "&" in payload
|
||||
|
||||
def test_double_quote_escaped(self):
|
||||
fn = self._capture()
|
||||
safe_send_message('say "hello"', fn)
|
||||
payload = fn.call_args[0][0]
|
||||
assert """ in payload
|
||||
|
||||
def test_non_str_coerced(self):
|
||||
"""Non-string input must be coerced to str, not raise."""
|
||||
fn = self._capture()
|
||||
safe_send_message(42, fn)
|
||||
fn.assert_called_once()
|
||||
payload = fn.call_args[0][0]
|
||||
assert "42" in payload
|
||||
|
||||
def test_send_fn_called_exactly_once(self):
|
||||
fn = self._capture()
|
||||
safe_send_message("msg", fn)
|
||||
assert fn.call_count == 1
|
||||
|
||||
def test_empty_string_sends_label_only(self):
|
||||
fn = self._capture()
|
||||
safe_send_message("", fn)
|
||||
payload = fn.call_args[0][0]
|
||||
assert payload.strip() == "[smolagents]"
|
||||
Loading…
Reference in New Issue
Block a user