8019231a16
ci-arm64-advisory / fast-checks (push) Waiting to run
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (push) Successful in 8s
Block internal-flavored paths / Block forbidden paths (push) Successful in 8s
CI / Detect changes (push) Successful in 9s
CI / Python Lint & Test (push) Successful in 5s
E2E API Smoke Test / detect-changes (push) Successful in 9s
E2E Chat / detect-changes (push) Successful in 8s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (local) (push) Successful in 49s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 12s
publish-workspace-server-image / build-and-push (push) Successful in 3m12s
E2E Staging SaaS (full lifecycle) / pr-validate (push) Successful in 39s
Handlers Postgres Integration / detect-changes (push) Successful in 4s
Harness Replays / detect-changes (push) Successful in 5s
Lint curl status-code capture / Scan workflows for curl status-capture pollution (push) Successful in 6s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (push) Successful in 4s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (push) Successful in 3s
lint-required-workflows-docker-host-pinned / Lint docker-host pin on docker-touching workflows (push) Successful in 3s
lint-continue-on-error-tracking / lint-continue-on-error-tracking (push) Successful in 1m6s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 14s
CI / Canvas (Next.js) (push) Successful in 3s
CI / Shellcheck (E2E scripts) (push) Successful in 2s
Lint workflow YAML (Gitea-1.22.6-hostile shapes) / Lint workflow YAML for Gitea-1.22.6-hostile shapes (push) Successful in 1m25s
E2E Peer Visibility (literal MCP list_peers) / E2E Peer Visibility (push) Successful in 5m19s
E2E Staging External Runtime / E2E Staging External Runtime (push) Successful in 5m30s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 2m23s
E2E Staging SaaS (full lifecycle) / E2E Staging SaaS (push) Successful in 6m5s
E2E Chat / E2E Chat (push) Successful in 4m6s
CI / Platform (Go) (push) Successful in 5m0s
CI / all-required (push) Successful in 9m45s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 2s
publish-workspace-server-image / Production auto-deploy (push) Successful in 8m32s
Harness Replays / Harness Replays (push) Successful in 12s
CI / Canvas Deploy Reminder (push) Successful in 2s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 1m37s
Sweep stale Cloudflare Tunnels / Sweep CF tunnels (push) Successful in 8s
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 12s
Staging SaaS smoke (every 30 min) / Staging SaaS smoke (push) Successful in 5m9s
main-red-watchdog / watchdog (push) Successful in 32s
gate-check-v3 / gate-check (push) Successful in 25s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Successful in 6m10s
CTO-bypass merge 2026-05-24: #1760 Go module rename to git.moleculesai.app path
240 lines
6.8 KiB
Go
240 lines
6.8 KiB
Go
// memory-plugin-postgres is the built-in implementation of the memory
|
|
// plugin contract (RFC #2728). Operators run it next to workspace-
|
|
// server; workspace-server points MEMORY_PLUGIN_URL at it.
|
|
//
|
|
// Owns its own postgres tables (see migrations/). When an operator
|
|
// swaps in a different plugin, this binary's tables become orphaned
|
|
// — not auto-dropped. Document this in the plugin docs (PR-10).
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"embed"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"sort"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
_ "github.com/lib/pq"
|
|
|
|
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/memory/pgplugin"
|
|
)
|
|
|
|
// migrationsFS bundles the .up.sql files into the binary at build time
|
|
// so the prebuilt image doesn't need the source tree at runtime. The
|
|
// prior `os.ReadDir("cmd/memory-plugin-postgres/migrations")` path
|
|
// only resolved during `go test` from the repo root — in the published
|
|
// image the path didn't exist and boot failed after the 30s health gate
|
|
// (caught on staging redeploy 2026-05-05 after PR #2906).
|
|
//
|
|
//go:embed migrations/*.up.sql
|
|
var migrationsFS embed.FS
|
|
|
|
const (
|
|
envDatabaseURL = "MEMORY_PLUGIN_DATABASE_URL"
|
|
envListenAddr = "MEMORY_PLUGIN_LISTEN_ADDR"
|
|
envSkipMigrate = "MEMORY_PLUGIN_SKIP_MIGRATE"
|
|
|
|
// Loopback-only by default (defense in depth). The platform talks to
|
|
// the plugin over `http://localhost:9100` from the same container, so
|
|
// binding to all interfaces would only widen the reachable surface
|
|
// without enabling any in-design caller. Operators running the plugin
|
|
// on a separate host override via MEMORY_PLUGIN_LISTEN_ADDR=:9100 (or
|
|
// some other interface).
|
|
defaultListenAddr = "127.0.0.1:9100"
|
|
)
|
|
|
|
func main() {
|
|
if err := run(); err != nil {
|
|
log.Fatalf("memory-plugin-postgres: %v", err)
|
|
}
|
|
}
|
|
|
|
// run is the boot path. Extracted from main() so tests can drive it
|
|
// with synthesized env. Returns nil on graceful shutdown, an error on
|
|
// failure to bring up.
|
|
func run() error {
|
|
cfg, err := loadConfig()
|
|
if err != nil {
|
|
return fmt.Errorf("config: %w", err)
|
|
}
|
|
|
|
db, err := openDB(cfg.DatabaseURL)
|
|
if err != nil {
|
|
return fmt.Errorf("open db: %w", err)
|
|
}
|
|
defer db.Close()
|
|
|
|
if !cfg.SkipMigrate {
|
|
if err := runMigrations(db); err != nil {
|
|
return fmt.Errorf("migrate: %w", err)
|
|
}
|
|
}
|
|
|
|
store := pgplugin.NewStore(db)
|
|
handler := pgplugin.NewHandler(store, func() error {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
return db.PingContext(ctx)
|
|
})
|
|
|
|
srv := &http.Server{
|
|
Addr: cfg.ListenAddr,
|
|
Handler: handler,
|
|
ReadHeaderTimeout: 5 * time.Second,
|
|
}
|
|
|
|
// Listen separately so we can log the bound port (handy when
|
|
// :0 is used in tests).
|
|
ln, err := net.Listen("tcp", cfg.ListenAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("listen %s: %w", cfg.ListenAddr, err)
|
|
}
|
|
log.Printf("memory-plugin-postgres listening on %s", ln.Addr())
|
|
|
|
// Run server in a goroutine; main waits on signal.
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
if err := srv.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
errCh <- err
|
|
}
|
|
}()
|
|
|
|
sigCh := make(chan os.Signal, 1)
|
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
select {
|
|
case <-sigCh:
|
|
log.Println("shutdown signal received")
|
|
case err := <-errCh:
|
|
return fmt.Errorf("serve: %w", err)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
return srv.Shutdown(ctx)
|
|
}
|
|
|
|
type config struct {
|
|
DatabaseURL string
|
|
ListenAddr string
|
|
SkipMigrate bool
|
|
}
|
|
|
|
func loadConfig() (*config, error) {
|
|
dbURL := strings.TrimSpace(os.Getenv(envDatabaseURL))
|
|
if dbURL == "" {
|
|
return nil, fmt.Errorf("%s is required", envDatabaseURL)
|
|
}
|
|
addr := strings.TrimSpace(os.Getenv(envListenAddr))
|
|
if addr == "" {
|
|
addr = defaultListenAddr
|
|
}
|
|
return &config{
|
|
DatabaseURL: dbURL,
|
|
ListenAddr: addr,
|
|
SkipMigrate: os.Getenv(envSkipMigrate) == "1",
|
|
}, nil
|
|
}
|
|
|
|
func openDB(databaseURL string) (*sql.DB, error) {
|
|
db, err := sql.Open("postgres", databaseURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
db.SetMaxOpenConns(25)
|
|
db.SetMaxIdleConns(5)
|
|
db.SetConnMaxLifetime(30 * time.Minute)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := db.PingContext(ctx); err != nil {
|
|
return nil, fmt.Errorf("ping: %w", err)
|
|
}
|
|
return db, nil
|
|
}
|
|
|
|
// runMigrations applies the schema migrations bundled into the binary
|
|
// via go:embed (see migrationsFS at the top of this file). Idempotent
|
|
// on repeat boot — every migration file uses CREATE … IF NOT EXISTS.
|
|
//
|
|
// The down migrations are deliberately NOT applied here — that's a
|
|
// manual operator action. This keeps the binary tiny and avoids
|
|
// dragging in golang-migrate's drivers.
|
|
//
|
|
// MEMORY_PLUGIN_MIGRATIONS_DIR (filesystem path) is honored as an
|
|
// override for operators who need to ship custom migrations alongside
|
|
// the binary without rebuilding. When unset (the common case) we read
|
|
// from the embedded FS.
|
|
func runMigrations(db *sql.DB) error {
|
|
if dir := strings.TrimSpace(os.Getenv("MEMORY_PLUGIN_MIGRATIONS_DIR")); dir != "" {
|
|
return runMigrationsFromDisk(db, dir)
|
|
}
|
|
return runMigrationsFromEmbed(db)
|
|
}
|
|
|
|
// runMigrationsFromEmbed applies the *.up.sql files bundled into the
|
|
// binary at build time. Order is alphabetical (matches the on-disk
|
|
// behavior of os.ReadDir on Linux for the same set of names).
|
|
func runMigrationsFromEmbed(db *sql.DB) error {
|
|
entries, err := migrationsFS.ReadDir("migrations")
|
|
if err != nil {
|
|
return fmt.Errorf("read embedded migrations: %w", err)
|
|
}
|
|
names := make([]string, 0, len(entries))
|
|
for _, e := range entries {
|
|
if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") {
|
|
continue
|
|
}
|
|
names = append(names, e.Name())
|
|
}
|
|
sort.Strings(names)
|
|
for _, name := range names {
|
|
data, err := migrationsFS.ReadFile("migrations/" + name)
|
|
if err != nil {
|
|
return fmt.Errorf("read embedded %q: %w", name, err)
|
|
}
|
|
if _, err := db.Exec(string(data)); err != nil {
|
|
return fmt.Errorf("apply %q: %w", name, err)
|
|
}
|
|
log.Printf("applied embedded migration %s", name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// runMigrationsFromDisk preserves the legacy filesystem-path mode for
|
|
// operator-supplied custom migrations.
|
|
func runMigrationsFromDisk(db *sql.DB, dir string) error {
|
|
entries, err := os.ReadDir(dir)
|
|
if err != nil {
|
|
return fmt.Errorf("read migrations dir %q: %w", dir, err)
|
|
}
|
|
names := make([]string, 0, len(entries))
|
|
for _, e := range entries {
|
|
if e.IsDir() || !strings.HasSuffix(e.Name(), ".up.sql") {
|
|
continue
|
|
}
|
|
names = append(names, e.Name())
|
|
}
|
|
sort.Strings(names)
|
|
for _, name := range names {
|
|
path := dir + "/" + name
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return fmt.Errorf("read %q: %w", path, err)
|
|
}
|
|
if _, err := db.Exec(string(data)); err != nil {
|
|
return fmt.Errorf("apply %q: %w", path, err)
|
|
}
|
|
log.Printf("applied disk migration %s (from %s)", name, dir)
|
|
}
|
|
return nil
|
|
}
|