feat(security): denylist env sanitization + safe messaging for smolagents (#826, #827)

Add safe_env.py (denylist-based make_safe_env), send_message_wrapper.py
(label prefix, 2000-char cap, HTML entity escaping) and 33 pytest tests
covering all four security properties. Update __init__.py to re-export
safe_send_message alongside the existing allowlist-based make_safe_env.

Closes #826, closes #827

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Molecule AI Backend Engineer 2026-04-17 23:57:59 +00:00
parent 210c6b5b2c
commit d226094a98
4 changed files with 358 additions and 3 deletions

View File

@ -1,11 +1,32 @@
"""Smolagents adapter for Molecule AI workspace runtime.
Provides env sanitization and safe executor primitives for use with
HuggingFace's smolagents library.
Provides env sanitization and safe executor/messaging primitives for use
with HuggingFace's smolagents library.
Two env-sanitization strategies are available:
* **Allowlist** (recommended) :mod:`adapters.smolagents.env_sanitize`:
only explicitly-safe variables pass through. Stricter but requires keeping
the allowlist up-to-date as new safe vars are needed.
* **Denylist** (simple) :mod:`adapters.smolagents.safe_env`:
well-known secret names plus ``*_API_KEY`` / ``*_TOKEN`` suffix patterns
are stripped. Easier to start with; less exhaustive.
Quick start::
# Allowlist approach (stricter)
from adapters.smolagents.env_sanitize import make_safe_env, SafeLocalPythonExecutor
# Denylist approach (simpler)
from adapters.smolagents.safe_env import make_safe_env
# Safe messaging
from adapters.smolagents.send_message_wrapper import safe_send_message
"""
__all__ = ["make_safe_env", "SafeLocalPythonExecutor"]
# Re-export the allowlist-based make_safe_env as the default (most secure).
from adapters.smolagents.env_sanitize import SafeLocalPythonExecutor, make_safe_env
from adapters.smolagents.send_message_wrapper import safe_send_message
__all__ = ["make_safe_env", "SafeLocalPythonExecutor", "safe_send_message"]

View File

@ -0,0 +1,61 @@
"""Denylist-based environment sanitization for smolagents (issue #826 — C3 CRITICAL).
This module provides a simple denylist approach: well-known secret variable
names plus ``*_API_KEY`` and ``*_TOKEN`` suffix patterns are stripped before
env is passed to agent-executed code.
For a stricter allowlist-based alternative that only passes explicitly-safe
variables through, see :mod:`adapters.smolagents.env_sanitize`.
Usage::
from adapters.smolagents.safe_env import make_safe_env
executor = LocalPythonExecutor(...)
# Pass only the sanitised env to the subprocess / exec context:
safe = make_safe_env()
"""
import copy
import os
# Named API keys and tokens known to be used by smolagents / LLM clients.
# These are removed regardless of the suffix-pattern below.
SMOLAGENTS_ENV_DENYLIST: frozenset = frozenset(
{
"OPENAI_API_KEY",
"ANTHROPIC_API_KEY",
"GROQ_API_KEY",
"CEREBRAS_API_KEY",
"QIANFAN_API_KEY",
"LANGFUSE_SECRET_KEY",
"LANGFUSE_PUBLIC_KEY",
"HF_TOKEN",
}
)
def make_safe_env() -> dict:
"""Return a sanitised copy of ``os.environ`` with secrets removed.
Removes any key that:
- Is in :data:`SMOLAGENTS_ENV_DENYLIST`, OR
- Ends with ``_API_KEY``, OR
- Ends with ``_TOKEN``
``os.environ`` is **never mutated** a fresh ``dict`` copy is returned.
Returns
-------
dict
A copy of the current environment with secret keys removed.
"""
env = copy.copy(dict(os.environ))
for key in list(env.keys()):
if (
key in SMOLAGENTS_ENV_DENYLIST
or key.endswith("_API_KEY")
or key.endswith("_TOKEN")
):
del env[key]
return env

View File

@ -0,0 +1,71 @@
"""Safe send_message wrapper for smolagents (issue #827 — C1 HIGH).
Prevents social-engineering attacks where agent-generated content could
impersonate platform messages, inject HTML, or flood the user chat.
Guarantees
----------
1. Every message is prefixed with ``[smolagents]`` so recipients can
attribute it to the agent and cannot be mistaken for platform UI.
2. Truncated to 2000 characters to prevent log/UI floods.
3. HTML entities (``<``, ``>``, ``&``, ``"``, ``'``) are escaped so
rendered UIs that interpret HTML cannot be injected into.
Usage::
from adapters.smolagents.send_message_wrapper import safe_send_message
safe_send_message("Hello world", send_fn=platform_client.send)
"""
from __future__ import annotations
import html
import logging
logger = logging.getLogger(__name__)
# Maximum character length for the *user-visible* portion of the message
# (label prefix does not count toward this cap).
_MAX_TEXT_LEN: int = 2000
# Label prepended to every outbound message.
_LABEL: str = "[smolagents]"
def safe_send_message(text: str, send_fn) -> None:
"""Sanitise *text* and deliver it via *send_fn*.
Parameters
----------
text:
The raw message text produced by the agent.
send_fn:
Callable that delivers the message (e.g. ``platform_client.send``
or a WebSocket broadcast function). Called with the final,
sanitised string as its sole positional argument.
Side effects
------------
- Logs a warning when truncation occurs.
- Logs a debug entry with the final payload length.
"""
if not isinstance(text, str):
text = str(text)
# Strip HTML entities to prevent injection into rendered UIs.
sanitised = html.escape(text, quote=True)
# Truncate to cap (before adding label so cap applies to content).
if len(sanitised) > _MAX_TEXT_LEN:
logger.warning(
"safe_send_message: truncating message from %d to %d chars",
len(sanitised),
_MAX_TEXT_LEN,
)
sanitised = sanitised[:_MAX_TEXT_LEN]
payload = f"{_LABEL} {sanitised}"
logger.debug("safe_send_message: delivering %d-char payload", len(payload))
send_fn(payload)

View File

@ -0,0 +1,202 @@
"""Tests for denylist-based env sanitization — safe_env.py (issue #826 / #827).
Covers:
(a) SMOLAGENTS_ENV_DENYLIST keys are stripped
(b) *_API_KEY suffix keys are stripped
(c) *_TOKEN suffix keys are stripped
(d) Non-secret keys (PATH, HOME, ) are preserved
(e) safe_send_message label, truncation, and HTML escaping
"""
from __future__ import annotations
import os
from unittest.mock import MagicMock, patch
import pytest
from adapters.smolagents.safe_env import (
SMOLAGENTS_ENV_DENYLIST,
make_safe_env,
)
from adapters.smolagents.send_message_wrapper import safe_send_message
# ---------------------------------------------------------------------------
# make_safe_env — denylist-based
# ---------------------------------------------------------------------------
class TestMakeSafeEnvDenylist:
"""(a) Explicit denylist keys are removed."""
@pytest.mark.parametrize("key", sorted(SMOLAGENTS_ENV_DENYLIST))
def test_denylist_key_stripped(self, key: str):
with patch.dict(os.environ, {key: "secret-value"}, clear=False):
result = make_safe_env()
assert key not in result, f"Denylist key {key!r} must be stripped"
def test_all_denylist_keys_stripped_simultaneously(self):
secrets = {k: "secret" for k in SMOLAGENTS_ENV_DENYLIST}
with patch.dict(os.environ, secrets, clear=False):
result = make_safe_env()
for key in SMOLAGENTS_ENV_DENYLIST:
assert key not in result
class TestMakeSafeEnvApiKeySuffix:
"""(b) Keys ending with _API_KEY are stripped."""
def test_openai_api_key(self):
with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-openai"}, clear=False):
assert "OPENAI_API_KEY" not in make_safe_env()
def test_custom_api_key_suffix(self):
with patch.dict(os.environ, {"MY_CUSTOM_SERVICE_API_KEY": "abc123"}, clear=False):
assert "MY_CUSTOM_SERVICE_API_KEY" not in make_safe_env()
def test_arbitrary_api_key_suffix(self):
with patch.dict(os.environ, {"FOOBAR_API_KEY": "secret"}, clear=False):
assert "FOOBAR_API_KEY" not in make_safe_env()
class TestMakeSafeEnvTokenSuffix:
"""(c) Keys ending with _TOKEN are stripped."""
def test_gh_token(self):
with patch.dict(os.environ, {"GH_TOKEN": "ghp_secret"}, clear=False):
assert "GH_TOKEN" not in make_safe_env()
def test_github_token(self):
with patch.dict(os.environ, {"GITHUB_TOKEN": "ghp_secret"}, clear=False):
assert "GITHUB_TOKEN" not in make_safe_env()
def test_custom_token_suffix(self):
with patch.dict(os.environ, {"MY_SERVICE_TOKEN": "tok_abc"}, clear=False):
assert "MY_SERVICE_TOKEN" not in make_safe_env()
def test_arbitrary_token_suffix(self):
with patch.dict(os.environ, {"INTERNAL_ACCESS_TOKEN": "secret"}, clear=False):
assert "INTERNAL_ACCESS_TOKEN" not in make_safe_env()
class TestMakeSafeEnvPreservesNonSecrets:
"""(d) Non-secret keys are preserved."""
def test_preserves_path(self):
with patch.dict(os.environ, {"PATH": "/usr/bin:/bin"}, clear=False):
result = make_safe_env()
assert result.get("PATH") == "/usr/bin:/bin"
def test_preserves_home(self):
with patch.dict(os.environ, {"HOME": "/home/agent"}, clear=False):
result = make_safe_env()
assert result.get("HOME") == "/home/agent"
def test_preserves_workspace_id(self):
with patch.dict(os.environ, {"WORKSPACE_ID": "ws-abc123"}, clear=False):
result = make_safe_env()
assert result.get("WORKSPACE_ID") == "ws-abc123"
def test_preserves_pythonpath(self):
with patch.dict(os.environ, {"PYTHONPATH": "/app"}, clear=False):
result = make_safe_env()
assert result.get("PYTHONPATH") == "/app"
def test_preserves_lang(self):
with patch.dict(os.environ, {"LANG": "en_US.UTF-8"}, clear=False):
result = make_safe_env()
assert result.get("LANG") == "en_US.UTF-8"
def test_does_not_mutate_os_environ(self):
"""make_safe_env must never write back to os.environ."""
with patch.dict(
os.environ,
{"ANTHROPIC_API_KEY": "sk-ant-secret", "PATH": "/usr/bin"},
clear=False,
):
before = dict(os.environ)
make_safe_env()
after = dict(os.environ)
assert before == after
def test_returns_dict(self):
assert isinstance(make_safe_env(), dict)
# ---------------------------------------------------------------------------
# safe_send_message — label, truncation, HTML escaping
# ---------------------------------------------------------------------------
class TestSafeSendMessage:
def _capture(self):
"""Return a mock send_fn and its captured calls."""
fn = MagicMock()
return fn
def test_label_prefix_added(self):
fn = self._capture()
safe_send_message("hello", fn)
fn.assert_called_once()
payload = fn.call_args[0][0]
assert payload.startswith("[smolagents]"), f"Missing label: {payload!r}"
def test_label_prefix_followed_by_content(self):
fn = self._capture()
safe_send_message("world", fn)
payload = fn.call_args[0][0]
assert "world" in payload
def test_truncates_at_2000_chars(self):
fn = self._capture()
long_text = "a" * 3000
safe_send_message(long_text, fn)
payload = fn.call_args[0][0]
# The user content portion must be capped; label adds a few chars on top
# Total len = len("[smolagents] ") + 2000
assert len(payload) <= len("[smolagents] ") + 2000
def test_short_message_not_truncated(self):
fn = self._capture()
safe_send_message("short", fn)
payload = fn.call_args[0][0]
assert "short" in payload
def test_html_entities_escaped(self):
fn = self._capture()
safe_send_message("<script>alert('xss')</script>", fn)
payload = fn.call_args[0][0]
assert "<script>" not in payload
assert "&lt;script&gt;" in payload
def test_ampersand_escaped(self):
fn = self._capture()
safe_send_message("a & b", fn)
payload = fn.call_args[0][0]
assert "&amp;" in payload
def test_double_quote_escaped(self):
fn = self._capture()
safe_send_message('say "hello"', fn)
payload = fn.call_args[0][0]
assert "&quot;" in payload
def test_non_str_coerced(self):
"""Non-string input must be coerced to str, not raise."""
fn = self._capture()
safe_send_message(42, fn)
fn.assert_called_once()
payload = fn.call_args[0][0]
assert "42" in payload
def test_send_fn_called_exactly_once(self):
fn = self._capture()
safe_send_message("msg", fn)
assert fn.call_count == 1
def test_empty_string_sends_label_only(self):
fn = self._capture()
safe_send_message("", fn)
payload = fn.call_args[0][0]
assert payload.strip() == "[smolagents]"