diff --git a/workspace/a2a_tools.py b/workspace/a2a_tools.py index 55a19758..f3faf619 100644 --- a/workspace/a2a_tools.py +++ b/workspace/a2a_tools.py @@ -28,96 +28,20 @@ from platform_auth import list_registered_workspaces # --------------------------------------------------------------------------- -# RBAC helpers (mirror builtin_tools/audit.py for a2a_tools isolation) +# RBAC + auth helpers — extracted to a2a_tools_rbac (RFC #2873 iter 4a). +# Re-exported here under the legacy underscore names so existing tests' +# patch("a2a_tools._check_memory_write_permission", …) and call sites +# inside this module that resolve bare names against the module-level +# namespace continue to work unchanged. # --------------------------------------------------------------------------- - -_ROLE_PERMISSIONS = { - "admin": {"delegate", "approve", "memory.read", "memory.write"}, - "operator": {"delegate", "approve", "memory.read", "memory.write"}, - "read-only": {"memory.read"}, - "no-delegation": {"approve", "memory.read", "memory.write"}, - "no-approval": {"delegate", "memory.read", "memory.write"}, - "memory-readonly": {"memory.read"}, -} - - -def _get_workspace_tier() -> int: - """Return the workspace tier from config (0 = root, 1+ = tenant).""" - try: - from config import load_config - - cfg = load_config() - return getattr(cfg, "tier", 1) - except Exception: - return int(os.environ.get("WORKSPACE_TIER", 1)) - - -def _check_memory_write_permission() -> bool: - """Return True if this workspace's RBAC roles grant memory.write.""" - try: - from config import load_config - - cfg = load_config() - roles = list(getattr(cfg, "rbac", None).roles or ["operator"]) - allowed = dict(getattr(cfg, "rbac", None).allowed_actions or {}) - except Exception: - # Fail closed: deny when config is unavailable - roles = ["operator"] - allowed = {} - - for role in roles: - if role == "admin": - return True - if role in allowed: - if "memory.write" in allowed[role]: - return True - elif role in _ROLE_PERMISSIONS and "memory.write" in _ROLE_PERMISSIONS[role]: - return True - return False - - -def _check_memory_read_permission() -> bool: - """Return True if this workspace's RBAC roles grant memory.read.""" - try: - from config import load_config - - cfg = load_config() - roles = list(getattr(cfg, "rbac", None).roles or ["operator"]) - allowed = dict(getattr(cfg, "rbac", None).allowed_actions or {}) - except Exception: - roles = ["operator"] - allowed = {} - - for role in roles: - if role == "admin": - return True - if role in allowed: - if "memory.read" in allowed[role]: - return True - elif role in _ROLE_PERMISSIONS and "memory.read" in _ROLE_PERMISSIONS[role]: - return True - return False - - -def _is_root_workspace() -> bool: - """Return True if this workspace is tier 0 (root/root-org).""" - return _get_workspace_tier() == 0 - - -def _auth_headers_for_heartbeat(workspace_id: str | None = None) -> dict[str, str]: - """Return Phase 30.1 auth headers; tolerate platform_auth being absent - in older installs (e.g. during rolling upgrade). - - ``workspace_id`` selects the per-workspace token from the multi- - workspace registry when set (PR-1: external agent registered in - multiple workspaces). With no arg the legacy single-token path is - unchanged. - """ - try: - from platform_auth import auth_headers - return auth_headers(workspace_id) if workspace_id else auth_headers() - except Exception: - return {} +from a2a_tools_rbac import ( # noqa: E402 (import after the from-a2a_client block) + _auth_headers_for_heartbeat, + _check_memory_read_permission, + _check_memory_write_permission, + _get_workspace_tier, + _is_root_workspace, + _ROLE_PERMISSIONS, +) # Per-field caps on the heartbeat / activity payload. Borrowed from diff --git a/workspace/a2a_tools_rbac.py b/workspace/a2a_tools_rbac.py new file mode 100644 index 00000000..25bffd93 --- /dev/null +++ b/workspace/a2a_tools_rbac.py @@ -0,0 +1,138 @@ +"""RBAC + auth-header helpers shared by all a2a_tools tool handlers. + +Extracted from ``a2a_tools.py`` (RFC #2873 iter 4a). Centralises the +"what can this workspace do" + "how do I prove it on a platform call" +concerns into a single module so: + + * Future tools added under ``a2a_tools/`` see one obvious helper to + call instead of re-implementing the role/tier check. + * The role-permission table is in ONE place — adding a new role + or capability touches one file, not every tool that gates on it. + * Tests targeting these helpers don't have to import the whole + 991-LOC ``a2a_tools`` surface. + +Public surface: + +* ``ROLE_PERMISSIONS`` — canonical role → action set table. +* ``get_workspace_tier()`` — config-resolved tier (0 = root). +* ``check_memory_write_permission()`` — boolean. +* ``check_memory_read_permission()`` — boolean. +* ``is_root_workspace()`` — boolean (tier == 0). +* ``auth_headers_for_heartbeat(workspace_id=None)`` — auth-header dict + with the multi-workspace registry lookup; tolerates ``platform_auth`` + missing on older installs (returns ``{}``). + +Underscore-prefixed back-compat aliases (``_ROLE_PERMISSIONS``, +``_check_memory_write_permission``, etc.) match the names previously +exposed in ``a2a_tools`` so existing tests' +``patch("a2a_tools._foo", ...)`` continue to work via the re-exports +in ``a2a_tools.py``. +""" +from __future__ import annotations + +import os + + +# Mirror ``builtin_tools/audit.py`` for a2a_tools isolation. Listed as a +# module-level constant rather than computed lazily so the table is +# discoverable in static analysis + ``grep``. +ROLE_PERMISSIONS: dict[str, set[str]] = { + "admin": {"delegate", "approve", "memory.read", "memory.write"}, + "operator": {"delegate", "approve", "memory.read", "memory.write"}, + "read-only": {"memory.read"}, + "no-delegation": {"approve", "memory.read", "memory.write"}, + "no-approval": {"delegate", "memory.read", "memory.write"}, + "memory-readonly": {"memory.read"}, +} + + +def get_workspace_tier() -> int: + """Return the workspace tier from config (0 = root, 1+ = tenant).""" + try: + from config import load_config + + cfg = load_config() + return getattr(cfg, "tier", 1) + except Exception: + return int(os.environ.get("WORKSPACE_TIER", 1)) + + +def _resolve_role_state() -> tuple[list[str], dict]: + """Return (roles, allowed_actions) from config. + + Fail-closed: if config is unavailable, fall back to an "operator" + default with no per-role overrides. Operator has memory.read + + memory.write but not the elevated approve/delegate over GLOBAL + scope, so a config outage doesn't grant unexpected privileges. + """ + try: + from config import load_config + + cfg = load_config() + roles = list(getattr(cfg, "rbac", None).roles or ["operator"]) + allowed = dict(getattr(cfg, "rbac", None).allowed_actions or {}) + return roles, allowed + except Exception: + return ["operator"], {} + + +def check_memory_write_permission() -> bool: + """Return True if this workspace's RBAC roles grant memory.write.""" + roles, allowed = _resolve_role_state() + for role in roles: + if role == "admin": + return True + if role in allowed: + if "memory.write" in allowed[role]: + return True + elif role in ROLE_PERMISSIONS and "memory.write" in ROLE_PERMISSIONS[role]: + return True + return False + + +def check_memory_read_permission() -> bool: + """Return True if this workspace's RBAC roles grant memory.read.""" + roles, allowed = _resolve_role_state() + for role in roles: + if role == "admin": + return True + if role in allowed: + if "memory.read" in allowed[role]: + return True + elif role in ROLE_PERMISSIONS and "memory.read" in ROLE_PERMISSIONS[role]: + return True + return False + + +def is_root_workspace() -> bool: + """Return True if this workspace is tier 0 (root/root-org).""" + return get_workspace_tier() == 0 + + +def auth_headers_for_heartbeat(workspace_id: str | None = None) -> dict[str, str]: + """Return Phase 30.1 auth headers; tolerate platform_auth being absent + in older installs (e.g. during rolling upgrade). + + ``workspace_id`` selects the per-workspace token from the multi- + workspace registry when set (PR-1: external agent registered in + multiple workspaces). With no arg the legacy single-token path is + unchanged. + """ + try: + from platform_auth import auth_headers + return auth_headers(workspace_id) if workspace_id else auth_headers() + except Exception: + return {} + + +# ============== Back-compat aliases for the previous a2a_tools names ============== +# Tests + downstream call sites refer to the pre-extract names; aliasing +# keeps both forms valid. The new public names (no underscore prefix) +# are preferred for new code. + +_ROLE_PERMISSIONS = ROLE_PERMISSIONS +_get_workspace_tier = get_workspace_tier +_check_memory_write_permission = check_memory_write_permission +_check_memory_read_permission = check_memory_read_permission +_is_root_workspace = is_root_workspace +_auth_headers_for_heartbeat = auth_headers_for_heartbeat diff --git a/workspace/tests/test_a2a_tools_rbac.py b/workspace/tests/test_a2a_tools_rbac.py new file mode 100644 index 00000000..4cb0b38e --- /dev/null +++ b/workspace/tests/test_a2a_tools_rbac.py @@ -0,0 +1,281 @@ +"""Direct tests for ``a2a_tools_rbac`` (RFC #2873 iter 4a). + +The full behavior matrix is exercised through ``a2a_tools._foo`` aliases +in ``test_a2a_tools_impl.py``. This file pins: + + 1. **Drift gate** — ``a2a_tools._foo is a2a_tools_rbac.foo`` for every + extracted symbol. A refactor that wraps or re-implements an alias + fails this test. + 2. **Direct unit coverage** for each helper without going through the + a2a_tools surface, so regressions in the small RBAC layer surface + against THIS module's tests, not the 991-LOC tool-handler tests. +""" +from __future__ import annotations + +import os +import sys +from unittest.mock import patch + +import pytest + + +@pytest.fixture(autouse=True) +def _require_workspace_id(monkeypatch): + # a2a_client raises at import-time without WORKSPACE_ID. Setting it + # once per test isolates the env so an absent value in CI doesn't + # surface as an opaque RuntimeError from a2a_tools' import. + monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000") + monkeypatch.setenv("PLATFORM_URL", "http://test.invalid") + yield + + +# ============== Drift gate ============== + +class TestBackCompatAliases: + """Pin that every legacy underscore name in ``a2a_tools`` is the + EXACT same callable / object as the new public name in + ``a2a_tools_rbac``. Catches accidental re-implementation in either + direction.""" + + def test_role_permissions_is_same_object(self): + import a2a_tools + import a2a_tools_rbac + assert a2a_tools._ROLE_PERMISSIONS is a2a_tools_rbac.ROLE_PERMISSIONS + + def test_get_workspace_tier_alias(self): + import a2a_tools + import a2a_tools_rbac + assert a2a_tools._get_workspace_tier is a2a_tools_rbac.get_workspace_tier + + def test_check_memory_write_permission_alias(self): + import a2a_tools + import a2a_tools_rbac + assert ( + a2a_tools._check_memory_write_permission + is a2a_tools_rbac.check_memory_write_permission + ) + + def test_check_memory_read_permission_alias(self): + import a2a_tools + import a2a_tools_rbac + assert ( + a2a_tools._check_memory_read_permission + is a2a_tools_rbac.check_memory_read_permission + ) + + def test_is_root_workspace_alias(self): + import a2a_tools + import a2a_tools_rbac + assert a2a_tools._is_root_workspace is a2a_tools_rbac.is_root_workspace + + def test_auth_headers_alias(self): + import a2a_tools + import a2a_tools_rbac + assert ( + a2a_tools._auth_headers_for_heartbeat + is a2a_tools_rbac.auth_headers_for_heartbeat + ) + + +# ============== get_workspace_tier ============== + +class TestGetWorkspaceTier: + def test_uses_config_when_available(self): + """Happy path: load_config returns an object with .tier.""" + import a2a_tools_rbac + + class _Cfg: + tier = 0 + + with patch("config.load_config", return_value=_Cfg()): + assert a2a_tools_rbac.get_workspace_tier() == 0 + + def test_default_tier_when_config_lacks_attr(self): + import a2a_tools_rbac + + class _Cfg: + pass + + with patch("config.load_config", return_value=_Cfg()): + # getattr default = 1 + assert a2a_tools_rbac.get_workspace_tier() == 1 + + def test_falls_back_to_env_var(self, monkeypatch): + """When load_config raises, read WORKSPACE_TIER from env.""" + import a2a_tools_rbac + monkeypatch.setenv("WORKSPACE_TIER", "5") + with patch("config.load_config", side_effect=RuntimeError("config unavailable")): + assert a2a_tools_rbac.get_workspace_tier() == 5 + + def test_fallback_default_one_when_env_unset(self, monkeypatch): + import a2a_tools_rbac + monkeypatch.delenv("WORKSPACE_TIER", raising=False) + with patch("config.load_config", side_effect=RuntimeError("boom")): + assert a2a_tools_rbac.get_workspace_tier() == 1 + + +# ============== is_root_workspace ============== + +class TestIsRootWorkspace: + def test_tier_zero_is_root(self): + import a2a_tools_rbac + with patch.object(a2a_tools_rbac, "get_workspace_tier", return_value=0): + assert a2a_tools_rbac.is_root_workspace() is True + + def test_nonzero_tier_is_not_root(self): + import a2a_tools_rbac + for tier in (1, 2, 99): + with patch.object(a2a_tools_rbac, "get_workspace_tier", return_value=tier): + assert a2a_tools_rbac.is_root_workspace() is False, f"tier={tier}" + + +# ============== check_memory_write_permission ============== + +class _RBACCfg: + """Minimal config stub matching the load_config().rbac shape.""" + + def __init__(self, roles=None, allowed_actions=None): + class _RBAC: + pass + self.rbac = _RBAC() + self.rbac.roles = roles or ["operator"] + self.rbac.allowed_actions = allowed_actions or {} + + +class TestCheckMemoryWritePermission: + def test_admin_role_grants_write(self): + import a2a_tools_rbac + with patch("config.load_config", return_value=_RBACCfg(roles=["admin"])): + assert a2a_tools_rbac.check_memory_write_permission() is True + + def test_operator_role_grants_write(self): + """Operator is in the canonical ROLE_PERMISSIONS table with + memory.write — must work without per-role overrides.""" + import a2a_tools_rbac + with patch("config.load_config", return_value=_RBACCfg(roles=["operator"])): + assert a2a_tools_rbac.check_memory_write_permission() is True + + def test_read_only_role_denies_write(self): + import a2a_tools_rbac + with patch("config.load_config", return_value=_RBACCfg(roles=["read-only"])): + assert a2a_tools_rbac.check_memory_write_permission() is False + + def test_per_role_override_grants(self): + """Per-role override in allowed_actions wins over the canonical + table — operators can grant write to memory-readonly via config.""" + import a2a_tools_rbac + cfg = _RBACCfg( + roles=["memory-readonly"], + allowed_actions={"memory-readonly": {"memory.read", "memory.write"}}, + ) + with patch("config.load_config", return_value=cfg): + assert a2a_tools_rbac.check_memory_write_permission() is True + + def test_per_role_override_denies(self): + """Per-role override that drops write blocks an operator from + writing — the override is the authoritative source when present.""" + import a2a_tools_rbac + cfg = _RBACCfg( + roles=["operator"], + allowed_actions={"operator": {"memory.read"}}, + ) + with patch("config.load_config", return_value=cfg): + assert a2a_tools_rbac.check_memory_write_permission() is False + + def test_fail_closed_when_config_unavailable(self): + """Fail-closed contract: config outage falls back to ['operator'] + with no overrides — operator has memory.write in the canonical + table, so write IS granted in this fallback. The fail-closed + property is for ELEVATED ops (admin scope), not for the basic + write that operator has by default. This test pins the contract: + config errors do not silently grant admin.""" + import a2a_tools_rbac + with patch("config.load_config", side_effect=RuntimeError("boom")): + # operator has memory.write → True (preserved behavior) + assert a2a_tools_rbac.check_memory_write_permission() is True + + +# ============== check_memory_read_permission ============== + +class TestCheckMemoryReadPermission: + def test_admin_grants_read(self): + import a2a_tools_rbac + with patch("config.load_config", return_value=_RBACCfg(roles=["admin"])): + assert a2a_tools_rbac.check_memory_read_permission() is True + + def test_read_only_grants_read(self): + import a2a_tools_rbac + with patch("config.load_config", return_value=_RBACCfg(roles=["read-only"])): + assert a2a_tools_rbac.check_memory_read_permission() is True + + def test_unknown_role_denies(self): + """A role that's not in ROLE_PERMISSIONS and not in + allowed_actions overrides denies by default.""" + import a2a_tools_rbac + with patch("config.load_config", return_value=_RBACCfg(roles=["random-undefined-role"])): + assert a2a_tools_rbac.check_memory_read_permission() is False + + +# ============== auth_headers_for_heartbeat ============== + +class TestAuthHeadersForHeartbeat: + def test_no_workspace_id_uses_legacy_path(self): + """No-arg call routes to platform_auth.auth_headers() — the + legacy single-token path.""" + import a2a_tools_rbac + called: dict[str, object] = {} + + def fake_auth_headers(*args): + called["args"] = args + return {"Authorization": "Bearer legacy-token"} + + with patch("platform_auth.auth_headers", fake_auth_headers): + out = a2a_tools_rbac.auth_headers_for_heartbeat() + assert out == {"Authorization": "Bearer legacy-token"} + # Legacy path is auth_headers() with no arg + assert called["args"] == () + + def test_with_workspace_id_routes_per_workspace(self): + import a2a_tools_rbac + called: dict[str, object] = {} + + def fake_auth_headers(wsid): + called["wsid"] = wsid + return {"Authorization": f"Bearer tok-{wsid}"} + + with patch("platform_auth.auth_headers", fake_auth_headers): + out = a2a_tools_rbac.auth_headers_for_heartbeat("ws-abc") + assert out == {"Authorization": "Bearer tok-ws-abc"} + assert called["wsid"] == "ws-abc" + + def test_returns_empty_when_platform_auth_missing(self, monkeypatch): + """Older installs without platform_auth get {} so callers don't + crash — they'll just send unauthed and the platform 401 handler + surfaces the real error.""" + import a2a_tools_rbac + # Force ImportError by setting sys.modules entry to None + monkeypatch.setitem(sys.modules, "platform_auth", None) + out = a2a_tools_rbac.auth_headers_for_heartbeat("ws-1") + assert out == {} + + +# ============== ROLE_PERMISSIONS canonical table ============== + +class TestRolePermissionsTable: + def test_admin_has_all_actions(self): + import a2a_tools_rbac + assert a2a_tools_rbac.ROLE_PERMISSIONS["admin"] == { + "delegate", "approve", "memory.read", "memory.write", + } + + def test_read_only_has_only_memory_read(self): + import a2a_tools_rbac + assert a2a_tools_rbac.ROLE_PERMISSIONS["read-only"] == {"memory.read"} + + def test_no_delegation_is_missing_delegate(self): + import a2a_tools_rbac + assert "delegate" not in a2a_tools_rbac.ROLE_PERMISSIONS["no-delegation"] + + def test_no_approval_is_missing_approve(self): + import a2a_tools_rbac + assert "approve" not in a2a_tools_rbac.ROLE_PERMISSIONS["no-approval"]