Compare commits
2 Commits
main
...
fix/cherry
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
47c0a8c903 | ||
|
|
570f456436 |
130
.github/workflows/e2e-api.yml
vendored
130
.github/workflows/e2e-api.yml
vendored
@ -12,6 +12,59 @@ name: E2E API Smoke Test
|
|||||||
# spending CI cycles. See the in-job comment on the `e2e-api` job for
|
# spending CI cycles. See the in-job comment on the `e2e-api` job for
|
||||||
# why this is one job (not two-jobs-sharing-name) and the 2026-04-29
|
# why this is one job (not two-jobs-sharing-name) and the 2026-04-29
|
||||||
# PR #2264 incident that drove the consolidation.
|
# PR #2264 incident that drove the consolidation.
|
||||||
|
#
|
||||||
|
# Parallel-safety (Class B Hongming-owned CICD red sweep, 2026-05-08)
|
||||||
|
# -------------------------------------------------------------------
|
||||||
|
# Same substrate hazard as PR #98 (handlers-postgres-integration). Our
|
||||||
|
# Gitea act_runner runs with `container.network: host` (operator host
|
||||||
|
# `/opt/molecule/runners/config.yaml`), which means:
|
||||||
|
#
|
||||||
|
# * Two concurrent runs both try to bind their `-p 15432:5432` /
|
||||||
|
# `-p 16379:6379` host ports — the second postgres/redis FATALs
|
||||||
|
# with `Address in use` and `docker run` returns exit 125 with
|
||||||
|
# `Conflict. The container name "/molecule-ci-postgres" is already
|
||||||
|
# in use by container ...`. Verified in run a7/2727 on 2026-05-07.
|
||||||
|
# * The fixed container names `molecule-ci-postgres` / `-redis` (the
|
||||||
|
# pre-fix shape) collide on name AS WELL AS port. The cleanup-with-
|
||||||
|
# `docker rm -f` at the start of the second job KILLS the first
|
||||||
|
# job's still-running postgres/redis.
|
||||||
|
#
|
||||||
|
# Fix shape (mirrors PR #98's bridge-net pattern, adapted because
|
||||||
|
# platform-server is a Go binary on the host, not a containerised
|
||||||
|
# step):
|
||||||
|
#
|
||||||
|
# 1. Unique container names per run:
|
||||||
|
# pg-e2e-api-${RUN_ID}-${RUN_ATTEMPT}
|
||||||
|
# redis-e2e-api-${RUN_ID}-${RUN_ATTEMPT}
|
||||||
|
# `${RUN_ID}-${RUN_ATTEMPT}` is unique even across reruns of the
|
||||||
|
# same run_id.
|
||||||
|
# 2. Ephemeral host port per run (`-p 0:5432`), then read the actual
|
||||||
|
# bound port via `docker port` and export DATABASE_URL/REDIS_URL
|
||||||
|
# pointing at it. No fixed host-port → no port collision.
|
||||||
|
# 3. `127.0.0.1` (NOT `localhost`) in URLs — IPv6 first-resolve was
|
||||||
|
# the original flake fixed in #92 and the script's still IPv6-
|
||||||
|
# enabled.
|
||||||
|
# 4. `if: always()` cleanup so containers don't leak when test steps
|
||||||
|
# fail.
|
||||||
|
#
|
||||||
|
# Issue #94 items #2 + #3 (also fixed here):
|
||||||
|
# * Pre-pull `alpine:latest` so the platform-server's provisioner
|
||||||
|
# (`internal/handlers/container_files.go`) can stand up its
|
||||||
|
# ephemeral token-write helper without a daemon.io round-trip.
|
||||||
|
# * Create `molecule-monorepo-net` bridge network if missing so the
|
||||||
|
# provisioner's container.HostConfig {NetworkMode: ...} attach
|
||||||
|
# succeeds.
|
||||||
|
# Item #1 (timeouts) — evidence on recent runs (77/3191, ae/4270, 0e/
|
||||||
|
# 2318) shows Postgres ready in 3s, Redis in 1s, Platform in 1s when
|
||||||
|
# they DO come up. Timeouts are not the bottleneck; not bumped.
|
||||||
|
#
|
||||||
|
# Item explicitly NOT fixed here: failing test `Status back online`
|
||||||
|
# fails because the platform's langgraph workspace template image
|
||||||
|
# (ghcr.io/molecule-ai/workspace-template-langgraph:latest) returns
|
||||||
|
# 403 Forbidden post-2026-05-06 GitHub org suspension. That is a
|
||||||
|
# template-registry resolution issue (ADR-002 / local-build mode) and
|
||||||
|
# belongs in a separate change that touches workspace-server, not
|
||||||
|
# this workflow file.
|
||||||
|
|
||||||
on:
|
on:
|
||||||
push:
|
push:
|
||||||
@ -78,11 +131,14 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
timeout-minutes: 15
|
timeout-minutes: 15
|
||||||
env:
|
env:
|
||||||
DATABASE_URL: postgres://dev:dev@localhost:15432/molecule?sslmode=disable
|
# Unique per-run container names so concurrent runs on the host-
|
||||||
REDIS_URL: redis://localhost:16379
|
# network act_runner don't collide on name OR port.
|
||||||
|
# `${RUN_ID}-${RUN_ATTEMPT}` stays unique across reruns of the
|
||||||
|
# same run_id. PORT is set later (after docker port lookup) since
|
||||||
|
# we let Docker assign an ephemeral host port.
|
||||||
|
PG_CONTAINER: pg-e2e-api-${{ github.run_id }}-${{ github.run_attempt }}
|
||||||
|
REDIS_CONTAINER: redis-e2e-api-${{ github.run_id }}-${{ github.run_attempt }}
|
||||||
PORT: "8080"
|
PORT: "8080"
|
||||||
PG_CONTAINER: molecule-ci-postgres
|
|
||||||
REDIS_CONTAINER: molecule-ci-redis
|
|
||||||
steps:
|
steps:
|
||||||
- name: No-op pass (paths filter excluded this commit)
|
- name: No-op pass (paths filter excluded this commit)
|
||||||
if: needs.detect-changes.outputs.api != 'true'
|
if: needs.detect-changes.outputs.api != 'true'
|
||||||
@ -97,11 +153,53 @@ jobs:
|
|||||||
go-version: 'stable'
|
go-version: 'stable'
|
||||||
cache: true
|
cache: true
|
||||||
cache-dependency-path: workspace-server/go.sum
|
cache-dependency-path: workspace-server/go.sum
|
||||||
|
- name: Pre-pull alpine + ensure provisioner network (Issue #94 items #2 + #3)
|
||||||
|
if: needs.detect-changes.outputs.api == 'true'
|
||||||
|
run: |
|
||||||
|
# Provisioner uses alpine:latest for ephemeral token-write
|
||||||
|
# containers (workspace-server/internal/handlers/container_files.go).
|
||||||
|
# Pre-pull so the first provision in test_api.sh doesn't race
|
||||||
|
# the daemon's pull cache. Idempotent — `docker pull` is a no-op
|
||||||
|
# when the image is already present.
|
||||||
|
docker pull alpine:latest >/dev/null
|
||||||
|
# Provisioner attaches workspace containers to
|
||||||
|
# molecule-monorepo-net (workspace-server/internal/provisioner/
|
||||||
|
# provisioner.go::DefaultNetwork). The bridge already exists on
|
||||||
|
# the operator host's docker daemon — `network create` is
|
||||||
|
# idempotent via `|| true`.
|
||||||
|
docker network create molecule-monorepo-net >/dev/null 2>&1 || true
|
||||||
|
echo "alpine:latest pre-pulled; molecule-monorepo-net ensured."
|
||||||
- name: Start Postgres (docker)
|
- name: Start Postgres (docker)
|
||||||
if: needs.detect-changes.outputs.api == 'true'
|
if: needs.detect-changes.outputs.api == 'true'
|
||||||
run: |
|
run: |
|
||||||
|
# Defensive cleanup — only matches THIS run's container name,
|
||||||
|
# so it cannot kill a sibling run's postgres. (Pre-fix the
|
||||||
|
# name was static and this rm hit other runs' containers.)
|
||||||
docker rm -f "$PG_CONTAINER" 2>/dev/null || true
|
docker rm -f "$PG_CONTAINER" 2>/dev/null || true
|
||||||
docker run -d --name "$PG_CONTAINER" -e POSTGRES_USER=dev -e POSTGRES_PASSWORD=dev -e POSTGRES_DB=molecule -p 15432:5432 postgres:16
|
# `-p 0:5432` requests an ephemeral host port; we read it back
|
||||||
|
# below and export DATABASE_URL.
|
||||||
|
docker run -d --name "$PG_CONTAINER" \
|
||||||
|
-e POSTGRES_USER=dev -e POSTGRES_PASSWORD=dev -e POSTGRES_DB=molecule \
|
||||||
|
-p 0:5432 postgres:16 >/dev/null
|
||||||
|
# Resolve the host-side port assignment. `docker port` prints
|
||||||
|
# `0.0.0.0:NNNN` (and on host-net runners may also print an
|
||||||
|
# IPv6 line — take the first IPv4 line).
|
||||||
|
PG_PORT=$(docker port "$PG_CONTAINER" 5432/tcp | awk -F: '/^0\.0\.0\.0:/ {print $2; exit}')
|
||||||
|
if [ -z "$PG_PORT" ]; then
|
||||||
|
# Fallback: any first line. Some Docker versions print only
|
||||||
|
# one line.
|
||||||
|
PG_PORT=$(docker port "$PG_CONTAINER" 5432/tcp | head -1 | awk -F: '{print $NF}')
|
||||||
|
fi
|
||||||
|
if [ -z "$PG_PORT" ]; then
|
||||||
|
echo "::error::Could not resolve host port for $PG_CONTAINER"
|
||||||
|
docker port "$PG_CONTAINER" 5432/tcp || true
|
||||||
|
docker logs "$PG_CONTAINER" || true
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
# 127.0.0.1 (NOT localhost) — IPv6 first-resolve flake (#92).
|
||||||
|
echo "PG_PORT=${PG_PORT}" >> "$GITHUB_ENV"
|
||||||
|
echo "DATABASE_URL=postgres://dev:dev@127.0.0.1:${PG_PORT}/molecule?sslmode=disable" >> "$GITHUB_ENV"
|
||||||
|
echo "Postgres host port: ${PG_PORT}"
|
||||||
for i in $(seq 1 30); do
|
for i in $(seq 1 30); do
|
||||||
if docker exec "$PG_CONTAINER" pg_isready -U dev >/dev/null 2>&1; then
|
if docker exec "$PG_CONTAINER" pg_isready -U dev >/dev/null 2>&1; then
|
||||||
echo "Postgres ready after ${i}s"
|
echo "Postgres ready after ${i}s"
|
||||||
@ -116,7 +214,20 @@ jobs:
|
|||||||
if: needs.detect-changes.outputs.api == 'true'
|
if: needs.detect-changes.outputs.api == 'true'
|
||||||
run: |
|
run: |
|
||||||
docker rm -f "$REDIS_CONTAINER" 2>/dev/null || true
|
docker rm -f "$REDIS_CONTAINER" 2>/dev/null || true
|
||||||
docker run -d --name "$REDIS_CONTAINER" -p 16379:6379 redis:7
|
docker run -d --name "$REDIS_CONTAINER" -p 0:6379 redis:7 >/dev/null
|
||||||
|
REDIS_PORT=$(docker port "$REDIS_CONTAINER" 6379/tcp | awk -F: '/^0\.0\.0\.0:/ {print $2; exit}')
|
||||||
|
if [ -z "$REDIS_PORT" ]; then
|
||||||
|
REDIS_PORT=$(docker port "$REDIS_CONTAINER" 6379/tcp | head -1 | awk -F: '{print $NF}')
|
||||||
|
fi
|
||||||
|
if [ -z "$REDIS_PORT" ]; then
|
||||||
|
echo "::error::Could not resolve host port for $REDIS_CONTAINER"
|
||||||
|
docker port "$REDIS_CONTAINER" 6379/tcp || true
|
||||||
|
docker logs "$REDIS_CONTAINER" || true
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "REDIS_PORT=${REDIS_PORT}" >> "$GITHUB_ENV"
|
||||||
|
echo "REDIS_URL=redis://127.0.0.1:${REDIS_PORT}" >> "$GITHUB_ENV"
|
||||||
|
echo "Redis host port: ${REDIS_PORT}"
|
||||||
for i in $(seq 1 15); do
|
for i in $(seq 1 15); do
|
||||||
if docker exec "$REDIS_CONTAINER" redis-cli ping 2>/dev/null | grep -q PONG; then
|
if docker exec "$REDIS_CONTAINER" redis-cli ping 2>/dev/null | grep -q PONG; then
|
||||||
echo "Redis ready after ${i}s"
|
echo "Redis ready after ${i}s"
|
||||||
@ -135,13 +246,15 @@ jobs:
|
|||||||
if: needs.detect-changes.outputs.api == 'true'
|
if: needs.detect-changes.outputs.api == 'true'
|
||||||
working-directory: workspace-server
|
working-directory: workspace-server
|
||||||
run: |
|
run: |
|
||||||
|
# DATABASE_URL + REDIS_URL exported by the start-postgres /
|
||||||
|
# start-redis steps point at this run's per-run host ports.
|
||||||
./platform-server > platform.log 2>&1 &
|
./platform-server > platform.log 2>&1 &
|
||||||
echo $! > platform.pid
|
echo $! > platform.pid
|
||||||
- name: Wait for /health
|
- name: Wait for /health
|
||||||
if: needs.detect-changes.outputs.api == 'true'
|
if: needs.detect-changes.outputs.api == 'true'
|
||||||
run: |
|
run: |
|
||||||
for i in $(seq 1 30); do
|
for i in $(seq 1 30); do
|
||||||
if curl -sf http://localhost:8080/health > /dev/null; then
|
if curl -sf http://127.0.0.1:8080/health > /dev/null; then
|
||||||
echo "Platform up after ${i}s"
|
echo "Platform up after ${i}s"
|
||||||
exit 0
|
exit 0
|
||||||
fi
|
fi
|
||||||
@ -185,6 +298,9 @@ jobs:
|
|||||||
kill "$(cat workspace-server/platform.pid)" 2>/dev/null || true
|
kill "$(cat workspace-server/platform.pid)" 2>/dev/null || true
|
||||||
fi
|
fi
|
||||||
- name: Stop service containers
|
- name: Stop service containers
|
||||||
|
# always() so containers don't leak when test steps fail. The
|
||||||
|
# cleanup is best-effort: if the container is already gone
|
||||||
|
# (e.g. concurrent rerun race), don't fail the job.
|
||||||
if: always() && needs.detect-changes.outputs.api == 'true'
|
if: always() && needs.detect-changes.outputs.api == 'true'
|
||||||
run: |
|
run: |
|
||||||
docker rm -f "$PG_CONTAINER" 2>/dev/null || true
|
docker rm -f "$PG_CONTAINER" 2>/dev/null || true
|
||||||
|
|||||||
141
.github/workflows/handlers-postgres-integration.yml
vendored
141
.github/workflows/handlers-postgres-integration.yml
vendored
@ -14,12 +14,42 @@ name: Handlers Postgres Integration
|
|||||||
# self-review caught it took 2 minutes to set up and would have caught
|
# self-review caught it took 2 minutes to set up and would have caught
|
||||||
# the bug at PR-time.
|
# the bug at PR-time.
|
||||||
#
|
#
|
||||||
# This job spins a Postgres service container, applies the migration,
|
# Why this workflow does NOT use `services: postgres:` (Class B fix)
|
||||||
# and runs `go test -tags=integration` against a live DB. Required
|
# ------------------------------------------------------------------
|
||||||
# check on staging branch protection — backend handler PRs cannot
|
# Our act_runner config has `container.network: host` (operator host
|
||||||
# merge without a real-DB regression gate.
|
# /opt/molecule/runners/config.yaml), which act_runner applies to BOTH
|
||||||
|
# the job container AND every service container. With host-net, two
|
||||||
|
# concurrent runs of this workflow both try to bind 0.0.0.0:5432 — the
|
||||||
|
# second postgres FATALs with `could not create any TCP/IP sockets:
|
||||||
|
# Address in use`, and Docker auto-removes it (act_runner sets
|
||||||
|
# AutoRemove:true on service containers). By the time the migrations
|
||||||
|
# step runs `psql`, the postgres container is gone, hence
|
||||||
|
# `Connection refused` then `failed to remove container: No such
|
||||||
|
# container` at cleanup time.
|
||||||
#
|
#
|
||||||
# Cost: ~30s job (postgres pull from GH cache + go build + 4 tests).
|
# Per-job `container.network` override is silently ignored by
|
||||||
|
# act_runner — `--network and --net in the options will be ignored.`
|
||||||
|
# appears in the runner log. Documented constraint.
|
||||||
|
#
|
||||||
|
# So we sidestep `services:` entirely. The job container still uses
|
||||||
|
# host-net (inherited from runner config; required for cache server
|
||||||
|
# discovery on the bridge IP 172.18.0.17:42631). We launch a sibling
|
||||||
|
# postgres on the existing `molecule-monorepo-net` bridge with a
|
||||||
|
# UNIQUE name per run — `pg-handlers-${RUN_ID}-${RUN_ATTEMPT}` — and
|
||||||
|
# read its bridge IP via `docker inspect`. A host-net job container
|
||||||
|
# can reach a bridge-net container directly via the bridge IP (verified
|
||||||
|
# manually on operator host 2026-05-08).
|
||||||
|
#
|
||||||
|
# Trade-offs vs. the original `services:` shape:
|
||||||
|
# + No host-port collision; N parallel runs share the bridge cleanly
|
||||||
|
# + `if: always()` cleanup runs even on test-step failure
|
||||||
|
# - One more step in the workflow (+~3 lines)
|
||||||
|
# - Requires `molecule-monorepo-net` to exist on the operator host
|
||||||
|
# (it does; declared in docker-compose.yml + docker-compose.infra.yml)
|
||||||
|
#
|
||||||
|
# Class B Hongming-owned CICD red sweep, 2026-05-08.
|
||||||
|
#
|
||||||
|
# Cost: ~30s job (postgres pull from cache + go build + 4 tests).
|
||||||
|
|
||||||
on:
|
on:
|
||||||
push:
|
push:
|
||||||
@ -59,20 +89,14 @@ jobs:
|
|||||||
name: Handlers Postgres Integration
|
name: Handlers Postgres Integration
|
||||||
needs: detect-changes
|
needs: detect-changes
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
services:
|
env:
|
||||||
postgres:
|
# Unique name per run so concurrent jobs don't collide on the
|
||||||
image: postgres:15-alpine
|
# bridge network. ${RUN_ID}-${RUN_ATTEMPT} is unique even across
|
||||||
env:
|
# workflow_dispatch reruns of the same run_id.
|
||||||
POSTGRES_PASSWORD: test
|
PG_NAME: pg-handlers-${{ github.run_id }}-${{ github.run_attempt }}
|
||||||
POSTGRES_DB: molecule
|
# Bridge network already exists on the operator host (declared
|
||||||
ports:
|
# in docker-compose.yml + docker-compose.infra.yml).
|
||||||
- 5432:5432
|
PG_NETWORK: molecule-monorepo-net
|
||||||
# GHA spins this with --health-cmd built in for postgres images.
|
|
||||||
options: >-
|
|
||||||
--health-cmd pg_isready
|
|
||||||
--health-interval 5s
|
|
||||||
--health-timeout 5s
|
|
||||||
--health-retries 10
|
|
||||||
defaults:
|
defaults:
|
||||||
run:
|
run:
|
||||||
working-directory: workspace-server
|
working-directory: workspace-server
|
||||||
@ -89,16 +113,57 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
go-version: 'stable'
|
go-version: 'stable'
|
||||||
|
|
||||||
|
- if: needs.detect-changes.outputs.handlers == 'true'
|
||||||
|
name: Start sibling Postgres on bridge network
|
||||||
|
working-directory: .
|
||||||
|
run: |
|
||||||
|
# Sanity: the bridge network must exist on the operator host.
|
||||||
|
# Hard-fail loud if it doesn't — easier to spot than a silent
|
||||||
|
# auto-create that diverges from the rest of the stack.
|
||||||
|
if ! docker network inspect "${PG_NETWORK}" >/dev/null 2>&1; then
|
||||||
|
echo "::error::Bridge network '${PG_NETWORK}' missing on operator host. Re-run docker-compose.infra.yml or check ops handbook."
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
# If a stale container with the same name exists (rerun on
|
||||||
|
# the same run_id), wipe it first.
|
||||||
|
docker rm -f "${PG_NAME}" >/dev/null 2>&1 || true
|
||||||
|
|
||||||
|
docker run -d \
|
||||||
|
--name "${PG_NAME}" \
|
||||||
|
--network "${PG_NETWORK}" \
|
||||||
|
--health-cmd "pg_isready -U postgres" \
|
||||||
|
--health-interval 5s \
|
||||||
|
--health-timeout 5s \
|
||||||
|
--health-retries 10 \
|
||||||
|
-e POSTGRES_PASSWORD=test \
|
||||||
|
-e POSTGRES_DB=molecule \
|
||||||
|
postgres:15-alpine >/dev/null
|
||||||
|
|
||||||
|
# Read back the bridge IP. Always present immediately after
|
||||||
|
# `docker run -d` for bridge networks.
|
||||||
|
PG_HOST=$(docker inspect "${PG_NAME}" \
|
||||||
|
--format "{{(index .NetworkSettings.Networks \"${PG_NETWORK}\").IPAddress}}")
|
||||||
|
if [ -z "${PG_HOST}" ]; then
|
||||||
|
echo "::error::Could not resolve PG_HOST for ${PG_NAME} on ${PG_NETWORK}"
|
||||||
|
docker logs "${PG_NAME}" || true
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
echo "PG_HOST=${PG_HOST}" >> "$GITHUB_ENV"
|
||||||
|
echo "INTEGRATION_DB_URL=postgres://postgres:test@${PG_HOST}:5432/molecule?sslmode=disable" >> "$GITHUB_ENV"
|
||||||
|
echo "Started ${PG_NAME} at ${PG_HOST}:5432"
|
||||||
|
|
||||||
- if: needs.detect-changes.outputs.handlers == 'true'
|
- if: needs.detect-changes.outputs.handlers == 'true'
|
||||||
name: Apply migrations to Postgres service
|
name: Apply migrations to Postgres service
|
||||||
env:
|
env:
|
||||||
PGPASSWORD: test
|
PGPASSWORD: test
|
||||||
run: |
|
run: |
|
||||||
# Wait for postgres to actually accept connections (the
|
# Wait for postgres to actually accept connections. Docker's
|
||||||
# GHA --health-cmd is best-effort but psql can still race).
|
# health-cmd handles container-side readiness, but the wire
|
||||||
|
# to the bridge IP is best-tested with pg_isready directly.
|
||||||
for i in {1..15}; do
|
for i in {1..15}; do
|
||||||
if pg_isready -h localhost -p 5432 -U postgres -q; then break; fi
|
if pg_isready -h "${PG_HOST}" -p 5432 -U postgres -q; then break; fi
|
||||||
echo "waiting for postgres..."; sleep 2
|
echo "waiting for postgres at ${PG_HOST}:5432..."; sleep 2
|
||||||
done
|
done
|
||||||
|
|
||||||
# Apply every .up.sql in lexicographic order with
|
# Apply every .up.sql in lexicographic order with
|
||||||
@ -131,7 +196,7 @@ jobs:
|
|||||||
# not fine once a cross-table atomicity test came in.
|
# not fine once a cross-table atomicity test came in.
|
||||||
set +e
|
set +e
|
||||||
for migration in $(ls migrations/*.sql 2>/dev/null | grep -v '\.down\.sql$' | sort); do
|
for migration in $(ls migrations/*.sql 2>/dev/null | grep -v '\.down\.sql$' | sort); do
|
||||||
if psql -h localhost -U postgres -d molecule -v ON_ERROR_STOP=1 \
|
if psql -h "${PG_HOST}" -U postgres -d molecule -v ON_ERROR_STOP=1 \
|
||||||
-f "$migration" >/dev/null 2>&1; then
|
-f "$migration" >/dev/null 2>&1; then
|
||||||
echo "✓ $(basename "$migration")"
|
echo "✓ $(basename "$migration")"
|
||||||
else
|
else
|
||||||
@ -145,7 +210,7 @@ jobs:
|
|||||||
# fail if any didn't land — that would be a real regression we
|
# fail if any didn't land — that would be a real regression we
|
||||||
# want loud.
|
# want loud.
|
||||||
for tbl in delegations workspaces activity_logs pending_uploads; do
|
for tbl in delegations workspaces activity_logs pending_uploads; do
|
||||||
if ! psql -h localhost -U postgres -d molecule -tA \
|
if ! psql -h "${PG_HOST}" -U postgres -d molecule -tA \
|
||||||
-c "SELECT 1 FROM information_schema.tables WHERE table_name = '$tbl'" \
|
-c "SELECT 1 FROM information_schema.tables WHERE table_name = '$tbl'" \
|
||||||
| grep -q 1; then
|
| grep -q 1; then
|
||||||
echo "::error::$tbl table missing after migration replay — handler integration tests would be meaningless"
|
echo "::error::$tbl table missing after migration replay — handler integration tests would be meaningless"
|
||||||
@ -156,16 +221,32 @@ jobs:
|
|||||||
|
|
||||||
- if: needs.detect-changes.outputs.handlers == 'true'
|
- if: needs.detect-changes.outputs.handlers == 'true'
|
||||||
name: Run integration tests
|
name: Run integration tests
|
||||||
env:
|
|
||||||
INTEGRATION_DB_URL: postgres://postgres:test@localhost:5432/molecule?sslmode=disable
|
|
||||||
run: |
|
run: |
|
||||||
|
# INTEGRATION_DB_URL is exported by the start-postgres step;
|
||||||
|
# points at the per-run bridge IP, not 127.0.0.1, so concurrent
|
||||||
|
# workflow runs don't fight over a host-net 5432 port.
|
||||||
go test -tags=integration -timeout 5m -v ./internal/handlers/ -run "^TestIntegration_"
|
go test -tags=integration -timeout 5m -v ./internal/handlers/ -run "^TestIntegration_"
|
||||||
|
|
||||||
- if: needs.detect-changes.outputs.handlers == 'true' && failure()
|
- if: failure() && needs.detect-changes.outputs.handlers == 'true'
|
||||||
name: Diagnostic dump on failure
|
name: Diagnostic dump on failure
|
||||||
env:
|
env:
|
||||||
PGPASSWORD: test
|
PGPASSWORD: test
|
||||||
run: |
|
run: |
|
||||||
echo "::group::delegations table state"
|
echo "::group::postgres container status"
|
||||||
psql -h localhost -U postgres -d molecule -c "SELECT * FROM delegations LIMIT 50;" || true
|
docker ps -a --filter "name=${PG_NAME}" --format '{{.Status}} {{.Names}}' || true
|
||||||
|
docker logs "${PG_NAME}" 2>&1 | tail -50 || true
|
||||||
echo "::endgroup::"
|
echo "::endgroup::"
|
||||||
|
echo "::group::delegations table state"
|
||||||
|
psql -h "${PG_HOST}" -U postgres -d molecule -c "SELECT * FROM delegations LIMIT 50;" || true
|
||||||
|
echo "::endgroup::"
|
||||||
|
|
||||||
|
- if: always() && needs.detect-changes.outputs.handlers == 'true'
|
||||||
|
name: Stop sibling Postgres
|
||||||
|
working-directory: .
|
||||||
|
run: |
|
||||||
|
# always() so containers don't leak when migrations or tests
|
||||||
|
# fail. The cleanup is best-effort: if the container is
|
||||||
|
# already gone (e.g. concurrent rerun race), don't fail the job.
|
||||||
|
docker rm -f "${PG_NAME}" >/dev/null 2>&1 || true
|
||||||
|
echo "Cleaned up ${PG_NAME}"
|
||||||
|
|
||||||
|
|||||||
@ -7,6 +7,32 @@ export default defineConfig({
|
|||||||
test: {
|
test: {
|
||||||
environment: 'node',
|
environment: 'node',
|
||||||
exclude: ['e2e/**', 'node_modules/**', '**/dist/**'],
|
exclude: ['e2e/**', 'node_modules/**', '**/dist/**'],
|
||||||
|
// CI-conditional test timeout (issue #96).
|
||||||
|
//
|
||||||
|
// Vitest's 5000ms default is too tight for the first test in any
|
||||||
|
// file under our CI shape: `npx vitest run --coverage` on the
|
||||||
|
// self-hosted Gitea Actions Docker runner. The cold-start cost
|
||||||
|
// (v8 coverage instrumentation init + JSDOM bootstrap + module-
|
||||||
|
// graph import for @/components/* and @/lib/* + first React
|
||||||
|
// render) consistently consumes 5-7 seconds for the first
|
||||||
|
// synchronous test in heavyweight component files
|
||||||
|
// (ActivityTab.test.tsx, CreateWorkspaceDialog.test.tsx,
|
||||||
|
// ConfigTab.provider.test.tsx) — even though every subsequent
|
||||||
|
// test in the same file completes in 100-1500ms.
|
||||||
|
//
|
||||||
|
// Empirically the worst observed first-test was 6453ms in a
|
||||||
|
// single file (CreateWorkspaceDialog). 30000ms gives ~5x
|
||||||
|
// headroom over that on CI; we still keep 5000ms locally so
|
||||||
|
// genuine waitFor races / hung promises stay sensitive in dev.
|
||||||
|
//
|
||||||
|
// Same vitest pattern documented at:
|
||||||
|
// https://vitest.dev/config/testtimeout
|
||||||
|
// https://vitest.dev/guide/coverage#profiling-test-performance
|
||||||
|
//
|
||||||
|
// Per-test duration is still emitted to the CI log; if a test
|
||||||
|
// ever silently approaches 25-30s under this raised ceiling that
|
||||||
|
// will surface as a duration regression and we revisit.
|
||||||
|
testTimeout: process.env.CI ? 30000 : 5000,
|
||||||
// Coverage is instrumented but NOT yet a CI gate — first land
|
// Coverage is instrumented but NOT yet a CI gate — first land
|
||||||
// observability so we can see the baseline, then dial in
|
// observability so we can see the baseline, then dial in
|
||||||
// thresholds + a hard gate in a follow-up PR (#1815). Today's
|
// thresholds + a hard gate in a follow-up PR (#1815). Today's
|
||||||
|
|||||||
137
docs/runbooks/handlers-postgres-integration-port-collision.md
Normal file
137
docs/runbooks/handlers-postgres-integration-port-collision.md
Normal file
@ -0,0 +1,137 @@
|
|||||||
|
# Runbook — Handlers Postgres Integration port-collision substrate
|
||||||
|
|
||||||
|
**Status:** Resolved 2026-05-08 (PR for class B Hongming-owned CICD red sweep).
|
||||||
|
|
||||||
|
## Symptom
|
||||||
|
|
||||||
|
`Handlers Postgres Integration` workflow fails on staging push and PRs.
|
||||||
|
Step `Apply migrations to Postgres service` shows:
|
||||||
|
|
||||||
|
```
|
||||||
|
psql: error: connection to server at "127.0.0.1", port 5432 failed: Connection refused
|
||||||
|
```
|
||||||
|
|
||||||
|
Job-cleanup step further down logs:
|
||||||
|
|
||||||
|
```
|
||||||
|
Cleaning up services for job Handlers Postgres Integration
|
||||||
|
failed to remove container: Error response from daemon: No such container: <id>
|
||||||
|
```
|
||||||
|
|
||||||
|
…confirming the postgres service container was already gone before
|
||||||
|
cleanup ran.
|
||||||
|
|
||||||
|
## Root cause
|
||||||
|
|
||||||
|
Our Gitea act_runner (operator host `5.78.80.188`,
|
||||||
|
`/opt/molecule/runners/config.yaml`) sets:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
container:
|
||||||
|
network: host
|
||||||
|
```
|
||||||
|
|
||||||
|
…which act_runner applies to BOTH the job container AND every
|
||||||
|
`services:` container in a workflow. Multiple workflow instances
|
||||||
|
running concurrently across the 16 parallel runners each try to bind
|
||||||
|
postgres on `0.0.0.0:5432`. The first wins; subsequent instances exit
|
||||||
|
immediately with:
|
||||||
|
|
||||||
|
```
|
||||||
|
LOG: could not bind IPv4 address "0.0.0.0": Address in use
|
||||||
|
HINT: Is another postmaster already running on port 5432?
|
||||||
|
FATAL: could not create any TCP/IP sockets
|
||||||
|
```
|
||||||
|
|
||||||
|
act_runner sets `AutoRemove:true` on service containers, so Docker
|
||||||
|
garbage-collects them as soon as they exit. By the time the migrations
|
||||||
|
step runs `pg_isready` / `psql`, the container is gone and connection
|
||||||
|
refused.
|
||||||
|
|
||||||
|
Reproduction (operator host):
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker run --rm -d --name pg-A --network host \
|
||||||
|
-e POSTGRES_PASSWORD=test postgres:15-alpine
|
||||||
|
docker run -d --name pg-B --network host \
|
||||||
|
-e POSTGRES_PASSWORD=test postgres:15-alpine
|
||||||
|
docker logs pg-B # FATAL: could not create any TCP/IP sockets
|
||||||
|
```
|
||||||
|
|
||||||
|
## Why per-job override doesn't work
|
||||||
|
|
||||||
|
The natural fix — per-job `container.network` override — is silently
|
||||||
|
ignored by act_runner. The runner log emits:
|
||||||
|
|
||||||
|
```
|
||||||
|
--network and --net in the options will be ignored.
|
||||||
|
```
|
||||||
|
|
||||||
|
This is a documented act_runner constraint: container network is a
|
||||||
|
runner-wide setting, not per-job. Source: gitea/act_runner config docs
|
||||||
|
+ vegardit/docker-gitea-act-runner issue #7.
|
||||||
|
|
||||||
|
Flipping the global `container.network` to `bridge` would break every
|
||||||
|
other workflow in the repo (cache server discovery,
|
||||||
|
`molecule-monorepo-net` peer access during integration tests, etc.) —
|
||||||
|
unacceptable blast radius for a per-test bug.
|
||||||
|
|
||||||
|
## Fix shape
|
||||||
|
|
||||||
|
`handlers-postgres-integration.yml` no longer uses `services: postgres:`.
|
||||||
|
It launches a sibling postgres container manually on the existing
|
||||||
|
`molecule-monorepo-net` bridge network with a per-run unique name:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
env:
|
||||||
|
PG_NAME: pg-handlers-${{ github.run_id }}-${{ github.run_attempt }}
|
||||||
|
PG_NETWORK: molecule-monorepo-net
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Start sibling Postgres on bridge network
|
||||||
|
run: |
|
||||||
|
docker run -d --name "${PG_NAME}" --network "${PG_NETWORK}" \
|
||||||
|
...
|
||||||
|
postgres:15-alpine
|
||||||
|
PG_HOST=$(docker inspect "${PG_NAME}" \
|
||||||
|
--format "{{(index .NetworkSettings.Networks \"${PG_NETWORK}\").IPAddress}}")
|
||||||
|
echo "PG_HOST=${PG_HOST}" >> "$GITHUB_ENV"
|
||||||
|
|
||||||
|
# … migrations + tests use ${PG_HOST}, not 127.0.0.1 …
|
||||||
|
|
||||||
|
- if: always() && …
|
||||||
|
name: Stop sibling Postgres
|
||||||
|
run: docker rm -f "${PG_NAME}" || true
|
||||||
|
```
|
||||||
|
|
||||||
|
The host-net job container can reach a bridge-net container via the
|
||||||
|
bridge IP directly (verified manually, 2026-05-08). Two parallel runs
|
||||||
|
use different names + different bridge IPs — no collision.
|
||||||
|
|
||||||
|
## Future-proofing
|
||||||
|
|
||||||
|
Other workflows that hit the same shape (any `services:` with a
|
||||||
|
fixed-port image) will exhibit the same failure mode under
|
||||||
|
host-network runner config. Translate using this same pattern:
|
||||||
|
|
||||||
|
1. Drop the `services:` block.
|
||||||
|
2. Use `${{ github.run_id }}-${{ github.run_attempt }}` for unique
|
||||||
|
container name.
|
||||||
|
3. Launch on `molecule-monorepo-net` (already trusted bridge in
|
||||||
|
`docker-compose.infra.yml`).
|
||||||
|
4. Read back the bridge IP via `docker inspect` and export as a step env.
|
||||||
|
5. `if: always()` cleanup step at the end.
|
||||||
|
|
||||||
|
If the count of such workflows grows, factor into a composite action
|
||||||
|
(`./.github/actions/sibling-postgres`) so the substrate logic lives
|
||||||
|
in one place.
|
||||||
|
|
||||||
|
## Related
|
||||||
|
|
||||||
|
- Issue #88 (closed by #92): localhost → 127.0.0.1 fix that unmasked
|
||||||
|
this collision; the IPv6 fix is correct, port collision is the new
|
||||||
|
layer.
|
||||||
|
- Issue #94 created `molecule-monorepo-net` + `alpine:latest` as
|
||||||
|
prereqs.
|
||||||
|
- Saved memory `feedback_act_runner_github_server_url` documents
|
||||||
|
another act_runner-vs-GHA divergence (server URL).
|
||||||
457
workspace-server/internal/handlers/eic_tunnel_pool.go
Normal file
457
workspace-server/internal/handlers/eic_tunnel_pool.go
Normal file
@ -0,0 +1,457 @@
|
|||||||
|
package handlers
|
||||||
|
|
||||||
|
// eic_tunnel_pool.go — refcounted pool for EIC SSH tunnels keyed on
|
||||||
|
// instanceID. Reuses one tunnel across N file ops, amortising the
|
||||||
|
// ssh-keygen + SendSSHPublicKey + open-tunnel + waitForPort cost
|
||||||
|
// (~3-5s) over multiple cats/finds (~50-200ms each).
|
||||||
|
//
|
||||||
|
// Origin: core#11 — canvas detail-panel config + filesystem load
|
||||||
|
// took ~20s. ConfigTab fans out 4 GETs serially; the slowest is
|
||||||
|
// /files/config.yaml which dispatches to readFileViaEIC. Without a
|
||||||
|
// pool, every readFileViaEIC + listFilesViaEIC + writeFileViaEIC +
|
||||||
|
// deleteFileViaEIC pays the full setup cost even when fired
|
||||||
|
// back-to-back on the same workspace EC2.
|
||||||
|
//
|
||||||
|
// The pool keeps one eicSSHSession alive per instanceID for up to
|
||||||
|
// poolTTL. SendSSHPublicKey grants a 60s key validity, so poolTTL
|
||||||
|
// must stay strictly below that to avoid serving requests on a
|
||||||
|
// just-expired key. We default to 50s with a 10s safety margin.
|
||||||
|
//
|
||||||
|
// Concurrency model:
|
||||||
|
//
|
||||||
|
// - Single mutex guards the entries map.
|
||||||
|
// - Slow path (tunnel setup) runs OUTSIDE the lock, gated by an
|
||||||
|
// "intent" placeholder so concurrent acquires for the same
|
||||||
|
// instanceID don't both build a tunnel — the loser drops its
|
||||||
|
// setup and uses the winner's.
|
||||||
|
// - Refcount on each entry; eviction blocked while refcount > 0.
|
||||||
|
// - Janitor goroutine sweeps every poolJanitorInterval, drops
|
||||||
|
// entries where refcount == 0 && expiresAt < now.
|
||||||
|
//
|
||||||
|
// Test injection:
|
||||||
|
//
|
||||||
|
// - poolSetupTunnel is a package-level var so tests can swap the
|
||||||
|
// slow path for a counting stub. Production wires it to
|
||||||
|
// realWithEICTunnel-style setup.
|
||||||
|
// - withEICTunnel (the public, single-shot API) is also a var
|
||||||
|
// (already, see template_files_eic.go). It's rebound here to
|
||||||
|
// pooledWithEICTunnel which routes through globalEICTunnelPool.
|
||||||
|
// - Tests that need single-shot behaviour can set poolTTL = 0,
|
||||||
|
// which makes pooledWithEICTunnel fall through to the underlying
|
||||||
|
// setup directly (no pool entry kept).
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// poolTTL is the maximum age of a pooled tunnel. Must be strictly
|
||||||
|
// less than the SendSSHPublicKey grant window (60s) so we never
|
||||||
|
// serve a request through a key that's about to expire mid-op.
|
||||||
|
//
|
||||||
|
// Configurable via init-time wiring (see initEICTunnelPool); not a
|
||||||
|
// const so tests can pin TTL=0 (disable pooling) or TTL=50ms (drive
|
||||||
|
// eviction tests).
|
||||||
|
var poolTTL = 50 * time.Second
|
||||||
|
|
||||||
|
// poolJanitorInterval is how often the janitor goroutine sweeps for
|
||||||
|
// expired idle entries. Tighter than poolTTL so eviction is timely;
|
||||||
|
// loose enough that the goroutine doesn't burn CPU.
|
||||||
|
var poolJanitorInterval = 10 * time.Second
|
||||||
|
|
||||||
|
// poolMaxEntries caps simultaneous instanceIDs the pool tracks.
|
||||||
|
// Beyond this, new acquires evict the LRU entry. Defends against a
|
||||||
|
// pathological caller (e.g. a sweep over hundreds of workspace
|
||||||
|
// EC2s) from leaking unbounded tunnel processes. 32 is a generous
|
||||||
|
// ceiling for the canvas use case (one human navigates ≤ ~5
|
||||||
|
// workspaces at a time).
|
||||||
|
var poolMaxEntries = 32
|
||||||
|
|
||||||
|
// poolSetupTunnel is the slow-path tunnel constructor. Wrapped in a
|
||||||
|
// var so tests can inject a counter stub. Returns a session and a
|
||||||
|
// cleanup function (closes the open-tunnel subprocess + scrubs the
|
||||||
|
// ephemeral keydir). nil session + non-nil err means setup failed
|
||||||
|
// and there is nothing to clean up.
|
||||||
|
//
|
||||||
|
// Production wiring lives in eic_tunnel_pool_setup.go (a thin shim
|
||||||
|
// over the existing realWithEICTunnel logic).
|
||||||
|
var poolSetupTunnel = func(ctx context.Context, instanceID string) (
|
||||||
|
sess eicSSHSession, cleanup func(), err error) {
|
||||||
|
return setupRealEICTunnel(ctx, instanceID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pooledTunnel is one entry in the pool. session is shared by N
|
||||||
|
// concurrent fn calls; cleanup runs once when refcount returns to
|
||||||
|
// zero AND the entry is past expiresAt or evicted.
|
||||||
|
//
|
||||||
|
// lastUsed tracks the most recent acquire time for LRU bookkeeping
|
||||||
|
// (overflow eviction). expiresAt is set at construction and not
|
||||||
|
// extended on use — a tunnel cannot live past poolTTL even if it's
|
||||||
|
// hot, because the underlying SendSSHPublicKey grant expires.
|
||||||
|
type pooledTunnel struct {
|
||||||
|
session eicSSHSession
|
||||||
|
cleanup func()
|
||||||
|
expiresAt time.Time
|
||||||
|
lastUsed time.Time
|
||||||
|
refcount int
|
||||||
|
poisoned bool // true if a fn returned a tunnel-fatal error; do not reuse
|
||||||
|
}
|
||||||
|
|
||||||
|
// eicTunnelPool is the package-level pool. Single instance lives
|
||||||
|
// in globalEICTunnelPool; constructor runs lazily on first acquire.
|
||||||
|
type eicTunnelPool struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
entries map[string]*pooledTunnel
|
||||||
|
// pendingSetups guards concurrent setup for the same instanceID.
|
||||||
|
// First acquirer takes the slot; later ones wait on the channel.
|
||||||
|
pendingSetups map[string]chan struct{}
|
||||||
|
stopJanitor chan struct{}
|
||||||
|
// janitorInterval is captured at pool construction from the
|
||||||
|
// package-level poolJanitorInterval var. Captured (not re-read on
|
||||||
|
// every tick) so a test that swaps the package var via t.Cleanup
|
||||||
|
// after a global pool's janitor is already running can't race
|
||||||
|
// with that goroutine's ticker read. The global pool is created
|
||||||
|
// lazily once per process via sync.Once; before this capture
|
||||||
|
// landed, every test that touched poolJanitorInterval after the
|
||||||
|
// global pool's first-touch raced the janitor (caught by -race
|
||||||
|
// on staging tip 249dbc6a — TestPooledWithEICTunnel_PanicPoisonsEntry).
|
||||||
|
// Tests still get the new value on a freshPool() because they
|
||||||
|
// set the package var BEFORE calling newEICTunnelPool().
|
||||||
|
janitorInterval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
globalEICTunnelPool *eicTunnelPool
|
||||||
|
globalEICTunnelPoolOnce sync.Once
|
||||||
|
)
|
||||||
|
|
||||||
|
// getEICTunnelPool returns the singleton pool, lazy-initialising on
|
||||||
|
// first call. Idempotent.
|
||||||
|
func getEICTunnelPool() *eicTunnelPool {
|
||||||
|
globalEICTunnelPoolOnce.Do(func() {
|
||||||
|
globalEICTunnelPool = newEICTunnelPool()
|
||||||
|
go globalEICTunnelPool.janitor()
|
||||||
|
})
|
||||||
|
return globalEICTunnelPool
|
||||||
|
}
|
||||||
|
|
||||||
|
// newEICTunnelPool constructs an empty pool. Exported so tests can
|
||||||
|
// build isolated pools without sharing the singleton.
|
||||||
|
//
|
||||||
|
// Captures poolJanitorInterval at construction time so the janitor
|
||||||
|
// goroutine doesn't race with t.Cleanup-driven swaps of the package
|
||||||
|
// var. See the janitorInterval field comment for the failure mode.
|
||||||
|
func newEICTunnelPool() *eicTunnelPool {
|
||||||
|
return &eicTunnelPool{
|
||||||
|
entries: map[string]*pooledTunnel{},
|
||||||
|
pendingSetups: map[string]chan struct{}{},
|
||||||
|
stopJanitor: make(chan struct{}),
|
||||||
|
janitorInterval: poolJanitorInterval,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// acquire returns a usable session for instanceID. If a healthy entry
|
||||||
|
// exists, refcount++ and return it. If a setup is in flight for the
|
||||||
|
// same instanceID, wait for it. Otherwise build one (slow path).
|
||||||
|
//
|
||||||
|
// done() must be called by the caller when the op finishes. It
|
||||||
|
// decrements refcount and triggers cleanup if the entry is past
|
||||||
|
// TTL or poisoned and refcount==0.
|
||||||
|
//
|
||||||
|
// Errors from the slow path propagate; pool state is not modified
|
||||||
|
// for failed setups (no poisoned entry created — that's only for
|
||||||
|
// fn-returned errors on a previously-good session).
|
||||||
|
func (p *eicTunnelPool) acquire(ctx context.Context, instanceID string) (
|
||||||
|
sess eicSSHSession, done func(poisoned bool), err error) {
|
||||||
|
|
||||||
|
if poolTTL <= 0 {
|
||||||
|
// Pool disabled (TTL=0 mode for tests / opt-out). Fall
|
||||||
|
// through to a direct setup with caller-driven cleanup.
|
||||||
|
s, cleanup, err := poolSetupTunnel(ctx, instanceID)
|
||||||
|
if err != nil {
|
||||||
|
return eicSSHSession{}, nil, err
|
||||||
|
}
|
||||||
|
return s, func(_ bool) { cleanup() }, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
p.mu.Lock()
|
||||||
|
if pt, ok := p.entries[instanceID]; ok && !pt.poisoned && pt.expiresAt.After(time.Now()) {
|
||||||
|
pt.refcount++
|
||||||
|
pt.lastUsed = time.Now()
|
||||||
|
p.mu.Unlock()
|
||||||
|
return pt.session, p.releaser(instanceID, pt), nil
|
||||||
|
}
|
||||||
|
// Either no entry, expired entry, or poisoned entry. If a
|
||||||
|
// setup is already in flight, wait and retry.
|
||||||
|
if pending, ok := p.pendingSetups[instanceID]; ok {
|
||||||
|
p.mu.Unlock()
|
||||||
|
select {
|
||||||
|
case <-pending:
|
||||||
|
continue // re-check the entries map
|
||||||
|
case <-ctx.Done():
|
||||||
|
return eicSSHSession{}, nil, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Drop expired/poisoned entry now (we'll cleanup outside
|
||||||
|
// the lock — the entry is unreferenced or we'd not be here).
|
||||||
|
var oldCleanup func()
|
||||||
|
if pt, ok := p.entries[instanceID]; ok {
|
||||||
|
if pt.refcount == 0 {
|
||||||
|
oldCleanup = pt.cleanup
|
||||||
|
delete(p.entries, instanceID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Reserve the setup slot.
|
||||||
|
signal := make(chan struct{})
|
||||||
|
p.pendingSetups[instanceID] = signal
|
||||||
|
p.mu.Unlock()
|
||||||
|
|
||||||
|
if oldCleanup != nil {
|
||||||
|
go oldCleanup()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow path: build a new tunnel. Anything that goes wrong
|
||||||
|
// here cleans up the pendingSetups slot and propagates to
|
||||||
|
// the caller without leaving the pool in a state where the
|
||||||
|
// next acquire blocks waiting on a signal that never fires.
|
||||||
|
newSess, cleanup, setupErr := poolSetupTunnel(ctx, instanceID)
|
||||||
|
|
||||||
|
p.mu.Lock()
|
||||||
|
delete(p.pendingSetups, instanceID)
|
||||||
|
close(signal)
|
||||||
|
|
||||||
|
if setupErr != nil {
|
||||||
|
p.mu.Unlock()
|
||||||
|
return eicSSHSession{}, nil, fmt.Errorf("eic tunnel setup: %w", setupErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enforce LRU bound BEFORE inserting so we don't briefly
|
||||||
|
// exceed the cap even by one entry.
|
||||||
|
p.evictLRUIfFullLocked(instanceID)
|
||||||
|
|
||||||
|
pt := &pooledTunnel{
|
||||||
|
session: newSess,
|
||||||
|
cleanup: cleanup,
|
||||||
|
expiresAt: time.Now().Add(poolTTL),
|
||||||
|
lastUsed: time.Now(),
|
||||||
|
refcount: 1,
|
||||||
|
}
|
||||||
|
p.entries[instanceID] = pt
|
||||||
|
p.mu.Unlock()
|
||||||
|
return pt.session, p.releaser(instanceID, pt), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// releaser returns a closure that decrements refcount and triggers
|
||||||
|
// cleanup if (a) the entry is past TTL or (b) the caller signalled
|
||||||
|
// poison. Idempotent against double-release (decrements once via the
|
||||||
|
// captured pt; pool entry may have been replaced by then).
|
||||||
|
func (p *eicTunnelPool) releaser(instanceID string, pt *pooledTunnel) func(poisoned bool) {
|
||||||
|
released := false
|
||||||
|
return func(poisoned bool) {
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
if released {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
released = true
|
||||||
|
pt.refcount--
|
||||||
|
if poisoned {
|
||||||
|
pt.poisoned = true
|
||||||
|
}
|
||||||
|
// Evict immediately if poisoned-and-idle OR expired-and-idle.
|
||||||
|
// Hot entries (refcount > 0) defer eviction to the last release.
|
||||||
|
if pt.refcount == 0 && (pt.poisoned || pt.expiresAt.Before(time.Now())) {
|
||||||
|
// If the entry in the map is still us, remove it.
|
||||||
|
if cur, ok := p.entries[instanceID]; ok && cur == pt {
|
||||||
|
delete(p.entries, instanceID)
|
||||||
|
}
|
||||||
|
go pt.cleanup()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// evictLRUIfFullLocked drops the least-recently-used IDLE entry
|
||||||
|
// when the pool is at capacity. Caller must hold p.mu. The new
|
||||||
|
// instanceID about to be inserted is excluded so we don't evict
|
||||||
|
// ourselves. If no idle entries exist, no eviction happens — the
|
||||||
|
// new entry will push us above the soft cap until something releases.
|
||||||
|
func (p *eicTunnelPool) evictLRUIfFullLocked(skipInstance string) {
|
||||||
|
if len(p.entries) < poolMaxEntries {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var oldestKey string
|
||||||
|
var oldest *pooledTunnel
|
||||||
|
for k, pt := range p.entries {
|
||||||
|
if k == skipInstance {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if pt.refcount > 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if oldest == nil || pt.lastUsed.Before(oldest.lastUsed) {
|
||||||
|
oldestKey = k
|
||||||
|
oldest = pt
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if oldest == nil {
|
||||||
|
return // every entry is in use; no eviction possible
|
||||||
|
}
|
||||||
|
delete(p.entries, oldestKey)
|
||||||
|
go oldest.cleanup()
|
||||||
|
}
|
||||||
|
|
||||||
|
// janitor periodically scans for entries that are idle AND expired,
|
||||||
|
// closing their tunnels. Runs forever (per pool lifetime); cancelled
|
||||||
|
// by close(p.stopJanitor) for tests that build short-lived pools.
|
||||||
|
//
|
||||||
|
// Reads p.janitorInterval (captured at construction) instead of the
|
||||||
|
// package-level poolJanitorInterval — see janitorInterval field comment.
|
||||||
|
func (p *eicTunnelPool) janitor() {
|
||||||
|
t := time.NewTicker(p.janitorInterval)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
p.sweep()
|
||||||
|
case <-p.stopJanitor:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sweep is one janitor pass. Drops idle expired entries.
|
||||||
|
func (p *eicTunnelPool) sweep() {
|
||||||
|
p.mu.Lock()
|
||||||
|
now := time.Now()
|
||||||
|
var toClose []func()
|
||||||
|
for k, pt := range p.entries {
|
||||||
|
if pt.refcount == 0 && pt.expiresAt.Before(now) {
|
||||||
|
toClose = append(toClose, pt.cleanup)
|
||||||
|
delete(p.entries, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
p.mu.Unlock()
|
||||||
|
for _, c := range toClose {
|
||||||
|
go c()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// stop terminates the janitor and closes all idle entries. Hot
|
||||||
|
// (refcount > 0) entries are NOT force-closed — callers running
|
||||||
|
// against them would see a use-after-free. In practice stop is only
|
||||||
|
// called by tests that have already drained their callers.
|
||||||
|
func (p *eicTunnelPool) stop() {
|
||||||
|
close(p.stopJanitor)
|
||||||
|
p.mu.Lock()
|
||||||
|
defer p.mu.Unlock()
|
||||||
|
for k, pt := range p.entries {
|
||||||
|
if pt.refcount == 0 {
|
||||||
|
go pt.cleanup()
|
||||||
|
delete(p.entries, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// pooledWithEICTunnel is the pool-backed replacement for
|
||||||
|
// realWithEICTunnel. The signature matches `var withEICTunnel`
|
||||||
|
// exactly so the rebind (in initEICTunnelPool) is a drop-in.
|
||||||
|
//
|
||||||
|
// Errors from `fn` itself are forwarded to the caller AND mark the
|
||||||
|
// pool entry as poisoned, so the next acquire builds a fresh
|
||||||
|
// tunnel. This catches the case where the workspace EC2 was
|
||||||
|
// restarted out-of-band (tunnel still appears alive locally but
|
||||||
|
// every cat/find errors out).
|
||||||
|
func pooledWithEICTunnel(ctx context.Context, instanceID string,
|
||||||
|
fn func(s eicSSHSession) error) error {
|
||||||
|
pool := getEICTunnelPool()
|
||||||
|
sess, done, err := pool.acquire(ctx, instanceID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// poisoned defaults to true so a panic from fn poisons the
|
||||||
|
// entry on the way through the deferred release. Without the
|
||||||
|
// defer, a panicking fn would leak refcount=1 forever and
|
||||||
|
// permanently block eviction of this entry. The fn-error path
|
||||||
|
// resets poisoned to its real classification before return.
|
||||||
|
poisoned := true
|
||||||
|
defer func() { done(poisoned) }()
|
||||||
|
fnErr := fn(sess)
|
||||||
|
poisoned = fnErrIndicatesTunnelFault(fnErr)
|
||||||
|
return fnErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// fnErrIndicatesTunnelFault returns true for fn errors whose nature
|
||||||
|
// suggests the underlying tunnel is no longer reusable (auth gone,
|
||||||
|
// network gone, ssh process dead). Returning true poisons the pool
|
||||||
|
// entry so the next acquire builds fresh.
|
||||||
|
//
|
||||||
|
// Conservative: only marks tunnel-faulty for clearly tunnel-level
|
||||||
|
// failures (connection refused, broken pipe, ssh exit-status from
|
||||||
|
// fatal-channel signals). A `cat` returning os.ErrNotExist on a
|
||||||
|
// missing file is NOT a tunnel fault — that's the file path being
|
||||||
|
// wrong, the tunnel is fine.
|
||||||
|
func fnErrIndicatesTunnelFault(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
msg := err.Error()
|
||||||
|
// stderr substrings produced by ssh when the tunnel is broken.
|
||||||
|
for _, marker := range []string{
|
||||||
|
"connection refused",
|
||||||
|
"connection closed",
|
||||||
|
"broken pipe",
|
||||||
|
"Connection reset by peer",
|
||||||
|
"kex_exchange_identification",
|
||||||
|
"port forwarding failed",
|
||||||
|
"Permission denied",
|
||||||
|
"Authentication failed",
|
||||||
|
} {
|
||||||
|
if containsCaseInsensitive(msg, marker) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// containsCaseInsensitive avoids importing strings just for this
|
||||||
|
// (the file already needs ssh stderr matching elsewhere — this
|
||||||
|
// keeps the helper local to avoid a cross-file dependency).
|
||||||
|
func containsCaseInsensitive(s, substr string) bool {
|
||||||
|
if len(substr) > len(s) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// Manual lowercase compare loop; ssh error markers are ASCII so
|
||||||
|
// no need for unicode-aware folding.
|
||||||
|
low := func(b byte) byte {
|
||||||
|
if b >= 'A' && b <= 'Z' {
|
||||||
|
return b + 32
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
for i := 0; i+len(substr) <= len(s); i++ {
|
||||||
|
match := true
|
||||||
|
for j := 0; j < len(substr); j++ {
|
||||||
|
if low(s[i+j]) != low(substr[j]) {
|
||||||
|
match = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if match {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// initEICTunnelPool rebinds the package-level withEICTunnel var to
|
||||||
|
// the pooled implementation. Called once at package init via the
|
||||||
|
// init() in eic_tunnel_pool_setup.go (split file so the rebind
|
||||||
|
// itself is testable without dragging in the production setup
|
||||||
|
// shim's exec/aws dependencies).
|
||||||
|
func initEICTunnelPool() {
|
||||||
|
withEICTunnel = pooledWithEICTunnel
|
||||||
|
}
|
||||||
467
workspace-server/internal/handlers/eic_tunnel_pool_test.go
Normal file
467
workspace-server/internal/handlers/eic_tunnel_pool_test.go
Normal file
@ -0,0 +1,467 @@
|
|||||||
|
package handlers
|
||||||
|
|
||||||
|
// eic_tunnel_pool_test.go — tests for the refcounted EIC tunnel pool
|
||||||
|
// added in core#11. Stubs poolSetupTunnel with a counter so the
|
||||||
|
// tests don't fork ssh-keygen / aws subprocesses.
|
||||||
|
//
|
||||||
|
// Per memory feedback_assert_exact_not_substring: each test pins
|
||||||
|
// exact expected counts (not "at least N") so a regression that
|
||||||
|
// silently double-sets-up surfaces here.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// withPoolSetupStub swaps poolSetupTunnel for a counting fake that
|
||||||
|
// returns a sentinel session and a cleanup func that records its
|
||||||
|
// invocation. Restores on test cleanup.
|
||||||
|
//
|
||||||
|
// setupSignal blocks each setup until released — for concurrent-
|
||||||
|
// acquire tests where we want to gate setup completion.
|
||||||
|
func withPoolSetupStub(t *testing.T) (
|
||||||
|
setupCount *int64, cleanupCount *int64, restore func(), unblock func()) {
|
||||||
|
t.Helper()
|
||||||
|
prev := poolSetupTunnel
|
||||||
|
prevTTL := poolTTL
|
||||||
|
prevJanitor := poolJanitorInterval
|
||||||
|
|
||||||
|
var sc, cc int64
|
||||||
|
setupCount, cleanupCount = &sc, &cc
|
||||||
|
|
||||||
|
gate := make(chan struct{}, 1)
|
||||||
|
gate <- struct{}{} // allow the first setup through immediately
|
||||||
|
unblock = func() { gate <- struct{}{} }
|
||||||
|
|
||||||
|
poolSetupTunnel = func(ctx context.Context, instanceID string) (
|
||||||
|
eicSSHSession, func(), error) {
|
||||||
|
select {
|
||||||
|
case <-gate:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return eicSSHSession{}, nil, ctx.Err()
|
||||||
|
}
|
||||||
|
atomic.AddInt64(&sc, 1)
|
||||||
|
sess := eicSSHSession{
|
||||||
|
instanceID: instanceID,
|
||||||
|
osUser: "ubuntu",
|
||||||
|
localPort: 10000 + int(atomic.LoadInt64(&sc)),
|
||||||
|
keyPath: "/tmp/molecule-eic-test-" + instanceID,
|
||||||
|
}
|
||||||
|
cleanup := func() { atomic.AddInt64(&cc, 1) }
|
||||||
|
return sess, cleanup, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
restore = func() {
|
||||||
|
poolSetupTunnel = prev
|
||||||
|
poolTTL = prevTTL
|
||||||
|
poolJanitorInterval = prevJanitor
|
||||||
|
}
|
||||||
|
t.Cleanup(restore)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// freshPool returns an isolated pool (NOT the global) so tests run
|
||||||
|
// independently. Stops the janitor on cleanup.
|
||||||
|
func freshPool(t *testing.T) *eicTunnelPool {
|
||||||
|
t.Helper()
|
||||||
|
p := newEICTunnelPool()
|
||||||
|
t.Cleanup(p.stop)
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEICTunnelPool_FourOpsAmortise pins the core invariant: four
|
||||||
|
// sequential acquire/release cycles on the same instanceID share
|
||||||
|
// ONE underlying tunnel setup. Mutation: delete the cache hit branch
|
||||||
|
// in acquire() → setupCount goes 1 → 4 → test fails.
|
||||||
|
func TestEICTunnelPool_FourOpsAmortise(t *testing.T) {
|
||||||
|
setupCount, cleanupCount, _, _ := withPoolSetupStub(t)
|
||||||
|
// Refill gate after each setup so concurrent stubs aren't blocked
|
||||||
|
// (we want every test to be able to set up if it needs to).
|
||||||
|
t.Cleanup(func() { /* no-op; defer is enough */ })
|
||||||
|
poolTTL = 50 * time.Second
|
||||||
|
pool := freshPool(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
for i := 0; i < 4; i++ {
|
||||||
|
sess, done, err := pool.acquire(ctx, "i-test-1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("op %d: acquire: %v", i, err)
|
||||||
|
}
|
||||||
|
if sess.instanceID != "i-test-1" {
|
||||||
|
t.Fatalf("op %d: session has wrong instanceID: %q", i, sess.instanceID)
|
||||||
|
}
|
||||||
|
done(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := atomic.LoadInt64(setupCount); got != 1 {
|
||||||
|
t.Errorf("expected exactly 1 tunnel setup across 4 ops, got %d", got)
|
||||||
|
}
|
||||||
|
if got := atomic.LoadInt64(cleanupCount); got != 0 {
|
||||||
|
t.Errorf("expected 0 cleanups while entry is hot (TTL=50s), got %d", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEICTunnelPool_DifferentInstancesDoNotShare pins that two
|
||||||
|
// different instanceIDs each get their own tunnel — the pool is
|
||||||
|
// keyed on instanceID, not a single global slot.
|
||||||
|
func TestEICTunnelPool_DifferentInstancesDoNotShare(t *testing.T) {
|
||||||
|
setupCount, _, _, unblock := withPoolSetupStub(t)
|
||||||
|
poolTTL = 50 * time.Second
|
||||||
|
pool := freshPool(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// First instance setup uses the initial gate slot.
|
||||||
|
_, doneA, err := pool.acquire(ctx, "i-a")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("acquire A: %v", err)
|
||||||
|
}
|
||||||
|
doneA(false)
|
||||||
|
|
||||||
|
// Second instance needs a new slot through the gate.
|
||||||
|
unblock()
|
||||||
|
_, doneB, err := pool.acquire(ctx, "i-b")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("acquire B: %v", err)
|
||||||
|
}
|
||||||
|
doneB(false)
|
||||||
|
|
||||||
|
if got := atomic.LoadInt64(setupCount); got != 2 {
|
||||||
|
t.Errorf("expected 2 setups (one per instance), got %d", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEICTunnelPool_TTLEviction: a short TTL forces the second op
|
||||||
|
// to build a fresh tunnel after the first expires.
|
||||||
|
func TestEICTunnelPool_TTLEviction(t *testing.T) {
|
||||||
|
setupCount, cleanupCount, _, unblock := withPoolSetupStub(t)
|
||||||
|
poolTTL = 50 * time.Millisecond
|
||||||
|
poolJanitorInterval = 1 * time.Second // keep janitor away
|
||||||
|
pool := freshPool(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
_, done, err := pool.acquire(ctx, "i-ttl")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("acquire 1: %v", err)
|
||||||
|
}
|
||||||
|
done(false)
|
||||||
|
|
||||||
|
time.Sleep(80 * time.Millisecond) // past TTL
|
||||||
|
|
||||||
|
unblock() // allow next setup
|
||||||
|
_, done, err = pool.acquire(ctx, "i-ttl")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("acquire 2: %v", err)
|
||||||
|
}
|
||||||
|
done(false)
|
||||||
|
|
||||||
|
if got := atomic.LoadInt64(setupCount); got != 2 {
|
||||||
|
t.Errorf("expected 2 setups (TTL eviction between), got %d", got)
|
||||||
|
}
|
||||||
|
// First entry should have been cleaned up when the second
|
||||||
|
// acquire evicted it on the slow path. Cleanup runs in a
|
||||||
|
// goroutine; poll briefly for it to land.
|
||||||
|
deadline := time.Now().Add(500 * time.Millisecond)
|
||||||
|
for atomic.LoadInt64(cleanupCount) < 1 && time.Now().Before(deadline) {
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if got := atomic.LoadInt64(cleanupCount); got < 1 {
|
||||||
|
t.Errorf("expected ≥1 cleanup (first entry evicted), got %d", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEICTunnelPool_FailureInvalidates pins the poison-on-fault
|
||||||
|
// behavior — fn returning a tunnel-fatal error marks the entry
|
||||||
|
// unusable so the next acquire builds fresh.
|
||||||
|
func TestEICTunnelPool_FailureInvalidates(t *testing.T) {
|
||||||
|
setupCount, _, _, unblock := withPoolSetupStub(t)
|
||||||
|
poolTTL = 50 * time.Second
|
||||||
|
pool := freshPool(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
_, done, err := pool.acquire(ctx, "i-fault")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("acquire 1: %v", err)
|
||||||
|
}
|
||||||
|
done(true) // signal poison
|
||||||
|
|
||||||
|
unblock() // let the next setup through
|
||||||
|
_, done, err = pool.acquire(ctx, "i-fault")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("acquire 2: %v", err)
|
||||||
|
}
|
||||||
|
done(false)
|
||||||
|
|
||||||
|
if got := atomic.LoadInt64(setupCount); got != 2 {
|
||||||
|
t.Errorf("expected 2 setups (poison forced rebuild), got %d", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEICTunnelPool_ConcurrentAcquireSingleSetup pins that N
|
||||||
|
// concurrent acquires for the same instanceID before any release
|
||||||
|
// only trigger ONE tunnel setup — the rest wait via pendingSetups.
|
||||||
|
//
|
||||||
|
// Without this guard each concurrent acquire would spawn its own
|
||||||
|
// tunnel and the loser-cleanup would still leak refcount. Mutation:
|
||||||
|
// delete the pendingSetups gate → setupCount goes 1 → N → fails.
|
||||||
|
func TestEICTunnelPool_ConcurrentAcquireSingleSetup(t *testing.T) {
|
||||||
|
setupCount, _, _, _ := withPoolSetupStub(t)
|
||||||
|
// Pause setup so all goroutines pile into the pending slot.
|
||||||
|
prev := poolSetupTunnel
|
||||||
|
gate := make(chan struct{})
|
||||||
|
poolSetupTunnel = func(ctx context.Context, instanceID string) (
|
||||||
|
eicSSHSession, func(), error) {
|
||||||
|
<-gate
|
||||||
|
atomic.AddInt64(setupCount, 1)
|
||||||
|
return eicSSHSession{instanceID: instanceID}, func() {}, nil
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { poolSetupTunnel = prev })
|
||||||
|
|
||||||
|
poolTTL = 50 * time.Second
|
||||||
|
pool := freshPool(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
const N = 8
|
||||||
|
type result struct {
|
||||||
|
done func(bool)
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
results := make(chan result, N)
|
||||||
|
var startWg sync.WaitGroup
|
||||||
|
startWg.Add(N)
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
go func() {
|
||||||
|
startWg.Done()
|
||||||
|
_, done, err := pool.acquire(ctx, "i-concurrent")
|
||||||
|
results <- result{done, err}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
startWg.Wait()
|
||||||
|
// give all N goroutines time to enter pool.acquire
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
close(gate)
|
||||||
|
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
r := <-results
|
||||||
|
if r.err != nil {
|
||||||
|
t.Fatalf("acquire %d: %v", i, r.err)
|
||||||
|
}
|
||||||
|
r.done(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := atomic.LoadInt64(setupCount); got != 1 {
|
||||||
|
t.Errorf("expected 1 setup across %d concurrent acquires, got %d", N, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEICTunnelPool_TTLZeroDisablesPooling pins the escape hatch:
|
||||||
|
// poolTTL=0 means every acquire goes straight through to setup +
|
||||||
|
// cleanup, no entry kept. Useful for tests / opt-out.
|
||||||
|
func TestEICTunnelPool_TTLZeroDisablesPooling(t *testing.T) {
|
||||||
|
setupCount, cleanupCount, _, unblock := withPoolSetupStub(t)
|
||||||
|
poolTTL = 0
|
||||||
|
pool := freshPool(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
_, done, err := pool.acquire(ctx, "i-ttlzero")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("acquire 1: %v", err)
|
||||||
|
}
|
||||||
|
done(false)
|
||||||
|
|
||||||
|
unblock()
|
||||||
|
_, done, err = pool.acquire(ctx, "i-ttlzero")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("acquire 2: %v", err)
|
||||||
|
}
|
||||||
|
done(false)
|
||||||
|
|
||||||
|
if got := atomic.LoadInt64(setupCount); got != 2 {
|
||||||
|
t.Errorf("expected 2 setups with TTL=0 (pool disabled), got %d", got)
|
||||||
|
}
|
||||||
|
if got := atomic.LoadInt64(cleanupCount); got != 2 {
|
||||||
|
t.Errorf("expected 2 cleanups with TTL=0 (each release closes), got %d", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEICTunnelPool_LRUEvictionAtCap pins the LRU defence: when the
|
||||||
|
// pool reaches poolMaxEntries, a new acquire for an unseen
|
||||||
|
// instanceID evicts the LRU idle entry instead of growing unbounded.
|
||||||
|
func TestEICTunnelPool_LRUEvictionAtCap(t *testing.T) {
|
||||||
|
setupCount, cleanupCount, _, _ := withPoolSetupStub(t)
|
||||||
|
prev := poolMaxEntries
|
||||||
|
poolMaxEntries = 2
|
||||||
|
t.Cleanup(func() { poolMaxEntries = prev })
|
||||||
|
poolTTL = 50 * time.Second
|
||||||
|
|
||||||
|
// Replace stub with one that doesn't gate so we can fill quickly.
|
||||||
|
poolSetupTunnel = func(ctx context.Context, instanceID string) (
|
||||||
|
eicSSHSession, func(), error) {
|
||||||
|
atomic.AddInt64(setupCount, 1)
|
||||||
|
return eicSSHSession{instanceID: instanceID}, func() {
|
||||||
|
atomic.AddInt64(cleanupCount, 1)
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
pool := freshPool(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
for _, id := range []string{"i-1", "i-2"} {
|
||||||
|
_, done, err := pool.acquire(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("acquire %s: %v", id, err)
|
||||||
|
}
|
||||||
|
done(false)
|
||||||
|
}
|
||||||
|
// Both entries idle, pool at cap.
|
||||||
|
_, done, err := pool.acquire(ctx, "i-3")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("acquire i-3: %v", err)
|
||||||
|
}
|
||||||
|
done(false)
|
||||||
|
|
||||||
|
// Wait for the goroutine'd cleanup of the evicted entry.
|
||||||
|
deadline := time.Now().Add(500 * time.Millisecond)
|
||||||
|
for atomic.LoadInt64(cleanupCount) < 1 && time.Now().Before(deadline) {
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := atomic.LoadInt64(setupCount); got != 3 {
|
||||||
|
t.Errorf("expected 3 setups (one per unique instance), got %d", got)
|
||||||
|
}
|
||||||
|
if got := atomic.LoadInt64(cleanupCount); got < 1 {
|
||||||
|
t.Errorf("expected ≥1 cleanup (LRU eviction), got %d", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEICTunnelPool_PoisonedClassification pins the heuristic that
|
||||||
|
// distinguishes tunnel-fatal errors (poison the entry) from
|
||||||
|
// app-level errors (file not found, validation) that should NOT
|
||||||
|
// invalidate the tunnel.
|
||||||
|
func TestEICTunnelPool_PoisonedClassification(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
err error
|
||||||
|
want bool
|
||||||
|
}{
|
||||||
|
{"nil", nil, false},
|
||||||
|
{"file not found", errors.New("os: file does not exist"), false},
|
||||||
|
{"validation", errors.New("invalid path: must be relative"), false},
|
||||||
|
{"connection refused", errors.New("ssh: connect to host: connection refused"), true},
|
||||||
|
{"connection refused upper", errors.New("Connection Refused"), true},
|
||||||
|
{"broken pipe", errors.New("write tunnel: broken pipe"), true},
|
||||||
|
{"permission denied", errors.New("Permission denied (publickey)"), true},
|
||||||
|
{"auth failed", errors.New("Authentication failed"), true},
|
||||||
|
{"connection reset", errors.New("Connection reset by peer"), true},
|
||||||
|
{"port forward", errors.New("port forwarding failed"), true},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
got := fnErrIndicatesTunnelFault(tc.err)
|
||||||
|
if got != tc.want {
|
||||||
|
t.Errorf("fnErrIndicatesTunnelFault(%v) = %v, want %v",
|
||||||
|
tc.err, got, tc.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestEICTunnelPool_RefcountBlocksEviction pins that an entry past
|
||||||
|
// TTL is NOT evicted while a caller still holds it — preventing
|
||||||
|
// use-after-free in the holder.
|
||||||
|
func TestEICTunnelPool_RefcountBlocksEviction(t *testing.T) {
|
||||||
|
setupCount, cleanupCount, _, _ := withPoolSetupStub(t)
|
||||||
|
poolTTL = 30 * time.Millisecond
|
||||||
|
poolJanitorInterval = 5 * time.Millisecond
|
||||||
|
pool := freshPool(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
_, done, err := pool.acquire(ctx, "i-hold")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("acquire: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sleep past TTL while holding the session. Janitor sweeps
|
||||||
|
// every 5ms but must skip our entry because refcount=1.
|
||||||
|
time.Sleep(80 * time.Millisecond)
|
||||||
|
|
||||||
|
if got := atomic.LoadInt64(cleanupCount); got != 0 {
|
||||||
|
t.Errorf("expected 0 cleanups while holder is active, got %d", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
done(false)
|
||||||
|
// Now refcount=0 and entry is past TTL; releaser triggers cleanup.
|
||||||
|
deadline := time.Now().Add(200 * time.Millisecond)
|
||||||
|
for atomic.LoadInt64(cleanupCount) < 1 && time.Now().Before(deadline) {
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if got := atomic.LoadInt64(cleanupCount); got != 1 {
|
||||||
|
t.Errorf("expected 1 cleanup after release of expired entry, got %d", got)
|
||||||
|
}
|
||||||
|
if got := atomic.LoadInt64(setupCount); got != 1 {
|
||||||
|
t.Errorf("setupCount tracking: got %d, want 1", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPooledWithEICTunnel_PanicPoisonsEntry pins that a panic
|
||||||
|
// from fn poisons the pool entry on the way out — refcount goes
|
||||||
|
// back to zero (no leak) and the entry is marked unusable so the
|
||||||
|
// next acquire builds fresh. Without the defer-release pattern, a
|
||||||
|
// panic would leave refcount=1 forever and the entry would never
|
||||||
|
// evict.
|
||||||
|
func TestPooledWithEICTunnel_PanicPoisonsEntry(t *testing.T) {
|
||||||
|
setupCount, _, _, _ := withPoolSetupStub(t)
|
||||||
|
poolTTL = 50 * time.Second
|
||||||
|
globalEICTunnelPool = newEICTunnelPool()
|
||||||
|
t.Cleanup(globalEICTunnelPool.stop)
|
||||||
|
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r == nil {
|
||||||
|
t.Errorf("expected panic to bubble up, got nil")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
_ = pooledWithEICTunnel(context.Background(), "i-panic",
|
||||||
|
func(s eicSSHSession) error { panic("boom") })
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Replenish the gate so the next setup can run.
|
||||||
|
prev := poolSetupTunnel
|
||||||
|
poolSetupTunnel = func(ctx context.Context, instanceID string) (
|
||||||
|
eicSSHSession, func(), error) {
|
||||||
|
atomic.AddInt64(setupCount, 1)
|
||||||
|
return eicSSHSession{instanceID: instanceID}, func() {}, nil
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { poolSetupTunnel = prev })
|
||||||
|
|
||||||
|
// Next acquire must build fresh — entry was poisoned by panic.
|
||||||
|
if err := pooledWithEICTunnel(context.Background(), "i-panic",
|
||||||
|
func(s eicSSHSession) error { return nil }); err != nil {
|
||||||
|
t.Fatalf("post-panic acquire: %v", err)
|
||||||
|
}
|
||||||
|
if got := atomic.LoadInt64(setupCount); got != 2 {
|
||||||
|
t.Errorf("expected 2 setups (panic poisoned, rebuild), got %d", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPooledWithEICTunnel_PreservesFnErr pins that errors from the
|
||||||
|
// inner fn pass through to the caller verbatim — pool wrapping
|
||||||
|
// should not swallow or transform error semantics for app code.
|
||||||
|
func TestPooledWithEICTunnel_PreservesFnErr(t *testing.T) {
|
||||||
|
withPoolSetupStub(t)
|
||||||
|
poolTTL = 50 * time.Second
|
||||||
|
|
||||||
|
// Reset the global pool so this test is isolated from any prior
|
||||||
|
// test that may have populated it.
|
||||||
|
globalEICTunnelPool = newEICTunnelPool()
|
||||||
|
|
||||||
|
want := errors.New("file does not exist")
|
||||||
|
got := pooledWithEICTunnel(context.Background(), "i-fn-err",
|
||||||
|
func(s eicSSHSession) error { return want })
|
||||||
|
if !errors.Is(got, want) {
|
||||||
|
t.Errorf("pooledWithEICTunnel returned %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user