security: block token exfiltration patterns (OFFSEC-002) #3
@ -1 +1,2 @@
|
||||
pyyaml>=6.0
|
||||
|
||||
|
||||
@ -4,6 +4,7 @@ import os, sys, yaml
|
||||
|
||||
errors = []
|
||||
|
||||
# 1. plugin.yaml exists
|
||||
if not os.path.isfile("plugin.yaml"):
|
||||
print("::error::plugin.yaml not found at repo root")
|
||||
sys.exit(1)
|
||||
@ -11,23 +12,28 @@ if not os.path.isfile("plugin.yaml"):
|
||||
with open("plugin.yaml") as f:
|
||||
plugin = yaml.safe_load(f)
|
||||
|
||||
# 2. Required fields
|
||||
for field in ["name", "version", "description"]:
|
||||
if not plugin.get(field):
|
||||
errors.append(f"Missing required field: {field}")
|
||||
|
||||
# 3. Version format
|
||||
v = str(plugin.get("version", ""))
|
||||
if v and not all(c in "0123456789." for c in v):
|
||||
errors.append(f"Invalid version format: {v}")
|
||||
|
||||
# 4. Runtimes type
|
||||
runtimes = plugin.get("runtimes")
|
||||
if runtimes is not None and not isinstance(runtimes, list):
|
||||
errors.append(f"runtimes must be a list, got {type(runtimes).__name__}")
|
||||
|
||||
# 5. Has content
|
||||
content_paths = ["SKILL.md", "hooks", "skills", "rules"]
|
||||
found = [p for p in content_paths if os.path.exists(p)]
|
||||
if not found:
|
||||
errors.append("Plugin must contain at least one of: SKILL.md, hooks/, skills/, rules/")
|
||||
|
||||
# 6. SKILL.md formatting check
|
||||
if os.path.isfile("SKILL.md"):
|
||||
with open("SKILL.md") as f:
|
||||
first_line = f.readline().strip()
|
||||
@ -39,9 +45,9 @@ if errors:
|
||||
print(f"::error::{e}")
|
||||
sys.exit(1)
|
||||
|
||||
pn = plugin["name"]; pv = plugin["version"]
|
||||
print(f"\u2713 plugin.yaml valid: {pn} v{pv}")
|
||||
print(f"✓ plugin.yaml valid: {plugin['name']} v{plugin['version']}")
|
||||
if found:
|
||||
print(f" Content: {', '.join(found)}")
|
||||
if runtimes:
|
||||
print(f" Runtimes: {', '.join(runtimes)}")
|
||||
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
#!/usr/bin/env python3
|
||||
"""PreToolUse:Bash — enforce careful-mode patterns on shell commands."""
|
||||
import re
|
||||
import sys
|
||||
import os
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
@ -44,7 +45,7 @@ def main() -> None:
|
||||
if "migrations" in cmd:
|
||||
deny_pretooluse("careful-mode: rm -rf inside a migrations dir is REFUSED.")
|
||||
deny_pretooluse(f"careful-mode: rm -rf at filesystem root, HOME, or .git is REFUSED. Command: {cmd[:200]}")
|
||||
if "/.git" in cmd:
|
||||
if re.search(r"(^|\s)\.git(?:\s|$|/)", cmd):
|
||||
deny_pretooluse("careful-mode: rm -rf .git is REFUSED. Re-clone if you need a fresh repo.")
|
||||
|
||||
# WARN list — log but allow
|
||||
@ -53,6 +54,54 @@ def main() -> None:
|
||||
if "gh pr close" in cmd or "gh issue close" in cmd:
|
||||
warn_to_stderr("[careful-mode WARN] closing a PR/issue is irreversible from this bot's standpoint. Confirm intent.")
|
||||
|
||||
# Token exfiltration — OFFSEC-002 / CRED-2
|
||||
# Block direct reads of known token file paths
|
||||
token_paths = [
|
||||
".gh_token",
|
||||
".git-credentials-cache",
|
||||
".auth_token",
|
||||
"gh_token",
|
||||
"git-credentials-cache",
|
||||
".auth-token",
|
||||
".claude/.gh_token",
|
||||
"~/.gh_token",
|
||||
]
|
||||
# Also block cat of any path containing token-like segments
|
||||
if re.search(r"cat\s+.*(~|/home/[^/]+/)\S*(token|gh_token|git.credential|auth)", cmd, re.IGNORECASE):
|
||||
deny_pretooluse(
|
||||
"careful-mode: potential credential file read REFUSED. "
|
||||
"Token files must not be read in the agent context. "
|
||||
"Use platform-managed secrets instead."
|
||||
)
|
||||
if any(tok in cmd for tok in token_paths):
|
||||
deny_pretooluse(
|
||||
"careful-mode: token file read REFUSED. "
|
||||
"Token files must not be read in the agent context. "
|
||||
"Use platform-managed secrets instead."
|
||||
)
|
||||
|
||||
# Block env | grep for secrets (common exfil staging pattern)
|
||||
# Matches: env | grep token; env|grep -i API_KEY; env | grep secret
|
||||
if re.search(r"env\s*\|\s*grep\s+(-i\s+)?(token|api_key|secret|auth|password|passwd)", cmd, re.IGNORECASE):
|
||||
deny_pretooluse(
|
||||
"careful-mode: env grep for secrets REFUSED. "
|
||||
"Reading environment variables for secrets is not permitted in agent context."
|
||||
)
|
||||
|
||||
# Block generic cat of potential credential file extensions or token-named files
|
||||
if re.search(r"cat\s+.*(\.gh_token|\.git-credential|\.auth_token|auth_token|gh_token)", cmd, re.IGNORECASE):
|
||||
deny_pretooluse(
|
||||
"careful-mode: potential credential file read REFUSED. "
|
||||
"Token files must not be read in the agent context."
|
||||
)
|
||||
|
||||
# Block curl/wget exfiltration of credentials to external endpoints
|
||||
if re.search(r"(curl|wget)\s+.*(Authorization|Bearer|token|key).*\s+>", cmd):
|
||||
deny_pretooluse(
|
||||
"careful-mode: credential exfiltration via redirect REFUSED. "
|
||||
"Do not redirect credentials to external endpoints."
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
|
||||
@ -10,7 +10,30 @@
|
||||
|
||||
## Recently Resolved
|
||||
|
||||
*(No recently resolved issues.)*
|
||||
### [RESOLVED] OFFSEC-002: Token exfiltration patterns not blocked
|
||||
|
||||
**Severity:** P0 (security)
|
||||
**Resolved in:** v1.0.1
|
||||
|
||||
**Symptoms:** The PreToolUse:Bash hook blocked destructive commands (force push, DROP TABLE, rm -rf) but did NOT block token exfiltration patterns. An LLM prompt injection could instruct the agent to execute:
|
||||
- `cat ~/.gh_token`
|
||||
- `cat /tmp/.git-credentials-cache`
|
||||
- `env | grep token`
|
||||
|
||||
**Cause:** The REFUSE list in `pre-bash-careful.py` only covered git/SQL/rm commands. Token file reads and env variable grep patterns were not covered.
|
||||
|
||||
**Fix:** Added blocking for:
|
||||
- Direct token file reads (`.gh_token`, `.auth_token`, `.git-credentials-cache`, etc.)
|
||||
- `cat` of home-directory token paths (`~/.config/gh_token`, `/home/agent/.gh_token`)
|
||||
- `env | grep` for secrets (case-insensitive: token, api_key, secret, auth, password, passwd)
|
||||
- Generic credential file extensions in `cat` targets
|
||||
- curl/wget credential redirect exfil
|
||||
|
||||
Also fixed: `rm -rf .git` check used `"/.git"` string search which never matched. Changed to regex `r"(^|\s)\.git(?:\s|$|/)"`.
|
||||
|
||||
**Prevention:** New security-sensitive patterns must be reviewed during plugin review. Add token-exfil test cases to any hook touching credential paths.
|
||||
|
||||
---
|
||||
|
||||
---
|
||||
|
||||
|
||||
6
pytest.ini
Normal file
6
pytest.ini
Normal file
@ -0,0 +1,6 @@
|
||||
[pytest]
|
||||
testpaths = tests
|
||||
python_files = test_*.py
|
||||
python_classes = Test*
|
||||
python_functions = test_*
|
||||
addopts = -v
|
||||
Binary file not shown.
210
tests/test_pre_bash_careful.py
Normal file
210
tests/test_pre_bash_careful.py
Normal file
@ -0,0 +1,210 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Unit tests for pre-bash-careful.py hook."""
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import re
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "hooks"))
|
||||
|
||||
|
||||
def run_hook(cmd_input):
|
||||
"""Simulate PreToolUse:Bash hook invocation."""
|
||||
stdin_data = json.dumps({"tool_input": {"command": cmd_input}})
|
||||
stdout = io.StringIO()
|
||||
stderr = io.StringIO()
|
||||
with mock.patch("sys.stdin", io.StringIO(stdin_data)), \
|
||||
mock.patch("sys.stdout", stdout), \
|
||||
mock.patch("sys.stderr", stderr), \
|
||||
mock.patch("sys.exit", lambda code: None):
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"careful_mod",
|
||||
os.path.join(os.path.dirname(__file__), "..", "hooks", "pre-bash-careful.py"),
|
||||
)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
mod.main()
|
||||
return stdout.getvalue(), stderr.getvalue()
|
||||
|
||||
|
||||
def parse_denial(output):
|
||||
"""Return the permissionDecision from the first denial payload, or None."""
|
||||
if not output.strip():
|
||||
return None
|
||||
try:
|
||||
# Handle multiple JSON lines (hook may emit multiple denials)
|
||||
first_line = output.strip().split("\n")[0]
|
||||
payload = json.loads(first_line)
|
||||
return payload.get("hookSpecificOutput", {}).get("permissionDecision")
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
|
||||
|
||||
class TestRefuseForcePush:
|
||||
"""Existing guard: git push --force to main/master."""
|
||||
|
||||
def test_refuses_git_push_force_to_main(self):
|
||||
output, _ = run_hook("git push --force origin main")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_git_push_f_to_main(self):
|
||||
output, _ = run_hook("git push -f origin main")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_git_push_force_to_master(self):
|
||||
output, _ = run_hook("git push --force origin master")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_allows_git_push_force_to_feature_branch(self):
|
||||
output, _ = run_hook("git push --force origin feature/abc")
|
||||
assert parse_denial(output) is None
|
||||
|
||||
def test_allows_normal_git_push(self):
|
||||
output, _ = run_hook("git push origin main")
|
||||
assert parse_denial(output) is None
|
||||
|
||||
|
||||
class TestRefuseGitResetHard:
|
||||
"""Existing guard: git reset --hard against main."""
|
||||
|
||||
def test_refuses_reset_hard_main(self):
|
||||
output, _ = run_hook("git reset --hard origin/main")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_reset_hard_space_main(self):
|
||||
output, _ = run_hook("git reset --hard HEAD~3 main")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_allows_reset_hard_on_feature_branch(self):
|
||||
output, _ = run_hook("git reset --hard HEAD~1")
|
||||
assert parse_denial(output) is None
|
||||
|
||||
|
||||
class TestRefuseSQLDestructive:
|
||||
"""Existing guard: DROP TABLE/DATABASE against non-test schemas."""
|
||||
|
||||
def test_refuses_drop_table_prod(self):
|
||||
output, _ = run_hook("psql -c 'DROP TABLE users;'")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_drop_database_prod(self):
|
||||
output, _ = run_hook("mysql -e 'DROP DATABASE prod_db'")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_allows_drop_on_test_schema(self):
|
||||
output, _ = run_hook("psql -c 'DROP TABLE users_test;'")
|
||||
assert parse_denial(output) is None
|
||||
|
||||
def test_allows_drop_on_sandbox(self):
|
||||
output, _ = run_hook("mysql -e 'DROP TABLE sandbox_events;'")
|
||||
assert parse_denial(output) is None
|
||||
|
||||
|
||||
class TestRefuseRmRf:
|
||||
"""Existing guard: rm -rf at root or .git."""
|
||||
|
||||
def test_refuses_rm_rf_root(self):
|
||||
output, _ = run_hook("rm -rf /")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_rm_rf_home(self):
|
||||
output, _ = run_hook("rm -rf ~")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_rm_rf_git_dir(self):
|
||||
output, _ = run_hook("rm -rf .git")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_allows_rm_rf_tmp_dir(self):
|
||||
output, _ = run_hook("rm -rf /tmp/scratch-dir")
|
||||
assert parse_denial(output) is None
|
||||
|
||||
def test_allows_rm_rf_node_modules(self):
|
||||
output, _ = run_hook("rm -rf node_modules")
|
||||
assert parse_denial(output) is None
|
||||
|
||||
|
||||
class TestTokenExfiltrationBlocking:
|
||||
"""OFFSEC-002: token exfiltration patterns must be blocked."""
|
||||
|
||||
def test_refuses_cat_gh_token(self):
|
||||
output, _ = run_hook("cat ~/.gh_token")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_cat_dot_gh_token(self):
|
||||
output, _ = run_hook("cat .gh_token")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_cat_git_credentials_cache(self):
|
||||
output, _ = run_hook("cat /tmp/.git-credentials-cache")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_cat_auth_token(self):
|
||||
output, _ = run_hook("cat .auth_token")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_cat_molecule_auth_token(self):
|
||||
output, _ = run_hook("cat ~/.molecule/.auth-token")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_env_grep_token(self):
|
||||
output, _ = run_hook("env | grep token")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_env_grep_case_insensitive(self):
|
||||
output, _ = run_hook("env | grep -i API_KEY")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_env_grep_secret(self):
|
||||
output, _ = run_hook("env | grep secret")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_env_grep_auth(self):
|
||||
output, _ = run_hook("env|grep auth")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_env_grep_password(self):
|
||||
output, _ = run_hook("env | grep password")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_cat_credential_file_extensions(self):
|
||||
output, _ = run_hook("cat /secrets/auth_token")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_cat_tilde_token_path(self):
|
||||
output, _ = run_hook("cat ~/.config/gh_token")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_refuses_cat_home_token_path(self):
|
||||
output, _ = run_hook("cat /home/agent/.gh_token")
|
||||
assert parse_denial(output) == "deny"
|
||||
|
||||
def test_allows_normal_env_without_grep(self):
|
||||
output, _ = run_hook("env | head")
|
||||
assert parse_denial(output) is None
|
||||
|
||||
def test_allows_reading_nontoken_files(self):
|
||||
output, _ = run_hook("cat README.md")
|
||||
assert parse_denial(output) is None
|
||||
|
||||
def test_allows_grep_for_nonsecret_things(self):
|
||||
output, _ = run_hook("env | grep PATH")
|
||||
assert parse_denial(output) is None
|
||||
|
||||
|
||||
class TestWarnList:
|
||||
"""WARN list: agent is notified but command proceeds."""
|
||||
|
||||
def test_warns_force_with_lease(self, capsys):
|
||||
output, _ = run_hook("git push --force-with-lease origin feature/x")
|
||||
assert parse_denial(output) is None # not denied
|
||||
|
||||
def test_warns_closing_pr(self, capsys):
|
||||
output, _ = run_hook("gh pr close 123")
|
||||
assert parse_denial(output) is None # not denied
|
||||
Loading…
Reference in New Issue
Block a user