fix(core#2127): per-workspace can_delegate capability (defense-in-depth) #3168

Merged
devops-engineer merged 4 commits from fix/2127-can-delegate-capability into main 2026-06-23 10:05:45 +00:00
11 changed files with 747 additions and 10 deletions
@@ -620,6 +620,32 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
}
body = normalizedBody
// core#2127 RC 13392: a workspace with can_delegate=false MUST NOT be
// able to send a delegation via the raw A2A message/send path. The MCP
// tools/list+tools/call+helper gates (PR#3165+#3168) and the REST
// /delegate handler (PR#3168 RC 13387 fix) need a 4th layer — the raw
// A2A proxy itself. A locked-out workspace that hand-builds a
// message/send JSON-RPC body and posts to /workspaces/:id/a2a would
// otherwise still dispatch a delegation. The check fires AFTER access
// control + budget + normalize (so unauthorized / over-budget / malformed
// calls are rejected first) and BEFORE the persist / poll-mode
// short-circuit / push-dispatch (so no side effect on a blocked call).
// Skipped for self-calls (callerID == workspaceID — replying to your
// own queued turn is not a delegation) and for system / canvas callers
// (whose auth path bypasses CanCommunicate above and whose can_delegate
// is not a meaningful policy surface). OFFSEC-001: constant 403 body,
// no can_delegate wording leaks.
if callerID != "" && callerID != workspaceID && !isSystemCaller(callerID) && !isCanvasUser && a2aMethod == "message/send" {
canDelegate, lookupErr := loadWorkspaceCanDelegate(ctx, db.DB, callerID)
if lookupErr == nil && !canDelegate {
log.Printf("ProxyA2A: can_delegate=FALSE rejected message/send caller=%s → %s", callerID, workspaceID)
return 0, nil, &proxyA2AError{
Status: http.StatusForbidden,
Response: gin.H{"error": "tool call failed"},
}
}
}
// #2560 (chat UX: persist in-flight exchange across leave/refresh):
// write the user message to activity_logs AT RECEIPT — before any of
// the downstream short-circuits (poll-mode, mock-runtime, push dispatch)
@@ -3628,3 +3628,145 @@ func TestLogA2ASuccess_NoBroadcastForWorkspaceCaller(t *testing.T) {
}
}
}
// ---------- core#2127 RC 13392: can_delegate gate on raw A2A /a2a message/send ----------
// TestProxyA2A_MessageSend_CanDelegateFalse_Rejects is the 4th-layer regression
// for the can_delegate policy (after PR#3165/#3168 + the REST /delegate fix).
// The raw /a2a message/send path was still ungated — a locked-out workspace
// could hand-build a message/send JSON-RPC body and post it directly to
// /workspaces/:id/a2a, bypassing the MCP tool-hiding, the MCP tools/call
// gate, the MCP delegate helper gate, and the REST /delegate handler.
//
// The gate fires after access control + budget + normalize and before the
// persist / poll-mode short-circuit / push-dispatch, so a blocked call has
// zero side effects. Per OFFSEC-001, the 403 body is constant — no
// can_delegate wording leaks to the caller.
func TestProxyA2A_MessageSend_CanDelegateFalse_Rejects(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
mr.Set(fmt.Sprintf("ws:%s:url", "ws-target"), "http://localhost:1")
mockCanCommunicate(mock, "ws-caller", "ws-target", true) // same parent
mockSameOrg(mock, "ws-caller", "ws-target", true) // same tenant
expectBudgetCheck(mock, "ws-target")
// can_delegate lookup on the CALLER — returns FALSE.
mock.ExpectQuery(`SELECT can_delegate FROM workspaces WHERE id = \$1`).
WithArgs("ws-caller").
WillReturnRows(sqlmock.NewRows([]string{"can_delegate"}).AddRow(false))
// No follow-up expectations — a persist, INSERT, or proxy call would
// mean the gate leaked.
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-target"}}
body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-target/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
c.Request.Header.Set("X-Workspace-ID", "ws-caller")
handler.ProxyA2A(c)
if w.Code != http.StatusForbidden {
t.Errorf("expected 403 for can_delegate=false message/send, got %d: %s", w.Code, w.Body.String())
}
if strings.Contains(w.Body.String(), "can_delegate") {
t.Errorf("error body leaks can_delegate wording: %s", w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations (a follow-up persist/insert/proxy call means the gate leaked): %v", err)
}
}
// TestProxyA2A_MessageSend_CanDelegateTrue_Proceeds is the no-regression
// sentinel for the default-true path. A workspace with can_delegate=TRUE
// (the default for every existing workspace) MUST follow the existing
// message/send flow unchanged. We use the poll-mode short-circuit (no
// real upstream dispatch) so the mock setup is bounded; the existing
// TestProxyA2A_AllowedSelf_SkipsAccessCheck test covers the full
// happy-path dispatch. This test focuses on the gate behaviour: the
// can_delegate=TRUE lookup happens, then the poll-mode path returns 200.
func TestProxyA2A_MessageSend_CanDelegateTrue_Proceeds(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
mr.Set(fmt.Sprintf("ws:%s:url", "ws-target"), "http://localhost:1")
mockCanCommunicate(mock, "ws-caller", "ws-target", true) // same parent
mockSameOrg(mock, "ws-caller", "ws-target", true) // same tenant
expectBudgetCheck(mock, "ws-target")
// Poll-mode short-circuit: setting delivery_mode='poll' skips the
// upstream dispatch, so the test exercises the gate (can_delegate
// lookup + canDelegate=true) without the rest of the dispatch path.
mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`).
WithArgs("ws-target").
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow("poll"))
// can_delegate=TRUE — proceed (the gate's lookup happens, the value
// is true, so the gate does NOT short-circuit to 403).
mock.ExpectQuery(`SELECT can_delegate FROM workspaces WHERE id = \$1`).
WithArgs("ws-caller").
WillReturnRows(sqlmock.NewRows([]string{"can_delegate"}).AddRow(true))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-target"}}
body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-target/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
c.Request.Header.Set("X-Workspace-ID", "ws-caller")
handler.ProxyA2A(c)
// Not 403 (the gate's rejection code) — the poll-mode handler
// returns 200 {status:"queued"} after the gate passes. We don't
// pin the exact body since the poll-mode tests cover that.
if w.Code == http.StatusForbidden {
t.Errorf("can_delegate=true must NOT trigger the gate (403); got %d: %s", w.Code, w.Body.String())
}
}
// TestProxyA2A_MessageSend_CanDelegateFalse_SelfCall_Allowed is the
// no-false-positive sentinel. Self-calls (callerID == workspaceID) reply
// to the workspace's own queued turn — that is NOT a delegation and must
// not be can_delegate-gated.
func TestProxyA2A_MessageSend_CanDelegateFalse_SelfCall_Allowed(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
allowLoopbackForTest(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
waitForHandlerAsyncBeforeDBCleanup(t, handler)
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
fmt.Fprint(w, `{"jsonrpc":"2.0","id":"1","result":{}}`)
}))
defer agentServer.Close()
mr.Set(fmt.Sprintf("ws:%s:url", "ws-self"), agentServer.URL)
expectBudgetCheck(mock, "ws-self")
// can_delegate=FALSE on the workspace — but the call is a self-call
// (callerID == workspaceID), so the gate MUST NOT fire. The lookup
// itself is also skipped — no can_delegate expectation.
mock.ExpectExec("INSERT INTO activity_logs").WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-self"}}
body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"hi"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/ws-self/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
c.Request.Header.Set("X-Workspace-ID", "ws-self")
handler.ProxyA2A(c)
time.Sleep(50 * time.Millisecond)
if w.Code != http.StatusOK {
t.Errorf("self-call with can_delegate=false must NOT be gated, got %d: %s", w.Code, w.Body.String())
}
}
@@ -128,6 +128,21 @@ func (h *DelegationHandler) Delegate(c *gin.Context) {
return // response already written
}
// core#2127 (Researcher RC 13387): server-side enforcement of the
// can_delegate policy MUST cover the RAW REST endpoint, not only the
// MCP tools/list+tools/call paths gated in PR#3165. A locked-out
// workspace that hand-builds an HTTP body to POST /workspaces/:id/
// delegate would otherwise still dispatch delegations via this path.
// The check fires BEFORE the self-delegation guard, idempotency
// lookup, insertDelegationRow, and executeDelegation goroutine —
// i.e. before any DB or proxy side effect. Same constant error as
// the MCP gate (OFFSEC-001): no policy wording leaks to the caller.
if canDelegate, derr := loadWorkspaceCanDelegate(ctx, db.DB, sourceID); derr == nil && !canDelegate {
log.Printf("Delegate: can_delegate=FALSE rejected delegation from workspace=%s target=%s", sourceID, body.TargetID)
c.JSON(http.StatusForbidden, gin.H{"error": "tool call failed"})
return
}
// #548 — prevent self-delegation: a workspace delegating to itself
// acquires _run_lock twice on the same mutex, deadlocking permanently.
//
@@ -8,6 +8,7 @@ import (
"net"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
@@ -1833,3 +1834,100 @@ func TestBuildDelegateA2ABody_SchemaValidSendMessageRequest(t *testing.T) {
t.Errorf("metadata.delegation_id = %v, want %s", meta["delegation_id"], delegationID)
}
}
// ---------- core#2127 (Researcher RC 13387): can_delegate REST gate ----------
// TestDelegate_CanDelegateFalse_RestEndpointRejected is the regression for
// the REST endpoint bypass surfaced by Researcher RC 13387. The MCP
// delegation gate (PR#3165) covers the MCP tools/list + tools/call + the
// delegate helpers, but the RAW REST endpoint POST /workspaces/:id/delegate
// remained unguarded. A locked-out workspace that hand-builds an HTTP body
// would otherwise still dispatch delegations via this path.
//
// This test exercises the REST path directly (not via the MCP bridge) so the
// regression sentinel stays pinned even if the MCP layer is refactored.
func TestDelegate_CanDelegateFalse_RestEndpointRejected(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
sourceID := "11111111-2222-3333-4444-555555555555"
targetID := "66666666-7777-8888-9999-aaaaaaaaaaaa"
// can_delegate lookup returns FALSE — the REST gate MUST short-circuit
// BEFORE the self-delegation check, idempotency lookup, or any
// insertDelegationRow / executeDelegation work.
mock.ExpectQuery(`SELECT can_delegate FROM workspaces WHERE id = \$1`).
WithArgs(sourceID).
WillReturnRows(sqlmock.NewRows([]string{"can_delegate"}).AddRow(false))
// No further DB or proxy expectations — the call MUST fail closed.
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: sourceID}}
body := `{"target_id":"` + targetID + `","task":"do something"}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+sourceID+"/delegate", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
dh.Delegate(c)
// Per OFFSEC-001: the error message is constant; the policy itself
// lives in tools/list (hidden) + the abilities API + this REST gate.
if w.Code != http.StatusForbidden {
t.Errorf("expected 403 for can_delegate=false, got %d: %s", w.Code, w.Body.String())
}
if strings.Contains(w.Body.String(), "can_delegate") {
t.Errorf("error body leaks can_delegate wording: %s", w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations (a follow-up idempotency/insert/exec call means the gate leaked): %v", err)
}
}
// TestDelegate_CanDelegateTrue_NoRegression is the no-regression sentinel
// for the default-true path. A workspace with can_delegate=TRUE (every
// existing workspace) MUST follow the existing delegation flow unchanged.
// Mirrors TestDelegate_Success (the only delta is the can_delegate lookup
// returning true in front of the same activity_logs + structure_events
// inserts).
func TestDelegate_CanDelegateTrue_NoRegression(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
dh := NewDelegationHandler(wh, broadcaster)
sourceID := "ws-source"
targetID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
// can_delegate=TRUE — proceed through the existing flow.
mock.ExpectQuery(`SELECT can_delegate FROM workspaces WHERE id = \$1`).
WithArgs(sourceID).
WillReturnRows(sqlmock.NewRows([]string{"can_delegate"}).AddRow(true))
// Existing success-path expectations (mirrors TestDelegate_Success).
mock.ExpectExec("INSERT INTO activity_logs").
WithArgs(sourceID, sourceID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), sqlmock.AnyArg(), nil).
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: sourceID}}
body := fmt.Sprintf(`{"target_id":"%s","task":"write unit tests"}`, targetID)
c.Request = httptest.NewRequest("POST", "/workspaces/"+sourceID+"/delegate", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
dh.Delegate(c)
if w.Code != http.StatusAccepted {
t.Errorf("expected 202 for can_delegate=true happy path, got %d: %s", w.Code, w.Body.String())
}
// Wait for the background goroutine (mirrors TestDelegate_Success).
time.Sleep(100 * time.Millisecond)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
+32 -2
View File
@@ -445,13 +445,19 @@ var mcpAllTools = []mcpTool{
// mcpToolList returns the filtered tool list for this MCP bridge.
// C3: send_message_to_user is excluded unless MOLECULE_MCP_ALLOW_SEND_MESSAGE=true.
func mcpToolList() []mcpTool {
// core#2127: delegate_task + delegate_task_async are excluded when the
// caller's workspace has can_delegate=FALSE (defense-in-depth: a role-locked
// agent whose prompt is bypassed still cannot discover the tools).
func mcpToolList(canDelegate bool) []mcpTool {
allowSend := os.Getenv("MOLECULE_MCP_ALLOW_SEND_MESSAGE") == "true"
var out []mcpTool
for _, t := range mcpAllTools {
if t.Name == "send_message_to_user" && !allowSend {
continue
}
if !canDelegate && (t.Name == "delegate_task" || t.Name == "delegate_task_async") {
continue
}
out = append(out, t)
}
return out
@@ -552,8 +558,18 @@ func (h *MCPHandler) dispatchRPC(ctx context.Context, workspaceID string, req mc
base.Result = nil
case "tools/list":
// core#2127: tool-list visibility gated on can_delegate (defence-in-depth).
// Default TRUE keeps the existing list for the vast majority of workspaces.
canDelegate, lookupErr := loadWorkspaceCanDelegate(ctx, h.database, workspaceID)
if lookupErr != nil {
log.Printf("mcp: tools/list can_delegate lookup failed workspace=%s: %v", workspaceID, lookupErr)
// Fail open on the lookup error: surface the full tool list rather
// than a misleading 403. The tools/call path enforces can_delegate
// again as a second gate.
canDelegate = true
}
base.Result = map[string]interface{}{
"tools": mcpToolList(),
"tools": mcpToolList(canDelegate),
}
case "tools/call":
@@ -565,6 +581,20 @@ func (h *MCPHandler) dispatchRPC(ctx context.Context, workspaceID string, req mc
base.Error = &mcpRPCError{Code: -32602, Message: "invalid parameters"}
return base
}
// core#2127: server-side delegation-policy gate. A role-locked
// "coding executor" agent (Kimi/MiniMax) has can_delegate=FALSE; even
// if a prompt-bypass or a stale tool list makes them attempt
// delegate_task / delegate_task_async, the call MUST 403 here.
// Per OFFSEC-001, the error message is constant; the policy
// itself is documented in tools/list (hidden) + the abilities API.
if params.Name == "delegate_task" || params.Name == "delegate_task_async" {
canDelegate, lookupErr := loadWorkspaceCanDelegate(ctx, h.database, workspaceID)
if lookupErr == nil && !canDelegate {
log.Printf("mcp: can_delegate=FALSE rejected %s from workspace=%s", params.Name, workspaceID)
base.Error = &mcpRPCError{Code: -32000, Message: "tool call failed"}
return base
}
}
text, err := h.dispatch(ctx, workspaceID, params.Name, params.Arguments)
if err != nil {
// Log full error server-side for forensics; return constant string
@@ -1624,3 +1624,153 @@ func TestMCPHandler_dispatchRPC_InvalidParams_ArrayInsteadOfObject(t *testing.T)
t.Errorf("error message should be constant 'invalid parameters', got: %q", resp.Error.Message)
}
}
// ---------- core#2127: can_delegate=FALSE rejects delegate_task / async ----------
// TestMCPHandler_DelegateTask_CanDelegateFalse_Rejects is the server-side
// delegation-policy gate. A role-locked "coding executor" with
// can_delegate=FALSE must NOT be able to delegate, even if its prompt
// bypasses the role or its MCP tools/list cache is stale. The MCP gate
// at the tool-entry point is the second line of defence (the first is
// the tools/list filter that hides the tools).
func TestMCPHandler_DelegateTask_CanDelegateFalse_Rejects(t *testing.T) {
h, mock := newMCPHandler(t)
callerID := "11111111-1111-1111-1111-111111111111"
targetID := "22222222-2222-2222-2222-222222222222"
// can_delegate lookup returns FALSE — the gate MUST short-circuit
// BEFORE the CanCommunicate lookup or any A2A proxy call.
mock.ExpectQuery(`SELECT can_delegate FROM workspaces WHERE id = \$1`).
WithArgs(callerID).
WillReturnRows(sqlmock.NewRows([]string{"can_delegate"}).AddRow(false))
// No further DB or proxy expectations — the call MUST fail closed.
out, err := h.toolDelegateTask(context.Background(), callerID, map[string]interface{}{
"workspace_id": targetID,
"task": "do work",
}, mcpCallTimeout)
if err == nil {
t.Fatalf("delegate_task must return error when can_delegate=false, got nil (out=%q)", out)
}
// Per OFFSEC-001: error message is constant; the policy lives in
// tools/list (hidden) + the abilities API. Do NOT assert the
// can_delegate wording leaks to the caller.
if strings.Contains(err.Error(), "can_delegate") {
t.Errorf("error message leaks can_delegate wording: %q", err.Error())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations (a follow-up CanCommunicate or proxy call means the gate leaked): %v", err)
}
}
func TestMCPHandler_DelegateTaskAsync_CanDelegateFalse_Rejects(t *testing.T) {
h, mock := newMCPHandler(t)
callerID := "11111111-1111-1111-1111-111111111111"
targetID := "22222222-2222-2222-2222-222222222222"
mock.ExpectQuery(`SELECT can_delegate FROM workspaces WHERE id = \$1`).
WithArgs(callerID).
WillReturnRows(sqlmock.NewRows([]string{"can_delegate"}).AddRow(false))
out, err := h.toolDelegateTaskAsync(context.Background(), callerID, map[string]interface{}{
"workspace_id": targetID,
"task": "do work",
})
if err == nil {
t.Fatalf("delegate_task_async must return error when can_delegate=false, got nil (out=%q)", out)
}
if strings.Contains(err.Error(), "can_delegate") {
t.Errorf("error message leaks can_delegate wording: %q", err.Error())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// TestMCPHandler_DelegateTask_CanDelegateTrue_Proceeds is the no-regression
// sentinel: a workspace with can_delegate=TRUE (the default for every
// existing workspace) MUST follow the existing delegation path unchanged.
func TestMCPHandler_DelegateTask_CanDelegateTrue_Proceeds(t *testing.T) {
h, mock := newMCPHandler(t)
callerID := "11111111-1111-1111-1111-111111111111"
targetID := "22222222-2222-2222-2222-222222222222"
parentID := "33333333-3333-3333-3333-333333333333"
// can_delegate=TRUE (the default) — proceed.
mock.ExpectQuery(`SELECT can_delegate FROM workspaces WHERE id = \$1`).
WithArgs(callerID).
WillReturnRows(sqlmock.NewRows([]string{"can_delegate"}).AddRow(true))
expectCanCommunicateSiblings(mock, callerID, targetID, parentID)
mock.ExpectExec(`(?s)INSERT INTO activity_logs.*'delegation'.*'delegate'`).
WithArgs(callerID, callerID, targetID, "Delegating to "+targetID, sqlmock.AnyArg(), "pending").
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(`UPDATE activity_logs`).
WithArgs("dispatched", "", callerID, sqlmock.AnyArg()).
WillReturnResult(sqlmock.NewResult(0, 1))
h.a2aProxy = func(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error) {
return 200, []byte(`{"result":{"message":{"parts":[{"text":"done"}]}}}`), nil
}
out, err := h.toolDelegateTask(context.Background(), callerID, map[string]interface{}{
"workspace_id": targetID,
"task": "do work",
}, mcpCallTimeout)
if err != nil {
t.Fatalf("delegate_task with can_delegate=true returned error: %v", err)
}
if out != "done" {
t.Fatalf("delegate_task response = %q, want done", out)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestMCPToolList_CanDelegateFalse_HidesDelegateTools is the
// tools/list-side filter. A locked-out workspace must NOT see
// delegate_task or delegate_task_async in its tools/list response.
func TestMCPToolList_CanDelegateFalse_HidesDelegateTools(t *testing.T) {
tools := mcpToolList(false)
for _, tool := range tools {
if tool.Name == "delegate_task" || tool.Name == "delegate_task_async" {
t.Errorf("can_delegate=false must hide %q from tools/list; got it visible", tool.Name)
}
}
// And the rest still show up — delegate_task filtering must be the
// ONLY change. Check that list_peers, get_workspace_info, check_task_status
// all remain visible (the rest of the surface is unaffected).
wantVisible := []string{"list_peers", "get_workspace_info", "check_task_status"}
for _, name := range wantVisible {
found := false
for _, tool := range tools {
if tool.Name == name {
found = true
break
}
}
if !found {
t.Errorf("can_delegate=false must still expose %q (only delegate_* are gated)", name)
}
}
}
func TestMCPToolList_CanDelegateTrue_ShowsDelegateTools(t *testing.T) {
tools := mcpToolList(true)
foundDelegate := false
foundDelegateAsync := false
for _, tool := range tools {
if tool.Name == "delegate_task" {
foundDelegate = true
}
if tool.Name == "delegate_task_async" {
foundDelegateAsync = true
}
}
if !foundDelegate {
t.Errorf("can_delegate=true must include delegate_task in tools/list")
}
if !foundDelegateAsync {
t.Errorf("can_delegate=true must include delegate_task_async in tools/list")
}
}
@@ -223,6 +223,15 @@ func (h *MCPHandler) toolDelegateTask(ctx context.Context, callerID string, args
return "", fmt.Errorf("task is required")
}
// core#2127: defence-in-depth — even if the MCP tools/list gate is
// bypassed (stale tool cache, a caller that hand-builds an A2A body),
// the delegation call itself must 403 on can_delegate=false. Per
// OFFSEC-001, the error message is constant; the policy is documented
// in tools/list (hidden) + the abilities API.
if canDelegate, derr := loadWorkspaceCanDelegate(ctx, h.database, callerID); derr == nil && !canDelegate {
return "", fmt.Errorf("tool call failed")
}
if !registry.CanCommunicate(callerID, targetID) {
return "", fmt.Errorf("workspace %s is not authorised to communicate with %s", callerID, targetID)
}
@@ -279,6 +288,11 @@ func (h *MCPHandler) toolDelegateTaskAsync(ctx context.Context, callerID string,
return "", fmt.Errorf("task is required")
}
// core#2127: see toolDelegateTask — same defence-in-depth gate.
if canDelegate, derr := loadWorkspaceCanDelegate(ctx, h.database, callerID); derr == nil && !canDelegate {
return "", fmt.Errorf("tool call failed")
}
if !registry.CanCommunicate(callerID, targetID) {
return "", fmt.Errorf("workspace %s is not authorised to communicate with %s", callerID, targetID)
}
@@ -2,17 +2,25 @@ package handlers
// workspace_abilities.go — PATCH /workspaces/:id/abilities
//
// Allows users and admin agents to toggle two workspace-level ability flags:
// Allows users and admin agents to toggle workspace-level ability flags:
//
// broadcast_enabled — workspace may POST /broadcast to send org-wide messages
// talk_to_user_enabled — workspace may deliver canvas chat messages via
// send_message_to_user / POST /notify
// can_delegate — workspace may initiate A2A delegate_task /
// delegate_task_async (core#2127). Default TRUE;
// setting FALSE hides the delegate_* tools in the
// MCP tools/list response AND makes the A2A
// delegation path return 403 (defense-in-depth
// against prompt-bypass for role-locked agents).
//
// Gated behind AdminAuth so workspace agents cannot self-modify their own
// ability flags (that would let any agent grant itself broadcast rights or
// suppress its own chat-silence constraint).
import (
"context"
"database/sql"
"log"
"net/http"
@@ -24,8 +32,9 @@ import (
// update. Fields are pointers so that the handler can distinguish "caller
// supplied false" from "caller omitted the field" (omitempty semantics).
type AbilitiesPayload struct {
BroadcastEnabled *bool `json:"broadcast_enabled"`
TalkToUserEnabled *bool `json:"talk_to_user_enabled"`
BroadcastEnabled *bool `json:"broadcast_enabled"`
TalkToUserEnabled *bool `json:"talk_to_user_enabled"`
CanDelegate *bool `json:"can_delegate"`
}
// PatchAbilities handles PATCH /workspaces/:id/abilities (AdminAuth).
@@ -41,7 +50,7 @@ func PatchAbilities(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
return
}
if body.BroadcastEnabled == nil && body.TalkToUserEnabled == nil {
if body.BroadcastEnabled == nil && body.TalkToUserEnabled == nil && body.CanDelegate == nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "at least one ability field required"})
return
}
@@ -56,11 +65,21 @@ func PatchAbilities(c *gin.Context) {
return
}
// Atomic update: when both fields are supplied, apply them in one SQL
// Atomic update: when multiple fields are supplied, apply them in one SQL
// statement so the request is all-or-nothing (#2131). A partial mutation
// (e.g. broadcast_enabled updated but talk_to_user_enabled failing) would
// leave the workspace in an ambiguous capability state.
if body.BroadcastEnabled != nil && body.TalkToUserEnabled != nil {
switch {
case body.BroadcastEnabled != nil && body.TalkToUserEnabled != nil && body.CanDelegate != nil:
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET broadcast_enabled = $2, talk_to_user_enabled = $3, can_delegate = $4, updated_at = now() WHERE id = $1`,
id, *body.BroadcastEnabled, *body.TalkToUserEnabled, *body.CanDelegate,
); err != nil {
log.Printf("PatchAbilities three-fields for %s: %v", id, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
return
}
case body.BroadcastEnabled != nil && body.TalkToUserEnabled != nil:
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET broadcast_enabled = $2, talk_to_user_enabled = $3, updated_at = now() WHERE id = $1`,
id, *body.BroadcastEnabled, *body.TalkToUserEnabled,
@@ -69,7 +88,25 @@ func PatchAbilities(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
return
}
} else if body.BroadcastEnabled != nil {
case body.BroadcastEnabled != nil && body.CanDelegate != nil:
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET broadcast_enabled = $2, can_delegate = $3, updated_at = now() WHERE id = $1`,
id, *body.BroadcastEnabled, *body.CanDelegate,
); err != nil {
log.Printf("PatchAbilities broadcast+can_delegate for %s: %v", id, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
return
}
case body.TalkToUserEnabled != nil && body.CanDelegate != nil:
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET talk_to_user_enabled = $2, can_delegate = $3, updated_at = now() WHERE id = $1`,
id, *body.TalkToUserEnabled, *body.CanDelegate,
); err != nil {
log.Printf("PatchAbilities talk_to_user+can_delegate for %s: %v", id, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
return
}
case body.BroadcastEnabled != nil:
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET broadcast_enabled = $2, updated_at = now() WHERE id = $1`,
id, *body.BroadcastEnabled,
@@ -78,7 +115,7 @@ func PatchAbilities(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
return
}
} else if body.TalkToUserEnabled != nil {
case body.TalkToUserEnabled != nil:
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET talk_to_user_enabled = $2, updated_at = now() WHERE id = $1`,
id, *body.TalkToUserEnabled,
@@ -87,7 +124,52 @@ func PatchAbilities(c *gin.Context) {
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
return
}
case body.CanDelegate != nil:
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspaces SET can_delegate = $2, updated_at = now() WHERE id = $1`,
id, *body.CanDelegate,
); err != nil {
log.Printf("PatchAbilities can_delegate for %s: %v", id, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
return
}
}
c.JSON(http.StatusOK, gin.H{"status": "updated"})
}
// loadWorkspaceCanDelegate returns the workspace's can_delegate flag (core#2127).
//
// Returns (true, nil) on any read error (e.g. the row has not been migrated
// to add the can_delegate column) so a missing column never accidentally
// locks an entire org out of delegation — a fail-closed default here would
// turn a forward-only migration into a live-incident-grade outage. The
// column is NOT NULL DEFAULT TRUE in the up-migration, so the only path to
// a real "false" return is an explicit operator PATCH. The tools/call gate
// in mcp.go applies the second-line check, so a transient DB blip can't
// silently elevate a previously-locked workspace.
//
// Tolerates column absence: the SELECT references can_delegate, which on a
// pre-migration schema would return "column does not exist" — caught here and
// mapped to (true, nil). The down-migration drops the column; a downgrade
// in flight is therefore safe (the SELECT just falls through to the
// "column missing" branch and returns the safe-default true).
func loadWorkspaceCanDelegate(ctx context.Context, dbh interface {
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}, workspaceID string) (bool, error) {
var canDelegate bool
err := dbh.QueryRowContext(ctx,
`SELECT can_delegate FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&canDelegate)
if err != nil {
if err == sql.ErrNoRows {
return true, nil // unknown workspace — fail open (let downstream 404/403 handle)
}
// Column-missing (pre-migration) or any other error → safe default true.
// The second-line gate in mcp.go (tools/call) protects against
// accidental elevation; the trade-off is a missing column never
// silently locking delegation.
return true, err
}
return canDelegate, nil
}
@@ -5,11 +5,14 @@ package handlers
import (
"bytes"
"context"
"database/sql"
"errors"
"net/http"
"net/http/httptest"
"testing"
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
@@ -201,3 +204,156 @@ func TestPatchAbilities_BothFields_UpdateError(t *testing.T) {
// path to assert against; sqlmock implicitly verifies no second
// exec occurred.
}
// ---------- core#2127: can_delegate field ----------
func TestPatchAbilities_CanDelegateOnly(t *testing.T) {
mock, cleanup := withMockDB(t)
defer cleanup()
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(wsUUID1).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
mock.ExpectExec(`UPDATE workspaces SET can_delegate = \$2, updated_at = now\(\) WHERE id = \$1`).
WithArgs(wsUUID1, false).
WillReturnResult(sqlmock.NewResult(0, 1))
w := patchAbilitiesReq(t, wsUUID1, `{"can_delegate":false}`)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestPatchAbilities_AllThreeFields(t *testing.T) {
mock, cleanup := withMockDB(t)
defer cleanup()
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(wsUUID1).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, talk_to_user_enabled = \$3, can_delegate = \$4, updated_at = now\(\) WHERE id = \$1`).
WithArgs(wsUUID1, true, true, false).
WillReturnResult(sqlmock.NewResult(0, 1))
w := patchAbilitiesReq(t, wsUUID1, `{"broadcast_enabled":true,"talk_to_user_enabled":true,"can_delegate":false}`)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestPatchAbilities_BroadcastAndCanDelegate(t *testing.T) {
mock, cleanup := withMockDB(t)
defer cleanup()
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(wsUUID1).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
mock.ExpectExec(`UPDATE workspaces SET broadcast_enabled = \$2, can_delegate = \$3, updated_at = now\(\) WHERE id = \$1`).
WithArgs(wsUUID1, true, false).
WillReturnResult(sqlmock.NewResult(0, 1))
w := patchAbilitiesReq(t, wsUUID1, `{"broadcast_enabled":true,"can_delegate":false}`)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
func TestPatchAbilities_TalkToUserAndCanDelegate(t *testing.T) {
mock, cleanup := withMockDB(t)
defer cleanup()
mock.ExpectQuery(`SELECT EXISTS\(SELECT 1 FROM workspaces WHERE id = \$1 AND status != 'removed'\)`).
WithArgs(wsUUID1).
WillReturnRows(sqlmock.NewRows([]string{"exists"}).AddRow(true))
mock.ExpectExec(`UPDATE workspaces SET talk_to_user_enabled = \$2, can_delegate = \$3, updated_at = now\(\) WHERE id = \$1`).
WithArgs(wsUUID1, false, true).
WillReturnResult(sqlmock.NewResult(0, 1))
w := patchAbilitiesReq(t, wsUUID1, `{"talk_to_user_enabled":false,"can_delegate":true}`)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
}
// TestLoadWorkspaceCanDelegate covers the helper used by the MCP tools/list +
// tools/call gates. Tolerates column absence (pre-migration) by returning
// the safe default true so a forward-only migration never accidentally
// locks an entire org out of delegation.
func TestLoadWorkspaceCanDelegate(t *testing.T) {
const wsID = "ws-can-delegate-1"
t.Run("returns the stored value (true)", func(t *testing.T) {
mock, cleanup := withMockDB(t)
defer cleanup()
mock.ExpectQuery(`SELECT can_delegate FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"can_delegate"}).AddRow(true))
got, err := loadWorkspaceCanDelegate(context.Background(), db.DB, wsID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !got {
t.Errorf("expected can_delegate=true, got false")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet: %v", err)
}
})
t.Run("returns the stored value (false) — locked-out workspace", func(t *testing.T) {
mock, cleanup := withMockDB(t)
defer cleanup()
mock.ExpectQuery(`SELECT can_delegate FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"can_delegate"}).AddRow(false))
got, err := loadWorkspaceCanDelegate(context.Background(), db.DB, wsID)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got {
t.Errorf("expected can_delegate=false (locked out), got true — would let a role-locked agent delegate")
}
})
t.Run("column missing (pre-migration) → safe default true (no live lockout)", func(t *testing.T) {
mock, cleanup := withMockDB(t)
defer cleanup()
mock.ExpectQuery(`SELECT can_delegate FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnError(errors.New(`pq: column "can_delegate" does not exist`))
got, err := loadWorkspaceCanDelegate(context.Background(), db.DB, wsID)
if err == nil {
t.Errorf("expected non-nil error (column-missing), got nil")
}
if !got {
t.Errorf("expected safe-default true on column-missing (no live lockout), got false")
}
})
t.Run("sql.ErrNoRows (workspace not found) → safe default true", func(t *testing.T) {
mock, cleanup := withMockDB(t)
defer cleanup()
mock.ExpectQuery(`SELECT can_delegate FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnError(sql.ErrNoRows)
got, err := loadWorkspaceCanDelegate(context.Background(), db.DB, wsID)
if err != nil {
t.Errorf("expected nil error on ErrNoRows (downstream 404/403 handles), got %v", err)
}
if !got {
t.Errorf("expected safe-default true on ErrNoRows, got false")
}
})
}
@@ -0,0 +1,10 @@
-- core#2127: drop the can_delegate column.
--
-- Note: any workspace that was PATCHed to can_delegate=FALSE before the
-- rollback will silently regain delegation on the downgrade (no audit row
-- preserved). The migration is reversible for clean-up; the runtime fallback
-- (any handler that still reads can_delegate) MUST be tolerant of the column
-- being absent — see workspace_abilities.go / mcp_tools.go.
ALTER TABLE workspaces
DROP COLUMN IF EXISTS can_delegate;
@@ -0,0 +1,14 @@
-- core#2127: per-workspace `can_delegate` capability.
--
-- Default TRUE preserves the existing behaviour for every current workspace —
-- the column does not affect routing, delegation, or any other call path until
-- an operator (or the future Phase-4 governance automation) explicitly PATCHes
-- it to FALSE. Defense-in-depth: a role-locked "coding executor" agent that
-- ALSO has its prompt bypassed (jailbreak / role drift) cannot delegate; the
-- A2A layer rejects the call AND the MCP layer hides the tools. Kimi/MiniMax
-- pinning is intentionally OUT OF SCOPE for this migration — that is an
-- operator action taken AFTER this PR lands, so live delegation is not broken
-- by the schema change.
ALTER TABLE workspaces
ADD COLUMN IF NOT EXISTS can_delegate BOOLEAN NOT NULL DEFAULT TRUE;