forked from molecule-ai/molecule-core
The previous comment said "all share one IP bucket" — accurate before the keyFor refactor, slightly stale after it. The dev-mode rationale (bucket fills fast, blanks the page on a single-user dev box) is unchanged; only the bucket-key flavour text needed updating. Doc-only follow-up from #60's hostile self-review #3. No behavior change. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
172 lines
5.7 KiB
Go
172 lines
5.7 KiB
Go
// Package middleware provides HTTP middleware for the platform API.
|
|
package middleware
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
)
|
|
|
|
// RateLimiter implements a token bucket rate limiter keyed by tenant
|
|
// identity (org id, then bearer token, then client IP — see keyFor).
|
|
type RateLimiter struct {
|
|
mu sync.Mutex
|
|
buckets map[string]*bucket
|
|
rate int // tokens per interval
|
|
interval time.Duration
|
|
}
|
|
|
|
type bucket struct {
|
|
tokens int
|
|
lastReset time.Time
|
|
}
|
|
|
|
// NewRateLimiter creates a rate limiter with the given rate per interval.
|
|
// Pass a context to stop the cleanup goroutine on shutdown.
|
|
func NewRateLimiter(rate int, interval time.Duration, ctx context.Context) *RateLimiter {
|
|
rl := &RateLimiter{
|
|
buckets: make(map[string]*bucket),
|
|
rate: rate,
|
|
interval: interval,
|
|
}
|
|
go func() {
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
rl.mu.Lock()
|
|
cutoff := time.Now().Add(-10 * time.Minute)
|
|
for k, b := range rl.buckets {
|
|
if b.lastReset.Before(cutoff) {
|
|
delete(rl.buckets, k)
|
|
}
|
|
}
|
|
rl.mu.Unlock()
|
|
}
|
|
}
|
|
}()
|
|
return rl
|
|
}
|
|
|
|
// keyFor returns the bucket identifier for this request. Priority:
|
|
//
|
|
// 1. X-Molecule-Org-Id header — when present (CP-routed SaaS traffic),
|
|
// isolates tenants from each other regardless of the upstream proxy IP
|
|
// they all share.
|
|
// 2. SHA-256 of Authorization Bearer token — when present (per-workspace
|
|
// bearer, ADMIN_TOKEN, org-scoped API token). On a per-tenant Caddy
|
|
// box where the org-id header isn't attached, this still distinguishes
|
|
// distinct user sessions on the same egress IP.
|
|
// 3. ClientIP() — anonymous probes, /health scrapes, registry boot
|
|
// signals (when SetTrustedProxies(nil) is in effect, this is the
|
|
// direct TCP RemoteAddr — fine for the probe surface, not fine as a
|
|
// primary key behind a proxy, hence the priority order above).
|
|
//
|
|
// Mixing these namespaces is fine because they never collide: org ids
|
|
// are UUIDs ("org:..."), token hashes are 64-char hex ("tok:..."), IPs
|
|
// contain dots/colons ("ip:...").
|
|
//
|
|
// Security note on X-Molecule-Org-Id spoofing: the rate limiter runs
|
|
// BEFORE TenantGuard, so the org-id value here is unvalidated. A caller
|
|
// reaching workspace-server directly could spoof the header to drain
|
|
// another org's bucket. In production this surface is closed by the
|
|
// CP/Caddy front: tenant SGs reject :8080 from the public internet, and
|
|
// CP rewrites the header to the verified org. If a future deployment
|
|
// exposes :8080 directly, validate the org-id (e.g. against
|
|
// MOLECULE_ORG_ID) before keying on it, or move this middleware after
|
|
// TenantGuard. The token-hash and IP fallbacks are unspoofable.
|
|
//
|
|
// Issue #59 — replaces the previous IP-only keying that silently
|
|
// collapsed all canvas traffic into one bucket once #179 disabled
|
|
// proxy-header trust. See the issue for the deployment-shape analysis.
|
|
func (rl *RateLimiter) keyFor(c *gin.Context) string {
|
|
if orgID := strings.TrimSpace(c.GetHeader("X-Molecule-Org-Id")); orgID != "" {
|
|
return "org:" + orgID
|
|
}
|
|
if tok := bearerFromHeader(c.GetHeader("Authorization")); tok != "" {
|
|
return "tok:" + tokenKey(tok)
|
|
}
|
|
return "ip:" + c.ClientIP()
|
|
}
|
|
|
|
// Middleware returns a Gin middleware that rate limits per caller. The
|
|
// caller-key derivation lives in keyFor — see that function's doc for
|
|
// the priority list and rationale.
|
|
func (rl *RateLimiter) Middleware() gin.HandlerFunc {
|
|
return func(c *gin.Context) {
|
|
// Tier-1b dev-mode hatch — same gate as AdminAuth / WorkspaceAuth /
|
|
// discovery. On a local single-user Docker setup the 600-req/min
|
|
// bucket fills fast: a 15-workspace canvas + activity polling +
|
|
// approvals polling + A2A overlay + initial hydration all land in
|
|
// one bucket (whichever keyFor returns — typically the dev user's
|
|
// IP or shared admin token), so a minute of active use can trip
|
|
// 429 and blank the page. Gated by MOLECULE_ENV=development +
|
|
// empty ADMIN_TOKEN so SaaS production keeps the bucket.
|
|
if isDevModeFailOpen() {
|
|
c.Header("X-RateLimit-Limit", "unlimited")
|
|
c.Next()
|
|
return
|
|
}
|
|
|
|
key := rl.keyFor(c)
|
|
|
|
rl.mu.Lock()
|
|
b, exists := rl.buckets[key]
|
|
if !exists {
|
|
b = &bucket{tokens: rl.rate, lastReset: time.Now()}
|
|
rl.buckets[key] = b
|
|
}
|
|
|
|
// Reset tokens if interval has passed
|
|
if time.Since(b.lastReset) >= rl.interval {
|
|
b.tokens = rl.rate
|
|
b.lastReset = time.Now()
|
|
}
|
|
|
|
// Issue #105 — advertise the current bucket state so clients and
|
|
// monitoring tools can back off proactively. Headers are set on every
|
|
// response (both allowed and throttled) so they're observable against
|
|
// any endpoint — /health, /metrics, and every /workspaces/* route.
|
|
//
|
|
// The `reset` value is seconds until the current bucket refills,
|
|
// matching the RFC 6585 Retry-After spec for 429 responses and the
|
|
// de-facto X-RateLimit-Reset convention (GitHub, Stripe, etc.).
|
|
remaining := b.tokens - 1
|
|
if remaining < 0 {
|
|
remaining = 0
|
|
}
|
|
resetSeconds := int(time.Until(b.lastReset.Add(rl.interval)).Seconds())
|
|
if resetSeconds < 0 {
|
|
resetSeconds = 0
|
|
}
|
|
c.Header("X-RateLimit-Limit", strconv.Itoa(rl.rate))
|
|
c.Header("X-RateLimit-Remaining", strconv.Itoa(remaining))
|
|
c.Header("X-RateLimit-Reset", strconv.Itoa(resetSeconds))
|
|
|
|
if b.tokens <= 0 {
|
|
rl.mu.Unlock()
|
|
// Retry-After is the canonical 429 signal per RFC 6585.
|
|
c.Header("Retry-After", strconv.Itoa(resetSeconds))
|
|
c.JSON(http.StatusTooManyRequests, gin.H{
|
|
"error": "rate limit exceeded",
|
|
"retry_after": resetSeconds,
|
|
})
|
|
c.Abort()
|
|
return
|
|
}
|
|
|
|
b.tokens--
|
|
rl.mu.Unlock()
|
|
|
|
c.Next()
|
|
}
|
|
}
|