forked from molecule-ai/molecule-core
refactor(workspace): extract RBAC helpers from a2a_tools.py to a2a_tools_rbac.py (RFC #2873 iter 4a)
First slice of the a2a_tools.py (991 LOC) split — single-concern module
for the workspace's RBAC + auth-header layer:
* _ROLE_PERMISSIONS canonical table
* _get_workspace_tier
* _check_memory_write_permission
* _check_memory_read_permission
* _is_root_workspace
* _auth_headers_for_heartbeat
a2a_tools.py shrinks from 991 → 915 LOC. Internal call sites (15
references) work unchanged because the bare names are re-imported at
module-level — Python's local-then-module name resolution still
finds them in a2a_tools's namespace, so existing tests'
patch("a2a_tools._foo", …) keeps working.
The RBAC layer can now evolve independently of the 18 tool handlers.
Adding a new role or capability action touches one file, not the
kitchen-sink module.
Tests:
* 77 existing test_a2a_tools_impl.py pass unchanged.
* test_a2a_tools_rbac.py adds 28 focused tests:
- 6 alias drift-gate tests (`_foo is rbac.foo`)
- 4 get_workspace_tier env+config branches
- 2 is_root_workspace tier branches
- 6 check_memory_write_permission roles + override branches
- 3 check_memory_read_permission scenarios
- 3 auth_headers_for_heartbeat platform_auth branches
- 4 ROLE_PERMISSIONS table invariants
* Direct coverage for the helper module (was previously only
exercised through 991-LOC tool-handler tests).
Refs RFC #2873.
This commit is contained in:
parent
243f9bc2b1
commit
0c461eb9f1
@ -28,96 +28,20 @@ from platform_auth import list_registered_workspaces
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# RBAC helpers (mirror builtin_tools/audit.py for a2a_tools isolation)
|
||||
# RBAC + auth helpers — extracted to a2a_tools_rbac (RFC #2873 iter 4a).
|
||||
# Re-exported here under the legacy underscore names so existing tests'
|
||||
# patch("a2a_tools._check_memory_write_permission", …) and call sites
|
||||
# inside this module that resolve bare names against the module-level
|
||||
# namespace continue to work unchanged.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_ROLE_PERMISSIONS = {
|
||||
"admin": {"delegate", "approve", "memory.read", "memory.write"},
|
||||
"operator": {"delegate", "approve", "memory.read", "memory.write"},
|
||||
"read-only": {"memory.read"},
|
||||
"no-delegation": {"approve", "memory.read", "memory.write"},
|
||||
"no-approval": {"delegate", "memory.read", "memory.write"},
|
||||
"memory-readonly": {"memory.read"},
|
||||
}
|
||||
|
||||
|
||||
def _get_workspace_tier() -> int:
|
||||
"""Return the workspace tier from config (0 = root, 1+ = tenant)."""
|
||||
try:
|
||||
from config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
return getattr(cfg, "tier", 1)
|
||||
except Exception:
|
||||
return int(os.environ.get("WORKSPACE_TIER", 1))
|
||||
|
||||
|
||||
def _check_memory_write_permission() -> bool:
|
||||
"""Return True if this workspace's RBAC roles grant memory.write."""
|
||||
try:
|
||||
from config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
roles = list(getattr(cfg, "rbac", None).roles or ["operator"])
|
||||
allowed = dict(getattr(cfg, "rbac", None).allowed_actions or {})
|
||||
except Exception:
|
||||
# Fail closed: deny when config is unavailable
|
||||
roles = ["operator"]
|
||||
allowed = {}
|
||||
|
||||
for role in roles:
|
||||
if role == "admin":
|
||||
return True
|
||||
if role in allowed:
|
||||
if "memory.write" in allowed[role]:
|
||||
return True
|
||||
elif role in _ROLE_PERMISSIONS and "memory.write" in _ROLE_PERMISSIONS[role]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _check_memory_read_permission() -> bool:
|
||||
"""Return True if this workspace's RBAC roles grant memory.read."""
|
||||
try:
|
||||
from config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
roles = list(getattr(cfg, "rbac", None).roles or ["operator"])
|
||||
allowed = dict(getattr(cfg, "rbac", None).allowed_actions or {})
|
||||
except Exception:
|
||||
roles = ["operator"]
|
||||
allowed = {}
|
||||
|
||||
for role in roles:
|
||||
if role == "admin":
|
||||
return True
|
||||
if role in allowed:
|
||||
if "memory.read" in allowed[role]:
|
||||
return True
|
||||
elif role in _ROLE_PERMISSIONS and "memory.read" in _ROLE_PERMISSIONS[role]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _is_root_workspace() -> bool:
|
||||
"""Return True if this workspace is tier 0 (root/root-org)."""
|
||||
return _get_workspace_tier() == 0
|
||||
|
||||
|
||||
def _auth_headers_for_heartbeat(workspace_id: str | None = None) -> dict[str, str]:
|
||||
"""Return Phase 30.1 auth headers; tolerate platform_auth being absent
|
||||
in older installs (e.g. during rolling upgrade).
|
||||
|
||||
``workspace_id`` selects the per-workspace token from the multi-
|
||||
workspace registry when set (PR-1: external agent registered in
|
||||
multiple workspaces). With no arg the legacy single-token path is
|
||||
unchanged.
|
||||
"""
|
||||
try:
|
||||
from platform_auth import auth_headers
|
||||
return auth_headers(workspace_id) if workspace_id else auth_headers()
|
||||
except Exception:
|
||||
return {}
|
||||
from a2a_tools_rbac import ( # noqa: E402 (import after the from-a2a_client block)
|
||||
_auth_headers_for_heartbeat,
|
||||
_check_memory_read_permission,
|
||||
_check_memory_write_permission,
|
||||
_get_workspace_tier,
|
||||
_is_root_workspace,
|
||||
_ROLE_PERMISSIONS,
|
||||
)
|
||||
|
||||
|
||||
# Per-field caps on the heartbeat / activity payload. Borrowed from
|
||||
|
||||
138
workspace/a2a_tools_rbac.py
Normal file
138
workspace/a2a_tools_rbac.py
Normal file
@ -0,0 +1,138 @@
|
||||
"""RBAC + auth-header helpers shared by all a2a_tools tool handlers.
|
||||
|
||||
Extracted from ``a2a_tools.py`` (RFC #2873 iter 4a). Centralises the
|
||||
"what can this workspace do" + "how do I prove it on a platform call"
|
||||
concerns into a single module so:
|
||||
|
||||
* Future tools added under ``a2a_tools/`` see one obvious helper to
|
||||
call instead of re-implementing the role/tier check.
|
||||
* The role-permission table is in ONE place — adding a new role
|
||||
or capability touches one file, not every tool that gates on it.
|
||||
* Tests targeting these helpers don't have to import the whole
|
||||
991-LOC ``a2a_tools`` surface.
|
||||
|
||||
Public surface:
|
||||
|
||||
* ``ROLE_PERMISSIONS`` — canonical role → action set table.
|
||||
* ``get_workspace_tier()`` — config-resolved tier (0 = root).
|
||||
* ``check_memory_write_permission()`` — boolean.
|
||||
* ``check_memory_read_permission()`` — boolean.
|
||||
* ``is_root_workspace()`` — boolean (tier == 0).
|
||||
* ``auth_headers_for_heartbeat(workspace_id=None)`` — auth-header dict
|
||||
with the multi-workspace registry lookup; tolerates ``platform_auth``
|
||||
missing on older installs (returns ``{}``).
|
||||
|
||||
Underscore-prefixed back-compat aliases (``_ROLE_PERMISSIONS``,
|
||||
``_check_memory_write_permission``, etc.) match the names previously
|
||||
exposed in ``a2a_tools`` so existing tests'
|
||||
``patch("a2a_tools._foo", ...)`` continue to work via the re-exports
|
||||
in ``a2a_tools.py``.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
|
||||
# Mirror ``builtin_tools/audit.py`` for a2a_tools isolation. Listed as a
|
||||
# module-level constant rather than computed lazily so the table is
|
||||
# discoverable in static analysis + ``grep``.
|
||||
ROLE_PERMISSIONS: dict[str, set[str]] = {
|
||||
"admin": {"delegate", "approve", "memory.read", "memory.write"},
|
||||
"operator": {"delegate", "approve", "memory.read", "memory.write"},
|
||||
"read-only": {"memory.read"},
|
||||
"no-delegation": {"approve", "memory.read", "memory.write"},
|
||||
"no-approval": {"delegate", "memory.read", "memory.write"},
|
||||
"memory-readonly": {"memory.read"},
|
||||
}
|
||||
|
||||
|
||||
def get_workspace_tier() -> int:
|
||||
"""Return the workspace tier from config (0 = root, 1+ = tenant)."""
|
||||
try:
|
||||
from config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
return getattr(cfg, "tier", 1)
|
||||
except Exception:
|
||||
return int(os.environ.get("WORKSPACE_TIER", 1))
|
||||
|
||||
|
||||
def _resolve_role_state() -> tuple[list[str], dict]:
|
||||
"""Return (roles, allowed_actions) from config.
|
||||
|
||||
Fail-closed: if config is unavailable, fall back to an "operator"
|
||||
default with no per-role overrides. Operator has memory.read +
|
||||
memory.write but not the elevated approve/delegate over GLOBAL
|
||||
scope, so a config outage doesn't grant unexpected privileges.
|
||||
"""
|
||||
try:
|
||||
from config import load_config
|
||||
|
||||
cfg = load_config()
|
||||
roles = list(getattr(cfg, "rbac", None).roles or ["operator"])
|
||||
allowed = dict(getattr(cfg, "rbac", None).allowed_actions or {})
|
||||
return roles, allowed
|
||||
except Exception:
|
||||
return ["operator"], {}
|
||||
|
||||
|
||||
def check_memory_write_permission() -> bool:
|
||||
"""Return True if this workspace's RBAC roles grant memory.write."""
|
||||
roles, allowed = _resolve_role_state()
|
||||
for role in roles:
|
||||
if role == "admin":
|
||||
return True
|
||||
if role in allowed:
|
||||
if "memory.write" in allowed[role]:
|
||||
return True
|
||||
elif role in ROLE_PERMISSIONS and "memory.write" in ROLE_PERMISSIONS[role]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def check_memory_read_permission() -> bool:
|
||||
"""Return True if this workspace's RBAC roles grant memory.read."""
|
||||
roles, allowed = _resolve_role_state()
|
||||
for role in roles:
|
||||
if role == "admin":
|
||||
return True
|
||||
if role in allowed:
|
||||
if "memory.read" in allowed[role]:
|
||||
return True
|
||||
elif role in ROLE_PERMISSIONS and "memory.read" in ROLE_PERMISSIONS[role]:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def is_root_workspace() -> bool:
|
||||
"""Return True if this workspace is tier 0 (root/root-org)."""
|
||||
return get_workspace_tier() == 0
|
||||
|
||||
|
||||
def auth_headers_for_heartbeat(workspace_id: str | None = None) -> dict[str, str]:
|
||||
"""Return Phase 30.1 auth headers; tolerate platform_auth being absent
|
||||
in older installs (e.g. during rolling upgrade).
|
||||
|
||||
``workspace_id`` selects the per-workspace token from the multi-
|
||||
workspace registry when set (PR-1: external agent registered in
|
||||
multiple workspaces). With no arg the legacy single-token path is
|
||||
unchanged.
|
||||
"""
|
||||
try:
|
||||
from platform_auth import auth_headers
|
||||
return auth_headers(workspace_id) if workspace_id else auth_headers()
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
# ============== Back-compat aliases for the previous a2a_tools names ==============
|
||||
# Tests + downstream call sites refer to the pre-extract names; aliasing
|
||||
# keeps both forms valid. The new public names (no underscore prefix)
|
||||
# are preferred for new code.
|
||||
|
||||
_ROLE_PERMISSIONS = ROLE_PERMISSIONS
|
||||
_get_workspace_tier = get_workspace_tier
|
||||
_check_memory_write_permission = check_memory_write_permission
|
||||
_check_memory_read_permission = check_memory_read_permission
|
||||
_is_root_workspace = is_root_workspace
|
||||
_auth_headers_for_heartbeat = auth_headers_for_heartbeat
|
||||
281
workspace/tests/test_a2a_tools_rbac.py
Normal file
281
workspace/tests/test_a2a_tools_rbac.py
Normal file
@ -0,0 +1,281 @@
|
||||
"""Direct tests for ``a2a_tools_rbac`` (RFC #2873 iter 4a).
|
||||
|
||||
The full behavior matrix is exercised through ``a2a_tools._foo`` aliases
|
||||
in ``test_a2a_tools_impl.py``. This file pins:
|
||||
|
||||
1. **Drift gate** — ``a2a_tools._foo is a2a_tools_rbac.foo`` for every
|
||||
extracted symbol. A refactor that wraps or re-implements an alias
|
||||
fails this test.
|
||||
2. **Direct unit coverage** for each helper without going through the
|
||||
a2a_tools surface, so regressions in the small RBAC layer surface
|
||||
against THIS module's tests, not the 991-LOC tool-handler tests.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _require_workspace_id(monkeypatch):
|
||||
# a2a_client raises at import-time without WORKSPACE_ID. Setting it
|
||||
# once per test isolates the env so an absent value in CI doesn't
|
||||
# surface as an opaque RuntimeError from a2a_tools' import.
|
||||
monkeypatch.setenv("WORKSPACE_ID", "00000000-0000-0000-0000-000000000000")
|
||||
monkeypatch.setenv("PLATFORM_URL", "http://test.invalid")
|
||||
yield
|
||||
|
||||
|
||||
# ============== Drift gate ==============
|
||||
|
||||
class TestBackCompatAliases:
|
||||
"""Pin that every legacy underscore name in ``a2a_tools`` is the
|
||||
EXACT same callable / object as the new public name in
|
||||
``a2a_tools_rbac``. Catches accidental re-implementation in either
|
||||
direction."""
|
||||
|
||||
def test_role_permissions_is_same_object(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_rbac
|
||||
assert a2a_tools._ROLE_PERMISSIONS is a2a_tools_rbac.ROLE_PERMISSIONS
|
||||
|
||||
def test_get_workspace_tier_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_rbac
|
||||
assert a2a_tools._get_workspace_tier is a2a_tools_rbac.get_workspace_tier
|
||||
|
||||
def test_check_memory_write_permission_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_rbac
|
||||
assert (
|
||||
a2a_tools._check_memory_write_permission
|
||||
is a2a_tools_rbac.check_memory_write_permission
|
||||
)
|
||||
|
||||
def test_check_memory_read_permission_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_rbac
|
||||
assert (
|
||||
a2a_tools._check_memory_read_permission
|
||||
is a2a_tools_rbac.check_memory_read_permission
|
||||
)
|
||||
|
||||
def test_is_root_workspace_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_rbac
|
||||
assert a2a_tools._is_root_workspace is a2a_tools_rbac.is_root_workspace
|
||||
|
||||
def test_auth_headers_alias(self):
|
||||
import a2a_tools
|
||||
import a2a_tools_rbac
|
||||
assert (
|
||||
a2a_tools._auth_headers_for_heartbeat
|
||||
is a2a_tools_rbac.auth_headers_for_heartbeat
|
||||
)
|
||||
|
||||
|
||||
# ============== get_workspace_tier ==============
|
||||
|
||||
class TestGetWorkspaceTier:
|
||||
def test_uses_config_when_available(self):
|
||||
"""Happy path: load_config returns an object with .tier."""
|
||||
import a2a_tools_rbac
|
||||
|
||||
class _Cfg:
|
||||
tier = 0
|
||||
|
||||
with patch("config.load_config", return_value=_Cfg()):
|
||||
assert a2a_tools_rbac.get_workspace_tier() == 0
|
||||
|
||||
def test_default_tier_when_config_lacks_attr(self):
|
||||
import a2a_tools_rbac
|
||||
|
||||
class _Cfg:
|
||||
pass
|
||||
|
||||
with patch("config.load_config", return_value=_Cfg()):
|
||||
# getattr default = 1
|
||||
assert a2a_tools_rbac.get_workspace_tier() == 1
|
||||
|
||||
def test_falls_back_to_env_var(self, monkeypatch):
|
||||
"""When load_config raises, read WORKSPACE_TIER from env."""
|
||||
import a2a_tools_rbac
|
||||
monkeypatch.setenv("WORKSPACE_TIER", "5")
|
||||
with patch("config.load_config", side_effect=RuntimeError("config unavailable")):
|
||||
assert a2a_tools_rbac.get_workspace_tier() == 5
|
||||
|
||||
def test_fallback_default_one_when_env_unset(self, monkeypatch):
|
||||
import a2a_tools_rbac
|
||||
monkeypatch.delenv("WORKSPACE_TIER", raising=False)
|
||||
with patch("config.load_config", side_effect=RuntimeError("boom")):
|
||||
assert a2a_tools_rbac.get_workspace_tier() == 1
|
||||
|
||||
|
||||
# ============== is_root_workspace ==============
|
||||
|
||||
class TestIsRootWorkspace:
|
||||
def test_tier_zero_is_root(self):
|
||||
import a2a_tools_rbac
|
||||
with patch.object(a2a_tools_rbac, "get_workspace_tier", return_value=0):
|
||||
assert a2a_tools_rbac.is_root_workspace() is True
|
||||
|
||||
def test_nonzero_tier_is_not_root(self):
|
||||
import a2a_tools_rbac
|
||||
for tier in (1, 2, 99):
|
||||
with patch.object(a2a_tools_rbac, "get_workspace_tier", return_value=tier):
|
||||
assert a2a_tools_rbac.is_root_workspace() is False, f"tier={tier}"
|
||||
|
||||
|
||||
# ============== check_memory_write_permission ==============
|
||||
|
||||
class _RBACCfg:
|
||||
"""Minimal config stub matching the load_config().rbac shape."""
|
||||
|
||||
def __init__(self, roles=None, allowed_actions=None):
|
||||
class _RBAC:
|
||||
pass
|
||||
self.rbac = _RBAC()
|
||||
self.rbac.roles = roles or ["operator"]
|
||||
self.rbac.allowed_actions = allowed_actions or {}
|
||||
|
||||
|
||||
class TestCheckMemoryWritePermission:
|
||||
def test_admin_role_grants_write(self):
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", return_value=_RBACCfg(roles=["admin"])):
|
||||
assert a2a_tools_rbac.check_memory_write_permission() is True
|
||||
|
||||
def test_operator_role_grants_write(self):
|
||||
"""Operator is in the canonical ROLE_PERMISSIONS table with
|
||||
memory.write — must work without per-role overrides."""
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", return_value=_RBACCfg(roles=["operator"])):
|
||||
assert a2a_tools_rbac.check_memory_write_permission() is True
|
||||
|
||||
def test_read_only_role_denies_write(self):
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", return_value=_RBACCfg(roles=["read-only"])):
|
||||
assert a2a_tools_rbac.check_memory_write_permission() is False
|
||||
|
||||
def test_per_role_override_grants(self):
|
||||
"""Per-role override in allowed_actions wins over the canonical
|
||||
table — operators can grant write to memory-readonly via config."""
|
||||
import a2a_tools_rbac
|
||||
cfg = _RBACCfg(
|
||||
roles=["memory-readonly"],
|
||||
allowed_actions={"memory-readonly": {"memory.read", "memory.write"}},
|
||||
)
|
||||
with patch("config.load_config", return_value=cfg):
|
||||
assert a2a_tools_rbac.check_memory_write_permission() is True
|
||||
|
||||
def test_per_role_override_denies(self):
|
||||
"""Per-role override that drops write blocks an operator from
|
||||
writing — the override is the authoritative source when present."""
|
||||
import a2a_tools_rbac
|
||||
cfg = _RBACCfg(
|
||||
roles=["operator"],
|
||||
allowed_actions={"operator": {"memory.read"}},
|
||||
)
|
||||
with patch("config.load_config", return_value=cfg):
|
||||
assert a2a_tools_rbac.check_memory_write_permission() is False
|
||||
|
||||
def test_fail_closed_when_config_unavailable(self):
|
||||
"""Fail-closed contract: config outage falls back to ['operator']
|
||||
with no overrides — operator has memory.write in the canonical
|
||||
table, so write IS granted in this fallback. The fail-closed
|
||||
property is for ELEVATED ops (admin scope), not for the basic
|
||||
write that operator has by default. This test pins the contract:
|
||||
config errors do not silently grant admin."""
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", side_effect=RuntimeError("boom")):
|
||||
# operator has memory.write → True (preserved behavior)
|
||||
assert a2a_tools_rbac.check_memory_write_permission() is True
|
||||
|
||||
|
||||
# ============== check_memory_read_permission ==============
|
||||
|
||||
class TestCheckMemoryReadPermission:
|
||||
def test_admin_grants_read(self):
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", return_value=_RBACCfg(roles=["admin"])):
|
||||
assert a2a_tools_rbac.check_memory_read_permission() is True
|
||||
|
||||
def test_read_only_grants_read(self):
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", return_value=_RBACCfg(roles=["read-only"])):
|
||||
assert a2a_tools_rbac.check_memory_read_permission() is True
|
||||
|
||||
def test_unknown_role_denies(self):
|
||||
"""A role that's not in ROLE_PERMISSIONS and not in
|
||||
allowed_actions overrides denies by default."""
|
||||
import a2a_tools_rbac
|
||||
with patch("config.load_config", return_value=_RBACCfg(roles=["random-undefined-role"])):
|
||||
assert a2a_tools_rbac.check_memory_read_permission() is False
|
||||
|
||||
|
||||
# ============== auth_headers_for_heartbeat ==============
|
||||
|
||||
class TestAuthHeadersForHeartbeat:
|
||||
def test_no_workspace_id_uses_legacy_path(self):
|
||||
"""No-arg call routes to platform_auth.auth_headers() — the
|
||||
legacy single-token path."""
|
||||
import a2a_tools_rbac
|
||||
called: dict[str, object] = {}
|
||||
|
||||
def fake_auth_headers(*args):
|
||||
called["args"] = args
|
||||
return {"Authorization": "Bearer legacy-token"}
|
||||
|
||||
with patch("platform_auth.auth_headers", fake_auth_headers):
|
||||
out = a2a_tools_rbac.auth_headers_for_heartbeat()
|
||||
assert out == {"Authorization": "Bearer legacy-token"}
|
||||
# Legacy path is auth_headers() with no arg
|
||||
assert called["args"] == ()
|
||||
|
||||
def test_with_workspace_id_routes_per_workspace(self):
|
||||
import a2a_tools_rbac
|
||||
called: dict[str, object] = {}
|
||||
|
||||
def fake_auth_headers(wsid):
|
||||
called["wsid"] = wsid
|
||||
return {"Authorization": f"Bearer tok-{wsid}"}
|
||||
|
||||
with patch("platform_auth.auth_headers", fake_auth_headers):
|
||||
out = a2a_tools_rbac.auth_headers_for_heartbeat("ws-abc")
|
||||
assert out == {"Authorization": "Bearer tok-ws-abc"}
|
||||
assert called["wsid"] == "ws-abc"
|
||||
|
||||
def test_returns_empty_when_platform_auth_missing(self, monkeypatch):
|
||||
"""Older installs without platform_auth get {} so callers don't
|
||||
crash — they'll just send unauthed and the platform 401 handler
|
||||
surfaces the real error."""
|
||||
import a2a_tools_rbac
|
||||
# Force ImportError by setting sys.modules entry to None
|
||||
monkeypatch.setitem(sys.modules, "platform_auth", None)
|
||||
out = a2a_tools_rbac.auth_headers_for_heartbeat("ws-1")
|
||||
assert out == {}
|
||||
|
||||
|
||||
# ============== ROLE_PERMISSIONS canonical table ==============
|
||||
|
||||
class TestRolePermissionsTable:
|
||||
def test_admin_has_all_actions(self):
|
||||
import a2a_tools_rbac
|
||||
assert a2a_tools_rbac.ROLE_PERMISSIONS["admin"] == {
|
||||
"delegate", "approve", "memory.read", "memory.write",
|
||||
}
|
||||
|
||||
def test_read_only_has_only_memory_read(self):
|
||||
import a2a_tools_rbac
|
||||
assert a2a_tools_rbac.ROLE_PERMISSIONS["read-only"] == {"memory.read"}
|
||||
|
||||
def test_no_delegation_is_missing_delegate(self):
|
||||
import a2a_tools_rbac
|
||||
assert "delegate" not in a2a_tools_rbac.ROLE_PERMISSIONS["no-delegation"]
|
||||
|
||||
def test_no_approval_is_missing_approve(self):
|
||||
import a2a_tools_rbac
|
||||
assert "approve" not in a2a_tools_rbac.ROLE_PERMISSIONS["no-approval"]
|
||||
Loading…
Reference in New Issue
Block a user