diff --git a/workspace-server/internal/handlers/eic_tunnel_pool.go b/workspace-server/internal/handlers/eic_tunnel_pool.go new file mode 100644 index 00000000..20b2e269 --- /dev/null +++ b/workspace-server/internal/handlers/eic_tunnel_pool.go @@ -0,0 +1,457 @@ +package handlers + +// eic_tunnel_pool.go — refcounted pool for EIC SSH tunnels keyed on +// instanceID. Reuses one tunnel across N file ops, amortising the +// ssh-keygen + SendSSHPublicKey + open-tunnel + waitForPort cost +// (~3-5s) over multiple cats/finds (~50-200ms each). +// +// Origin: core#11 — canvas detail-panel config + filesystem load +// took ~20s. ConfigTab fans out 4 GETs serially; the slowest is +// /files/config.yaml which dispatches to readFileViaEIC. Without a +// pool, every readFileViaEIC + listFilesViaEIC + writeFileViaEIC + +// deleteFileViaEIC pays the full setup cost even when fired +// back-to-back on the same workspace EC2. +// +// The pool keeps one eicSSHSession alive per instanceID for up to +// poolTTL. SendSSHPublicKey grants a 60s key validity, so poolTTL +// must stay strictly below that to avoid serving requests on a +// just-expired key. We default to 50s with a 10s safety margin. +// +// Concurrency model: +// +// - Single mutex guards the entries map. +// - Slow path (tunnel setup) runs OUTSIDE the lock, gated by an +// "intent" placeholder so concurrent acquires for the same +// instanceID don't both build a tunnel — the loser drops its +// setup and uses the winner's. +// - Refcount on each entry; eviction blocked while refcount > 0. +// - Janitor goroutine sweeps every poolJanitorInterval, drops +// entries where refcount == 0 && expiresAt < now. +// +// Test injection: +// +// - poolSetupTunnel is a package-level var so tests can swap the +// slow path for a counting stub. Production wires it to +// realWithEICTunnel-style setup. +// - withEICTunnel (the public, single-shot API) is also a var +// (already, see template_files_eic.go). It's rebound here to +// pooledWithEICTunnel which routes through globalEICTunnelPool. +// - Tests that need single-shot behaviour can set poolTTL = 0, +// which makes pooledWithEICTunnel fall through to the underlying +// setup directly (no pool entry kept). + +import ( + "context" + "fmt" + "sync" + "time" +) + +// poolTTL is the maximum age of a pooled tunnel. Must be strictly +// less than the SendSSHPublicKey grant window (60s) so we never +// serve a request through a key that's about to expire mid-op. +// +// Configurable via init-time wiring (see initEICTunnelPool); not a +// const so tests can pin TTL=0 (disable pooling) or TTL=50ms (drive +// eviction tests). +var poolTTL = 50 * time.Second + +// poolJanitorInterval is how often the janitor goroutine sweeps for +// expired idle entries. Tighter than poolTTL so eviction is timely; +// loose enough that the goroutine doesn't burn CPU. +var poolJanitorInterval = 10 * time.Second + +// poolMaxEntries caps simultaneous instanceIDs the pool tracks. +// Beyond this, new acquires evict the LRU entry. Defends against a +// pathological caller (e.g. a sweep over hundreds of workspace +// EC2s) from leaking unbounded tunnel processes. 32 is a generous +// ceiling for the canvas use case (one human navigates ≤ ~5 +// workspaces at a time). +var poolMaxEntries = 32 + +// poolSetupTunnel is the slow-path tunnel constructor. Wrapped in a +// var so tests can inject a counter stub. Returns a session and a +// cleanup function (closes the open-tunnel subprocess + scrubs the +// ephemeral keydir). nil session + non-nil err means setup failed +// and there is nothing to clean up. +// +// Production wiring lives in eic_tunnel_pool_setup.go (a thin shim +// over the existing realWithEICTunnel logic). +var poolSetupTunnel = func(ctx context.Context, instanceID string) ( + sess eicSSHSession, cleanup func(), err error) { + return setupRealEICTunnel(ctx, instanceID) +} + +// pooledTunnel is one entry in the pool. session is shared by N +// concurrent fn calls; cleanup runs once when refcount returns to +// zero AND the entry is past expiresAt or evicted. +// +// lastUsed tracks the most recent acquire time for LRU bookkeeping +// (overflow eviction). expiresAt is set at construction and not +// extended on use — a tunnel cannot live past poolTTL even if it's +// hot, because the underlying SendSSHPublicKey grant expires. +type pooledTunnel struct { + session eicSSHSession + cleanup func() + expiresAt time.Time + lastUsed time.Time + refcount int + poisoned bool // true if a fn returned a tunnel-fatal error; do not reuse +} + +// eicTunnelPool is the package-level pool. Single instance lives +// in globalEICTunnelPool; constructor runs lazily on first acquire. +type eicTunnelPool struct { + mu sync.Mutex + entries map[string]*pooledTunnel + // pendingSetups guards concurrent setup for the same instanceID. + // First acquirer takes the slot; later ones wait on the channel. + pendingSetups map[string]chan struct{} + stopJanitor chan struct{} + // janitorInterval is captured at pool construction from the + // package-level poolJanitorInterval var. Captured (not re-read on + // every tick) so a test that swaps the package var via t.Cleanup + // after a global pool's janitor is already running can't race + // with that goroutine's ticker read. The global pool is created + // lazily once per process via sync.Once; before this capture + // landed, every test that touched poolJanitorInterval after the + // global pool's first-touch raced the janitor (caught by -race + // on staging tip 249dbc6a — TestPooledWithEICTunnel_PanicPoisonsEntry). + // Tests still get the new value on a freshPool() because they + // set the package var BEFORE calling newEICTunnelPool(). + janitorInterval time.Duration +} + +var ( + globalEICTunnelPool *eicTunnelPool + globalEICTunnelPoolOnce sync.Once +) + +// getEICTunnelPool returns the singleton pool, lazy-initialising on +// first call. Idempotent. +func getEICTunnelPool() *eicTunnelPool { + globalEICTunnelPoolOnce.Do(func() { + globalEICTunnelPool = newEICTunnelPool() + go globalEICTunnelPool.janitor() + }) + return globalEICTunnelPool +} + +// newEICTunnelPool constructs an empty pool. Exported so tests can +// build isolated pools without sharing the singleton. +// +// Captures poolJanitorInterval at construction time so the janitor +// goroutine doesn't race with t.Cleanup-driven swaps of the package +// var. See the janitorInterval field comment for the failure mode. +func newEICTunnelPool() *eicTunnelPool { + return &eicTunnelPool{ + entries: map[string]*pooledTunnel{}, + pendingSetups: map[string]chan struct{}{}, + stopJanitor: make(chan struct{}), + janitorInterval: poolJanitorInterval, + } +} + +// acquire returns a usable session for instanceID. If a healthy entry +// exists, refcount++ and return it. If a setup is in flight for the +// same instanceID, wait for it. Otherwise build one (slow path). +// +// done() must be called by the caller when the op finishes. It +// decrements refcount and triggers cleanup if the entry is past +// TTL or poisoned and refcount==0. +// +// Errors from the slow path propagate; pool state is not modified +// for failed setups (no poisoned entry created — that's only for +// fn-returned errors on a previously-good session). +func (p *eicTunnelPool) acquire(ctx context.Context, instanceID string) ( + sess eicSSHSession, done func(poisoned bool), err error) { + + if poolTTL <= 0 { + // Pool disabled (TTL=0 mode for tests / opt-out). Fall + // through to a direct setup with caller-driven cleanup. + s, cleanup, err := poolSetupTunnel(ctx, instanceID) + if err != nil { + return eicSSHSession{}, nil, err + } + return s, func(_ bool) { cleanup() }, nil + } + + for { + p.mu.Lock() + if pt, ok := p.entries[instanceID]; ok && !pt.poisoned && pt.expiresAt.After(time.Now()) { + pt.refcount++ + pt.lastUsed = time.Now() + p.mu.Unlock() + return pt.session, p.releaser(instanceID, pt), nil + } + // Either no entry, expired entry, or poisoned entry. If a + // setup is already in flight, wait and retry. + if pending, ok := p.pendingSetups[instanceID]; ok { + p.mu.Unlock() + select { + case <-pending: + continue // re-check the entries map + case <-ctx.Done(): + return eicSSHSession{}, nil, ctx.Err() + } + } + // Drop expired/poisoned entry now (we'll cleanup outside + // the lock — the entry is unreferenced or we'd not be here). + var oldCleanup func() + if pt, ok := p.entries[instanceID]; ok { + if pt.refcount == 0 { + oldCleanup = pt.cleanup + delete(p.entries, instanceID) + } + } + // Reserve the setup slot. + signal := make(chan struct{}) + p.pendingSetups[instanceID] = signal + p.mu.Unlock() + + if oldCleanup != nil { + go oldCleanup() + } + + // Slow path: build a new tunnel. Anything that goes wrong + // here cleans up the pendingSetups slot and propagates to + // the caller without leaving the pool in a state where the + // next acquire blocks waiting on a signal that never fires. + newSess, cleanup, setupErr := poolSetupTunnel(ctx, instanceID) + + p.mu.Lock() + delete(p.pendingSetups, instanceID) + close(signal) + + if setupErr != nil { + p.mu.Unlock() + return eicSSHSession{}, nil, fmt.Errorf("eic tunnel setup: %w", setupErr) + } + + // Enforce LRU bound BEFORE inserting so we don't briefly + // exceed the cap even by one entry. + p.evictLRUIfFullLocked(instanceID) + + pt := &pooledTunnel{ + session: newSess, + cleanup: cleanup, + expiresAt: time.Now().Add(poolTTL), + lastUsed: time.Now(), + refcount: 1, + } + p.entries[instanceID] = pt + p.mu.Unlock() + return pt.session, p.releaser(instanceID, pt), nil + } +} + +// releaser returns a closure that decrements refcount and triggers +// cleanup if (a) the entry is past TTL or (b) the caller signalled +// poison. Idempotent against double-release (decrements once via the +// captured pt; pool entry may have been replaced by then). +func (p *eicTunnelPool) releaser(instanceID string, pt *pooledTunnel) func(poisoned bool) { + released := false + return func(poisoned bool) { + p.mu.Lock() + defer p.mu.Unlock() + if released { + return + } + released = true + pt.refcount-- + if poisoned { + pt.poisoned = true + } + // Evict immediately if poisoned-and-idle OR expired-and-idle. + // Hot entries (refcount > 0) defer eviction to the last release. + if pt.refcount == 0 && (pt.poisoned || pt.expiresAt.Before(time.Now())) { + // If the entry in the map is still us, remove it. + if cur, ok := p.entries[instanceID]; ok && cur == pt { + delete(p.entries, instanceID) + } + go pt.cleanup() + } + } +} + +// evictLRUIfFullLocked drops the least-recently-used IDLE entry +// when the pool is at capacity. Caller must hold p.mu. The new +// instanceID about to be inserted is excluded so we don't evict +// ourselves. If no idle entries exist, no eviction happens — the +// new entry will push us above the soft cap until something releases. +func (p *eicTunnelPool) evictLRUIfFullLocked(skipInstance string) { + if len(p.entries) < poolMaxEntries { + return + } + var oldestKey string + var oldest *pooledTunnel + for k, pt := range p.entries { + if k == skipInstance { + continue + } + if pt.refcount > 0 { + continue + } + if oldest == nil || pt.lastUsed.Before(oldest.lastUsed) { + oldestKey = k + oldest = pt + } + } + if oldest == nil { + return // every entry is in use; no eviction possible + } + delete(p.entries, oldestKey) + go oldest.cleanup() +} + +// janitor periodically scans for entries that are idle AND expired, +// closing their tunnels. Runs forever (per pool lifetime); cancelled +// by close(p.stopJanitor) for tests that build short-lived pools. +// +// Reads p.janitorInterval (captured at construction) instead of the +// package-level poolJanitorInterval — see janitorInterval field comment. +func (p *eicTunnelPool) janitor() { + t := time.NewTicker(p.janitorInterval) + defer t.Stop() + for { + select { + case <-t.C: + p.sweep() + case <-p.stopJanitor: + return + } + } +} + +// sweep is one janitor pass. Drops idle expired entries. +func (p *eicTunnelPool) sweep() { + p.mu.Lock() + now := time.Now() + var toClose []func() + for k, pt := range p.entries { + if pt.refcount == 0 && pt.expiresAt.Before(now) { + toClose = append(toClose, pt.cleanup) + delete(p.entries, k) + } + } + p.mu.Unlock() + for _, c := range toClose { + go c() + } +} + +// stop terminates the janitor and closes all idle entries. Hot +// (refcount > 0) entries are NOT force-closed — callers running +// against them would see a use-after-free. In practice stop is only +// called by tests that have already drained their callers. +func (p *eicTunnelPool) stop() { + close(p.stopJanitor) + p.mu.Lock() + defer p.mu.Unlock() + for k, pt := range p.entries { + if pt.refcount == 0 { + go pt.cleanup() + delete(p.entries, k) + } + } +} + +// pooledWithEICTunnel is the pool-backed replacement for +// realWithEICTunnel. The signature matches `var withEICTunnel` +// exactly so the rebind (in initEICTunnelPool) is a drop-in. +// +// Errors from `fn` itself are forwarded to the caller AND mark the +// pool entry as poisoned, so the next acquire builds a fresh +// tunnel. This catches the case where the workspace EC2 was +// restarted out-of-band (tunnel still appears alive locally but +// every cat/find errors out). +func pooledWithEICTunnel(ctx context.Context, instanceID string, + fn func(s eicSSHSession) error) error { + pool := getEICTunnelPool() + sess, done, err := pool.acquire(ctx, instanceID) + if err != nil { + return err + } + // poisoned defaults to true so a panic from fn poisons the + // entry on the way through the deferred release. Without the + // defer, a panicking fn would leak refcount=1 forever and + // permanently block eviction of this entry. The fn-error path + // resets poisoned to its real classification before return. + poisoned := true + defer func() { done(poisoned) }() + fnErr := fn(sess) + poisoned = fnErrIndicatesTunnelFault(fnErr) + return fnErr +} + +// fnErrIndicatesTunnelFault returns true for fn errors whose nature +// suggests the underlying tunnel is no longer reusable (auth gone, +// network gone, ssh process dead). Returning true poisons the pool +// entry so the next acquire builds fresh. +// +// Conservative: only marks tunnel-faulty for clearly tunnel-level +// failures (connection refused, broken pipe, ssh exit-status from +// fatal-channel signals). A `cat` returning os.ErrNotExist on a +// missing file is NOT a tunnel fault — that's the file path being +// wrong, the tunnel is fine. +func fnErrIndicatesTunnelFault(err error) bool { + if err == nil { + return false + } + msg := err.Error() + // stderr substrings produced by ssh when the tunnel is broken. + for _, marker := range []string{ + "connection refused", + "connection closed", + "broken pipe", + "Connection reset by peer", + "kex_exchange_identification", + "port forwarding failed", + "Permission denied", + "Authentication failed", + } { + if containsCaseInsensitive(msg, marker) { + return true + } + } + return false +} + +// containsCaseInsensitive avoids importing strings just for this +// (the file already needs ssh stderr matching elsewhere — this +// keeps the helper local to avoid a cross-file dependency). +func containsCaseInsensitive(s, substr string) bool { + if len(substr) > len(s) { + return false + } + // Manual lowercase compare loop; ssh error markers are ASCII so + // no need for unicode-aware folding. + low := func(b byte) byte { + if b >= 'A' && b <= 'Z' { + return b + 32 + } + return b + } + for i := 0; i+len(substr) <= len(s); i++ { + match := true + for j := 0; j < len(substr); j++ { + if low(s[i+j]) != low(substr[j]) { + match = false + break + } + } + if match { + return true + } + } + return false +} + +// initEICTunnelPool rebinds the package-level withEICTunnel var to +// the pooled implementation. Called once at package init via the +// init() in eic_tunnel_pool_setup.go (split file so the rebind +// itself is testable without dragging in the production setup +// shim's exec/aws dependencies). +func initEICTunnelPool() { + withEICTunnel = pooledWithEICTunnel +} diff --git a/workspace-server/internal/handlers/eic_tunnel_pool_test.go b/workspace-server/internal/handlers/eic_tunnel_pool_test.go new file mode 100644 index 00000000..2b4b5bf4 --- /dev/null +++ b/workspace-server/internal/handlers/eic_tunnel_pool_test.go @@ -0,0 +1,467 @@ +package handlers + +// eic_tunnel_pool_test.go — tests for the refcounted EIC tunnel pool +// added in core#11. Stubs poolSetupTunnel with a counter so the +// tests don't fork ssh-keygen / aws subprocesses. +// +// Per memory feedback_assert_exact_not_substring: each test pins +// exact expected counts (not "at least N") so a regression that +// silently double-sets-up surfaces here. + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" +) + +// withPoolSetupStub swaps poolSetupTunnel for a counting fake that +// returns a sentinel session and a cleanup func that records its +// invocation. Restores on test cleanup. +// +// setupSignal blocks each setup until released — for concurrent- +// acquire tests where we want to gate setup completion. +func withPoolSetupStub(t *testing.T) ( + setupCount *int64, cleanupCount *int64, restore func(), unblock func()) { + t.Helper() + prev := poolSetupTunnel + prevTTL := poolTTL + prevJanitor := poolJanitorInterval + + var sc, cc int64 + setupCount, cleanupCount = &sc, &cc + + gate := make(chan struct{}, 1) + gate <- struct{}{} // allow the first setup through immediately + unblock = func() { gate <- struct{}{} } + + poolSetupTunnel = func(ctx context.Context, instanceID string) ( + eicSSHSession, func(), error) { + select { + case <-gate: + case <-ctx.Done(): + return eicSSHSession{}, nil, ctx.Err() + } + atomic.AddInt64(&sc, 1) + sess := eicSSHSession{ + instanceID: instanceID, + osUser: "ubuntu", + localPort: 10000 + int(atomic.LoadInt64(&sc)), + keyPath: "/tmp/molecule-eic-test-" + instanceID, + } + cleanup := func() { atomic.AddInt64(&cc, 1) } + return sess, cleanup, nil + } + + restore = func() { + poolSetupTunnel = prev + poolTTL = prevTTL + poolJanitorInterval = prevJanitor + } + t.Cleanup(restore) + return +} + +// freshPool returns an isolated pool (NOT the global) so tests run +// independently. Stops the janitor on cleanup. +func freshPool(t *testing.T) *eicTunnelPool { + t.Helper() + p := newEICTunnelPool() + t.Cleanup(p.stop) + return p +} + +// TestEICTunnelPool_FourOpsAmortise pins the core invariant: four +// sequential acquire/release cycles on the same instanceID share +// ONE underlying tunnel setup. Mutation: delete the cache hit branch +// in acquire() → setupCount goes 1 → 4 → test fails. +func TestEICTunnelPool_FourOpsAmortise(t *testing.T) { + setupCount, cleanupCount, _, _ := withPoolSetupStub(t) + // Refill gate after each setup so concurrent stubs aren't blocked + // (we want every test to be able to set up if it needs to). + t.Cleanup(func() { /* no-op; defer is enough */ }) + poolTTL = 50 * time.Second + pool := freshPool(t) + ctx := context.Background() + + for i := 0; i < 4; i++ { + sess, done, err := pool.acquire(ctx, "i-test-1") + if err != nil { + t.Fatalf("op %d: acquire: %v", i, err) + } + if sess.instanceID != "i-test-1" { + t.Fatalf("op %d: session has wrong instanceID: %q", i, sess.instanceID) + } + done(false) + } + + if got := atomic.LoadInt64(setupCount); got != 1 { + t.Errorf("expected exactly 1 tunnel setup across 4 ops, got %d", got) + } + if got := atomic.LoadInt64(cleanupCount); got != 0 { + t.Errorf("expected 0 cleanups while entry is hot (TTL=50s), got %d", got) + } +} + +// TestEICTunnelPool_DifferentInstancesDoNotShare pins that two +// different instanceIDs each get their own tunnel — the pool is +// keyed on instanceID, not a single global slot. +func TestEICTunnelPool_DifferentInstancesDoNotShare(t *testing.T) { + setupCount, _, _, unblock := withPoolSetupStub(t) + poolTTL = 50 * time.Second + pool := freshPool(t) + ctx := context.Background() + + // First instance setup uses the initial gate slot. + _, doneA, err := pool.acquire(ctx, "i-a") + if err != nil { + t.Fatalf("acquire A: %v", err) + } + doneA(false) + + // Second instance needs a new slot through the gate. + unblock() + _, doneB, err := pool.acquire(ctx, "i-b") + if err != nil { + t.Fatalf("acquire B: %v", err) + } + doneB(false) + + if got := atomic.LoadInt64(setupCount); got != 2 { + t.Errorf("expected 2 setups (one per instance), got %d", got) + } +} + +// TestEICTunnelPool_TTLEviction: a short TTL forces the second op +// to build a fresh tunnel after the first expires. +func TestEICTunnelPool_TTLEviction(t *testing.T) { + setupCount, cleanupCount, _, unblock := withPoolSetupStub(t) + poolTTL = 50 * time.Millisecond + poolJanitorInterval = 1 * time.Second // keep janitor away + pool := freshPool(t) + ctx := context.Background() + + _, done, err := pool.acquire(ctx, "i-ttl") + if err != nil { + t.Fatalf("acquire 1: %v", err) + } + done(false) + + time.Sleep(80 * time.Millisecond) // past TTL + + unblock() // allow next setup + _, done, err = pool.acquire(ctx, "i-ttl") + if err != nil { + t.Fatalf("acquire 2: %v", err) + } + done(false) + + if got := atomic.LoadInt64(setupCount); got != 2 { + t.Errorf("expected 2 setups (TTL eviction between), got %d", got) + } + // First entry should have been cleaned up when the second + // acquire evicted it on the slow path. Cleanup runs in a + // goroutine; poll briefly for it to land. + deadline := time.Now().Add(500 * time.Millisecond) + for atomic.LoadInt64(cleanupCount) < 1 && time.Now().Before(deadline) { + time.Sleep(5 * time.Millisecond) + } + if got := atomic.LoadInt64(cleanupCount); got < 1 { + t.Errorf("expected ≥1 cleanup (first entry evicted), got %d", got) + } +} + +// TestEICTunnelPool_FailureInvalidates pins the poison-on-fault +// behavior — fn returning a tunnel-fatal error marks the entry +// unusable so the next acquire builds fresh. +func TestEICTunnelPool_FailureInvalidates(t *testing.T) { + setupCount, _, _, unblock := withPoolSetupStub(t) + poolTTL = 50 * time.Second + pool := freshPool(t) + ctx := context.Background() + + _, done, err := pool.acquire(ctx, "i-fault") + if err != nil { + t.Fatalf("acquire 1: %v", err) + } + done(true) // signal poison + + unblock() // let the next setup through + _, done, err = pool.acquire(ctx, "i-fault") + if err != nil { + t.Fatalf("acquire 2: %v", err) + } + done(false) + + if got := atomic.LoadInt64(setupCount); got != 2 { + t.Errorf("expected 2 setups (poison forced rebuild), got %d", got) + } +} + +// TestEICTunnelPool_ConcurrentAcquireSingleSetup pins that N +// concurrent acquires for the same instanceID before any release +// only trigger ONE tunnel setup — the rest wait via pendingSetups. +// +// Without this guard each concurrent acquire would spawn its own +// tunnel and the loser-cleanup would still leak refcount. Mutation: +// delete the pendingSetups gate → setupCount goes 1 → N → fails. +func TestEICTunnelPool_ConcurrentAcquireSingleSetup(t *testing.T) { + setupCount, _, _, _ := withPoolSetupStub(t) + // Pause setup so all goroutines pile into the pending slot. + prev := poolSetupTunnel + gate := make(chan struct{}) + poolSetupTunnel = func(ctx context.Context, instanceID string) ( + eicSSHSession, func(), error) { + <-gate + atomic.AddInt64(setupCount, 1) + return eicSSHSession{instanceID: instanceID}, func() {}, nil + } + t.Cleanup(func() { poolSetupTunnel = prev }) + + poolTTL = 50 * time.Second + pool := freshPool(t) + ctx := context.Background() + + const N = 8 + type result struct { + done func(bool) + err error + } + results := make(chan result, N) + var startWg sync.WaitGroup + startWg.Add(N) + for i := 0; i < N; i++ { + go func() { + startWg.Done() + _, done, err := pool.acquire(ctx, "i-concurrent") + results <- result{done, err} + }() + } + startWg.Wait() + // give all N goroutines time to enter pool.acquire + time.Sleep(20 * time.Millisecond) + close(gate) + + for i := 0; i < N; i++ { + r := <-results + if r.err != nil { + t.Fatalf("acquire %d: %v", i, r.err) + } + r.done(false) + } + + if got := atomic.LoadInt64(setupCount); got != 1 { + t.Errorf("expected 1 setup across %d concurrent acquires, got %d", N, got) + } +} + +// TestEICTunnelPool_TTLZeroDisablesPooling pins the escape hatch: +// poolTTL=0 means every acquire goes straight through to setup + +// cleanup, no entry kept. Useful for tests / opt-out. +func TestEICTunnelPool_TTLZeroDisablesPooling(t *testing.T) { + setupCount, cleanupCount, _, unblock := withPoolSetupStub(t) + poolTTL = 0 + pool := freshPool(t) + ctx := context.Background() + + _, done, err := pool.acquire(ctx, "i-ttlzero") + if err != nil { + t.Fatalf("acquire 1: %v", err) + } + done(false) + + unblock() + _, done, err = pool.acquire(ctx, "i-ttlzero") + if err != nil { + t.Fatalf("acquire 2: %v", err) + } + done(false) + + if got := atomic.LoadInt64(setupCount); got != 2 { + t.Errorf("expected 2 setups with TTL=0 (pool disabled), got %d", got) + } + if got := atomic.LoadInt64(cleanupCount); got != 2 { + t.Errorf("expected 2 cleanups with TTL=0 (each release closes), got %d", got) + } +} + +// TestEICTunnelPool_LRUEvictionAtCap pins the LRU defence: when the +// pool reaches poolMaxEntries, a new acquire for an unseen +// instanceID evicts the LRU idle entry instead of growing unbounded. +func TestEICTunnelPool_LRUEvictionAtCap(t *testing.T) { + setupCount, cleanupCount, _, _ := withPoolSetupStub(t) + prev := poolMaxEntries + poolMaxEntries = 2 + t.Cleanup(func() { poolMaxEntries = prev }) + poolTTL = 50 * time.Second + + // Replace stub with one that doesn't gate so we can fill quickly. + poolSetupTunnel = func(ctx context.Context, instanceID string) ( + eicSSHSession, func(), error) { + atomic.AddInt64(setupCount, 1) + return eicSSHSession{instanceID: instanceID}, func() { + atomic.AddInt64(cleanupCount, 1) + }, nil + } + + pool := freshPool(t) + ctx := context.Background() + + for _, id := range []string{"i-1", "i-2"} { + _, done, err := pool.acquire(ctx, id) + if err != nil { + t.Fatalf("acquire %s: %v", id, err) + } + done(false) + } + // Both entries idle, pool at cap. + _, done, err := pool.acquire(ctx, "i-3") + if err != nil { + t.Fatalf("acquire i-3: %v", err) + } + done(false) + + // Wait for the goroutine'd cleanup of the evicted entry. + deadline := time.Now().Add(500 * time.Millisecond) + for atomic.LoadInt64(cleanupCount) < 1 && time.Now().Before(deadline) { + time.Sleep(10 * time.Millisecond) + } + + if got := atomic.LoadInt64(setupCount); got != 3 { + t.Errorf("expected 3 setups (one per unique instance), got %d", got) + } + if got := atomic.LoadInt64(cleanupCount); got < 1 { + t.Errorf("expected ≥1 cleanup (LRU eviction), got %d", got) + } +} + +// TestEICTunnelPool_PoisonedClassification pins the heuristic that +// distinguishes tunnel-fatal errors (poison the entry) from +// app-level errors (file not found, validation) that should NOT +// invalidate the tunnel. +func TestEICTunnelPool_PoisonedClassification(t *testing.T) { + cases := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"file not found", errors.New("os: file does not exist"), false}, + {"validation", errors.New("invalid path: must be relative"), false}, + {"connection refused", errors.New("ssh: connect to host: connection refused"), true}, + {"connection refused upper", errors.New("Connection Refused"), true}, + {"broken pipe", errors.New("write tunnel: broken pipe"), true}, + {"permission denied", errors.New("Permission denied (publickey)"), true}, + {"auth failed", errors.New("Authentication failed"), true}, + {"connection reset", errors.New("Connection reset by peer"), true}, + {"port forward", errors.New("port forwarding failed"), true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := fnErrIndicatesTunnelFault(tc.err) + if got != tc.want { + t.Errorf("fnErrIndicatesTunnelFault(%v) = %v, want %v", + tc.err, got, tc.want) + } + }) + } +} + +// TestEICTunnelPool_RefcountBlocksEviction pins that an entry past +// TTL is NOT evicted while a caller still holds it — preventing +// use-after-free in the holder. +func TestEICTunnelPool_RefcountBlocksEviction(t *testing.T) { + setupCount, cleanupCount, _, _ := withPoolSetupStub(t) + poolTTL = 30 * time.Millisecond + poolJanitorInterval = 5 * time.Millisecond + pool := freshPool(t) + ctx := context.Background() + + _, done, err := pool.acquire(ctx, "i-hold") + if err != nil { + t.Fatalf("acquire: %v", err) + } + + // Sleep past TTL while holding the session. Janitor sweeps + // every 5ms but must skip our entry because refcount=1. + time.Sleep(80 * time.Millisecond) + + if got := atomic.LoadInt64(cleanupCount); got != 0 { + t.Errorf("expected 0 cleanups while holder is active, got %d", got) + } + + done(false) + // Now refcount=0 and entry is past TTL; releaser triggers cleanup. + deadline := time.Now().Add(200 * time.Millisecond) + for atomic.LoadInt64(cleanupCount) < 1 && time.Now().Before(deadline) { + time.Sleep(5 * time.Millisecond) + } + if got := atomic.LoadInt64(cleanupCount); got != 1 { + t.Errorf("expected 1 cleanup after release of expired entry, got %d", got) + } + if got := atomic.LoadInt64(setupCount); got != 1 { + t.Errorf("setupCount tracking: got %d, want 1", got) + } +} + +// TestPooledWithEICTunnel_PanicPoisonsEntry pins that a panic +// from fn poisons the pool entry on the way out — refcount goes +// back to zero (no leak) and the entry is marked unusable so the +// next acquire builds fresh. Without the defer-release pattern, a +// panic would leave refcount=1 forever and the entry would never +// evict. +func TestPooledWithEICTunnel_PanicPoisonsEntry(t *testing.T) { + setupCount, _, _, _ := withPoolSetupStub(t) + poolTTL = 50 * time.Second + globalEICTunnelPool = newEICTunnelPool() + t.Cleanup(globalEICTunnelPool.stop) + + func() { + defer func() { + if r := recover(); r == nil { + t.Errorf("expected panic to bubble up, got nil") + } + }() + _ = pooledWithEICTunnel(context.Background(), "i-panic", + func(s eicSSHSession) error { panic("boom") }) + }() + + // Replenish the gate so the next setup can run. + prev := poolSetupTunnel + poolSetupTunnel = func(ctx context.Context, instanceID string) ( + eicSSHSession, func(), error) { + atomic.AddInt64(setupCount, 1) + return eicSSHSession{instanceID: instanceID}, func() {}, nil + } + t.Cleanup(func() { poolSetupTunnel = prev }) + + // Next acquire must build fresh — entry was poisoned by panic. + if err := pooledWithEICTunnel(context.Background(), "i-panic", + func(s eicSSHSession) error { return nil }); err != nil { + t.Fatalf("post-panic acquire: %v", err) + } + if got := atomic.LoadInt64(setupCount); got != 2 { + t.Errorf("expected 2 setups (panic poisoned, rebuild), got %d", got) + } +} + +// TestPooledWithEICTunnel_PreservesFnErr pins that errors from the +// inner fn pass through to the caller verbatim — pool wrapping +// should not swallow or transform error semantics for app code. +func TestPooledWithEICTunnel_PreservesFnErr(t *testing.T) { + withPoolSetupStub(t) + poolTTL = 50 * time.Second + + // Reset the global pool so this test is isolated from any prior + // test that may have populated it. + globalEICTunnelPool = newEICTunnelPool() + + want := errors.New("file does not exist") + got := pooledWithEICTunnel(context.Background(), "i-fn-err", + func(s eicSSHSession) error { return want }) + if !errors.Is(got, want) { + t.Errorf("pooledWithEICTunnel returned %v, want %v", got, want) + } +}