forked from molecule-ai/molecule-core
perf(wsauth): in-process cache for platform_inbound_secret reads
Heartbeats fire every 60s per workspace and were the dominant caller of ReadPlatformInboundSecret — one DB SELECT each, purely to redeliver the same value. For an N-workspace fleet that's N SELECTs/minute of pure overhead, growing linearly with the fleet (#189). This adds a sync.Map cache keyed by workspaceID with a 5-minute TTL: - **Read-through**: cache miss → DB SELECT → populate → return. - **Write-through**: every IssuePlatformInboundSecret call refreshes the cache with the new value before returning, so the lazy-heal mint path (readOrLazyHealInboundSecret) doesn't see a stale read of the value it just wrote. - **TTL eviction**: 5 minutes — generous enough that the heartbeat hot path hits cache for ~5 reads in a row before re-validating, short enough that an out-of-band rotation (operator running `UPDATE workspaces SET platform_inbound_secret=...` directly) propagates within minutes without requiring a redeploy. - **Absence not cached**: ErrNoInboundSecret skips the cache write so the lazy-heal recovery contract for the column-NULL case (readOrLazyHealInboundSecret in workspace_provision_shared.go) keeps working. Memory footprint is bounded by the active workspace fleet (~200 bytes per entry); deleted workspaces leave dead entries until process restart, acceptable given workspace-deletion is operator-rare. Why in-process instead of Redis: workspace-server runs as a single Railway service today (per memory project_controlplane_ownership); adding Redis for this single column read would be over-engineering. The cache is a self-contained, Redis-free upgrade that keeps the same semantic surface (read returns the latest secret) while collapsing the heartbeat read storm. If the deployment ever fans out across replicas, an operator-side rotation propagates per-replica TTL-bounded without needing a shared write log. Tests: 5 new cases covering cache hit within TTL, refresh after TTL (simulating an operator rotation via SQL), write-through on Issue, absence-not-cached, and Reset clearing all entries. The setupMock helper in wsauth and setupTestDB helper in handlers both call ResetInboundSecretCacheForTesting() at start + cleanup so write-through state from one test doesn't shadow SELECT expectations in the next. SetInboundSecretCacheNowForTesting() exposes a deterministic clock override so the TTL test doesn't sleep. Task: #189.
This commit is contained in:
parent
29261cee3d
commit
b040171fa1
@ -17,6 +17,7 @@ import (
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/redis/go-redis/v9"
|
||||
@ -44,6 +45,15 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock {
|
||||
restore := setSSRFCheckForTest(false)
|
||||
t.Cleanup(restore)
|
||||
|
||||
// The wsauth.platform_inbound_secret cache (#189) is package-level
|
||||
// state in another package — without a reset between tests, a
|
||||
// write-through Issue from one test (or even a prior Read populating
|
||||
// the cache) shadows the SELECT expectation in the next test that
|
||||
// uses the same workspace ID. Reset before each test that builds a
|
||||
// fresh sqlmock; the no-op cost is one Range over an empty sync.Map.
|
||||
wsauth.ResetInboundSecretCacheForTesting()
|
||||
t.Cleanup(wsauth.ResetInboundSecretCacheForTesting)
|
||||
|
||||
return mock
|
||||
}
|
||||
|
||||
|
||||
@ -21,6 +21,8 @@ import (
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// platformInboundSecretBytes is the raw-random length before base64url
|
||||
@ -37,6 +39,56 @@ const platformInboundSecretBytes = 32
|
||||
// silently sending an unauthenticated request to the workspace.
|
||||
var ErrNoInboundSecret = errors.New("wsauth: workspace has no platform_inbound_secret on file")
|
||||
|
||||
// inboundSecretCacheTTL is how long a cached secret survives in the
|
||||
// process-local cache before the next read forces a fresh DB lookup.
|
||||
// Picked large enough that the heartbeat hot path (60s/workspace,
|
||||
// task #189 motivation) hits the cache for ~5 reads in a row before
|
||||
// re-confirming, but short enough that an out-of-band rotation
|
||||
// (operator running `UPDATE workspaces SET platform_inbound_secret=...`
|
||||
// directly) propagates within minutes — not requiring a redeploy.
|
||||
const inboundSecretCacheTTL = 5 * time.Minute
|
||||
|
||||
// inboundSecretCacheEntry is the per-workspace value stored in
|
||||
// inboundSecretCache. Tracks the secret + when it was loaded so the
|
||||
// reader can decide whether to trust it or refresh.
|
||||
type inboundSecretCacheEntry struct {
|
||||
secret string
|
||||
expiresAt time.Time
|
||||
}
|
||||
|
||||
// inboundSecretCache caches per-workspace platform_inbound_secret values
|
||||
// to absorb the heartbeat read storm. Heartbeats fire every 60s per
|
||||
// workspace and were doing one DB SELECT each; for an N-workspace fleet
|
||||
// that's N reads/minute purely to redeliver the same value. Cache hits
|
||||
// short-circuit the DB call.
|
||||
//
|
||||
// Cache invariants:
|
||||
// - Read-through: cache miss → DB SELECT → populate → return.
|
||||
// - Write-through: every IssuePlatformInboundSecret call refreshes
|
||||
// the cache with the new value before returning, so the in-process
|
||||
// mint path never sees a stale read of the value it just wrote.
|
||||
// - TTL eviction: stale entries get re-validated against the DB after
|
||||
// inboundSecretCacheTTL so manual / out-of-band rotations propagate
|
||||
// bounded-quickly.
|
||||
// - Memory: bounded by the active workspace fleet. Deleted workspaces
|
||||
// leave dead entries until process restart — acceptable given the
|
||||
// small per-entry footprint (<200 bytes) and that workspace deletion
|
||||
// is operator-rare on the platform.
|
||||
//
|
||||
// Single-replica process safety: workspace-server runs as a single
|
||||
// Railway service today, so the cache is process-local and consistent
|
||||
// with itself. If the deployment ever fans out across replicas, an
|
||||
// operator-rotation propagates per-replica TTL-bounded — there is no
|
||||
// shared write log.
|
||||
//
|
||||
// Cleared by ResetInboundSecretCacheForTesting() in tests.
|
||||
var inboundSecretCache sync.Map // key: workspaceID (string), value: *inboundSecretCacheEntry
|
||||
|
||||
// inboundSecretCacheNow is the time source used by the cache. Tests
|
||||
// override it via SetInboundSecretCacheNowForTesting to drive TTL
|
||||
// expiry deterministically without time.Sleep.
|
||||
var inboundSecretCacheNow = time.Now
|
||||
|
||||
// IssuePlatformInboundSecret generates a fresh per-workspace shared
|
||||
// secret, persists the plaintext into workspaces.platform_inbound_secret,
|
||||
// and returns the plaintext so the provisioner can write it into
|
||||
@ -65,6 +117,15 @@ func IssuePlatformInboundSecret(ctx context.Context, db *sql.DB, workspaceID str
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("wsauth: persist platform_inbound_secret: %w", err)
|
||||
}
|
||||
// Write-through cache update so an immediate ReadPlatformInboundSecret
|
||||
// from the same process (e.g. registry handler returning the freshly
|
||||
// minted secret to the workspace in the heartbeat response) doesn't
|
||||
// see a stale or empty value via a parallel cache hit. Same expiry
|
||||
// rules as a regular read population.
|
||||
inboundSecretCache.Store(workspaceID, &inboundSecretCacheEntry{
|
||||
secret: plaintext,
|
||||
expiresAt: inboundSecretCacheNow().Add(inboundSecretCacheTTL),
|
||||
})
|
||||
return plaintext, nil
|
||||
}
|
||||
|
||||
@ -80,11 +141,26 @@ func ReadPlatformInboundSecret(ctx context.Context, db *sql.DB, workspaceID stri
|
||||
if workspaceID == "" {
|
||||
return "", fmt.Errorf("wsauth: workspaceID required")
|
||||
}
|
||||
// Cache fast path. Heartbeats fire every 60s per workspace and were
|
||||
// the dominant caller before #189. The TTL keeps cached entries
|
||||
// fresh enough that operator-side rotations propagate within
|
||||
// minutes; see inboundSecretCacheTTL.
|
||||
if v, ok := inboundSecretCache.Load(workspaceID); ok {
|
||||
if entry, ok := v.(*inboundSecretCacheEntry); ok {
|
||||
if inboundSecretCacheNow().Before(entry.expiresAt) {
|
||||
return entry.secret, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
var secret sql.NullString
|
||||
err := db.QueryRowContext(ctx,
|
||||
`SELECT platform_inbound_secret FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&secret)
|
||||
if err == sql.ErrNoRows {
|
||||
// Don't cache absence — the row may appear momentarily after
|
||||
// provision_workspace's INSERT lands, and the lazy-heal path
|
||||
// is the recovery contract for the column-NULL case (see
|
||||
// readOrLazyHealInboundSecret in workspace_provision_shared.go).
|
||||
return "", ErrNoInboundSecret
|
||||
}
|
||||
if err != nil {
|
||||
@ -93,5 +169,35 @@ func ReadPlatformInboundSecret(ctx context.Context, db *sql.DB, workspaceID stri
|
||||
if !secret.Valid || secret.String == "" {
|
||||
return "", ErrNoInboundSecret
|
||||
}
|
||||
// Read-through cache population on success.
|
||||
inboundSecretCache.Store(workspaceID, &inboundSecretCacheEntry{
|
||||
secret: secret.String,
|
||||
expiresAt: inboundSecretCacheNow().Add(inboundSecretCacheTTL),
|
||||
})
|
||||
return secret.String, nil
|
||||
}
|
||||
|
||||
// ResetInboundSecretCacheForTesting clears the process-local cache.
|
||||
// Tests that exercise rotation or DB-side mutation of the secret column
|
||||
// MUST call this between scenarios to keep an earlier entry from
|
||||
// shadowing a fresh DB read.
|
||||
//
|
||||
// Exported (`...ForTesting` suffix) so cross-package tests in the
|
||||
// handlers/ tree can call it directly without circular imports.
|
||||
func ResetInboundSecretCacheForTesting() {
|
||||
inboundSecretCache.Range(func(k, _ any) bool {
|
||||
inboundSecretCache.Delete(k)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// SetInboundSecretCacheNowForTesting overrides the package-level time
|
||||
// source for cache TTL calculations. Tests use this to advance past
|
||||
// the TTL deterministically rather than waiting on the wall clock.
|
||||
// Returns a restore function that the caller MUST defer to avoid
|
||||
// leaking the override into other tests.
|
||||
func SetInboundSecretCacheNowForTesting(now func() time.Time) func() {
|
||||
prev := inboundSecretCacheNow
|
||||
inboundSecretCacheNow = now
|
||||
return func() { inboundSecretCacheNow = prev }
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"regexp"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
)
|
||||
@ -127,3 +128,170 @@ func TestReadPlatformInboundSecret_RejectsEmptyWorkspaceID(t *testing.T) {
|
||||
t.Error("expected error for empty workspaceID, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// Cache (#189) — heartbeat-storm absorption
|
||||
// ------------------------------------------------------------
|
||||
|
||||
// A second read inside the TTL window MUST hit the cache and NOT
|
||||
// re-issue a SELECT to the DB. This is the entire point of #189:
|
||||
// the heartbeat fires every 60s/workspace and was doing one DB read
|
||||
// each time to redeliver an unchanged value.
|
||||
func TestReadPlatformInboundSecret_CacheHitWithinTTL(t *testing.T) {
|
||||
db, mock := setupMock(t)
|
||||
// Exactly ONE expected SELECT — the second read must be served
|
||||
// from cache. If the cache doesn't fire, a second SELECT will
|
||||
// arrive without a matching expectation and ExpectationsWereMet
|
||||
// will pass while the call panics — so we ALSO assert via the
|
||||
// returned value.
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WithArgs("ws-cached").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow("plaintext-1"))
|
||||
|
||||
first, err := ReadPlatformInboundSecret(context.Background(), db, "ws-cached")
|
||||
if err != nil {
|
||||
t.Fatalf("first read: %v", err)
|
||||
}
|
||||
second, err := ReadPlatformInboundSecret(context.Background(), db, "ws-cached")
|
||||
if err != nil {
|
||||
t.Fatalf("second read: %v", err)
|
||||
}
|
||||
if first != second {
|
||||
t.Errorf("cache returned different value: %q vs %q", first, second)
|
||||
}
|
||||
if second != "plaintext-1" {
|
||||
t.Errorf("cache returned %q, want %q", second, "plaintext-1")
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations (cache likely failed to short-circuit DB): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// After TTL expires the next read MUST hit the DB again so an
|
||||
// out-of-band rotation propagates within minutes.
|
||||
func TestReadPlatformInboundSecret_CacheRefreshesAfterTTL(t *testing.T) {
|
||||
db, mock := setupMock(t)
|
||||
// Two SELECTs expected. The first populates the cache; the second
|
||||
// fires after we advance the clock past the TTL. They return
|
||||
// DIFFERENT values to simulate an operator rotating the secret
|
||||
// directly via SQL.
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow("v1"))
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow("v2-rotated"))
|
||||
|
||||
now := time.Date(2026, 5, 3, 12, 0, 0, 0, time.UTC)
|
||||
restore := SetInboundSecretCacheNowForTesting(func() time.Time { return now })
|
||||
defer restore()
|
||||
|
||||
first, err := ReadPlatformInboundSecret(context.Background(), db, "ws-rotated")
|
||||
if err != nil {
|
||||
t.Fatalf("first read: %v", err)
|
||||
}
|
||||
if first != "v1" {
|
||||
t.Errorf("first read = %q, want v1", first)
|
||||
}
|
||||
|
||||
// Advance past the TTL.
|
||||
now = now.Add(inboundSecretCacheTTL).Add(time.Second)
|
||||
|
||||
second, err := ReadPlatformInboundSecret(context.Background(), db, "ws-rotated")
|
||||
if err != nil {
|
||||
t.Fatalf("second read: %v", err)
|
||||
}
|
||||
if second != "v2-rotated" {
|
||||
t.Errorf("post-TTL read = %q, want v2-rotated (rotation didn't propagate)", second)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Issue MUST update the cache (write-through) so a subsequent read
|
||||
// from the same process sees the just-minted value without a DB
|
||||
// round-trip. This pins the lazy-heal path in
|
||||
// readOrLazyHealInboundSecret, which mints then immediately wants the
|
||||
// fresh value.
|
||||
func TestIssuePlatformInboundSecret_WriteThroughCachesValue(t *testing.T) {
|
||||
db, mock := setupMock(t)
|
||||
// ONE Exec for the mint. NO SELECT expected — the read should hit
|
||||
// cache because Issue populated it.
|
||||
mock.ExpectExec(`UPDATE workspaces SET platform_inbound_secret = \$1 WHERE id = \$2`).
|
||||
WithArgs(sqlmock.AnyArg(), "ws-write-through").
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
minted, err := IssuePlatformInboundSecret(context.Background(), db, "ws-write-through")
|
||||
if err != nil {
|
||||
t.Fatalf("Issue: %v", err)
|
||||
}
|
||||
got, err := ReadPlatformInboundSecret(context.Background(), db, "ws-write-through")
|
||||
if err != nil {
|
||||
t.Fatalf("Read: %v", err)
|
||||
}
|
||||
if got != minted {
|
||||
t.Errorf("read after Issue = %q, want minted %q", got, minted)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations (read should not have hit DB): %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ErrNoInboundSecret (NULL/empty column) must NOT be cached — the
|
||||
// row may legitimately appear later (race between Heartbeat and the
|
||||
// initial INSERT in provisionWorkspaceCP, or a manual operator
|
||||
// backfill). Caching absence would defeat the lazy-heal recovery
|
||||
// contract.
|
||||
func TestReadPlatformInboundSecret_DoesNotCacheAbsence(t *testing.T) {
|
||||
db, mock := setupMock(t)
|
||||
// First read returns NULL → ErrNoInboundSecret, NO cache.
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
|
||||
// Second read returns the freshly-backfilled value — must hit DB
|
||||
// because absence wasn't cached.
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow("backfilled"))
|
||||
|
||||
_, err := ReadPlatformInboundSecret(context.Background(), db, "ws-null-then-set")
|
||||
if !errors.Is(err, ErrNoInboundSecret) {
|
||||
t.Fatalf("expected ErrNoInboundSecret on first read, got %v", err)
|
||||
}
|
||||
got, err := ReadPlatformInboundSecret(context.Background(), db, "ws-null-then-set")
|
||||
if err != nil {
|
||||
t.Fatalf("second read: %v", err)
|
||||
}
|
||||
if got != "backfilled" {
|
||||
t.Errorf("second read = %q, want backfilled (absence was cached)", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ResetInboundSecretCacheForTesting must clear ALL entries, not just
|
||||
// the one matching a specific key. The setupMock helper uses this on
|
||||
// every test to keep entries from leaking across runs.
|
||||
func TestResetInboundSecretCacheForTesting_ClearsAllEntries(t *testing.T) {
|
||||
db, mock := setupMock(t)
|
||||
// Populate cache for two workspaces.
|
||||
for _, id := range []string{"ws-a", "ws-b"} {
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(id).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow("v-" + id))
|
||||
if _, err := ReadPlatformInboundSecret(context.Background(), db, id); err != nil {
|
||||
t.Fatalf("populate %s: %v", id, err)
|
||||
}
|
||||
}
|
||||
ResetInboundSecretCacheForTesting()
|
||||
// After reset BOTH must miss the cache and trigger a fresh SELECT.
|
||||
for _, id := range []string{"ws-a", "ws-b"} {
|
||||
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
|
||||
WithArgs(id).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow("v-" + id))
|
||||
if _, err := ReadPlatformInboundSecret(context.Background(), db, id); err != nil {
|
||||
t.Fatalf("post-reset %s: %v", id, err)
|
||||
}
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -16,6 +16,15 @@ func setupMock(t *testing.T) (*sql.DB, sqlmock.Sqlmock) {
|
||||
t.Fatalf("sqlmock.New: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { db.Close() })
|
||||
// The platform_inbound_secret cache is package-level state shared
|
||||
// across every test in this package — without a reset between
|
||||
// tests a write-through Issue from one test shadows the SELECT
|
||||
// expectation in the next test that touches the same workspaceID
|
||||
// (e.g. "ws-abc" reused across PersistsPlaintext + HappyPath).
|
||||
// Reset before each test that uses setupMock; the no-op cost on
|
||||
// pure-token tests is one Range over an empty sync.Map.
|
||||
ResetInboundSecretCacheForTesting()
|
||||
t.Cleanup(ResetInboundSecretCacheForTesting)
|
||||
return db, mock
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user