fix(handlers): bypass CanCommunicate for canvas-user identity callers (#1674)
CI / all-required (pull_request) compensating
Block internal-flavored paths / Block forbidden paths (pull_request) Waiting to run
ci-arm64-advisory / fast-checks (pull_request) Waiting to run
CI / Detect changes (pull_request) Waiting to run
CI / Python Lint & Test (pull_request) Waiting to run
E2E API Smoke Test / detect-changes (pull_request) Waiting to run
E2E Chat / detect-changes (pull_request) Waiting to run
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Waiting to run
E2E Staging SaaS (full lifecycle) / pr-validate (pull_request) Waiting to run
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (pull_request) Waiting to run
Handlers Postgres Integration / detect-changes (pull_request) Waiting to run
Harness Replays / detect-changes (pull_request) Waiting to run
lint-continue-on-error-tracking / lint-continue-on-error-tracking (pull_request) Waiting to run
Lint curl status-code capture / Scan workflows for curl status-capture pollution (pull_request) Waiting to run
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (pull_request) Waiting to run
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (pull_request) Waiting to run
Lint pre-flip continue-on-error / Verify continue-on-error flips have run-log proof (pull_request) Waiting to run
lint-required-context-exists-in-bp / lint-required-context-exists-in-bp (pull_request) Waiting to run
lint-required-no-paths / lint-required-no-paths (pull_request) Waiting to run
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (pull_request) Waiting to run
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (pull_request) Waiting to run
Secret scan / Scan diff for credential-shaped strings (pull_request) Waiting to run
gate-check-v3 / gate-check (pull_request) Waiting to run
qa-review / approved (pull_request) Waiting to run
security-review / approved (pull_request) Waiting to run
sop-checklist / all-items-acked (pull_request) Waiting to run
sop-checklist / review-refire (pull_request) Waiting to run
sop-tier-check / tier-check (pull_request) Waiting to run
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (pull_request) Successful in 9s
audit-force-merge / audit (pull_request) Successful in 7s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Has been cancelled
CI / Platform (Go) (pull_request) Has been cancelled
CI / Canvas (Next.js) (pull_request) Has been cancelled
CI / Shellcheck (E2E scripts) (pull_request) Has been cancelled
CI / Canvas Deploy Reminder (pull_request) Has been cancelled
E2E API Smoke Test / E2E API Smoke Test (pull_request) Has been cancelled
E2E Chat / E2E Chat (pull_request) Has been cancelled
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Has been cancelled
Harness Replays / Harness Replays (pull_request) Has been cancelled

Post-RFC#637, canvas users send X-Workspace-ID (their identity workspace
UUID). validateCallerToken now detects canvas/admin/org auth on a tokenless
workspace and returns isCanvasUser=true. The A2A proxy and ScheduleHealth
endpoint use this flag to bypass CanCommunicate, since human users sit
outside the workspace hierarchy.

Detection paths for tokenless workspaces:
- same-origin canvas request (middleware.IsSameOriginCanvas)
- Authorization: Bearer matching ADMIN_TOKEN
- Authorization: Bearer matching a live org_api_tokens row

Also fixes mcp_tools.go which was calling MCPHandler.proxyA2ARequest
(5 args) with an extra argument.

Fixes #1674

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Molecule AI Dev Engineer A (Kimi)
2026-05-24 00:49:46 +00:00
parent 600f88b172
commit 226698239f
7 changed files with 154 additions and 25 deletions
@@ -228,7 +228,7 @@ func (e *proxyA2AError) Error() string {
// cron scheduler and other internal callers that need to send A2A messages
// to workspaces programmatically (not from an HTTP handler).
func (h *WorkspaceHandler) ProxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, error) {
status, resp, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, logActivity)
status, resp, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, logActivity, false)
if proxyErr != nil {
return status, resp, proxyErr
}
@@ -307,13 +307,21 @@ func (h *WorkspaceHandler) ProxyA2A(c *gin.Context) {
// The bind is strict: the token must match `callerID`, not
// `workspaceID` (the target). A compromised token from workspace A
// must never authenticate calls from A pretending to be B.
if callerID != "" && callerID != workspaceID {
if err := validateCallerToken(ctx, c, callerID); err != nil {
//
// Post-RFC#637: canvas users now send X-Workspace-ID (their identity
// workspace). validateCallerToken detects canvas/admin auth on a
// tokenless workspace and returns isCanvasUser=true so the proxy can
// bypass CanCommunicate (human users sit outside the hierarchy).
isCanvasUser := false
if callerID != "" && callerID != workspaceID && !isSystemCaller(callerID) {
var err error
isCanvasUser, err = validateCallerToken(ctx, c, callerID)
if err != nil {
return // response already written with 401
}
}
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, true)
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, body, callerID, true, isCanvasUser)
if proxyErr != nil {
for k, v := range proxyErr.Headers {
c.Header(k, v)
@@ -352,11 +360,13 @@ func (h *WorkspaceHandler) checkWorkspaceBudget(ctx context.Context, workspaceID
return nil
}
func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool) (int, []byte, *proxyA2AError) {
func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID string, body []byte, callerID string, logActivity bool, isCanvasUser bool) (int, []byte, *proxyA2AError) {
// Access control: workspace-to-workspace requests must pass CanCommunicate check.
// Canvas requests (callerID == "") and system callers (webhook:*, system:*, test:*)
// are trusted. Self-calls (callerID == workspaceID) are always allowed.
if callerID != "" && callerID != workspaceID && !isSystemCaller(callerID) {
// Post-RFC#637: canvas-user identity workspaces also bypass CanCommunicate
// because human users sit outside the org hierarchy.
if callerID != "" && callerID != workspaceID && !isSystemCaller(callerID) && !isCanvasUser {
if !registry.CanCommunicate(callerID, workspaceID) {
log.Printf("ProxyA2A: access denied %s → %s", callerID, workspaceID)
return 0, nil, &proxyA2AError{
@@ -5,17 +5,21 @@ package handlers
import (
"context"
"crypto/subtle"
"database/sql"
"encoding/json"
"errors"
"log"
"net/http"
"os"
"strconv"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/orgtoken"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/gin-gonic/gin"
)
@@ -383,31 +387,53 @@ func nilIfEmpty(s string) *string {
// (their next /registry/register will mint their first token, after
// which this branch never fires again for them).
//
// Post-RFC#637 addition: when the tokenless workspace is accompanied by
// canvas or admin auth (same-origin request, admin bearer, or org-level
// token), the caller is identified as a canvas-user identity rather than
// a legacy peer agent. The returned isCanvasUser flag lets the A2A proxy
// bypass CanCommunicate for human users, who sit outside the workspace
// hierarchy.
//
// On auth failure this writes the 401 via c and returns an error so the
// handler aborts without running the proxy.
func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) error {
hasLive, err := wsauth.HasAnyLiveToken(ctx, db.DB, callerID)
if err != nil {
func validateCallerToken(ctx context.Context, c *gin.Context, callerID string) (isCanvasUser bool, err error) {
hasLive, dbErr := wsauth.HasAnyLiveToken(ctx, db.DB, callerID)
if dbErr != nil {
// Fail-open here matches the heartbeat path — A2A caller auth is
// defense-in-depth on top of access-control hierarchy, not the
// sole gate on the secret material. A DB hiccup shouldn't take
// the whole A2A path down.
log.Printf("wsauth: caller HasAnyLiveToken(%s) failed: %v — allowing A2A", callerID, err)
return nil
log.Printf("wsauth: caller HasAnyLiveToken(%s) failed: %v — allowing A2A", callerID, dbErr)
return false, nil
}
if !hasLive {
return nil // legacy / pre-upgrade caller
// Tokenless workspace — could be legacy/pre-upgrade caller or
// canvas-user identity. Distinguish by request auth signals.
if middleware.IsSameOriginCanvas(c) {
return true, nil
}
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
if tok != "" {
adminSecret := os.Getenv("ADMIN_TOKEN")
if adminSecret != "" && subtle.ConstantTimeCompare([]byte(tok), []byte(adminSecret)) == 1 {
return true, nil
}
if _, _, _, err := orgtoken.Validate(ctx, db.DB, tok); err == nil {
return true, nil
}
}
return false, nil // legacy / pre-upgrade caller
}
tok := wsauth.BearerTokenFromHeader(c.GetHeader("Authorization"))
if tok == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "missing caller auth token"})
return errInvalidCallerToken
return false, errInvalidCallerToken
}
if err := wsauth.ValidateToken(ctx, db.DB, callerID, tok); err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid caller auth token"})
return err
return false, err
}
return nil
return false, nil
}
// errInvalidCallerToken is a sentinel for validateCallerToken's "missing
@@ -1112,9 +1112,13 @@ func TestValidateCallerToken_LegacyCallerGrandfathered(t *testing.T) {
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/workspaces/x/a2a", bytes.NewBufferString("{}"))
if err := validateCallerToken(context.Background(), c, "ws-legacy"); err != nil {
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-legacy")
if err != nil {
t.Errorf("legacy caller should grandfather through; got %v", err)
}
if isCanvasUser {
t.Errorf("legacy caller should NOT be identified as canvas user")
}
if w.Code != 200 {
// gin default before c.JSON is 200; we want no error response written
if w.Body.Len() != 0 {
@@ -1136,10 +1140,13 @@ func TestValidateCallerToken_MissingTokenWhenOnFile(t *testing.T) {
c.Request = httptest.NewRequest("POST", "/workspaces/x/a2a", bytes.NewBufferString("{}"))
// No Authorization header set
err := validateCallerToken(context.Background(), c, "ws-authed")
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-authed")
if err == nil {
t.Fatal("expected error for missing token")
}
if isCanvasUser {
t.Errorf("authed workspace with missing token should NOT be canvas user")
}
if w.Code != http.StatusUnauthorized {
t.Errorf("expected 401, got %d", w.Code)
}
@@ -1164,9 +1171,13 @@ func TestValidateCallerToken_InvalidToken(t *testing.T) {
req.Header.Set("Authorization", "Bearer wrong")
c.Request = req
if err := validateCallerToken(context.Background(), c, "ws-authed"); err == nil {
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-authed")
if err == nil {
t.Fatal("expected error for bad token")
}
if isCanvasUser {
t.Errorf("authed workspace with bad token should NOT be canvas user")
}
if w.Code != http.StatusUnauthorized {
t.Errorf("expected 401, got %d", w.Code)
}
@@ -1192,9 +1203,13 @@ func TestValidateCallerToken_ValidToken(t *testing.T) {
req.Header.Set("Authorization", "Bearer goodtok")
c.Request = req
if err := validateCallerToken(context.Background(), c, "ws-authed"); err != nil {
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-authed")
if err != nil {
t.Errorf("valid token should pass; got %v", err)
}
if isCanvasUser {
t.Errorf("authed workspace with valid token should NOT be canvas user")
}
}
func TestValidateCallerToken_WrongWorkspaceBindingRejected(t *testing.T) {
@@ -1216,14 +1231,86 @@ func TestValidateCallerToken_WrongWorkspaceBindingRejected(t *testing.T) {
req.Header.Set("Authorization", "Bearer tok-for-A")
c.Request = req
if err := validateCallerToken(context.Background(), c, "ws-b-attacker"); err == nil {
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-b-attacker")
if err == nil {
t.Fatal("token from A must not authenticate caller B")
}
if isCanvasUser {
t.Errorf("cross-workspace token replay should NOT be identified as canvas user")
}
if w.Code != http.StatusUnauthorized {
t.Errorf("expected 401, got %d", w.Code)
}
}
func TestValidateCallerToken_CanvasUser_AdminToken(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
// Tokenless workspace
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
WithArgs("ws-canvas-admin").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
t.Setenv("ADMIN_TOKEN", "admin-secret-42")
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
req := httptest.NewRequest("POST", "/workspaces/x/a2a", bytes.NewBufferString("{}"))
req.Header.Set("Authorization", "Bearer admin-secret-42")
c.Request = req
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-canvas-admin")
if err != nil {
t.Errorf("admin token should identify canvas user; got error: %v", err)
}
if !isCanvasUser {
t.Errorf("admin token bearer should be identified as canvas user")
}
if w.Code != 200 || w.Body.Len() != 0 {
t.Errorf("admin token path should not write a response body; got %d: %s", w.Code, w.Body.String())
}
}
func TestValidateCallerToken_CanvasUser_OrgToken(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
// Tokenless workspace
mock.ExpectQuery(`SELECT COUNT\(\*\) FROM workspace_auth_tokens`).
WithArgs("ws-canvas-org").
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0))
// orgtoken.Validate lookup
mock.ExpectQuery(`SELECT id, prefix, org_id FROM org_api_tokens WHERE token_hash = .* AND revoked_at IS NULL`).
WithArgs(sqlmock.AnyArg()).
WillReturnRows(sqlmock.NewRows([]string{"id", "prefix", "org_id"}).AddRow("orgtok-1", "pref1234", "org-1"))
mock.ExpectExec(`UPDATE org_api_tokens SET last_used_at`).
WithArgs("orgtok-1").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
req := httptest.NewRequest("POST", "/workspaces/x/a2a", bytes.NewBufferString("{}"))
req.Header.Set("Authorization", "Bearer org-token-plaintext-xyz")
c.Request = req
isCanvasUser, err := validateCallerToken(context.Background(), c, "ws-canvas-org")
if err != nil {
t.Errorf("org token should identify canvas user; got error: %v", err)
}
if !isCanvasUser {
t.Errorf("org token bearer should be identified as canvas user")
}
if w.Code != 200 || w.Body.Len() != 0 {
t.Errorf("org token path should not write a response body; got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// --- Direct unit tests for normalizeA2APayload (extracted from proxyA2ARequest) ---
func TestNormalizeA2APayload_InvalidJSON(t *testing.T) {
@@ -333,7 +333,7 @@ func (h *WorkspaceHandler) DrainQueueForWorkspace(ctx context.Context, workspace
}
// logActivity=false: the original EnqueueA2A callsite already logged
// the dispatch attempt; re-logging here would double-count events.
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false)
status, respBody, proxyErr := h.proxyA2ARequest(ctx, workspaceID, item.Body, callerID, false, false)
// 202 Accepted = the dispatch was itself queued again (target still busy).
// That's not a failure — the queued item just stays queued naturally on
@@ -389,7 +389,7 @@ func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, tar
})
log.Printf("Delegation %s: step=proxying_a2a_request", delegationID)
status, respBody, proxyErr := h.workspace.proxyA2ARequest(ctx, targetID, a2aBody, sourceID, true)
status, respBody, proxyErr := h.workspace.proxyA2ARequest(ctx, targetID, a2aBody, sourceID, true, false)
log.Printf("Delegation %s: step=proxy_done status=%d bodyLen=%d err=%v", delegationID, status, len(respBody), proxyErr)
// When proxyA2ARequest returns an error but we have a non-empty response body
@@ -418,7 +418,7 @@ func (h *DelegationHandler) executeDelegation(ctx context.Context, sourceID, tar
case <-ctx.Done():
// outer timeout hit before retry window elapsed
case <-time.After(delegationRetryDelay):
status, respBody, proxyErr = h.workspace.proxyA2ARequest(ctx, targetID, a2aBody, sourceID, true)
status, respBody, proxyErr = h.workspace.proxyA2ARequest(ctx, targetID, a2aBody, sourceID, true, false)
}
}
@@ -470,14 +470,19 @@ func (h *ScheduleHandler) Health(c *gin.Context) {
// Validate the caller's own bearer token (Phase 30.5 contract).
// Skip for system callers and self-calls, same as the A2A proxy.
// Post-RFC#637: canvas users may read schedule health too.
isCanvasUser := false
if !isSystemCaller(callerID) && callerID != workspaceID {
if err := validateCallerToken(ctx, c, callerID); err != nil {
var err error
isCanvasUser, err = validateCallerToken(ctx, c, callerID)
if err != nil {
return // response already written with 401
}
}
// CanCommunicate gate — only peers in the org hierarchy may read health.
if callerID != workspaceID && !isSystemCaller(callerID) {
// Canvas users (human operators) bypass this gate.
if callerID != workspaceID && !isSystemCaller(callerID) && !isCanvasUser {
if !registry.CanCommunicate(callerID, workspaceID) {
log.Printf("ScheduleHealth: access denied %s → %s", callerID, workspaceID)
c.JSON(http.StatusForbidden, gin.H{"error": "access denied"})
@@ -107,6 +107,7 @@ func (h *WebhookHandler) GitHub(c *gin.Context) {
forwardBody,
"webhook:github",
true,
false,
)
if proxyErr != nil {
c.JSON(proxyErr.Status, proxyErr.Response)