Merge pull request #2547 from Molecule-AI/perf/cache-platform-inbound-secret

perf(wsauth): in-process cache for platform_inbound_secret reads
This commit is contained in:
Hongming Wang 2026-05-03 07:11:38 +00:00 committed by GitHub
commit 350495f032
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 293 additions and 0 deletions

View File

@ -17,6 +17,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/models"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/wsauth"
"github.com/alicebob/miniredis/v2"
"github.com/gin-gonic/gin"
"github.com/redis/go-redis/v9"
@ -44,6 +45,15 @@ func setupTestDB(t *testing.T) sqlmock.Sqlmock {
restore := setSSRFCheckForTest(false)
t.Cleanup(restore)
// The wsauth.platform_inbound_secret cache (#189) is package-level
// state in another package — without a reset between tests, a
// write-through Issue from one test (or even a prior Read populating
// the cache) shadows the SELECT expectation in the next test that
// uses the same workspace ID. Reset before each test that builds a
// fresh sqlmock; the no-op cost is one Range over an empty sync.Map.
wsauth.ResetInboundSecretCacheForTesting()
t.Cleanup(wsauth.ResetInboundSecretCacheForTesting)
return mock
}

View File

@ -21,6 +21,8 @@ import (
"encoding/base64"
"errors"
"fmt"
"sync"
"time"
)
// platformInboundSecretBytes is the raw-random length before base64url
@ -37,6 +39,56 @@ const platformInboundSecretBytes = 32
// silently sending an unauthenticated request to the workspace.
var ErrNoInboundSecret = errors.New("wsauth: workspace has no platform_inbound_secret on file")
// inboundSecretCacheTTL is how long a cached secret survives in the
// process-local cache before the next read forces a fresh DB lookup.
// Picked large enough that the heartbeat hot path (60s/workspace,
// task #189 motivation) hits the cache for ~5 reads in a row before
// re-confirming, but short enough that an out-of-band rotation
// (operator running `UPDATE workspaces SET platform_inbound_secret=...`
// directly) propagates within minutes — not requiring a redeploy.
const inboundSecretCacheTTL = 5 * time.Minute
// inboundSecretCacheEntry is the per-workspace value stored in
// inboundSecretCache. Tracks the secret + when it was loaded so the
// reader can decide whether to trust it or refresh.
type inboundSecretCacheEntry struct {
secret string
expiresAt time.Time
}
// inboundSecretCache caches per-workspace platform_inbound_secret values
// to absorb the heartbeat read storm. Heartbeats fire every 60s per
// workspace and were doing one DB SELECT each; for an N-workspace fleet
// that's N reads/minute purely to redeliver the same value. Cache hits
// short-circuit the DB call.
//
// Cache invariants:
// - Read-through: cache miss → DB SELECT → populate → return.
// - Write-through: every IssuePlatformInboundSecret call refreshes
// the cache with the new value before returning, so the in-process
// mint path never sees a stale read of the value it just wrote.
// - TTL eviction: stale entries get re-validated against the DB after
// inboundSecretCacheTTL so manual / out-of-band rotations propagate
// bounded-quickly.
// - Memory: bounded by the active workspace fleet. Deleted workspaces
// leave dead entries until process restart — acceptable given the
// small per-entry footprint (<200 bytes) and that workspace deletion
// is operator-rare on the platform.
//
// Single-replica process safety: workspace-server runs as a single
// Railway service today, so the cache is process-local and consistent
// with itself. If the deployment ever fans out across replicas, an
// operator-rotation propagates per-replica TTL-bounded — there is no
// shared write log.
//
// Cleared by ResetInboundSecretCacheForTesting() in tests.
var inboundSecretCache sync.Map // key: workspaceID (string), value: *inboundSecretCacheEntry
// inboundSecretCacheNow is the time source used by the cache. Tests
// override it via SetInboundSecretCacheNowForTesting to drive TTL
// expiry deterministically without time.Sleep.
var inboundSecretCacheNow = time.Now
// IssuePlatformInboundSecret generates a fresh per-workspace shared
// secret, persists the plaintext into workspaces.platform_inbound_secret,
// and returns the plaintext so the provisioner can write it into
@ -65,6 +117,15 @@ func IssuePlatformInboundSecret(ctx context.Context, db *sql.DB, workspaceID str
if err != nil {
return "", fmt.Errorf("wsauth: persist platform_inbound_secret: %w", err)
}
// Write-through cache update so an immediate ReadPlatformInboundSecret
// from the same process (e.g. registry handler returning the freshly
// minted secret to the workspace in the heartbeat response) doesn't
// see a stale or empty value via a parallel cache hit. Same expiry
// rules as a regular read population.
inboundSecretCache.Store(workspaceID, &inboundSecretCacheEntry{
secret: plaintext,
expiresAt: inboundSecretCacheNow().Add(inboundSecretCacheTTL),
})
return plaintext, nil
}
@ -80,11 +141,26 @@ func ReadPlatformInboundSecret(ctx context.Context, db *sql.DB, workspaceID stri
if workspaceID == "" {
return "", fmt.Errorf("wsauth: workspaceID required")
}
// Cache fast path. Heartbeats fire every 60s per workspace and were
// the dominant caller before #189. The TTL keeps cached entries
// fresh enough that operator-side rotations propagate within
// minutes; see inboundSecretCacheTTL.
if v, ok := inboundSecretCache.Load(workspaceID); ok {
if entry, ok := v.(*inboundSecretCacheEntry); ok {
if inboundSecretCacheNow().Before(entry.expiresAt) {
return entry.secret, nil
}
}
}
var secret sql.NullString
err := db.QueryRowContext(ctx,
`SELECT platform_inbound_secret FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&secret)
if err == sql.ErrNoRows {
// Don't cache absence — the row may appear momentarily after
// provision_workspace's INSERT lands, and the lazy-heal path
// is the recovery contract for the column-NULL case (see
// readOrLazyHealInboundSecret in workspace_provision_shared.go).
return "", ErrNoInboundSecret
}
if err != nil {
@ -93,5 +169,35 @@ func ReadPlatformInboundSecret(ctx context.Context, db *sql.DB, workspaceID stri
if !secret.Valid || secret.String == "" {
return "", ErrNoInboundSecret
}
// Read-through cache population on success.
inboundSecretCache.Store(workspaceID, &inboundSecretCacheEntry{
secret: secret.String,
expiresAt: inboundSecretCacheNow().Add(inboundSecretCacheTTL),
})
return secret.String, nil
}
// ResetInboundSecretCacheForTesting clears the process-local cache.
// Tests that exercise rotation or DB-side mutation of the secret column
// MUST call this between scenarios to keep an earlier entry from
// shadowing a fresh DB read.
//
// Exported (`...ForTesting` suffix) so cross-package tests in the
// handlers/ tree can call it directly without circular imports.
func ResetInboundSecretCacheForTesting() {
inboundSecretCache.Range(func(k, _ any) bool {
inboundSecretCache.Delete(k)
return true
})
}
// SetInboundSecretCacheNowForTesting overrides the package-level time
// source for cache TTL calculations. Tests use this to advance past
// the TTL deterministically rather than waiting on the wall clock.
// Returns a restore function that the caller MUST defer to avoid
// leaking the override into other tests.
func SetInboundSecretCacheNowForTesting(now func() time.Time) func() {
prev := inboundSecretCacheNow
inboundSecretCacheNow = now
return func() { inboundSecretCacheNow = prev }
}

View File

@ -5,6 +5,7 @@ import (
"errors"
"regexp"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
)
@ -127,3 +128,170 @@ func TestReadPlatformInboundSecret_RejectsEmptyWorkspaceID(t *testing.T) {
t.Error("expected error for empty workspaceID, got nil")
}
}
// ------------------------------------------------------------
// Cache (#189) — heartbeat-storm absorption
// ------------------------------------------------------------
// A second read inside the TTL window MUST hit the cache and NOT
// re-issue a SELECT to the DB. This is the entire point of #189:
// the heartbeat fires every 60s/workspace and was doing one DB read
// each time to redeliver an unchanged value.
func TestReadPlatformInboundSecret_CacheHitWithinTTL(t *testing.T) {
db, mock := setupMock(t)
// Exactly ONE expected SELECT — the second read must be served
// from cache. If the cache doesn't fire, a second SELECT will
// arrive without a matching expectation and ExpectationsWereMet
// will pass while the call panics — so we ALSO assert via the
// returned value.
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WithArgs("ws-cached").
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow("plaintext-1"))
first, err := ReadPlatformInboundSecret(context.Background(), db, "ws-cached")
if err != nil {
t.Fatalf("first read: %v", err)
}
second, err := ReadPlatformInboundSecret(context.Background(), db, "ws-cached")
if err != nil {
t.Fatalf("second read: %v", err)
}
if first != second {
t.Errorf("cache returned different value: %q vs %q", first, second)
}
if second != "plaintext-1" {
t.Errorf("cache returned %q, want %q", second, "plaintext-1")
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations (cache likely failed to short-circuit DB): %v", err)
}
}
// After TTL expires the next read MUST hit the DB again so an
// out-of-band rotation propagates within minutes.
func TestReadPlatformInboundSecret_CacheRefreshesAfterTTL(t *testing.T) {
db, mock := setupMock(t)
// Two SELECTs expected. The first populates the cache; the second
// fires after we advance the clock past the TTL. They return
// DIFFERENT values to simulate an operator rotating the secret
// directly via SQL.
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow("v1"))
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow("v2-rotated"))
now := time.Date(2026, 5, 3, 12, 0, 0, 0, time.UTC)
restore := SetInboundSecretCacheNowForTesting(func() time.Time { return now })
defer restore()
first, err := ReadPlatformInboundSecret(context.Background(), db, "ws-rotated")
if err != nil {
t.Fatalf("first read: %v", err)
}
if first != "v1" {
t.Errorf("first read = %q, want v1", first)
}
// Advance past the TTL.
now = now.Add(inboundSecretCacheTTL).Add(time.Second)
second, err := ReadPlatformInboundSecret(context.Background(), db, "ws-rotated")
if err != nil {
t.Fatalf("second read: %v", err)
}
if second != "v2-rotated" {
t.Errorf("post-TTL read = %q, want v2-rotated (rotation didn't propagate)", second)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// Issue MUST update the cache (write-through) so a subsequent read
// from the same process sees the just-minted value without a DB
// round-trip. This pins the lazy-heal path in
// readOrLazyHealInboundSecret, which mints then immediately wants the
// fresh value.
func TestIssuePlatformInboundSecret_WriteThroughCachesValue(t *testing.T) {
db, mock := setupMock(t)
// ONE Exec for the mint. NO SELECT expected — the read should hit
// cache because Issue populated it.
mock.ExpectExec(`UPDATE workspaces SET platform_inbound_secret = \$1 WHERE id = \$2`).
WithArgs(sqlmock.AnyArg(), "ws-write-through").
WillReturnResult(sqlmock.NewResult(1, 1))
minted, err := IssuePlatformInboundSecret(context.Background(), db, "ws-write-through")
if err != nil {
t.Fatalf("Issue: %v", err)
}
got, err := ReadPlatformInboundSecret(context.Background(), db, "ws-write-through")
if err != nil {
t.Fatalf("Read: %v", err)
}
if got != minted {
t.Errorf("read after Issue = %q, want minted %q", got, minted)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations (read should not have hit DB): %v", err)
}
}
// ErrNoInboundSecret (NULL/empty column) must NOT be cached — the
// row may legitimately appear later (race between Heartbeat and the
// initial INSERT in provisionWorkspaceCP, or a manual operator
// backfill). Caching absence would defeat the lazy-heal recovery
// contract.
func TestReadPlatformInboundSecret_DoesNotCacheAbsence(t *testing.T) {
db, mock := setupMock(t)
// First read returns NULL → ErrNoInboundSecret, NO cache.
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow(nil))
// Second read returns the freshly-backfilled value — must hit DB
// because absence wasn't cached.
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow("backfilled"))
_, err := ReadPlatformInboundSecret(context.Background(), db, "ws-null-then-set")
if !errors.Is(err, ErrNoInboundSecret) {
t.Fatalf("expected ErrNoInboundSecret on first read, got %v", err)
}
got, err := ReadPlatformInboundSecret(context.Background(), db, "ws-null-then-set")
if err != nil {
t.Fatalf("second read: %v", err)
}
if got != "backfilled" {
t.Errorf("second read = %q, want backfilled (absence was cached)", got)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}
// ResetInboundSecretCacheForTesting must clear ALL entries, not just
// the one matching a specific key. The setupMock helper uses this on
// every test to keep entries from leaking across runs.
func TestResetInboundSecretCacheForTesting_ClearsAllEntries(t *testing.T) {
db, mock := setupMock(t)
// Populate cache for two workspaces.
for _, id := range []string{"ws-a", "ws-b"} {
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow("v-" + id))
if _, err := ReadPlatformInboundSecret(context.Background(), db, id); err != nil {
t.Fatalf("populate %s: %v", id, err)
}
}
ResetInboundSecretCacheForTesting()
// After reset BOTH must miss the cache and trigger a fresh SELECT.
for _, id := range []string{"ws-a", "ws-b"} {
mock.ExpectQuery(`SELECT platform_inbound_secret FROM workspaces WHERE id = \$1`).
WithArgs(id).
WillReturnRows(sqlmock.NewRows([]string{"platform_inbound_secret"}).AddRow("v-" + id))
if _, err := ReadPlatformInboundSecret(context.Background(), db, id); err != nil {
t.Fatalf("post-reset %s: %v", id, err)
}
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet expectations: %v", err)
}
}

View File

@ -16,6 +16,15 @@ func setupMock(t *testing.T) (*sql.DB, sqlmock.Sqlmock) {
t.Fatalf("sqlmock.New: %v", err)
}
t.Cleanup(func() { db.Close() })
// The platform_inbound_secret cache is package-level state shared
// across every test in this package — without a reset between
// tests a write-through Issue from one test shadows the SELECT
// expectation in the next test that touches the same workspaceID
// (e.g. "ws-abc" reused across PersistsPlaintext + HappyPath).
// Reset before each test that uses setupMock; the no-op cost on
// pure-token tests is one Range over an empty sync.Map.
ResetInboundSecretCacheForTesting()
t.Cleanup(ResetInboundSecretCacheForTesting)
return db, mock
}