Merge pull request #696 from Molecule-AI/fix/issue-682-684-683-auth-token-fixes

fix(security): metrics auth, token revocation hardening, A2A false-negative (#682 #683 #689)
This commit is contained in:
Hongming Wang 2026-04-17 05:47:08 -07:00 committed by GitHub
commit 87f2b9abb7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 328 additions and 28 deletions

View File

@ -0,0 +1,108 @@
# ADR-001: Admin endpoints accept any workspace bearer token
**Status:** Accepted — known risk, Phase-H remediation planned
**Date:** 2026-04-17
**Issue:** #684
**Tracking:** Phase-H — #710
## Context
The `AdminAuth` middleware validates callers by calling `ValidateAnyToken`, which
accepts any live workspace bearer token regardless of which workspace issued it.
There is no separation between workspace-scoped tokens (issued to individual
agents) and admin-scoped tokens (intended for platform operators).
This means any workspace agent that has been issued a token can reach every
admin-gated route on the platform.
## Decision
Proper token-tier separation (workspace vs. admin scope) is deferred to Phase-H.
The known risk is explicitly accepted. Mitigation controls are documented below.
## Blast radius — affected admin endpoints
A compromised workspace token grants unauthenticated-equivalent access to all
of the following:
| Endpoint | Impact |
|----------|--------|
| `GET /admin/workspaces/:id/test-token` | Mint a fresh bearer token for any workspace |
| `DELETE /workspaces/:id` | Delete any workspace and auto-revoke its tokens |
| `PUT /settings/secrets` / `POST /admin/secrets` | Overwrite any global secret (env-poisons every agent on restart) |
| `DELETE /settings/secrets/:key` / `DELETE /admin/secrets/:key` | Delete any global secret; same fan-out restart |
| `GET /settings/secrets` / `GET /admin/secrets` | Read all global secret keys (values masked, but key enumeration enables targeted attacks) |
| `GET /workspaces/:id/budget` + `PATCH /workspaces/:id/budget` | Read or clear any workspace's token budget |
| `GET /events` / `GET /events/:workspaceId` | Read the full structural event log across all workspaces |
| `POST /bundles/import` | Import an arbitrary workspace bundle — creates workspaces, injects secrets, overwrites configs |
| `GET /bundles/export/:id` | Exfiltrate full workspace bundle including config, secrets references, and files |
| `POST /org/import` | Instantiate an entire org template — creates multiple workspaces with arbitrary roles and secrets |
| `GET /org/templates` | Enumerate all org template names and their configured roles/system prompts |
| `POST /templates/import` | Write arbitrary files into `configsDir` (workspace template injection) |
| `GET /templates` | Enumerate all template names and metadata |
| `GET /admin/liveness` | Read platform subsystem health (ops intel) |
| `GET /admin/schedules/health` | Read cron scheduler health across all workspaces |
## Risk statement
**A single compromised workspace agent can achieve full platform takeover via
admin endpoints.**
Attack chain example:
1. Agent A's token is exfiltrated (e.g. via a prompt-injection in a delegated task).
2. Attacker calls `PUT /settings/secrets` to overwrite `CLAUDE_API_KEY` with a
controlled value.
3. Every non-paused workspace restarts and loads the poisoned key.
4. Attacker now controls the LLM backend for the entire platform.
Alternatively: call `POST /bundles/import` with a crafted bundle to inject a
malicious workspace with a pre-configured `initial_prompt` and elevated secrets.
## Current mitigations
- **Workspace isolation**`CanCommunicate()` in the A2A proxy limits which
workspaces can send tasks to which, reducing the blast radius of a single
compromised agent during normal operation.
- **Audit logging** — PR #651 writes all admin-route calls to `structure_events`.
Forensic recovery is possible after the fact.
- **`ValidateAnyToken` removed-workspace JOIN** — tokens belonging to deleted
workspaces are filtered at the DB layer (PR #682 defense-in-depth) so
post-deletion token replay is blocked.
- **`MOLECULE_ENV=production` gate** — hides the `/admin/workspaces/:id/test-token`
endpoint in production deployments unless `MOLECULE_ENABLE_TEST_TOKENS=1`.
## Phase-H remediation plan
Tracked in GitHub issue **#710**.
### Schema change
Add a `token_type` column to `workspace_auth_tokens`:
```sql
ALTER TABLE workspace_auth_tokens
ADD COLUMN IF NOT EXISTS token_type TEXT NOT NULL DEFAULT 'workspace'
CHECK (token_type IN ('workspace', 'admin'));
```
Admin tokens are minted only via a dedicated privileged endpoint that itself
requires an existing admin token or a one-time bootstrap secret.
### Middleware update
- `WorkspaceAuth` — continue accepting `token_type = 'workspace'` only.
- `AdminAuth` — require `token_type = 'admin'`. Workspace tokens rejected.
### Bootstrap flow
On first boot (no tokens exist), a single-use bootstrap secret is printed to
the server log. The operator uses it to mint the first admin token. Subsequent
admin tokens are minted by existing admin token holders. The fail-open path in
`HasAnyLiveTokenGlobal` is retired once Phase-H ships.
### Migration path
Phase-H is a breaking change for any automation that currently uses workspace
tokens against admin endpoints. A migration guide and a `MOLECULE_PHASE_H=1`
feature flag will be provided so operators can opt in before the strict
enforcement date.

View File

@ -275,11 +275,27 @@ func (h *WorkspaceHandler) proxyA2ARequest(ctx context.Context, workspaceID stri
defer resp.Body.Close()
// Read agent response (capped at 10MB)
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
if err != nil {
respBody, readErr := io.ReadAll(io.LimitReader(resp.Body, maxProxyResponseBody))
if readErr != nil {
// Do() succeeded, which means the target received the request and sent
// back response headers — delivery is confirmed. The body couldn't be
// fully read (connection drop, timeout mid-stream). Surface
// delivery_confirmed so callers can distinguish "not delivered" from
// "delivered, but response body lost" (#689). When delivery is confirmed,
// log the activity as successful (delivery happened) rather than leaving
// a false "failed" entry in the audit trail.
deliveryConfirmed := resp.StatusCode >= 200 && resp.StatusCode < 400
log.Printf("ProxyA2A: body read failed for %s (status=%d delivery_confirmed=%v bytes_read=%d): %v",
workspaceID, resp.StatusCode, deliveryConfirmed, len(respBody), readErr)
if logActivity && deliveryConfirmed {
h.logA2ASuccess(ctx, workspaceID, callerID, body, respBody, a2aMethod, resp.StatusCode, durationMs)
}
return 0, nil, &proxyA2AError{
Status: http.StatusBadGateway,
Response: gin.H{"error": "failed to read agent response"},
Status: http.StatusBadGateway,
Response: gin.H{
"error": "failed to read agent response",
"delivery_confirmed": deliveryConfirmed,
},
}
}

View File

@ -603,6 +603,83 @@ func TestProxyA2AError_BusyShape(t *testing.T) {
}
}
// ==================== ProxyA2A — body-read failure (delivery_confirmed) #689 ====================
//
// When Do() succeeds (target sent 2xx headers — delivery confirmed) but reading
// the response body fails (connection drop, mid-stream timeout), the proxy must:
// 1. Return 502 (caller can't get the response content)
// 2. Include "delivery_confirmed": true in the error body so callers can
// distinguish "not delivered" from "delivered, response body lost".
func TestProxyA2A_BodyReadFailure_DeliveryConfirmed(t *testing.T) {
mock := setupTestDB(t)
mr := setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
// Agent server: sends 200 OK headers + partial body, then closes the
// connection abruptly to simulate a mid-stream read failure.
agentServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Flush 200 headers immediately so Do() returns (resp, nil).
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
// Write partial JSON — just enough to prove the body was started,
// then hijack and close the connection so ReadAll fails.
if flusher, ok := w.(http.Flusher); ok {
io.WriteString(w, `{"result": "partial`) //nolint:errcheck
flusher.Flush()
}
// Hijack the underlying TCP connection and close it to simulate
// a mid-stream drop that causes io.ReadAll to return an error.
if hj, ok := w.(http.Hijacker); ok {
conn, _, _ := hj.Hijack()
if conn != nil {
conn.Close()
}
}
}))
defer agentServer.Close()
wsID := "ws-bodyreadfail"
mr.Set(fmt.Sprintf("ws:%s:url", wsID), agentServer.URL)
// Expect async activity log INSERT (logA2ASuccess is called because
// delivery_confirmed is true and the handler detected a 2xx status).
mock.ExpectExec("INSERT INTO activity_logs").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
body := `{"method":"message/send","params":{"message":{"role":"user","parts":[{"text":"ping"}]}}}`
c.Request = httptest.NewRequest("POST", "/workspaces/"+wsID+"/a2a", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")
handler.ProxyA2A(c)
time.Sleep(50 * time.Millisecond)
// Expect 502 (couldn't deliver the response content to the caller)
if w.Code != http.StatusBadGateway {
t.Errorf("expected 502, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("body not JSON: %v", err)
}
// delivery_confirmed must be true — Do() returned 2xx headers.
if v, _ := resp["delivery_confirmed"].(bool); !v {
t.Errorf(`expected "delivery_confirmed": true in response, got: %v`, resp)
}
if _, hasErr := resp["error"]; !hasErr {
t.Errorf(`expected "error" field in response body`)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
// ==================== validateCallerToken — Phase 30.5 ====================
// The A2A proxy validates the *caller's* token (not the target's) when the

View File

@ -486,22 +486,34 @@ func (h *DelegationHandler) ListDelegations(c *gin.Context) {
// --- helpers ---
// isTransientProxyError returns true when the proxy error looks like a
// restart-race condition worth retrying (connection refused, EOF, stale
// URL pointing at a dead ephemeral port, container-restart-triggered
// 503). Static 4xx errors (bad request, access denied, not found) are
// NOT retried — retrying them wastes the 8-second delay for no benefit.
// isTransientProxyError returns true when the proxy error is a restart-race
// condition worth retrying (connection refused, stale ephemeral-port URL after
// a container restart). Static 4xx and generic 5xx errors are NOT retried.
//
// 503 requires careful splitting (#689): the proxy emits two distinct 503 shapes
// that must be handled differently:
// - "restarting: true" — container was dead; restart triggered. The POST body
// was never delivered (dead container can't accept TCP). Safe to retry.
// - "busy: true" — agent is alive, mid-synthesis on a previous request. The
// POST body WAS likely delivered. Retrying double-delivers the message.
// Do NOT retry; surface the 503 to the caller instead.
func isTransientProxyError(err *proxyA2AError) bool {
if err == nil {
return false
}
// 503 is the explicit "container unreachable / restart triggered"
// response from a2a_proxy.go after its reactive health check.
// 502 is "failed to reach workspace agent" — the pre-reactive-check
// error for plain connection failures.
if err.Status == http.StatusServiceUnavailable || err.Status == http.StatusBadGateway {
// 502 = "failed to reach workspace agent" (connection refused / DNS failure).
// The message was NOT delivered. Safe to retry after reactive URL refresh (#74).
if err.Status == http.StatusBadGateway {
return true
}
// 503 with restarting:true = container died → message not delivered → retry.
// 503 with busy:true (or no flag) = agent alive → message may be delivered → no retry.
if err.Status == http.StatusServiceUnavailable {
if restart, ok := err.Response["restarting"].(bool); ok && restart {
return true
}
return false
}
return false
}

View File

@ -344,9 +344,19 @@ func TestIsTransientProxyError_RetriesOnRestartRaceStatuses(t *testing.T) {
expect bool
}{
{"nil", nil, false},
{"503 service unavailable (container restart triggered)",
&proxyA2AError{Status: http.StatusServiceUnavailable}, true},
{"502 bad gateway (connection refused)",
// 503 with restarting:true — container was dead; restart triggered.
// Message was NOT delivered (dead container). Safe to retry (#74).
{"503 container restart triggered — retry",
&proxyA2AError{Status: http.StatusServiceUnavailable, Response: gin.H{"restarting": true}}, true},
// 503 with busy:true — agent is alive, mid-synthesis on the delivered
// message. Retrying would double-deliver (#689). Must NOT retry.
{"503 agent busy (double-delivery risk) — no retry",
&proxyA2AError{Status: http.StatusServiceUnavailable, Response: gin.H{"busy": true, "retry_after": 30}}, false},
// 503 with no qualifying flag — conservative: don't retry.
{"503 plain (no restarting flag) — no retry",
&proxyA2AError{Status: http.StatusServiceUnavailable}, false},
// 502 = connection refused = message not delivered → safe to retry.
{"502 bad gateway (connection refused) — retry",
&proxyA2AError{Status: http.StatusBadGateway}, true},
{"404 workspace not found",
&proxyA2AError{Status: http.StatusNotFound}, false},

View File

@ -26,7 +26,8 @@ const hasAnyLiveTokenGlobalQuery = "SELECT COUNT.*FROM workspace_auth_tokens"
const validateTokenSelectQuery = "SELECT id, workspace_id.*FROM workspace_auth_tokens.*token_hash"
// validateAnyTokenQuery is matched for ValidateAnyToken (SELECT).
const validateAnyTokenSelectQuery = "SELECT id.*FROM workspace_auth_tokens.*token_hash"
// The JOIN on workspaces filters removed-workspace tokens (#682 defense-in-depth).
const validateAnyTokenSelectQuery = "SELECT t\\.id.*FROM workspace_auth_tokens t.*JOIN workspaces"
// validateTokenUpdateQuery is matched for the best-effort last_used_at UPDATE.
const validateTokenUpdateQuery = "UPDATE workspace_auth_tokens SET last_used_at"
@ -736,6 +737,54 @@ func TestCanvasOrBearer_TokensExist_CanvasOrigin_Passes(t *testing.T) {
}
}
// ────────────────────────────────────────────────────────────────────────────
// #682 defense-in-depth — ValidateAnyToken JOIN on workspaces
//
// Tokens belonging to 'removed' workspaces must be rejected by AdminAuth even
// if the token row itself is not yet revoked. The JOIN in ValidateAnyToken
// filters them at the DB layer before revoked_at is checked.
// ────────────────────────────────────────────────────────────────────────────
// TestAdminAuth_RemovedWorkspaceToken_Returns401 — a bearer token whose
// issuing workspace has status='removed' must not grant admin access.
// The JOIN in ValidateAnyToken filters the row out, resulting in ErrNoRows.
func TestAdminAuth_RemovedWorkspaceToken_Returns401(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
defer mockDB.Close()
removedToken := "token-from-removed-workspace"
removedHash := sha256.Sum256([]byte(removedToken))
// HasAnyLiveTokenGlobal: tokens exist (other workspaces are live).
mock.ExpectQuery(hasAnyLiveTokenGlobalQuery).
WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(1))
// ValidateAnyToken SELECT with JOIN — removed workspace filtered out → empty result.
mock.ExpectQuery(validateAnyTokenSelectQuery).
WithArgs(removedHash[:]).
WillReturnRows(sqlmock.NewRows([]string{"id"})) // empty: w.status='removed'
r := gin.New()
r.GET("/admin/secrets", AdminAuth(mockDB), func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"ok": true})
})
w := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, "/admin/secrets", nil)
req.Header.Set("Authorization", "Bearer "+removedToken)
r.ServeHTTP(w, req)
if w.Code != http.StatusUnauthorized {
t.Errorf("#682 removed-workspace token: expected 401, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestCanvasOrBearer_TokensExist_WrongOrigin_Returns401(t *testing.T) {
mockDB, mock, err := sqlmock.New()
if err != nil {

View File

@ -100,11 +100,14 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
c.JSON(200, gin.H{"subsystems": out})
})
// Prometheus metrics — exempt from rate limiter via separate registration
// (registered before Use(limiter) takes effect on this specific route — the
// middleware.Middleware() still records it for observability).
// Scrape with: curl http://localhost:8080/metrics
r.GET("/metrics", metrics.Handler())
// Prometheus metrics — gated behind AdminAuth (#683).
// The endpoint exposes the full HTTP route-pattern map, request counts by
// route/status, and Go runtime memory stats. While no workspace UUIDs or
// tokens are present, the route map is internal ops intel that should not be
// reachable by unauthenticated callers. Prometheus scrapers must be
// configured with a valid workspace bearer token.
// Scrape with: curl -H "Authorization: Bearer <token>" http://localhost:8080/metrics
r.GET("/metrics", middleware.AdminAuth(db.DB), metrics.Handler())
// Single-workspace read — open so canvas nodes can fetch their own state
// without a token (used by WorkspaceNode polling and health checks).

View File

@ -184,6 +184,10 @@ func HasAnyLiveTokenGlobal(ctx context.Context, db *sql.DB) (bool, error) {
// token (not scoped to a specific workspace). Used for admin/global routes
// where workspace-scoped auth is not applicable — any authenticated agent may
// access platform-wide settings.
//
// Defense-in-depth (#682): the JOIN on workspaces filters out tokens that
// belong to removed workspaces so that a deleted workspace's tokens cannot
// be replayed against admin endpoints.
func ValidateAnyToken(ctx context.Context, db *sql.DB, plaintext string) error {
if plaintext == "" {
return ErrInvalidToken
@ -192,8 +196,12 @@ func ValidateAnyToken(ctx context.Context, db *sql.DB, plaintext string) error {
var tokenID string
err := db.QueryRowContext(ctx, `
SELECT id FROM workspace_auth_tokens
WHERE token_hash = $1 AND revoked_at IS NULL
SELECT t.id
FROM workspace_auth_tokens t
JOIN workspaces w ON w.id = t.workspace_id
WHERE t.token_hash = $1
AND t.revoked_at IS NULL
AND w.status != 'removed'
`, hash[:]).Scan(&tokenID)
if err != nil {
return ErrInvalidToken

View File

@ -266,8 +266,8 @@ func TestValidateAnyToken_HappyPath(t *testing.T) {
t.Fatalf("IssueToken: %v", err)
}
// ValidateAnyToken: lookup by hash only (no workspace binding).
mock.ExpectQuery(`SELECT id FROM workspace_auth_tokens`).
// ValidateAnyToken: lookup by hash with removed-workspace JOIN.
mock.ExpectQuery(`SELECT t\.id.*FROM workspace_auth_tokens t.*JOIN workspaces`).
WithArgs(sqlmock.AnyArg()).
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("tok-id-global"))
// Best-effort last_used_at update.
@ -285,7 +285,7 @@ func TestValidateAnyToken_HappyPath(t *testing.T) {
func TestValidateAnyToken_UnknownTokenRejected(t *testing.T) {
db, mock := setupMock(t)
mock.ExpectQuery(`SELECT id FROM workspace_auth_tokens`).
mock.ExpectQuery(`SELECT t\.id.*FROM workspace_auth_tokens t.*JOIN workspaces`).
WillReturnError(sql.ErrNoRows)
if err := ValidateAnyToken(context.Background(), db, "not-a-real-token"); err != ErrInvalidToken {
@ -293,6 +293,23 @@ func TestValidateAnyToken_UnknownTokenRejected(t *testing.T) {
}
}
// TestValidateAnyToken_RemovedWorkspaceRejected — defense-in-depth (#682):
// a token belonging to a workspace with status='removed' must be rejected.
// The JOIN on workspaces filters it out before the revoked_at check, so the
// query returns no rows even though the token row itself is still live.
func TestValidateAnyToken_RemovedWorkspaceRejected(t *testing.T) {
db, mock := setupMock(t)
// JOIN with w.status != 'removed' causes no rows — same as ErrNoRows.
mock.ExpectQuery(`SELECT t\.id.*FROM workspace_auth_tokens t.*JOIN workspaces`).
WithArgs(sqlmock.AnyArg()).
WillReturnRows(sqlmock.NewRows([]string{"id"})) // empty: workspace is removed
err := ValidateAnyToken(context.Background(), db, "token-for-removed-workspace")
if err != ErrInvalidToken {
t.Errorf("removed workspace token: expected ErrInvalidToken, got %v", err)
}
}
func TestValidateAnyToken_EmptyTokenRejected(t *testing.T) {
db, _ := setupMock(t)
if err := ValidateAnyToken(context.Background(), db, ""); err != ErrInvalidToken {