fix(eic-tunnel-pool): port race fix b6646910 from staging

Sister-agent investigation of PR #99's Platform (Go) regression identified
that PR #99's head pre-dates b6646910 (fix(eic-tunnel-pool): capture
poolJanitorInterval at pool construction).

Without this commit, TestPooledWithEICTunnel_PanicPoisonsEntry fails under
-race on main when the cherry-pick lands. Including it in this cherry-pick
to keep main Platform (Go) green.

Verified locally by sister agent: 3-of-3 reproducible race FAIL without fix,
3-of-3 PASS with fix. 2-of-2 full handlers go test -race PASS with this
file from staging tip.

Co-authored-by: Claude (orchestrator)
This commit is contained in:
devops-engineer 2026-05-08 02:26:52 +00:00
parent 570f456436
commit 47c0a8c903
2 changed files with 924 additions and 0 deletions

View File

@ -0,0 +1,457 @@
package handlers
// eic_tunnel_pool.go — refcounted pool for EIC SSH tunnels keyed on
// instanceID. Reuses one tunnel across N file ops, amortising the
// ssh-keygen + SendSSHPublicKey + open-tunnel + waitForPort cost
// (~3-5s) over multiple cats/finds (~50-200ms each).
//
// Origin: core#11 — canvas detail-panel config + filesystem load
// took ~20s. ConfigTab fans out 4 GETs serially; the slowest is
// /files/config.yaml which dispatches to readFileViaEIC. Without a
// pool, every readFileViaEIC + listFilesViaEIC + writeFileViaEIC +
// deleteFileViaEIC pays the full setup cost even when fired
// back-to-back on the same workspace EC2.
//
// The pool keeps one eicSSHSession alive per instanceID for up to
// poolTTL. SendSSHPublicKey grants a 60s key validity, so poolTTL
// must stay strictly below that to avoid serving requests on a
// just-expired key. We default to 50s with a 10s safety margin.
//
// Concurrency model:
//
// - Single mutex guards the entries map.
// - Slow path (tunnel setup) runs OUTSIDE the lock, gated by an
// "intent" placeholder so concurrent acquires for the same
// instanceID don't both build a tunnel — the loser drops its
// setup and uses the winner's.
// - Refcount on each entry; eviction blocked while refcount > 0.
// - Janitor goroutine sweeps every poolJanitorInterval, drops
// entries where refcount == 0 && expiresAt < now.
//
// Test injection:
//
// - poolSetupTunnel is a package-level var so tests can swap the
// slow path for a counting stub. Production wires it to
// realWithEICTunnel-style setup.
// - withEICTunnel (the public, single-shot API) is also a var
// (already, see template_files_eic.go). It's rebound here to
// pooledWithEICTunnel which routes through globalEICTunnelPool.
// - Tests that need single-shot behaviour can set poolTTL = 0,
// which makes pooledWithEICTunnel fall through to the underlying
// setup directly (no pool entry kept).
import (
"context"
"fmt"
"sync"
"time"
)
// poolTTL is the maximum age of a pooled tunnel. Must be strictly
// less than the SendSSHPublicKey grant window (60s) so we never
// serve a request through a key that's about to expire mid-op.
//
// Configurable via init-time wiring (see initEICTunnelPool); not a
// const so tests can pin TTL=0 (disable pooling) or TTL=50ms (drive
// eviction tests).
var poolTTL = 50 * time.Second
// poolJanitorInterval is how often the janitor goroutine sweeps for
// expired idle entries. Tighter than poolTTL so eviction is timely;
// loose enough that the goroutine doesn't burn CPU.
var poolJanitorInterval = 10 * time.Second
// poolMaxEntries caps simultaneous instanceIDs the pool tracks.
// Beyond this, new acquires evict the LRU entry. Defends against a
// pathological caller (e.g. a sweep over hundreds of workspace
// EC2s) from leaking unbounded tunnel processes. 32 is a generous
// ceiling for the canvas use case (one human navigates ≤ ~5
// workspaces at a time).
var poolMaxEntries = 32
// poolSetupTunnel is the slow-path tunnel constructor. Wrapped in a
// var so tests can inject a counter stub. Returns a session and a
// cleanup function (closes the open-tunnel subprocess + scrubs the
// ephemeral keydir). nil session + non-nil err means setup failed
// and there is nothing to clean up.
//
// Production wiring lives in eic_tunnel_pool_setup.go (a thin shim
// over the existing realWithEICTunnel logic).
var poolSetupTunnel = func(ctx context.Context, instanceID string) (
sess eicSSHSession, cleanup func(), err error) {
return setupRealEICTunnel(ctx, instanceID)
}
// pooledTunnel is one entry in the pool. session is shared by N
// concurrent fn calls; cleanup runs once when refcount returns to
// zero AND the entry is past expiresAt or evicted.
//
// lastUsed tracks the most recent acquire time for LRU bookkeeping
// (overflow eviction). expiresAt is set at construction and not
// extended on use — a tunnel cannot live past poolTTL even if it's
// hot, because the underlying SendSSHPublicKey grant expires.
type pooledTunnel struct {
session eicSSHSession
cleanup func()
expiresAt time.Time
lastUsed time.Time
refcount int
poisoned bool // true if a fn returned a tunnel-fatal error; do not reuse
}
// eicTunnelPool is the package-level pool. Single instance lives
// in globalEICTunnelPool; constructor runs lazily on first acquire.
type eicTunnelPool struct {
mu sync.Mutex
entries map[string]*pooledTunnel
// pendingSetups guards concurrent setup for the same instanceID.
// First acquirer takes the slot; later ones wait on the channel.
pendingSetups map[string]chan struct{}
stopJanitor chan struct{}
// janitorInterval is captured at pool construction from the
// package-level poolJanitorInterval var. Captured (not re-read on
// every tick) so a test that swaps the package var via t.Cleanup
// after a global pool's janitor is already running can't race
// with that goroutine's ticker read. The global pool is created
// lazily once per process via sync.Once; before this capture
// landed, every test that touched poolJanitorInterval after the
// global pool's first-touch raced the janitor (caught by -race
// on staging tip 249dbc6a — TestPooledWithEICTunnel_PanicPoisonsEntry).
// Tests still get the new value on a freshPool() because they
// set the package var BEFORE calling newEICTunnelPool().
janitorInterval time.Duration
}
var (
globalEICTunnelPool *eicTunnelPool
globalEICTunnelPoolOnce sync.Once
)
// getEICTunnelPool returns the singleton pool, lazy-initialising on
// first call. Idempotent.
func getEICTunnelPool() *eicTunnelPool {
globalEICTunnelPoolOnce.Do(func() {
globalEICTunnelPool = newEICTunnelPool()
go globalEICTunnelPool.janitor()
})
return globalEICTunnelPool
}
// newEICTunnelPool constructs an empty pool. Exported so tests can
// build isolated pools without sharing the singleton.
//
// Captures poolJanitorInterval at construction time so the janitor
// goroutine doesn't race with t.Cleanup-driven swaps of the package
// var. See the janitorInterval field comment for the failure mode.
func newEICTunnelPool() *eicTunnelPool {
return &eicTunnelPool{
entries: map[string]*pooledTunnel{},
pendingSetups: map[string]chan struct{}{},
stopJanitor: make(chan struct{}),
janitorInterval: poolJanitorInterval,
}
}
// acquire returns a usable session for instanceID. If a healthy entry
// exists, refcount++ and return it. If a setup is in flight for the
// same instanceID, wait for it. Otherwise build one (slow path).
//
// done() must be called by the caller when the op finishes. It
// decrements refcount and triggers cleanup if the entry is past
// TTL or poisoned and refcount==0.
//
// Errors from the slow path propagate; pool state is not modified
// for failed setups (no poisoned entry created — that's only for
// fn-returned errors on a previously-good session).
func (p *eicTunnelPool) acquire(ctx context.Context, instanceID string) (
sess eicSSHSession, done func(poisoned bool), err error) {
if poolTTL <= 0 {
// Pool disabled (TTL=0 mode for tests / opt-out). Fall
// through to a direct setup with caller-driven cleanup.
s, cleanup, err := poolSetupTunnel(ctx, instanceID)
if err != nil {
return eicSSHSession{}, nil, err
}
return s, func(_ bool) { cleanup() }, nil
}
for {
p.mu.Lock()
if pt, ok := p.entries[instanceID]; ok && !pt.poisoned && pt.expiresAt.After(time.Now()) {
pt.refcount++
pt.lastUsed = time.Now()
p.mu.Unlock()
return pt.session, p.releaser(instanceID, pt), nil
}
// Either no entry, expired entry, or poisoned entry. If a
// setup is already in flight, wait and retry.
if pending, ok := p.pendingSetups[instanceID]; ok {
p.mu.Unlock()
select {
case <-pending:
continue // re-check the entries map
case <-ctx.Done():
return eicSSHSession{}, nil, ctx.Err()
}
}
// Drop expired/poisoned entry now (we'll cleanup outside
// the lock — the entry is unreferenced or we'd not be here).
var oldCleanup func()
if pt, ok := p.entries[instanceID]; ok {
if pt.refcount == 0 {
oldCleanup = pt.cleanup
delete(p.entries, instanceID)
}
}
// Reserve the setup slot.
signal := make(chan struct{})
p.pendingSetups[instanceID] = signal
p.mu.Unlock()
if oldCleanup != nil {
go oldCleanup()
}
// Slow path: build a new tunnel. Anything that goes wrong
// here cleans up the pendingSetups slot and propagates to
// the caller without leaving the pool in a state where the
// next acquire blocks waiting on a signal that never fires.
newSess, cleanup, setupErr := poolSetupTunnel(ctx, instanceID)
p.mu.Lock()
delete(p.pendingSetups, instanceID)
close(signal)
if setupErr != nil {
p.mu.Unlock()
return eicSSHSession{}, nil, fmt.Errorf("eic tunnel setup: %w", setupErr)
}
// Enforce LRU bound BEFORE inserting so we don't briefly
// exceed the cap even by one entry.
p.evictLRUIfFullLocked(instanceID)
pt := &pooledTunnel{
session: newSess,
cleanup: cleanup,
expiresAt: time.Now().Add(poolTTL),
lastUsed: time.Now(),
refcount: 1,
}
p.entries[instanceID] = pt
p.mu.Unlock()
return pt.session, p.releaser(instanceID, pt), nil
}
}
// releaser returns a closure that decrements refcount and triggers
// cleanup if (a) the entry is past TTL or (b) the caller signalled
// poison. Idempotent against double-release (decrements once via the
// captured pt; pool entry may have been replaced by then).
func (p *eicTunnelPool) releaser(instanceID string, pt *pooledTunnel) func(poisoned bool) {
released := false
return func(poisoned bool) {
p.mu.Lock()
defer p.mu.Unlock()
if released {
return
}
released = true
pt.refcount--
if poisoned {
pt.poisoned = true
}
// Evict immediately if poisoned-and-idle OR expired-and-idle.
// Hot entries (refcount > 0) defer eviction to the last release.
if pt.refcount == 0 && (pt.poisoned || pt.expiresAt.Before(time.Now())) {
// If the entry in the map is still us, remove it.
if cur, ok := p.entries[instanceID]; ok && cur == pt {
delete(p.entries, instanceID)
}
go pt.cleanup()
}
}
}
// evictLRUIfFullLocked drops the least-recently-used IDLE entry
// when the pool is at capacity. Caller must hold p.mu. The new
// instanceID about to be inserted is excluded so we don't evict
// ourselves. If no idle entries exist, no eviction happens — the
// new entry will push us above the soft cap until something releases.
func (p *eicTunnelPool) evictLRUIfFullLocked(skipInstance string) {
if len(p.entries) < poolMaxEntries {
return
}
var oldestKey string
var oldest *pooledTunnel
for k, pt := range p.entries {
if k == skipInstance {
continue
}
if pt.refcount > 0 {
continue
}
if oldest == nil || pt.lastUsed.Before(oldest.lastUsed) {
oldestKey = k
oldest = pt
}
}
if oldest == nil {
return // every entry is in use; no eviction possible
}
delete(p.entries, oldestKey)
go oldest.cleanup()
}
// janitor periodically scans for entries that are idle AND expired,
// closing their tunnels. Runs forever (per pool lifetime); cancelled
// by close(p.stopJanitor) for tests that build short-lived pools.
//
// Reads p.janitorInterval (captured at construction) instead of the
// package-level poolJanitorInterval — see janitorInterval field comment.
func (p *eicTunnelPool) janitor() {
t := time.NewTicker(p.janitorInterval)
defer t.Stop()
for {
select {
case <-t.C:
p.sweep()
case <-p.stopJanitor:
return
}
}
}
// sweep is one janitor pass. Drops idle expired entries.
func (p *eicTunnelPool) sweep() {
p.mu.Lock()
now := time.Now()
var toClose []func()
for k, pt := range p.entries {
if pt.refcount == 0 && pt.expiresAt.Before(now) {
toClose = append(toClose, pt.cleanup)
delete(p.entries, k)
}
}
p.mu.Unlock()
for _, c := range toClose {
go c()
}
}
// stop terminates the janitor and closes all idle entries. Hot
// (refcount > 0) entries are NOT force-closed — callers running
// against them would see a use-after-free. In practice stop is only
// called by tests that have already drained their callers.
func (p *eicTunnelPool) stop() {
close(p.stopJanitor)
p.mu.Lock()
defer p.mu.Unlock()
for k, pt := range p.entries {
if pt.refcount == 0 {
go pt.cleanup()
delete(p.entries, k)
}
}
}
// pooledWithEICTunnel is the pool-backed replacement for
// realWithEICTunnel. The signature matches `var withEICTunnel`
// exactly so the rebind (in initEICTunnelPool) is a drop-in.
//
// Errors from `fn` itself are forwarded to the caller AND mark the
// pool entry as poisoned, so the next acquire builds a fresh
// tunnel. This catches the case where the workspace EC2 was
// restarted out-of-band (tunnel still appears alive locally but
// every cat/find errors out).
func pooledWithEICTunnel(ctx context.Context, instanceID string,
fn func(s eicSSHSession) error) error {
pool := getEICTunnelPool()
sess, done, err := pool.acquire(ctx, instanceID)
if err != nil {
return err
}
// poisoned defaults to true so a panic from fn poisons the
// entry on the way through the deferred release. Without the
// defer, a panicking fn would leak refcount=1 forever and
// permanently block eviction of this entry. The fn-error path
// resets poisoned to its real classification before return.
poisoned := true
defer func() { done(poisoned) }()
fnErr := fn(sess)
poisoned = fnErrIndicatesTunnelFault(fnErr)
return fnErr
}
// fnErrIndicatesTunnelFault returns true for fn errors whose nature
// suggests the underlying tunnel is no longer reusable (auth gone,
// network gone, ssh process dead). Returning true poisons the pool
// entry so the next acquire builds fresh.
//
// Conservative: only marks tunnel-faulty for clearly tunnel-level
// failures (connection refused, broken pipe, ssh exit-status from
// fatal-channel signals). A `cat` returning os.ErrNotExist on a
// missing file is NOT a tunnel fault — that's the file path being
// wrong, the tunnel is fine.
func fnErrIndicatesTunnelFault(err error) bool {
if err == nil {
return false
}
msg := err.Error()
// stderr substrings produced by ssh when the tunnel is broken.
for _, marker := range []string{
"connection refused",
"connection closed",
"broken pipe",
"Connection reset by peer",
"kex_exchange_identification",
"port forwarding failed",
"Permission denied",
"Authentication failed",
} {
if containsCaseInsensitive(msg, marker) {
return true
}
}
return false
}
// containsCaseInsensitive avoids importing strings just for this
// (the file already needs ssh stderr matching elsewhere — this
// keeps the helper local to avoid a cross-file dependency).
func containsCaseInsensitive(s, substr string) bool {
if len(substr) > len(s) {
return false
}
// Manual lowercase compare loop; ssh error markers are ASCII so
// no need for unicode-aware folding.
low := func(b byte) byte {
if b >= 'A' && b <= 'Z' {
return b + 32
}
return b
}
for i := 0; i+len(substr) <= len(s); i++ {
match := true
for j := 0; j < len(substr); j++ {
if low(s[i+j]) != low(substr[j]) {
match = false
break
}
}
if match {
return true
}
}
return false
}
// initEICTunnelPool rebinds the package-level withEICTunnel var to
// the pooled implementation. Called once at package init via the
// init() in eic_tunnel_pool_setup.go (split file so the rebind
// itself is testable without dragging in the production setup
// shim's exec/aws dependencies).
func initEICTunnelPool() {
withEICTunnel = pooledWithEICTunnel
}

View File

@ -0,0 +1,467 @@
package handlers
// eic_tunnel_pool_test.go — tests for the refcounted EIC tunnel pool
// added in core#11. Stubs poolSetupTunnel with a counter so the
// tests don't fork ssh-keygen / aws subprocesses.
//
// Per memory feedback_assert_exact_not_substring: each test pins
// exact expected counts (not "at least N") so a regression that
// silently double-sets-up surfaces here.
import (
"context"
"errors"
"sync"
"sync/atomic"
"testing"
"time"
)
// withPoolSetupStub swaps poolSetupTunnel for a counting fake that
// returns a sentinel session and a cleanup func that records its
// invocation. Restores on test cleanup.
//
// setupSignal blocks each setup until released — for concurrent-
// acquire tests where we want to gate setup completion.
func withPoolSetupStub(t *testing.T) (
setupCount *int64, cleanupCount *int64, restore func(), unblock func()) {
t.Helper()
prev := poolSetupTunnel
prevTTL := poolTTL
prevJanitor := poolJanitorInterval
var sc, cc int64
setupCount, cleanupCount = &sc, &cc
gate := make(chan struct{}, 1)
gate <- struct{}{} // allow the first setup through immediately
unblock = func() { gate <- struct{}{} }
poolSetupTunnel = func(ctx context.Context, instanceID string) (
eicSSHSession, func(), error) {
select {
case <-gate:
case <-ctx.Done():
return eicSSHSession{}, nil, ctx.Err()
}
atomic.AddInt64(&sc, 1)
sess := eicSSHSession{
instanceID: instanceID,
osUser: "ubuntu",
localPort: 10000 + int(atomic.LoadInt64(&sc)),
keyPath: "/tmp/molecule-eic-test-" + instanceID,
}
cleanup := func() { atomic.AddInt64(&cc, 1) }
return sess, cleanup, nil
}
restore = func() {
poolSetupTunnel = prev
poolTTL = prevTTL
poolJanitorInterval = prevJanitor
}
t.Cleanup(restore)
return
}
// freshPool returns an isolated pool (NOT the global) so tests run
// independently. Stops the janitor on cleanup.
func freshPool(t *testing.T) *eicTunnelPool {
t.Helper()
p := newEICTunnelPool()
t.Cleanup(p.stop)
return p
}
// TestEICTunnelPool_FourOpsAmortise pins the core invariant: four
// sequential acquire/release cycles on the same instanceID share
// ONE underlying tunnel setup. Mutation: delete the cache hit branch
// in acquire() → setupCount goes 1 → 4 → test fails.
func TestEICTunnelPool_FourOpsAmortise(t *testing.T) {
setupCount, cleanupCount, _, _ := withPoolSetupStub(t)
// Refill gate after each setup so concurrent stubs aren't blocked
// (we want every test to be able to set up if it needs to).
t.Cleanup(func() { /* no-op; defer is enough */ })
poolTTL = 50 * time.Second
pool := freshPool(t)
ctx := context.Background()
for i := 0; i < 4; i++ {
sess, done, err := pool.acquire(ctx, "i-test-1")
if err != nil {
t.Fatalf("op %d: acquire: %v", i, err)
}
if sess.instanceID != "i-test-1" {
t.Fatalf("op %d: session has wrong instanceID: %q", i, sess.instanceID)
}
done(false)
}
if got := atomic.LoadInt64(setupCount); got != 1 {
t.Errorf("expected exactly 1 tunnel setup across 4 ops, got %d", got)
}
if got := atomic.LoadInt64(cleanupCount); got != 0 {
t.Errorf("expected 0 cleanups while entry is hot (TTL=50s), got %d", got)
}
}
// TestEICTunnelPool_DifferentInstancesDoNotShare pins that two
// different instanceIDs each get their own tunnel — the pool is
// keyed on instanceID, not a single global slot.
func TestEICTunnelPool_DifferentInstancesDoNotShare(t *testing.T) {
setupCount, _, _, unblock := withPoolSetupStub(t)
poolTTL = 50 * time.Second
pool := freshPool(t)
ctx := context.Background()
// First instance setup uses the initial gate slot.
_, doneA, err := pool.acquire(ctx, "i-a")
if err != nil {
t.Fatalf("acquire A: %v", err)
}
doneA(false)
// Second instance needs a new slot through the gate.
unblock()
_, doneB, err := pool.acquire(ctx, "i-b")
if err != nil {
t.Fatalf("acquire B: %v", err)
}
doneB(false)
if got := atomic.LoadInt64(setupCount); got != 2 {
t.Errorf("expected 2 setups (one per instance), got %d", got)
}
}
// TestEICTunnelPool_TTLEviction: a short TTL forces the second op
// to build a fresh tunnel after the first expires.
func TestEICTunnelPool_TTLEviction(t *testing.T) {
setupCount, cleanupCount, _, unblock := withPoolSetupStub(t)
poolTTL = 50 * time.Millisecond
poolJanitorInterval = 1 * time.Second // keep janitor away
pool := freshPool(t)
ctx := context.Background()
_, done, err := pool.acquire(ctx, "i-ttl")
if err != nil {
t.Fatalf("acquire 1: %v", err)
}
done(false)
time.Sleep(80 * time.Millisecond) // past TTL
unblock() // allow next setup
_, done, err = pool.acquire(ctx, "i-ttl")
if err != nil {
t.Fatalf("acquire 2: %v", err)
}
done(false)
if got := atomic.LoadInt64(setupCount); got != 2 {
t.Errorf("expected 2 setups (TTL eviction between), got %d", got)
}
// First entry should have been cleaned up when the second
// acquire evicted it on the slow path. Cleanup runs in a
// goroutine; poll briefly for it to land.
deadline := time.Now().Add(500 * time.Millisecond)
for atomic.LoadInt64(cleanupCount) < 1 && time.Now().Before(deadline) {
time.Sleep(5 * time.Millisecond)
}
if got := atomic.LoadInt64(cleanupCount); got < 1 {
t.Errorf("expected ≥1 cleanup (first entry evicted), got %d", got)
}
}
// TestEICTunnelPool_FailureInvalidates pins the poison-on-fault
// behavior — fn returning a tunnel-fatal error marks the entry
// unusable so the next acquire builds fresh.
func TestEICTunnelPool_FailureInvalidates(t *testing.T) {
setupCount, _, _, unblock := withPoolSetupStub(t)
poolTTL = 50 * time.Second
pool := freshPool(t)
ctx := context.Background()
_, done, err := pool.acquire(ctx, "i-fault")
if err != nil {
t.Fatalf("acquire 1: %v", err)
}
done(true) // signal poison
unblock() // let the next setup through
_, done, err = pool.acquire(ctx, "i-fault")
if err != nil {
t.Fatalf("acquire 2: %v", err)
}
done(false)
if got := atomic.LoadInt64(setupCount); got != 2 {
t.Errorf("expected 2 setups (poison forced rebuild), got %d", got)
}
}
// TestEICTunnelPool_ConcurrentAcquireSingleSetup pins that N
// concurrent acquires for the same instanceID before any release
// only trigger ONE tunnel setup — the rest wait via pendingSetups.
//
// Without this guard each concurrent acquire would spawn its own
// tunnel and the loser-cleanup would still leak refcount. Mutation:
// delete the pendingSetups gate → setupCount goes 1 → N → fails.
func TestEICTunnelPool_ConcurrentAcquireSingleSetup(t *testing.T) {
setupCount, _, _, _ := withPoolSetupStub(t)
// Pause setup so all goroutines pile into the pending slot.
prev := poolSetupTunnel
gate := make(chan struct{})
poolSetupTunnel = func(ctx context.Context, instanceID string) (
eicSSHSession, func(), error) {
<-gate
atomic.AddInt64(setupCount, 1)
return eicSSHSession{instanceID: instanceID}, func() {}, nil
}
t.Cleanup(func() { poolSetupTunnel = prev })
poolTTL = 50 * time.Second
pool := freshPool(t)
ctx := context.Background()
const N = 8
type result struct {
done func(bool)
err error
}
results := make(chan result, N)
var startWg sync.WaitGroup
startWg.Add(N)
for i := 0; i < N; i++ {
go func() {
startWg.Done()
_, done, err := pool.acquire(ctx, "i-concurrent")
results <- result{done, err}
}()
}
startWg.Wait()
// give all N goroutines time to enter pool.acquire
time.Sleep(20 * time.Millisecond)
close(gate)
for i := 0; i < N; i++ {
r := <-results
if r.err != nil {
t.Fatalf("acquire %d: %v", i, r.err)
}
r.done(false)
}
if got := atomic.LoadInt64(setupCount); got != 1 {
t.Errorf("expected 1 setup across %d concurrent acquires, got %d", N, got)
}
}
// TestEICTunnelPool_TTLZeroDisablesPooling pins the escape hatch:
// poolTTL=0 means every acquire goes straight through to setup +
// cleanup, no entry kept. Useful for tests / opt-out.
func TestEICTunnelPool_TTLZeroDisablesPooling(t *testing.T) {
setupCount, cleanupCount, _, unblock := withPoolSetupStub(t)
poolTTL = 0
pool := freshPool(t)
ctx := context.Background()
_, done, err := pool.acquire(ctx, "i-ttlzero")
if err != nil {
t.Fatalf("acquire 1: %v", err)
}
done(false)
unblock()
_, done, err = pool.acquire(ctx, "i-ttlzero")
if err != nil {
t.Fatalf("acquire 2: %v", err)
}
done(false)
if got := atomic.LoadInt64(setupCount); got != 2 {
t.Errorf("expected 2 setups with TTL=0 (pool disabled), got %d", got)
}
if got := atomic.LoadInt64(cleanupCount); got != 2 {
t.Errorf("expected 2 cleanups with TTL=0 (each release closes), got %d", got)
}
}
// TestEICTunnelPool_LRUEvictionAtCap pins the LRU defence: when the
// pool reaches poolMaxEntries, a new acquire for an unseen
// instanceID evicts the LRU idle entry instead of growing unbounded.
func TestEICTunnelPool_LRUEvictionAtCap(t *testing.T) {
setupCount, cleanupCount, _, _ := withPoolSetupStub(t)
prev := poolMaxEntries
poolMaxEntries = 2
t.Cleanup(func() { poolMaxEntries = prev })
poolTTL = 50 * time.Second
// Replace stub with one that doesn't gate so we can fill quickly.
poolSetupTunnel = func(ctx context.Context, instanceID string) (
eicSSHSession, func(), error) {
atomic.AddInt64(setupCount, 1)
return eicSSHSession{instanceID: instanceID}, func() {
atomic.AddInt64(cleanupCount, 1)
}, nil
}
pool := freshPool(t)
ctx := context.Background()
for _, id := range []string{"i-1", "i-2"} {
_, done, err := pool.acquire(ctx, id)
if err != nil {
t.Fatalf("acquire %s: %v", id, err)
}
done(false)
}
// Both entries idle, pool at cap.
_, done, err := pool.acquire(ctx, "i-3")
if err != nil {
t.Fatalf("acquire i-3: %v", err)
}
done(false)
// Wait for the goroutine'd cleanup of the evicted entry.
deadline := time.Now().Add(500 * time.Millisecond)
for atomic.LoadInt64(cleanupCount) < 1 && time.Now().Before(deadline) {
time.Sleep(10 * time.Millisecond)
}
if got := atomic.LoadInt64(setupCount); got != 3 {
t.Errorf("expected 3 setups (one per unique instance), got %d", got)
}
if got := atomic.LoadInt64(cleanupCount); got < 1 {
t.Errorf("expected ≥1 cleanup (LRU eviction), got %d", got)
}
}
// TestEICTunnelPool_PoisonedClassification pins the heuristic that
// distinguishes tunnel-fatal errors (poison the entry) from
// app-level errors (file not found, validation) that should NOT
// invalidate the tunnel.
func TestEICTunnelPool_PoisonedClassification(t *testing.T) {
cases := []struct {
name string
err error
want bool
}{
{"nil", nil, false},
{"file not found", errors.New("os: file does not exist"), false},
{"validation", errors.New("invalid path: must be relative"), false},
{"connection refused", errors.New("ssh: connect to host: connection refused"), true},
{"connection refused upper", errors.New("Connection Refused"), true},
{"broken pipe", errors.New("write tunnel: broken pipe"), true},
{"permission denied", errors.New("Permission denied (publickey)"), true},
{"auth failed", errors.New("Authentication failed"), true},
{"connection reset", errors.New("Connection reset by peer"), true},
{"port forward", errors.New("port forwarding failed"), true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := fnErrIndicatesTunnelFault(tc.err)
if got != tc.want {
t.Errorf("fnErrIndicatesTunnelFault(%v) = %v, want %v",
tc.err, got, tc.want)
}
})
}
}
// TestEICTunnelPool_RefcountBlocksEviction pins that an entry past
// TTL is NOT evicted while a caller still holds it — preventing
// use-after-free in the holder.
func TestEICTunnelPool_RefcountBlocksEviction(t *testing.T) {
setupCount, cleanupCount, _, _ := withPoolSetupStub(t)
poolTTL = 30 * time.Millisecond
poolJanitorInterval = 5 * time.Millisecond
pool := freshPool(t)
ctx := context.Background()
_, done, err := pool.acquire(ctx, "i-hold")
if err != nil {
t.Fatalf("acquire: %v", err)
}
// Sleep past TTL while holding the session. Janitor sweeps
// every 5ms but must skip our entry because refcount=1.
time.Sleep(80 * time.Millisecond)
if got := atomic.LoadInt64(cleanupCount); got != 0 {
t.Errorf("expected 0 cleanups while holder is active, got %d", got)
}
done(false)
// Now refcount=0 and entry is past TTL; releaser triggers cleanup.
deadline := time.Now().Add(200 * time.Millisecond)
for atomic.LoadInt64(cleanupCount) < 1 && time.Now().Before(deadline) {
time.Sleep(5 * time.Millisecond)
}
if got := atomic.LoadInt64(cleanupCount); got != 1 {
t.Errorf("expected 1 cleanup after release of expired entry, got %d", got)
}
if got := atomic.LoadInt64(setupCount); got != 1 {
t.Errorf("setupCount tracking: got %d, want 1", got)
}
}
// TestPooledWithEICTunnel_PanicPoisonsEntry pins that a panic
// from fn poisons the pool entry on the way out — refcount goes
// back to zero (no leak) and the entry is marked unusable so the
// next acquire builds fresh. Without the defer-release pattern, a
// panic would leave refcount=1 forever and the entry would never
// evict.
func TestPooledWithEICTunnel_PanicPoisonsEntry(t *testing.T) {
setupCount, _, _, _ := withPoolSetupStub(t)
poolTTL = 50 * time.Second
globalEICTunnelPool = newEICTunnelPool()
t.Cleanup(globalEICTunnelPool.stop)
func() {
defer func() {
if r := recover(); r == nil {
t.Errorf("expected panic to bubble up, got nil")
}
}()
_ = pooledWithEICTunnel(context.Background(), "i-panic",
func(s eicSSHSession) error { panic("boom") })
}()
// Replenish the gate so the next setup can run.
prev := poolSetupTunnel
poolSetupTunnel = func(ctx context.Context, instanceID string) (
eicSSHSession, func(), error) {
atomic.AddInt64(setupCount, 1)
return eicSSHSession{instanceID: instanceID}, func() {}, nil
}
t.Cleanup(func() { poolSetupTunnel = prev })
// Next acquire must build fresh — entry was poisoned by panic.
if err := pooledWithEICTunnel(context.Background(), "i-panic",
func(s eicSSHSession) error { return nil }); err != nil {
t.Fatalf("post-panic acquire: %v", err)
}
if got := atomic.LoadInt64(setupCount); got != 2 {
t.Errorf("expected 2 setups (panic poisoned, rebuild), got %d", got)
}
}
// TestPooledWithEICTunnel_PreservesFnErr pins that errors from the
// inner fn pass through to the caller verbatim — pool wrapping
// should not swallow or transform error semantics for app code.
func TestPooledWithEICTunnel_PreservesFnErr(t *testing.T) {
withPoolSetupStub(t)
poolTTL = 50 * time.Second
// Reset the global pool so this test is isolated from any prior
// test that may have populated it.
globalEICTunnelPool = newEICTunnelPool()
want := errors.New("file does not exist")
got := pooledWithEICTunnel(context.Background(), "i-fn-err",
func(s eicSSHSession) error { return want })
if !errors.Is(got, want) {
t.Errorf("pooledWithEICTunnel returned %v, want %v", got, want)
}
}