molecule-core/workspace-server/cmd/memory-plugin-postgres/main.go
claude-ceo-assistant 3501e6bfd7 fix(post-suspension): vanity import paths go.moleculesai.app/core/{platform,tests/harness/cp-stub} (closes molecule-ai/internal#71 phase 2)
Migrates the two Go modules under molecule-core off the dead
github.com/Molecule-AI/molecule-monorepo/... identity onto the vanity
host go.moleculesai.app. Also fixes the historical naming
inconsistency where the Gitea repo is molecule-core but the Go module
path said molecule-monorepo.

Module changes:
- workspace-server/go.mod:
    github.com/Molecule-AI/molecule-monorepo/platform
    -> go.moleculesai.app/core/platform
- tests/harness/cp-stub/go.mod:
    github.com/Molecule-AI/molecule-monorepo/tests/harness/cp-stub
    -> go.moleculesai.app/core/tests/harness/cp-stub

Surfaces touched
- 174 *.go files (374 import lines) — every import under
  workspace-server/ + tests/harness/cp-stub/
- 2 Dockerfiles (workspace-server/Dockerfile + Dockerfile.tenant) —
  -ldflags strings updated in lockstep with the module rename so
  buildinfo.GitSHA injection still resolves correctly
- README + docs + scripts + comment URLs to git.moleculesai.app form
- NEW workspace-server/internal/lint/import_path_lint_test.go —
  structural lint gate rejecting future github.com/Molecule-AI/ or
  Molecule-AI/molecule-monorepo references. Identical template to the
  other migration PRs (plugin-gh-identity#3, molecule-cli#2,
  molecule-controlplane#32).

Cross-repo dep allowlist (documented in lint gate)
workspace-server requires molecule-ai-plugin-gh-identity, whose own
vanity migration is PR molecule-ai-plugin-gh-identity#3. Until that PR
merges + a tag is cut at go.moleculesai.app/plugin/gh-identity, the
two locations referencing the legacy github.com path
(workspace-server/go.mod require, cmd/server/main.go import) remain
allowlisted. Follow-up PR drops the allowlist + updates both refs in
one shot once gh-identity is fully migrated.

Test plan
- go build ./... clean for both modules
- go test ./... green except two pre-existing failures
  (TestStartSweeper_RecordsMetricsOnSuccess flaky-on-suite,
  TestLocalResolver_BubblesUpCopyFailure relies on read-only fs perms
  but runs as root on operator host) — both reproduce identically on
  baseline main pre-migration; NOT regressions of this PR
- Mutation-tested: lint gate fails on canaries in .go + .md;
  allowlist correctly suppresses cross-repo dep references in go.mod
  while still flagging unrelated additions

Open dependency
- go.moleculesai.app responder must be deployed before fresh-clone
  external builds resolve the vanity path. Existing CI / Docker builds
  ride pinned go.sum + self-referential module path + responder is
  not on critical path for those.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 22:37:42 +00:00

240 lines
6.8 KiB
Go

// memory-plugin-postgres is the built-in implementation of the memory
// plugin contract (RFC #2728). Operators run it next to workspace-
// server; workspace-server points MEMORY_PLUGIN_URL at it.
//
// Owns its own postgres tables (see migrations/). When an operator
// swaps in a different plugin, this binary's tables become orphaned
// — not auto-dropped. Document this in the plugin docs (PR-10).
package main
import (
"context"
"database/sql"
"embed"
"errors"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"sort"
"strings"
"syscall"
"time"
_ "github.com/lib/pq"
"go.moleculesai.app/core/platform/internal/memory/pgplugin"
)
// migrationsFS bundles the .up.sql files into the binary at build time
// so the prebuilt image doesn't need the source tree at runtime. The
// prior `os.ReadDir("cmd/memory-plugin-postgres/migrations")` path
// only resolved during `go test` from the repo root — in the published
// image the path didn't exist and boot failed after the 30s health gate
// (caught on staging redeploy 2026-05-05 after PR #2906).
//
//go:embed migrations/*.up.sql
var migrationsFS embed.FS
const (
envDatabaseURL = "MEMORY_PLUGIN_DATABASE_URL"
envListenAddr = "MEMORY_PLUGIN_LISTEN_ADDR"
envSkipMigrate = "MEMORY_PLUGIN_SKIP_MIGRATE"
// Loopback-only by default (defense in depth). The platform talks to
// the plugin over `http://localhost:9100` from the same container, so
// binding to all interfaces would only widen the reachable surface
// without enabling any in-design caller. Operators running the plugin
// on a separate host override via MEMORY_PLUGIN_LISTEN_ADDR=:9100 (or
// some other interface).
defaultListenAddr = "127.0.0.1:9100"
)
func main() {
if err := run(); err != nil {
log.Fatalf("memory-plugin-postgres: %v", err)
}
}
// run is the boot path. Extracted from main() so tests can drive it
// with synthesized env. Returns nil on graceful shutdown, an error on
// failure to bring up.
func run() error {
cfg, err := loadConfig()
if err != nil {
return fmt.Errorf("config: %w", err)
}
db, err := openDB(cfg.DatabaseURL)
if err != nil {
return fmt.Errorf("open db: %w", err)
}
defer db.Close()
if !cfg.SkipMigrate {
if err := runMigrations(db); err != nil {
return fmt.Errorf("migrate: %w", err)
}
}
store := pgplugin.NewStore(db)
handler := pgplugin.NewHandler(store, func() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
return db.PingContext(ctx)
})
srv := &http.Server{
Addr: cfg.ListenAddr,
Handler: handler,
ReadHeaderTimeout: 5 * time.Second,
}
// Listen separately so we can log the bound port (handy when
// :0 is used in tests).
ln, err := net.Listen("tcp", cfg.ListenAddr)
if err != nil {
return fmt.Errorf("listen %s: %w", cfg.ListenAddr, err)
}
log.Printf("memory-plugin-postgres listening on %s", ln.Addr())
// Run server in a goroutine; main waits on signal.
errCh := make(chan error, 1)
go func() {
if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- err
}
}()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigCh:
log.Println("shutdown signal received")
case err := <-errCh:
return fmt.Errorf("serve: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
return srv.Shutdown(ctx)
}
type config struct {
DatabaseURL string
ListenAddr string
SkipMigrate bool
}
func loadConfig() (*config, error) {
dbURL := strings.TrimSpace(os.Getenv(envDatabaseURL))
if dbURL == "" {
return nil, fmt.Errorf("%s is required", envDatabaseURL)
}
addr := strings.TrimSpace(os.Getenv(envListenAddr))
if addr == "" {
addr = defaultListenAddr
}
return &config{
DatabaseURL: dbURL,
ListenAddr: addr,
SkipMigrate: os.Getenv(envSkipMigrate) == "1",
}, nil
}
func openDB(databaseURL string) (*sql.DB, error) {
db, err := sql.Open("postgres", databaseURL)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
db.SetConnMaxLifetime(30 * time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, fmt.Errorf("ping: %w", err)
}
return db, nil
}
// runMigrations applies the schema migrations bundled into the binary
// via go:embed (see migrationsFS at the top of this file). Idempotent
// on repeat boot — every migration file uses CREATE … IF NOT EXISTS.
//
// The down migrations are deliberately NOT applied here — that's a
// manual operator action. This keeps the binary tiny and avoids
// dragging in golang-migrate's drivers.
//
// MEMORY_PLUGIN_MIGRATIONS_DIR (filesystem path) is honored as an
// override for operators who need to ship custom migrations alongside
// the binary without rebuilding. When unset (the common case) we read
// from the embedded FS.
func runMigrations(db *sql.DB) error {
if dir := strings.TrimSpace(os.Getenv("MEMORY_PLUGIN_MIGRATIONS_DIR")); dir != "" {
return runMigrationsFromDisk(db, dir)
}
return runMigrationsFromEmbed(db)
}
// runMigrationsFromEmbed applies the *.up.sql files bundled into the
// binary at build time. Order is alphabetical (matches the on-disk
// behavior of os.ReadDir on Linux for the same set of names).
func runMigrationsFromEmbed(db *sql.DB) error {
entries, err := migrationsFS.ReadDir("migrations")
if err != nil {
return fmt.Errorf("read embedded migrations: %w", err)
}
names := make([]string, 0, len(entries))
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") {
continue
}
names = append(names, e.Name())
}
sort.Strings(names)
for _, name := range names {
data, err := migrationsFS.ReadFile("migrations/" + name)
if err != nil {
return fmt.Errorf("read embedded %q: %w", name, err)
}
if _, err := db.Exec(string(data)); err != nil {
return fmt.Errorf("apply %q: %w", name, err)
}
log.Printf("applied embedded migration %s", name)
}
return nil
}
// runMigrationsFromDisk preserves the legacy filesystem-path mode for
// operator-supplied custom migrations.
func runMigrationsFromDisk(db *sql.DB, dir string) error {
entries, err := os.ReadDir(dir)
if err != nil {
return fmt.Errorf("read migrations dir %q: %w", dir, err)
}
names := make([]string, 0, len(entries))
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") {
continue
}
names = append(names, e.Name())
}
sort.Strings(names)
for _, name := range names {
path := dir + "/" + name
data, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("read %q: %w", path, err)
}
if _, err := db.Exec(string(data)); err != nil {
return fmt.Errorf("apply %q: %w", path, err)
}
log.Printf("applied disk migration %s (from %s)", name, dir)
}
return nil
}