test(db): real-PG migration replay-from-scratch + InitPostgres ping + redis online-status key/TTL coverage (#2150) #2452
@@ -30,6 +30,11 @@ PROFILES: dict[str, dict[str, str]] = {
|
||||
# workflow (they reuse its migrated Postgres), so changes to the
|
||||
# scheduler package must trigger the job too.
|
||||
r"|^workspace-server/internal/scheduler/"
|
||||
# #2150: the db package's real-PG migration-replay-from-scratch
|
||||
# + InitPostgres ping tests also run in this same workflow (they
|
||||
# reuse its sibling Postgres, against a separate `molecule_replay`
|
||||
# database). Changes to db must trigger the job too.
|
||||
r"|^workspace-server/internal/db/"
|
||||
r"|^workspace-server/migrations/"
|
||||
r"|^\.gitea/workflows/handlers-postgres-integration\.yml$"
|
||||
),
|
||||
|
||||
@@ -290,6 +290,33 @@ jobs:
|
||||
# / workspaces all landed by the migration replay step above).
|
||||
go test -tags=integration -timeout 5m -v ./internal/scheduler/ -run "^TestIntegration_"
|
||||
|
||||
- if: needs.detect-changes.outputs.handlers == 'true'
|
||||
name: Migration replay-from-scratch gate (#2150)
|
||||
env:
|
||||
PGPASSWORD: test
|
||||
run: |
|
||||
# Issue #2150 (SOP internal#765): prove the FULL forward migration
|
||||
# chain (.up + legacy .sql) replays from a blank schema via the
|
||||
# PRODUCTION db.RunMigrations entrypoint — hard-fail on any error.
|
||||
#
|
||||
# This is the gap the psql apply loop above does NOT cover: that
|
||||
# loop deliberately SKIPS failing migrations (`⊘ skipped`), so it
|
||||
# stays green even if the chain stops replaying. The Go test below
|
||||
# uses the real boot-time runner with hard-fail semantics, catching
|
||||
# the #211 .down-wipe class and the 045 non-idempotent crash-loop
|
||||
# class (it runs the chain twice).
|
||||
#
|
||||
# Run against a SEPARATE database so the destructive
|
||||
# `DROP SCHEMA public CASCADE` inside the test never touches the
|
||||
# `molecule` DB the handlers integration tests above migrated. No
|
||||
# ordering coupling with the handlers step.
|
||||
createdb -h "${PG_HOST}" -U postgres molecule_replay 2>/dev/null || \
|
||||
psql -h "${PG_HOST}" -U postgres -d molecule \
|
||||
-c "CREATE DATABASE molecule_replay" >/dev/null 2>&1 || true
|
||||
INTEGRATION_DB_URL="postgres://postgres:test@${PG_HOST}:5432/molecule_replay?sslmode=disable" \
|
||||
go test -tags=integration -timeout 5m -v ./internal/db/ \
|
||||
-run '^TestIntegration_Migration|^TestIntegration_InitPostgres'
|
||||
|
||||
- if: failure() && needs.detect-changes.outputs.handlers == 'true'
|
||||
name: Diagnostic dump on failure
|
||||
env:
|
||||
|
||||
@@ -0,0 +1,286 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
// postgres_replay_integration_test.go — REAL Postgres integration tests for
|
||||
// the boot-time migration runner (db.RunMigrations) and the connection
|
||||
// bootstrap (db.InitPostgres).
|
||||
//
|
||||
// Issue #2150 (SOP rule internal#765 regression-coverage). test_layer:
|
||||
// real-postgres.
|
||||
//
|
||||
// Run locally with:
|
||||
//
|
||||
// docker run --rm -d --name pg-replay \
|
||||
// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \
|
||||
// -p 55432:5432 postgres:15-alpine
|
||||
// sleep 4
|
||||
// cd workspace-server
|
||||
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
|
||||
// go test -tags=integration ./internal/db/ -run '^TestIntegration_Migration|^TestIntegration_InitPostgres'
|
||||
//
|
||||
// In CI these run on .gitea/workflows/handlers-postgres-integration.yml,
|
||||
// which already provisions a real Postgres on the operator-host bridge and
|
||||
// triggers on workspace-server/migrations/** changes — the exact blast
|
||||
// radius this gate must cover.
|
||||
//
|
||||
// WHY A REAL DATABASE — and why the existing coverage is NOT enough
|
||||
// -----------------------------------------------------------------
|
||||
// postgres_migrate_test.go and postgres_schema_migrations_test.go are
|
||||
// sqlmock-only: they pin which SQL *statements* fire, but a mock cannot
|
||||
// execute SQL, so it cannot prove the 118-file (.up + legacy .sql) chain
|
||||
// actually REPLAYS FROM SCRATCH against a real Postgres. The CI psql loop
|
||||
// in handlers-postgres-integration.yml deliberately *skips* failing
|
||||
// migrations (`⊘ skipped`), so it would stay green even if the chain
|
||||
// stopped replaying — it is not a replay gate.
|
||||
//
|
||||
// This file closes that gap. It boots a Postgres, resets the public schema
|
||||
// to a blank slate, and runs the PRODUCTION db.RunMigrations entrypoint —
|
||||
// the same function platform boot calls — with hard-fail semantics. It
|
||||
// would FAIL (watch-fail intent) against:
|
||||
//
|
||||
// - Issue #211: if RunMigrations regresses to globbing `*.sql` and
|
||||
// sorting `.down.sql` before `.up.sql`, the rollback runs before the
|
||||
// forward for any pair (020_workspace_auth_tokens was the canary),
|
||||
// either erroring on the DROP or wiping the just-created table.
|
||||
//
|
||||
// - The 045 crash-loop class (cp#429 / project_cp_migration_045_*): the
|
||||
// runner re-applies every recorded-absent file every boot, so a
|
||||
// non-idempotent migration (bare CREATE / INSERT without IF NOT EXISTS
|
||||
// / ON CONFLICT) replays cleanly the first time and FAILS the second.
|
||||
// TestIntegration_MigrationReplay_IsIdempotent_DoubleApply runs the
|
||||
// full chain twice against the same DB to catch that at PR time.
|
||||
//
|
||||
// - A new migration that depends on a table a later migration drops, or
|
||||
// is mis-ordered in the lexicographic chain — it simply will not apply
|
||||
// from scratch and the replay errors.
|
||||
//
|
||||
// All assertions key off the OBSERVABLE database state after the real run,
|
||||
// not a proxy for "a statement fired".
|
||||
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
// migrationsDir is the on-disk path to the forward+legacy migration chain
|
||||
// relative to this test file (workspace-server/internal/db → ../../migrations).
|
||||
const migrationsDir = "../../migrations"
|
||||
|
||||
// freshIntegrationDB opens $INTEGRATION_DB_URL (skipping the test if unset),
|
||||
// resets the `public` schema to an empty slate so the run is a true
|
||||
// replay-from-scratch regardless of what an earlier CI step applied, and
|
||||
// registers a Cleanup that closes the connection.
|
||||
//
|
||||
// It also points the package-global db.DB at this connection, because
|
||||
// RunMigrations operates on db.DB. NOT SAFE for t.Parallel() — it owns the
|
||||
// schema for the duration of the test.
|
||||
func freshIntegrationDB(t *testing.T) *sql.DB {
|
||||
t.Helper()
|
||||
url := os.Getenv("INTEGRATION_DB_URL")
|
||||
if url == "" {
|
||||
t.Skip("INTEGRATION_DB_URL not set; skipping real-PG replay test (local devs: see file header)")
|
||||
}
|
||||
conn, err := sql.Open("postgres", url)
|
||||
if err != nil {
|
||||
t.Fatalf("open: %v", err)
|
||||
}
|
||||
if err := conn.Ping(); err != nil {
|
||||
t.Fatalf("ping: %v", err)
|
||||
}
|
||||
// True from-scratch: blow away any schema a prior CI step (e.g. the
|
||||
// handlers psql apply-all loop) left behind, then start clean. This is
|
||||
// what makes the test a *replay-from-scratch* gate rather than a
|
||||
// re-apply-onto-existing test.
|
||||
if _, err := conn.Exec(`DROP SCHEMA public CASCADE; CREATE SCHEMA public`); err != nil {
|
||||
t.Fatalf("reset public schema: %v", err)
|
||||
}
|
||||
// gen_random_uuid() (used by 001_workspaces.sql et al.) lives in
|
||||
// pgcrypto on PG < 13 and core on PG 13+. postgres:15-alpine has it in
|
||||
// core, but create the extension defensively so the test does not pin a
|
||||
// specific PG minor.
|
||||
if _, err := conn.Exec(`CREATE EXTENSION IF NOT EXISTS pgcrypto`); err != nil {
|
||||
t.Fatalf("create pgcrypto: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { conn.Close() })
|
||||
return conn
|
||||
}
|
||||
|
||||
// forwardMigrationCount counts the files RunMigrations is expected to apply:
|
||||
// every *.sql that is NOT a *.down.sql. This is derived from the real
|
||||
// directory so the gate auto-tracks new migrations without an edit here.
|
||||
func forwardMigrationCount(t *testing.T) int {
|
||||
t.Helper()
|
||||
all, err := filepath.Glob(filepath.Join(migrationsDir, "*.sql"))
|
||||
if err != nil {
|
||||
t.Fatalf("glob migrations: %v", err)
|
||||
}
|
||||
n := 0
|
||||
for _, f := range all {
|
||||
if len(f) >= len(".down.sql") && f[len(f)-len(".down.sql"):] == ".down.sql" {
|
||||
continue
|
||||
}
|
||||
n++
|
||||
}
|
||||
if n == 0 {
|
||||
t.Fatalf("found zero forward migrations under %s — wrong path?", migrationsDir)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// TestIntegration_InitPostgres_PingSucceeds proves the production connection
|
||||
// bootstrap actually establishes a usable pool against a real server. A
|
||||
// sqlmock test can never exercise the real DB.Ping() inside InitPostgres,
|
||||
// which is the line that turns a bad DSN / unreachable host into a boot
|
||||
// failure instead of a silently-broken pool.
|
||||
func TestIntegration_InitPostgres_PingSucceeds(t *testing.T) {
|
||||
url := os.Getenv("INTEGRATION_DB_URL")
|
||||
if url == "" {
|
||||
t.Skip("INTEGRATION_DB_URL not set; skipping")
|
||||
}
|
||||
if err := InitPostgres(url); err != nil {
|
||||
t.Fatalf("InitPostgres against real PG failed: %v", err)
|
||||
}
|
||||
if DB == nil {
|
||||
t.Fatal("InitPostgres returned nil error but db.DB is nil")
|
||||
}
|
||||
// The pool must be live, not just opened.
|
||||
if err := DB.Ping(); err != nil {
|
||||
t.Fatalf("db.DB.Ping after InitPostgres: %v", err)
|
||||
}
|
||||
// Round-trip a trivial query to prove the connection actually serves.
|
||||
var one int
|
||||
if err := DB.QueryRow("SELECT 1").Scan(&one); err != nil {
|
||||
t.Fatalf("SELECT 1 round-trip: %v", err)
|
||||
}
|
||||
if one != 1 {
|
||||
t.Fatalf("SELECT 1 returned %d", one)
|
||||
}
|
||||
}
|
||||
|
||||
// TestIntegration_InitPostgres_BadDSNFails proves InitPostgres surfaces an
|
||||
// unreachable/garbage DSN as an error (the ping path), rather than handing
|
||||
// back a half-open pool. Watch-fail: if someone drops the DB.Ping() check
|
||||
// from InitPostgres, this stops returning an error and fails.
|
||||
func TestIntegration_InitPostgres_BadDSNFails(t *testing.T) {
|
||||
if os.Getenv("INTEGRATION_DB_URL") == "" {
|
||||
t.Skip("INTEGRATION_DB_URL not set; skipping")
|
||||
}
|
||||
// Valid DSN shape, but nothing is listening on this port.
|
||||
err := InitPostgres("postgres://postgres:test@127.0.0.1:1/does_not_exist?sslmode=disable&connect_timeout=2")
|
||||
if err == nil {
|
||||
t.Fatal("expected InitPostgres to fail against an unreachable DSN, got nil (DB.Ping check removed?)")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIntegration_MigrationReplay_FromScratch is the core gate: run the
|
||||
// PRODUCTION RunMigrations over a blank public schema and assert the full
|
||||
// forward chain applies cleanly with zero skips.
|
||||
//
|
||||
// Watch-fail intent:
|
||||
// - #211 .down-wipe: a `.down.sql` leaking into the forward set would
|
||||
// run a DROP before its CREATE → error here (hard fail), or wipe a
|
||||
// table → the schema_migrations / table-presence assertions catch it.
|
||||
// - mis-ordered / dangling-dependency migration → RunMigrations returns
|
||||
// a non-nil error and this test fails.
|
||||
func TestIntegration_MigrationReplay_FromScratch(t *testing.T) {
|
||||
conn := freshIntegrationDB(t)
|
||||
DB = conn // RunMigrations operates on the package-global DB.
|
||||
|
||||
if err := RunMigrations(migrationsDir); err != nil {
|
||||
t.Fatalf("full-chain replay-from-scratch failed: %v", err)
|
||||
}
|
||||
|
||||
// Every forward migration must be recorded as applied — proves none was
|
||||
// silently skipped (the failure mode the CI psql loop tolerates).
|
||||
want := forwardMigrationCount(t)
|
||||
var got int
|
||||
if err := DB.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&got); err != nil {
|
||||
t.Fatalf("count schema_migrations: %v", err)
|
||||
}
|
||||
if got != want {
|
||||
t.Errorf("schema_migrations recorded %d migrations, expected %d (the full forward chain)", got, want)
|
||||
}
|
||||
|
||||
// No `.down.sql` may ever be recorded — that is the #211 signature.
|
||||
var downRecorded int
|
||||
if err := DB.QueryRow(
|
||||
"SELECT COUNT(*) FROM schema_migrations WHERE filename LIKE '%.down.sql'",
|
||||
).Scan(&downRecorded); err != nil {
|
||||
t.Fatalf("count down migrations: %v", err)
|
||||
}
|
||||
if downRecorded != 0 {
|
||||
t.Errorf("a .down.sql migration was applied (#211 regression): %d recorded", downRecorded)
|
||||
}
|
||||
|
||||
// Spot-check load-bearing tables that survive to HEAD of the chain.
|
||||
// workspaces is the root table; workspace_auth_tokens was the #211
|
||||
// canary (its data wipe regressed AdminAuth to fail-open).
|
||||
for _, tbl := range []string{"workspaces", "workspace_auth_tokens", "delegations", "activity_logs"} {
|
||||
var exists bool
|
||||
if err := DB.QueryRow(
|
||||
"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema='public' AND table_name=$1)",
|
||||
tbl,
|
||||
).Scan(&exists); err != nil {
|
||||
t.Fatalf("check table %s: %v", tbl, err)
|
||||
}
|
||||
if !exists {
|
||||
t.Errorf("table %q missing after full replay — chain did not land it", tbl)
|
||||
}
|
||||
}
|
||||
|
||||
// agent_memories is CREATEd at 008 and DROPped at the end of the chain
|
||||
// (20260524110000_drop_agent_memories). Its absence proves the late
|
||||
// drop migration actually ran AFTER the early create — i.e. ordering
|
||||
// held. If the chain ever runs a drop before its create, this flips.
|
||||
var legacyExists bool
|
||||
if err := DB.QueryRow(
|
||||
"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema='public' AND table_name='agent_memories')",
|
||||
).Scan(&legacyExists); err != nil {
|
||||
t.Fatalf("check agent_memories: %v", err)
|
||||
}
|
||||
if legacyExists {
|
||||
t.Error("agent_memories still present at HEAD — the late drop migration did not replay in order")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIntegration_MigrationReplay_IsIdempotent_DoubleApply guards the 045
|
||||
// crash-loop class (cp#429 / project_cp_migration_045_crashloop_idempotency_guard):
|
||||
// the runner re-checks every file on every boot, so a non-idempotent
|
||||
// migration replays fine once and FAILS on the second pass. Here we run the
|
||||
// full chain twice. The second pass must apply ZERO new files (all recorded)
|
||||
// and must not error.
|
||||
//
|
||||
// NOTE: this runs against the SAME populated schema, so it also exercises
|
||||
// the "skip already-applied" tracking path end-to-end against real PG, which
|
||||
// the sqlmock tests only simulate.
|
||||
func TestIntegration_MigrationReplay_IsIdempotent_DoubleApply(t *testing.T) {
|
||||
conn := freshIntegrationDB(t)
|
||||
DB = conn
|
||||
|
||||
if err := RunMigrations(migrationsDir); err != nil {
|
||||
t.Fatalf("first replay failed: %v", err)
|
||||
}
|
||||
var afterFirst int
|
||||
if err := DB.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&afterFirst); err != nil {
|
||||
t.Fatalf("count after first: %v", err)
|
||||
}
|
||||
|
||||
// Second boot: nothing new should apply, and it must not error even
|
||||
// though the runner re-evaluates every file (the 045 failure mode).
|
||||
if err := RunMigrations(migrationsDir); err != nil {
|
||||
t.Fatalf("second replay failed (non-idempotent migration / 045 crash-loop class): %v", err)
|
||||
}
|
||||
var afterSecond int
|
||||
if err := DB.QueryRow("SELECT COUNT(*) FROM schema_migrations").Scan(&afterSecond); err != nil {
|
||||
t.Fatalf("count after second: %v", err)
|
||||
}
|
||||
if afterSecond != afterFirst {
|
||||
t.Errorf("second boot changed schema_migrations from %d to %d — re-application is not a clean no-op", afterFirst, afterSecond)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,291 @@
|
||||
package db
|
||||
|
||||
// redis_test.go — regression coverage for the workspace online-status and
|
||||
// URL-resolution Redis layer (redis.go), which previously had NO test.
|
||||
//
|
||||
// Issue #2150 (SOP rule internal#765). redis.go drives two fleet-wide
|
||||
// behaviours that break silently if a key name or TTL drifts:
|
||||
//
|
||||
// - online detection: SetOnline / RefreshTTL / IsOnline on `ws:<id>`.
|
||||
// A wrong key prefix or a TTL shorter than the heartbeat interval makes
|
||||
// live workspaces flap to "unreachable — restart" (the exact failure
|
||||
// LivenessTTL=180s was tuned to avoid). A TTL too long hides real
|
||||
// crashes.
|
||||
// - proxy URL resolution: CacheURL / GetCachedURL / CacheInternalURL /
|
||||
// GetCachedInternalURL on `ws:<id>:url` and `ws:<id>:internal_url`.
|
||||
// A2A forwarding resolves the target workspace through these keys; a
|
||||
// prefix collision (e.g. the liveness key overlapping the URL key)
|
||||
// would serve the wrong URL or a literal "online" string as a URL.
|
||||
//
|
||||
// These tests run against miniredis — an in-process Redis that speaks the
|
||||
// real RESP protocol and enforces real TTL/expiry semantics — so they
|
||||
// exercise the actual go-redis client calls and key/TTL behaviour, not a
|
||||
// mock that rubber-stamps them. miniredis is already a module dependency.
|
||||
//
|
||||
// Watch-fail intent: change any `ws:%s...` format string in redis.go, or
|
||||
// regress LivenessTTL below the heartbeat window, and a test here fails.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// withMiniRedis spins up an in-process Redis, points the package-global RDB
|
||||
// at it, and registers Cleanup. Returns the server handle so tests can drive
|
||||
// the clock (FastForward) to exercise TTL expiry deterministically.
|
||||
func withMiniRedis(t *testing.T) *miniredis.Miniredis {
|
||||
t.Helper()
|
||||
mr, err := miniredis.Run()
|
||||
if err != nil {
|
||||
t.Fatalf("miniredis.Run: %v", err)
|
||||
}
|
||||
RDB = redis.NewClient(&redis.Options{Addr: mr.Addr()})
|
||||
t.Cleanup(func() {
|
||||
RDB.Close()
|
||||
mr.Close()
|
||||
})
|
||||
return mr
|
||||
}
|
||||
|
||||
// TestLivenessTTL_ExceedsHeartbeatWindow pins the tuned TTL. The heartbeat
|
||||
// loop fires every 30s; LivenessTTL must allow several missed beats (the
|
||||
// comment in redis.go targets ~5) so a busy leader starved for 60-120s is
|
||||
// not falsely declared dead. 180s = 6×30s. Regressing this toward the old
|
||||
// 60s value reintroduces the false-positive restart cycle.
|
||||
func TestLivenessTTL_ExceedsHeartbeatWindow(t *testing.T) {
|
||||
const heartbeatInterval = 30 * time.Second
|
||||
const minMissedBeats = 5
|
||||
if LivenessTTL < heartbeatInterval*minMissedBeats {
|
||||
t.Errorf("LivenessTTL=%s is too short: must tolerate >=%d missed %s heartbeats (>= %s) to avoid false-positive restarts",
|
||||
LivenessTTL, minMissedBeats, heartbeatInterval, heartbeatInterval*minMissedBeats)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSetOnline_KeyAndTTL verifies SetOnline writes the canonical `ws:<id>`
|
||||
// key with the value "online" and the LivenessTTL — the exact contract
|
||||
// IsOnline and the a2a_proxy reactive check rely on.
|
||||
func TestSetOnline_KeyAndTTL(t *testing.T) {
|
||||
mr := withMiniRedis(t)
|
||||
ctx := context.Background()
|
||||
const ws = "ws-abc-123"
|
||||
|
||||
if err := SetOnline(ctx, ws); err != nil {
|
||||
t.Fatalf("SetOnline: %v", err)
|
||||
}
|
||||
|
||||
// Key name must be exactly ws:<id> — not, say, ws:<id>:online.
|
||||
if !mr.Exists("ws:" + ws) {
|
||||
t.Fatalf("expected key %q to exist; keys present: %v", "ws:"+ws, mr.Keys())
|
||||
}
|
||||
got, err := mr.Get("ws:" + ws)
|
||||
if err != nil {
|
||||
t.Fatalf("mr.Get: %v", err)
|
||||
}
|
||||
if got != "online" {
|
||||
t.Errorf("liveness value = %q, want %q", got, "online")
|
||||
}
|
||||
|
||||
// TTL must be the tuned LivenessTTL (allow miniredis's whole-second
|
||||
// granularity).
|
||||
ttl := mr.TTL("ws:" + ws)
|
||||
if ttl != LivenessTTL {
|
||||
t.Errorf("TTL = %s, want %s", ttl, LivenessTTL)
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsOnline_TrueThenExpires drives the real TTL clock: a freshly-set
|
||||
// workspace is online; after the TTL elapses it is offline. This is the
|
||||
// behaviour online-detection depends on — proven against real expiry, not
|
||||
// asserted from a mock.
|
||||
func TestIsOnline_TrueThenExpires(t *testing.T) {
|
||||
mr := withMiniRedis(t)
|
||||
ctx := context.Background()
|
||||
const ws = "ws-expiry"
|
||||
|
||||
if err := SetOnline(ctx, ws); err != nil {
|
||||
t.Fatalf("SetOnline: %v", err)
|
||||
}
|
||||
online, err := IsOnline(ctx, ws)
|
||||
if err != nil {
|
||||
t.Fatalf("IsOnline: %v", err)
|
||||
}
|
||||
if !online {
|
||||
t.Fatal("expected workspace online immediately after SetOnline")
|
||||
}
|
||||
|
||||
// Fast-forward just past the TTL; the liveness key must expire.
|
||||
mr.FastForward(LivenessTTL + time.Second)
|
||||
|
||||
online, err = IsOnline(ctx, ws)
|
||||
if err != nil {
|
||||
t.Fatalf("IsOnline after expiry: %v", err)
|
||||
}
|
||||
if online {
|
||||
t.Error("expected workspace offline after TTL elapsed")
|
||||
}
|
||||
}
|
||||
|
||||
// TestRefreshTTL_ExtendsLiveness proves a heartbeat (RefreshTTL) keeps a
|
||||
// workspace alive across what would otherwise be an expiry. Without the
|
||||
// refresh the key expires; with it, IsOnline stays true. Watch-fail: if
|
||||
// RefreshTTL targets the wrong key, the refresh is a no-op and this fails.
|
||||
func TestRefreshTTL_ExtendsLiveness(t *testing.T) {
|
||||
mr := withMiniRedis(t)
|
||||
ctx := context.Background()
|
||||
const ws = "ws-refresh"
|
||||
|
||||
if err := SetOnline(ctx, ws); err != nil {
|
||||
t.Fatalf("SetOnline: %v", err)
|
||||
}
|
||||
// Advance most of the way to expiry, then heartbeat.
|
||||
mr.FastForward(LivenessTTL - 5*time.Second)
|
||||
if err := RefreshTTL(ctx, ws); err != nil {
|
||||
t.Fatalf("RefreshTTL: %v", err)
|
||||
}
|
||||
// Advance past where the ORIGINAL TTL would have expired. Still online.
|
||||
mr.FastForward(10 * time.Second)
|
||||
online, err := IsOnline(ctx, ws)
|
||||
if err != nil {
|
||||
t.Fatalf("IsOnline: %v", err)
|
||||
}
|
||||
if !online {
|
||||
t.Error("expected workspace still online after RefreshTTL heartbeat")
|
||||
}
|
||||
}
|
||||
|
||||
// TestIsOnline_UnknownWorkspace returns false (and no error) for a workspace
|
||||
// that was never set — the default for a never-registered / long-dead agent.
|
||||
func TestIsOnline_UnknownWorkspace(t *testing.T) {
|
||||
withMiniRedis(t)
|
||||
ctx := context.Background()
|
||||
online, err := IsOnline(ctx, "never-seen")
|
||||
if err != nil {
|
||||
t.Fatalf("IsOnline: %v", err)
|
||||
}
|
||||
if online {
|
||||
t.Error("expected unknown workspace to be offline")
|
||||
}
|
||||
}
|
||||
|
||||
// TestURLCache_RoundTrip pins the `ws:<id>:url` key and its 5-minute TTL,
|
||||
// and proves the value round-trips. A2A push resolves the target through
|
||||
// this key.
|
||||
func TestURLCache_RoundTrip(t *testing.T) {
|
||||
mr := withMiniRedis(t)
|
||||
ctx := context.Background()
|
||||
const ws = "ws-url"
|
||||
const url = "https://ws-url.workspaces.moleculesai.app"
|
||||
|
||||
if err := CacheURL(ctx, ws, url); err != nil {
|
||||
t.Fatalf("CacheURL: %v", err)
|
||||
}
|
||||
got, err := GetCachedURL(ctx, ws)
|
||||
if err != nil {
|
||||
t.Fatalf("GetCachedURL: %v", err)
|
||||
}
|
||||
if got != url {
|
||||
t.Errorf("GetCachedURL = %q, want %q", got, url)
|
||||
}
|
||||
if !mr.Exists("ws:" + ws + ":url") {
|
||||
t.Errorf("expected key %q; present: %v", "ws:"+ws+":url", mr.Keys())
|
||||
}
|
||||
if ttl := mr.TTL("ws:" + ws + ":url"); ttl != 5*time.Minute {
|
||||
t.Errorf("url cache TTL = %s, want 5m", ttl)
|
||||
}
|
||||
}
|
||||
|
||||
// TestInternalURLCache_RoundTrip pins the `ws:<id>:internal_url` key (the
|
||||
// Docker-internal address used for workspace-to-workspace discovery) and its
|
||||
// 5-minute TTL.
|
||||
func TestInternalURLCache_RoundTrip(t *testing.T) {
|
||||
mr := withMiniRedis(t)
|
||||
ctx := context.Background()
|
||||
const ws = "ws-int"
|
||||
const url = "http://ws-int:8080"
|
||||
|
||||
if err := CacheInternalURL(ctx, ws, url); err != nil {
|
||||
t.Fatalf("CacheInternalURL: %v", err)
|
||||
}
|
||||
got, err := GetCachedInternalURL(ctx, ws)
|
||||
if err != nil {
|
||||
t.Fatalf("GetCachedInternalURL: %v", err)
|
||||
}
|
||||
if got != url {
|
||||
t.Errorf("GetCachedInternalURL = %q, want %q", got, url)
|
||||
}
|
||||
if ttl := mr.TTL("ws:" + ws + ":internal_url"); ttl != 5*time.Minute {
|
||||
t.Errorf("internal url cache TTL = %s, want 5m", ttl)
|
||||
}
|
||||
}
|
||||
|
||||
// TestKeyNamespacesDoNotCollide is the prefix-collision regression: the
|
||||
// liveness key (ws:<id>), the URL key (ws:<id>:url), and the internal-URL
|
||||
// key (ws:<id>:internal_url) must be three DISTINCT keys for the same
|
||||
// workspace. If a future edit collapses the format strings, IsOnline would
|
||||
// read a URL as liveness (or vice versa) and online-detection / proxy
|
||||
// resolution would corrupt each other fleet-wide.
|
||||
func TestKeyNamespacesDoNotCollide(t *testing.T) {
|
||||
mr := withMiniRedis(t)
|
||||
ctx := context.Background()
|
||||
const ws = "ws-collide"
|
||||
|
||||
if err := SetOnline(ctx, ws); err != nil {
|
||||
t.Fatalf("SetOnline: %v", err)
|
||||
}
|
||||
if err := CacheURL(ctx, ws, "https://public"); err != nil {
|
||||
t.Fatalf("CacheURL: %v", err)
|
||||
}
|
||||
if err := CacheInternalURL(ctx, ws, "http://internal:8080"); err != nil {
|
||||
t.Fatalf("CacheInternalURL: %v", err)
|
||||
}
|
||||
|
||||
// Liveness value must still be "online", NOT a URL.
|
||||
if v, _ := mr.Get("ws:" + ws); v != "online" {
|
||||
t.Errorf("liveness key clobbered by a URL write: got %q", v)
|
||||
}
|
||||
if v, _ := mr.Get("ws:" + ws + ":url"); v != "https://public" {
|
||||
t.Errorf("url key = %q, want https://public", v)
|
||||
}
|
||||
if v, _ := mr.Get("ws:" + ws + ":internal_url"); v != "http://internal:8080" {
|
||||
t.Errorf("internal_url key = %q, want http://internal:8080", v)
|
||||
}
|
||||
}
|
||||
|
||||
// TestClearWorkspaceKeys_RemovesAllThree proves teardown removes the
|
||||
// liveness, URL, and internal-URL keys together — a leaked liveness key
|
||||
// after deletion would keep a dead workspace looking online; a leaked URL
|
||||
// key would let the proxy forward to a recycled address.
|
||||
func TestClearWorkspaceKeys_RemovesAllThree(t *testing.T) {
|
||||
mr := withMiniRedis(t)
|
||||
ctx := context.Background()
|
||||
const ws = "ws-clear"
|
||||
|
||||
if err := SetOnline(ctx, ws); err != nil {
|
||||
t.Fatalf("SetOnline: %v", err)
|
||||
}
|
||||
if err := CacheURL(ctx, ws, "https://x"); err != nil {
|
||||
t.Fatalf("CacheURL: %v", err)
|
||||
}
|
||||
if err := CacheInternalURL(ctx, ws, "http://x:8080"); err != nil {
|
||||
t.Fatalf("CacheInternalURL: %v", err)
|
||||
}
|
||||
|
||||
ClearWorkspaceKeys(ctx, ws)
|
||||
|
||||
for _, k := range []string{"ws:" + ws, "ws:" + ws + ":url", "ws:" + ws + ":internal_url"} {
|
||||
if mr.Exists(k) {
|
||||
t.Errorf("key %q survived ClearWorkspaceKeys", k)
|
||||
}
|
||||
}
|
||||
online, err := IsOnline(ctx, ws)
|
||||
if err != nil {
|
||||
t.Fatalf("IsOnline: %v", err)
|
||||
}
|
||||
if online {
|
||||
t.Error("workspace still online after ClearWorkspaceKeys")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user