fix(ratelimit): tenant-aware bucket keying — close canvas 429 storm
Some checks failed
E2E API Smoke Test / E2E API Smoke Test (pull_request) Successful in 1m13s
CI / Platform (Go) (pull_request) Successful in 2m8s
CodeQL / Analyze (${{ matrix.language }}) (go) (pull_request) Successful in 0s
CodeQL / Analyze (${{ matrix.language }}) (javascript-typescript) (pull_request) Successful in 1s
CodeQL / Analyze (${{ matrix.language }}) (python) (pull_request) Successful in 1s
Retarget main PRs to staging / Retarget to staging (pull_request) Has been skipped
Block internal-flavored paths / Block forbidden paths (pull_request) Successful in 4s
CI / Detect changes (pull_request) Successful in 7s
E2E API Smoke Test / detect-changes (pull_request) Successful in 7s
E2E Staging Canvas (Playwright) / detect-changes (pull_request) Successful in 6s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 6s
Handlers Postgres Integration / detect-changes (pull_request) Successful in 6s
Runtime PR-Built Compatibility / detect-changes (pull_request) Successful in 7s
Harness Replays / detect-changes (pull_request) Successful in 7s
CI / Shellcheck (E2E scripts) (pull_request) Successful in 2s
CI / Python Lint & Test (pull_request) Successful in 3s
Handlers Postgres Integration / Handlers Postgres Integration (pull_request) Successful in 4s
Runtime PR-Built Compatibility / PR-built wheel + import smoke (pull_request) Successful in 3s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (pull_request) Successful in 5s
CI / Canvas (Next.js) (pull_request) Successful in 15s
CI / Canvas Deploy Reminder (pull_request) Has been skipped
Harness Replays / Harness Replays (pull_request) Failing after 39s

Closes #59.

Symptom: /workspaces/:id/activity returns 429 with rate-limit-exceeded
on hongming.moleculesai.app whenever multiple workspaces are visible
in the canvas. Single-tab, single-user, well within the documented
600 req/min budget — but every request collapsed into one bucket.

Root cause: workspace-server's RateLimiter keyed buckets on
c.ClientIP(). After issue #179 turned off proxy-header trust
(SetTrustedProxies(nil), correctly closing the XFF spoofing hole),
c.ClientIP() returns the TCP RemoteAddr — which in production is the
upstream proxy (Caddy on per-tenant EC2; CP/Vercel on the SaaS plane).
Every browser tab + every canvas consumer + every poll loop for every
tenant collapsed into one bucket.

Fix: bucket key derivation moves into a single keyFor helper that
mirrors the SSOT pattern of:
  - molecule-controlplane/internal/middleware/ratelimit.go (org > user > IP)
  - this package's own MCPRateLimiter (token-hash via tokenKey)

Priority: X-Molecule-Org-Id header → SHA-256(Authorization Bearer)
→ ClientIP. Token values are kept hashed in the bucket map so the
in-memory state can't become a token dump.

Tests:
  - TestKeyFor_OrgIdHeaderTrumpsBearerAndIP — priority order
  - TestKeyFor_BearerTokenWhenNoOrgId — middle tier + raw-token leak pin
  - TestKeyFor_IPFallbackWhenNoOrgIdNoBearer — anon probe path
  - TestRateLimit_TwoOrgsSameIP_IndependentBuckets — load-bearing
    regression (issue #59) — two tenants behind same upstream proxy
    must not share a bucket
  - TestRateLimit_TwoTokensSameIP_IndependentBuckets — same shape
    for the per-tenant Caddy box
  - TestRateLimit_SameOrgDifferentTokens_SharedBucket — counter-pin:
    rotating tokens within one org must NOT bypass the org's quota
  - TestRateLimit_Middleware_RoutesThroughKeyFor — AST gate, mirrors
    the SSOT gates established in #36/#10/#12

Mutation-tested:
  - strip org-id branch in keyFor → 3 tests fail
  - strip bearer-token branch → 2 tests fail
  - reintroduce direct c.ClientIP() in Middleware → 3 tests fail
    (including the AST gate)

Existing tests pass unchanged: dev-mode fail-open, X-RateLimit-*
headers (#105), Retry-After on 429 (#105), XFF anti-spoofing (#179).

No schema/API change. 429 response body and X-RateLimit-* headers
unchanged. RATE_LIMIT env var semantics unchanged.

Hostile self-review (three weakest spots) is in the issue body:
  1. one-shot Docker-inspect cost is now bucket-key derivation cost
     (string compare + SHA-256 of bearer); single-digit microseconds.
  2. X-Molecule-Org-Id is unvalidated at the rate-limiter layer —
     spoofing is closed by tenant SG + CP front; documented in
     keyFor's docstring with the conditions under which to revisit.
  3. cpProv-style SaaS surface is out of scope; CP's own limiter
     handles that hop.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
security-auditor 2026-05-07 14:51:08 -07:00
parent 1e1f4d635b
commit 9dda84d671
2 changed files with 358 additions and 10 deletions

View File

@ -5,17 +5,19 @@ import (
"context"
"net/http"
"strconv"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
)
// RateLimiter implements a simple token bucket rate limiter per IP.
// RateLimiter implements a token bucket rate limiter keyed by tenant
// identity (org id, then bearer token, then client IP — see keyFor).
type RateLimiter struct {
mu sync.Mutex
buckets map[string]*bucket
rate int // tokens per interval
mu sync.Mutex
buckets map[string]*bucket
rate int // tokens per interval
interval time.Duration
}
@ -42,9 +44,9 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate
case <-ticker.C:
rl.mu.Lock()
cutoff := time.Now().Add(-10 * time.Minute)
for ip, b := range rl.buckets {
for k, b := range rl.buckets {
if b.lastReset.Before(cutoff) {
delete(rl.buckets, ip)
delete(rl.buckets, k)
}
}
rl.mu.Unlock()
@ -54,7 +56,50 @@ func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *Rate
return rl
}
// Middleware returns a Gin middleware that rate limits by client IP.
// keyFor returns the bucket identifier for this request. Priority:
//
// 1. X-Molecule-Org-Id header — when present (CP-routed SaaS traffic),
// isolates tenants from each other regardless of the upstream proxy IP
// they all share.
// 2. SHA-256 of Authorization Bearer token — when present (per-workspace
// bearer, ADMIN_TOKEN, org-scoped API token). On a per-tenant Caddy
// box where the org-id header isn't attached, this still distinguishes
// distinct user sessions on the same egress IP.
// 3. ClientIP() — anonymous probes, /health scrapes, registry boot
// signals (when SetTrustedProxies(nil) is in effect, this is the
// direct TCP RemoteAddr — fine for the probe surface, not fine as a
// primary key behind a proxy, hence the priority order above).
//
// Mixing these namespaces is fine because they never collide: org ids
// are UUIDs ("org:..."), token hashes are 64-char hex ("tok:..."), IPs
// contain dots/colons ("ip:...").
//
// Security note on X-Molecule-Org-Id spoofing: the rate limiter runs
// BEFORE TenantGuard, so the org-id value here is unvalidated. A caller
// reaching workspace-server directly could spoof the header to drain
// another org's bucket. In production this surface is closed by the
// CP/Caddy front: tenant SGs reject :8080 from the public internet, and
// CP rewrites the header to the verified org. If a future deployment
// exposes :8080 directly, validate the org-id (e.g. against
// MOLECULE_ORG_ID) before keying on it, or move this middleware after
// TenantGuard. The token-hash and IP fallbacks are unspoofable.
//
// Issue #59 — replaces the previous IP-only keying that silently
// collapsed all canvas traffic into one bucket once #179 disabled
// proxy-header trust. See the issue for the deployment-shape analysis.
func (rl *RateLimiter) keyFor(c *gin.Context) string {
if orgID := strings.TrimSpace(c.GetHeader("X-Molecule-Org-Id")); orgID != "" {
return "org:" + orgID
}
if tok := bearerFromHeader(c.GetHeader("Authorization")); tok != "" {
return "tok:" + tokenKey(tok)
}
return "ip:" + c.ClientIP()
}
// Middleware returns a Gin middleware that rate limits per caller. The
// caller-key derivation lives in keyFor — see that function's doc for
// the priority list and rationale.
func (rl *RateLimiter) Middleware() gin.HandlerFunc {
return func(c *gin.Context) {
// Tier-1b dev-mode hatch — same gate as AdminAuth / WorkspaceAuth /
@ -70,13 +115,13 @@ func (rl *RateLimiter) Middleware() gin.HandlerFunc {
return
}
ip := c.ClientIP()
key := rl.keyFor(c)
rl.mu.Lock()
b, exists := rl.buckets[ip]
b, exists := rl.buckets[key]
if !exists {
b = &bucket{tokens: rl.rate, lastReset: time.Now()}
rl.buckets[ip] = b
rl.buckets[key] = b
}
// Reset tokens if interval has passed

View File

@ -0,0 +1,303 @@
package middleware
import (
"context"
"crypto/sha256"
"fmt"
"go/ast"
"go/parser"
"go/token"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/gin-gonic/gin"
)
// newTestLimiterForKeyFor — same shape as newTestLimiter in ratelimit_test.go
// but exposes the *gin.Engine and lets the caller inject headers per-request.
func newTestLimiterForKeyFor(t *testing.T, rate int) *gin.Engine {
t.Helper()
gin.SetMode(gin.TestMode)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
rl := NewRateLimiter(rate, 5*time.Second, ctx)
r := gin.New()
if err := r.SetTrustedProxies(nil); err != nil {
t.Fatalf("SetTrustedProxies: %v", err)
}
r.Use(rl.Middleware())
r.GET("/x", func(c *gin.Context) { c.String(http.StatusOK, "ok") })
return r
}
// TestKeyFor_OrgIdHeaderTrumpsBearerAndIP — when X-Molecule-Org-Id is set
// the bucket is keyed on it regardless of bearer token or IP. This is the
// load-bearing case for the production SaaS plane: every tenant routed
// through the same upstream proxy IP gets its own bucket because the
// CP attaches the org-id header.
func TestKeyFor_OrgIdHeaderTrumpsBearerAndIP(t *testing.T) {
gin.SetMode(gin.TestMode)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
rl := NewRateLimiter(2, 5*time.Second, ctx)
c, _ := gin.CreateTestContext(httptest.NewRecorder())
c.Request = httptest.NewRequest(http.MethodGet, "/x", nil)
c.Request.RemoteAddr = "10.0.0.1:1234"
c.Request.Header.Set("X-Molecule-Org-Id", "org-aaa")
c.Request.Header.Set("Authorization", "Bearer ignored-token-value")
got := rl.keyFor(c)
if got != "org:org-aaa" {
t.Errorf("keyFor with org-id header: got %q, want %q", got, "org:org-aaa")
}
}
// TestKeyFor_BearerTokenWhenNoOrgId — the per-tenant Caddy box path:
// no org-id header (canvas same-origin), but Authorization Bearer is
// always set by WorkspaceAuth-protected routes. Bucket keyed on the
// SHA-256 hex of the token so distinct sessions on the same egress IP
// get distinct buckets — and so the in-memory map can never become a
// token dump if the process is inspected.
func TestKeyFor_BearerTokenWhenNoOrgId(t *testing.T) {
gin.SetMode(gin.TestMode)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
rl := NewRateLimiter(2, 5*time.Second, ctx)
c, _ := gin.CreateTestContext(httptest.NewRecorder())
c.Request = httptest.NewRequest(http.MethodGet, "/x", nil)
c.Request.RemoteAddr = "10.0.0.1:1234"
c.Request.Header.Set("Authorization", "Bearer secret-token-abc")
got := rl.keyFor(c)
expectedHash := fmt.Sprintf("%x", sha256.Sum256([]byte("secret-token-abc")))
if got != "tok:"+expectedHash {
t.Errorf("keyFor with bearer-only: got %q, want %q", got, "tok:"+expectedHash)
}
// Critical security pin: raw token must never appear in the key.
if strings.Contains(got, "secret-token-abc") {
t.Errorf("keyFor leaked raw bearer token in bucket key: %q", got)
}
}
// TestKeyFor_IPFallbackWhenNoOrgIdNoBearer — anonymous probes (no auth,
// no tenant header) fall through to ClientIP keying. This is the only
// path that depended on the pre-#179 trust-XFF behaviour and is fine
// to keep IP-keyed because the surface is just /health, /buildinfo,
// and the registry-boot endpoints.
func TestKeyFor_IPFallbackWhenNoOrgIdNoBearer(t *testing.T) {
gin.SetMode(gin.TestMode)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
rl := NewRateLimiter(2, 5*time.Second, ctx)
c, _ := gin.CreateTestContext(httptest.NewRecorder())
c.Request = httptest.NewRequest(http.MethodGet, "/x", nil)
c.Request.RemoteAddr = "203.0.113.1:1234"
got := rl.keyFor(c)
// gin.ClientIP() strips the port — we just need to confirm the prefix
// and that the IP appears.
if !strings.HasPrefix(got, "ip:") {
t.Errorf("keyFor without auth/org headers: got %q, want prefix %q", got, "ip:")
}
if !strings.Contains(got, "203.0.113.1") {
t.Errorf("keyFor IP fallback: got %q, want to contain %q", got, "203.0.113.1")
}
}
// TestRateLimit_TwoOrgsSameIP_IndependentBuckets — the load-bearing
// regression test for issue #59. Two tenants behind the same upstream
// proxy must NOT share a bucket; the production SaaS-plane outage was
// every tenant collapsing to the proxy IP and saturating one bucket.
//
// Mutation invariant: removing the org-id branch from keyFor — say,
// returning "ip:" + c.ClientIP() unconditionally — collapses both
// tenants back into one bucket and this test fails on the 3rd
// request because it would 429 instead of 200.
func TestRateLimit_TwoOrgsSameIP_IndependentBuckets(t *testing.T) {
r := newTestLimiterForKeyFor(t, 2)
exhaust := func(orgID string) {
t.Helper()
for i := 0; i < 2; i++ {
req := httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "10.0.0.1:1234" // SAME upstream proxy IP
req.Header.Set("X-Molecule-Org-Id", orgID)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("setup orgID=%s req %d: want 200, got %d", orgID, i+1, w.Code)
}
}
}
exhaust("org-aaa")
// org-aaa is now at 0 tokens. org-bbb's bucket must be FRESH.
req := httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "10.0.0.1:1234"
req.Header.Set("X-Molecule-Org-Id", "org-bbb")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("org-bbb on same IP must have its own bucket: got %d, want 200 (issue #59 regression)", w.Code)
}
// Confirm org-aaa is still throttled — proves we're not just opening
// the gate to everyone.
req = httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "10.0.0.1:1234"
req.Header.Set("X-Molecule-Org-Id", "org-aaa")
w = httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusTooManyRequests {
t.Errorf("org-aaa exhausted bucket: want 429, got %d", w.Code)
}
}
// TestRateLimit_TwoTokensSameIP_IndependentBuckets — analog of the
// org-id case for the per-tenant Caddy box: two distinct user
// sessions on the same egress IP, distinguished only by their bearer
// tokens, must get independent buckets. This was the path Hongming
// hit on hongming.moleculesai.app — a single user with multiple
// browser tabs against one workspace-server box.
func TestRateLimit_TwoTokensSameIP_IndependentBuckets(t *testing.T) {
r := newTestLimiterForKeyFor(t, 2)
exhaust := func(token string) {
t.Helper()
for i := 0; i < 2; i++ {
req := httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "127.0.0.1:1234" // local Caddy proxy — same for both
req.Header.Set("Authorization", "Bearer "+token)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("setup token=%s req %d: want 200, got %d", token, i+1, w.Code)
}
}
}
exhaust("user-a-token")
req := httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "127.0.0.1:1234"
req.Header.Set("Authorization", "Bearer user-b-token")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("user-b token on same proxy IP must have its own bucket: got %d, want 200", w.Code)
}
}
// TestRateLimit_SameOrgDifferentTokens_SharedBucket — counter-pin:
// ensure org-id keying really does collapse all tokens within one
// org into one bucket. This is the desired behaviour: a tenant that
// mints multiple tokens shouldn't be able to circumvent its quota
// by rotating tokens between requests. (The same-IP-different-org
// test above proves we don't collapse ACROSS orgs; this one proves
// we DO collapse WITHIN one org.)
func TestRateLimit_SameOrgDifferentTokens_SharedBucket(t *testing.T) {
r := newTestLimiterForKeyFor(t, 2)
for _, tok := range []string{"token-1", "token-2"} {
req := httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "10.0.0.1:1234"
req.Header.Set("X-Molecule-Org-Id", "org-shared")
req.Header.Set("Authorization", "Bearer "+tok)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("setup tok=%s: want 200, got %d", tok, w.Code)
}
}
// Bucket should be exhausted now — third request, even with a fresh
// token, must 429 because the org-id is keying it.
req := httptest.NewRequest(http.MethodGet, "/x", nil)
req.RemoteAddr = "10.0.0.1:1234"
req.Header.Set("X-Molecule-Org-Id", "org-shared")
req.Header.Set("Authorization", "Bearer token-3")
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusTooManyRequests {
t.Errorf("rotating tokens within one org should NOT bypass the quota: got %d, want 429", w.Code)
}
}
// TestRateLimit_Middleware_RoutesThroughKeyFor is the AST gate (mirror
// of #36/#10/#12's gates). Pins the SSOT routing invariant:
// (*RateLimiter).Middleware MUST call rl.keyFor and MUST NOT carry a
// direct c.ClientIP() call (= the parallel-impl drift this PR fixes).
//
// Mutation invariant: a future PR that re-introduces direct IP keying
// in Middleware (`ip := c.ClientIP()`) makes this test fail. That's
// the signal to either (a) extend keyFor's contract to cover the new
// case OR (b) update this gate with an explicit reason. Either way the
// drift gets a reviewer's attention before shipping.
func TestRateLimit_Middleware_RoutesThroughKeyFor(t *testing.T) {
fset := token.NewFileSet()
file, err := parser.ParseFile(fset, "ratelimit.go", nil, parser.ParseComments)
if err != nil {
t.Fatalf("parse ratelimit.go: %v", err)
}
var fn *ast.FuncDecl
ast.Inspect(file, func(n ast.Node) bool {
f, ok := n.(*ast.FuncDecl)
if !ok {
return true
}
// Match `func (rl *RateLimiter) Middleware() ...`
if f.Name.Name != "Middleware" {
return true
}
if f.Recv == nil || len(f.Recv.List) != 1 {
return true
}
star, ok := f.Recv.List[0].Type.(*ast.StarExpr)
if !ok {
return true
}
if id, ok := star.X.(*ast.Ident); !ok || id.Name != "RateLimiter" {
return true
}
fn = f
return false
})
if fn == nil {
t.Fatal("(*RateLimiter).Middleware not found — was it renamed? update this gate or the SSOT routing assumption")
}
var (
callsKeyFor bool
callsClientIP bool
)
ast.Inspect(fn.Body, func(n ast.Node) bool {
call, ok := n.(*ast.CallExpr)
if !ok {
return true
}
sel, ok := call.Fun.(*ast.SelectorExpr)
if !ok {
return true
}
switch sel.Sel.Name {
case "keyFor":
callsKeyFor = true
case "ClientIP":
callsClientIP = true
}
return true
})
if !callsKeyFor {
t.Error("(*RateLimiter).Middleware must call rl.keyFor for SSOT bucket-key derivation — see issue #59. Found no keyFor call.")
}
if callsClientIP {
t.Error("(*RateLimiter).Middleware carries a direct c.ClientIP() call. This is the parallel-impl drift issue #59 fixed. " +
"Either route through rl.keyFor OR — if a new use case truly needs direct IP — extend keyFor's contract first and update this gate to allow the specific delta.")
}
}