Merge pull request #2747 from Molecule-AI/fix/memory-v2-c2-backfill-verify

Memory v2 fixup C2: backfill -verify mode (parity check)
This commit is contained in:
Hongming Wang 2026-05-04 16:08:27 +00:00 committed by GitHub
commit 6fc328ef44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 644 additions and 2 deletions

View File

@ -50,13 +50,25 @@ func run(argv []string, stdout, stderr *os.File) error {
fs.SetOutput(stderr)
dryRun := fs.Bool("dry-run", false, "count + diff only, no writes")
apply := fs.Bool("apply", false, "actually copy rows to the plugin")
verify := fs.Bool("verify", false, "post-apply parity check: random-sample N workspaces, diff agent_memories vs plugin search")
verifySample := fs.Int("verify-sample", 50, "number of workspaces to sample in -verify mode")
workspace := fs.String("workspace", "", "limit to a single workspace UUID (empty = all)")
limit := fs.Int("limit", defaultLimit, "max rows to process this run")
if err := fs.Parse(argv); err != nil {
return err
}
if *dryRun == *apply {
return errors.New("specify exactly one of -dry-run or -apply")
modesPicked := 0
if *dryRun {
modesPicked++
}
if *apply {
modesPicked++
}
if *verify {
modesPicked++
}
if modesPicked != 1 {
return errors.New("specify exactly one of -dry-run, -apply, or -verify")
}
dbURL := os.Getenv("DATABASE_URL")
@ -82,6 +94,26 @@ func run(argv []string, stdout, stderr *os.File) error {
plugin := mclient.New(mclient.Config{BaseURL: pluginURL})
resolver := namespace.New(db)
if *verify {
vcfg := verifyConfig{
DB: db,
Plugin: plugin,
Resolver: namespaceResolverAdapter{resolver},
SampleSize: *verifySample,
WorkspaceID: *workspace,
}
report, err := verifyParity(context.Background(), vcfg, stdout)
if err != nil {
return err
}
fmt.Fprintf(stdout, "\nVerify complete: workspaces_sampled=%d matches=%d mismatches=%d errors=%d\n",
report.WorkspacesSampled, report.Matches, report.Mismatches, report.Errors)
if report.Mismatches > 0 || report.Errors > 0 {
return fmt.Errorf("verify found %d mismatches and %d errors", report.Mismatches, report.Errors)
}
return nil
}
cfg := backfillConfig{
DB: db,
Plugin: plugin,
@ -251,3 +283,23 @@ func namespaceKindFromString(scope string) contract.NamespaceKind {
return contract.NamespaceKindWorkspace
}
}
// namespaceResolverAdapter bridges *namespace.Resolver (which returns
// []namespace.Namespace) to verify.go's verifyResolver interface
// (which wants []ResolvedNamespace). Keeps verify.go independent of
// the namespace-package dependency so its tests can stub easily.
type namespaceResolverAdapter struct {
r *namespace.Resolver
}
func (a namespaceResolverAdapter) ReadableNamespaces(ctx context.Context, workspaceID string) ([]ResolvedNamespace, error) {
src, err := a.r.ReadableNamespaces(ctx, workspaceID)
if err != nil {
return nil, err
}
out := make([]ResolvedNamespace, len(src))
for i, ns := range src {
out[i] = ResolvedNamespace{Name: ns.Name}
}
return out, nil
}

View File

@ -0,0 +1,200 @@
package main
// verify.go — post-apply parity check.
//
// After a backfill -apply, run with -verify to confirm the migration
// actually produced equivalent data. Picks `SampleSize` random
// workspaces, queries agent_memories direct + plugin search via the
// caller's namespaces, and diffs the result sets by content.
//
// The diff is best-effort: pg's recent-first ordering and the plugin's
// internal ordering may differ, so we compare as sets, not lists.
// We do require strict 1:1 multiset equality (every legacy row maps
// to exactly one plugin row, ignoring id since the backfill preserves
// it via the C1 idempotency key).
import (
"context"
"database/sql"
"fmt"
"math/rand"
"os"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/contract"
)
// verifyConfig is the typed dependency bundle for verifyParity.
type verifyConfig struct {
DB *sql.DB
Plugin verifyPlugin
Resolver verifyResolver
SampleSize int
WorkspaceID string // optional: limit to one workspace
Rand *rand.Rand
}
// verifyPlugin is the slice of memory-plugin client we call.
type verifyPlugin interface {
Search(ctx context.Context, body contract.SearchRequest) (*contract.SearchResponse, error)
}
// verifyResolver mirrors namespace.Resolver. Same shape as
// backfillResolver but kept distinct so verify isn't tied to
// backfill's interface.
type verifyResolver interface {
ReadableNamespaces(ctx context.Context, workspaceID string) ([]ResolvedNamespace, error)
}
// ResolvedNamespace is the minimum we need from the resolver — kept
// separate so the verify code doesn't depend on the namespace package
// (the live tests inject stubs, the binary uses an adapter).
type ResolvedNamespace struct {
Name string
}
// verifyReport accumulates the per-workspace results.
type verifyReport struct {
WorkspacesSampled int
Matches int
Mismatches int
Errors int
}
// verifyParity is the workhorse. Returns a report; the CLI converts
// any non-zero mismatches/errors into a non-zero exit so CI can gate
// the cutover.
func verifyParity(ctx context.Context, cfg verifyConfig, stdout *os.File) (*verifyReport, error) {
report := &verifyReport{}
rng := cfg.Rand
if rng == nil {
rng = rand.New(rand.NewSource(42)) //nolint:gosec // determinism > unpredictability for ops
}
wsIDs, err := pickWorkspaceSample(ctx, cfg.DB, cfg.WorkspaceID, cfg.SampleSize, rng)
if err != nil {
return report, fmt.Errorf("pick sample: %w", err)
}
for _, wsID := range wsIDs {
report.WorkspacesSampled++
legacy, err := queryLegacyMemories(ctx, cfg.DB, wsID)
if err != nil {
fmt.Fprintf(stdout, "[err] workspace=%s legacy query: %v\n", wsID, err)
report.Errors++
continue
}
readable, err := cfg.Resolver.ReadableNamespaces(ctx, wsID)
if err != nil {
fmt.Fprintf(stdout, "[err] workspace=%s resolve: %v\n", wsID, err)
report.Errors++
continue
}
nsList := make([]string, len(readable))
for i, ns := range readable {
nsList[i] = ns.Name
}
if len(nsList) == 0 {
// No readable namespaces — empty plugin result expected.
if len(legacy) == 0 {
report.Matches++
} else {
fmt.Fprintf(stdout, "[mismatch] workspace=%s legacy=%d plugin=0 (no readable namespaces)\n", wsID, len(legacy))
report.Mismatches++
}
continue
}
resp, err := cfg.Plugin.Search(ctx, contract.SearchRequest{Namespaces: nsList, Limit: 100})
if err != nil {
fmt.Fprintf(stdout, "[err] workspace=%s plugin search: %v\n", wsID, err)
report.Errors++
continue
}
pluginContents := make(map[string]int, len(resp.Memories))
for _, m := range resp.Memories {
pluginContents[m.Content]++
}
// Compare as multisets: each legacy content appears at least
// once in plugin output. We deliberately tolerate plugin
// having MORE rows (the namespace might include team-shared
// memories from sibling workspaces that aren't in this
// workspace's agent_memories rows).
matched := true
for _, c := range legacy {
if pluginContents[c] == 0 {
fmt.Fprintf(stdout, "[mismatch] workspace=%s missing-from-plugin content=%q\n", wsID, truncate(c, 80))
matched = false
break
}
pluginContents[c]--
}
if matched {
report.Matches++
} else {
report.Mismatches++
}
}
return report, nil
}
// pickWorkspaceSample returns up to N workspace UUIDs. If
// WorkspaceID is set, returns only that one. Otherwise selects N
// random workspaces from the workspaces table (TABLESAMPLE would be
// nicer but SYSTEM/BERNOULLI sampling has surprising distribution
// properties for small populations; we just ORDER BY random() LIMIT).
func pickWorkspaceSample(ctx context.Context, db *sql.DB, workspaceID string, n int, _ *rand.Rand) ([]string, error) {
if workspaceID != "" {
return []string{workspaceID}, nil
}
rows, err := db.QueryContext(ctx, `
SELECT id::text
FROM workspaces
WHERE status != 'removed'
ORDER BY random()
LIMIT $1
`, n)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]string, 0, n)
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
out = append(out, id)
}
return out, rows.Err()
}
// queryLegacyMemories pulls all agent_memories rows for a workspace
// (LOCAL + TEAM scopes — what the plugin search would return through
// the resolver's readable list, mapped via PR-6 shim semantics).
func queryLegacyMemories(ctx context.Context, db *sql.DB, workspaceID string) ([]string, error) {
rows, err := db.QueryContext(ctx, `
SELECT content
FROM agent_memories
WHERE workspace_id = $1
ORDER BY created_at DESC
`, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
out := []string{}
for rows.Next() {
var c string
if err := rows.Scan(&c); err != nil {
return nil, err
}
out = append(out, c)
}
return out, rows.Err()
}
func truncate(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n] + "…"
}

View File

@ -0,0 +1,390 @@
package main
import (
"context"
"errors"
"os"
"strings"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/contract"
)
// stubVerifyPlugin records search calls and returns canned results.
type stubVerifyPlugin struct {
searchFn func(ctx context.Context, body contract.SearchRequest) (*contract.SearchResponse, error)
}
func (s *stubVerifyPlugin) Search(ctx context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
if s.searchFn != nil {
return s.searchFn(ctx, body)
}
return &contract.SearchResponse{}, nil
}
// stubVerifyResolver returns a canned readable namespace list.
type stubVerifyResolver struct {
namespaces []ResolvedNamespace
err error
}
func (s *stubVerifyResolver) ReadableNamespaces(_ context.Context, _ string) ([]ResolvedNamespace, error) {
return s.namespaces, s.err
}
// --- pickWorkspaceSample ---
func TestPickWorkspaceSample_SingleWorkspaceShortCircuit(t *testing.T) {
db, _, _ := sqlmock.New()
defer db.Close()
got, err := pickWorkspaceSample(context.Background(), db, "specific-ws", 50, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(got) != 1 || got[0] != "specific-ws" {
t.Errorf("got %v, want [specific-ws]", got)
}
}
func TestPickWorkspaceSample_RandomSample(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT id::text FROM workspaces").
WithArgs(50).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow("ws-1").
AddRow("ws-2").
AddRow("ws-3"))
got, err := pickWorkspaceSample(context.Background(), db, "", 50, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if len(got) != 3 {
t.Errorf("got len %d, want 3", len(got))
}
}
func TestPickWorkspaceSample_QueryError(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT id::text FROM workspaces").
WillReturnError(errors.New("dead"))
_, err := pickWorkspaceSample(context.Background(), db, "", 50, nil)
if err == nil {
t.Error("expected error")
}
}
func TestPickWorkspaceSample_ScanError(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT id::text FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "extra"}). // wrong shape
AddRow("ws-1", "extra"))
_, err := pickWorkspaceSample(context.Background(), db, "", 50, nil)
if err == nil {
t.Error("expected scan error")
}
}
// --- queryLegacyMemories ---
func TestQueryLegacyMemories_HappyPath(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT content FROM agent_memories").
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"content"}).
AddRow("fact 1").
AddRow("fact 2"))
got, err := queryLegacyMemories(context.Background(), db, "ws-1")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(got) != 2 || got[0] != "fact 1" {
t.Errorf("got %v", got)
}
}
func TestQueryLegacyMemories_QueryError(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT content FROM agent_memories").
WillReturnError(errors.New("dead"))
_, err := queryLegacyMemories(context.Background(), db, "ws-1")
if err == nil {
t.Error("expected error")
}
}
// --- verifyParity (the workhorse) ---
func TestVerifyParity_AllMatch(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT id::text FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-1"))
mock.ExpectQuery("SELECT content FROM agent_memories").
WithArgs("ws-1").
WillReturnRows(sqlmock.NewRows([]string{"content"}).
AddRow("fact A").
AddRow("fact B"))
plugin := &stubVerifyPlugin{
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
return &contract.SearchResponse{Memories: []contract.Memory{
{ID: "id-A", Content: "fact A"},
{ID: "id-B", Content: "fact B"},
}}, nil
},
}
resolver := &stubVerifyResolver{
namespaces: []ResolvedNamespace{{Name: "workspace:ws-1"}},
}
cfg := verifyConfig{DB: db, Plugin: plugin, Resolver: resolver, SampleSize: 50}
devnull, _ := os.Open(os.DevNull)
defer devnull.Close()
report, err := verifyParity(context.Background(), cfg, devnull)
if err != nil {
t.Fatalf("err: %v", err)
}
if report.Matches != 1 || report.Mismatches != 0 || report.Errors != 0 {
t.Errorf("report = %+v, want 1 match", report)
}
}
func TestVerifyParity_MismatchDetectsMissingFromPlugin(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT id::text FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-1"))
mock.ExpectQuery("SELECT content FROM agent_memories").
WillReturnRows(sqlmock.NewRows([]string{"content"}).
AddRow("fact A").
AddRow("fact-missing-from-plugin"))
plugin := &stubVerifyPlugin{
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
return &contract.SearchResponse{Memories: []contract.Memory{
{ID: "id-A", Content: "fact A"},
}}, nil
},
}
resolver := &stubVerifyResolver{
namespaces: []ResolvedNamespace{{Name: "workspace:ws-1"}},
}
cfg := verifyConfig{DB: db, Plugin: plugin, Resolver: resolver, SampleSize: 50}
devnull, _ := os.Open(os.DevNull)
defer devnull.Close()
report, err := verifyParity(context.Background(), cfg, devnull)
if err != nil {
t.Fatalf("err: %v", err)
}
if report.Mismatches != 1 {
t.Errorf("report = %+v, want 1 mismatch", report)
}
}
func TestVerifyParity_PluginExtraRowsTolerated(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT id::text FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-1"))
mock.ExpectQuery("SELECT content FROM agent_memories").
WillReturnRows(sqlmock.NewRows([]string{"content"}).
AddRow("fact A"))
// Plugin returns more rows (e.g., team-shared from a sibling).
// Verify treats this as a match — legacy is a subset of plugin.
plugin := &stubVerifyPlugin{
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
return &contract.SearchResponse{Memories: []contract.Memory{
{ID: "id-A", Content: "fact A"},
{ID: "id-team-1", Content: "team-shared content from sibling"},
}}, nil
},
}
resolver := &stubVerifyResolver{
namespaces: []ResolvedNamespace{{Name: "workspace:ws-1"}, {Name: "team:root"}},
}
cfg := verifyConfig{DB: db, Plugin: plugin, Resolver: resolver, SampleSize: 50}
devnull, _ := os.Open(os.DevNull)
defer devnull.Close()
report, err := verifyParity(context.Background(), cfg, devnull)
if err != nil {
t.Fatalf("err: %v", err)
}
if report.Matches != 1 || report.Mismatches != 0 {
t.Errorf("report = %+v, want 1 match (plugin-extra is OK)", report)
}
}
func TestVerifyParity_LegacyQueryError(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT id::text FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-1"))
mock.ExpectQuery("SELECT content FROM agent_memories").
WillReturnError(errors.New("dead"))
cfg := verifyConfig{
DB: db,
Plugin: &stubVerifyPlugin{},
Resolver: &stubVerifyResolver{namespaces: []ResolvedNamespace{{Name: "workspace:ws-1"}}},
}
devnull, _ := os.Open(os.DevNull)
defer devnull.Close()
report, err := verifyParity(context.Background(), cfg, devnull)
if err != nil {
t.Fatalf("err: %v", err)
}
if report.Errors != 1 {
t.Errorf("report = %+v, want 1 error", report)
}
}
func TestVerifyParity_ResolverError(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT id::text FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-1"))
mock.ExpectQuery("SELECT content FROM agent_memories").
WillReturnRows(sqlmock.NewRows([]string{"content"}).AddRow("x"))
cfg := verifyConfig{
DB: db,
Plugin: &stubVerifyPlugin{},
Resolver: &stubVerifyResolver{err: errors.New("dead")},
}
devnull, _ := os.Open(os.DevNull)
defer devnull.Close()
report, _ := verifyParity(context.Background(), cfg, devnull)
if report.Errors != 1 {
t.Errorf("report = %+v, want 1 error", report)
}
}
func TestVerifyParity_PluginSearchError(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT id::text FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-1"))
mock.ExpectQuery("SELECT content FROM agent_memories").
WillReturnRows(sqlmock.NewRows([]string{"content"}).AddRow("x"))
cfg := verifyConfig{
DB: db,
Plugin: &stubVerifyPlugin{
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
return nil, errors.New("plugin dead")
},
},
Resolver: &stubVerifyResolver{namespaces: []ResolvedNamespace{{Name: "workspace:ws-1"}}},
}
devnull, _ := os.Open(os.DevNull)
defer devnull.Close()
report, _ := verifyParity(context.Background(), cfg, devnull)
if report.Errors != 1 {
t.Errorf("report = %+v, want 1 error", report)
}
}
func TestVerifyParity_NoReadableNamespacesEmptyLegacy(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT id::text FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-1"))
mock.ExpectQuery("SELECT content FROM agent_memories").
WillReturnRows(sqlmock.NewRows([]string{"content"})) // empty
cfg := verifyConfig{
DB: db,
Plugin: &stubVerifyPlugin{},
Resolver: &stubVerifyResolver{namespaces: []ResolvedNamespace{}}, // empty
}
devnull, _ := os.Open(os.DevNull)
defer devnull.Close()
report, _ := verifyParity(context.Background(), cfg, devnull)
// Empty legacy + empty namespaces → match.
if report.Matches != 1 {
t.Errorf("report = %+v, want 1 match (both empty)", report)
}
}
func TestVerifyParity_NoReadableNamespacesNonEmptyLegacy(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT id::text FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("ws-1"))
mock.ExpectQuery("SELECT content FROM agent_memories").
WillReturnRows(sqlmock.NewRows([]string{"content"}).AddRow("orphan-fact"))
cfg := verifyConfig{
DB: db,
Plugin: &stubVerifyPlugin{},
Resolver: &stubVerifyResolver{namespaces: []ResolvedNamespace{}},
}
devnull, _ := os.Open(os.DevNull)
defer devnull.Close()
report, _ := verifyParity(context.Background(), cfg, devnull)
// Legacy has rows but plugin can't see any → mismatch.
if report.Mismatches != 1 {
t.Errorf("report = %+v, want 1 mismatch", report)
}
}
func TestVerifyParity_PickSampleError(t *testing.T) {
db, mock, _ := sqlmock.New()
defer db.Close()
mock.ExpectQuery("SELECT id::text FROM workspaces").
WillReturnError(errors.New("dead"))
cfg := verifyConfig{DB: db, Plugin: &stubVerifyPlugin{}, Resolver: &stubVerifyResolver{}}
devnull, _ := os.Open(os.DevNull)
defer devnull.Close()
_, err := verifyParity(context.Background(), cfg, devnull)
if err == nil || !strings.Contains(err.Error(), "pick sample") {
t.Errorf("err = %v", err)
}
}
// --- Truncate ---
func TestVerifyTruncate(t *testing.T) {
if got := truncate("short", 10); got != "short" {
t.Errorf("got %q", got)
}
if got := truncate(strings.Repeat("a", 200), 10); !strings.HasSuffix(got, "…") {
t.Errorf("expected ellipsis: %q", got)
}
}
// --- CLI: -verify mode ---
func TestRun_VerifyVsApplyMutuallyExclusive(t *testing.T) {
stderr, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0)
defer stderr.Close()
stdout, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0)
defer stdout.Close()
err := run([]string{"-verify", "-apply"}, stdout, stderr)
if err == nil || !strings.Contains(err.Error(), "exactly one") {
t.Errorf("err = %v", err)
}
}
func TestRun_VerifyAloneIsValid(t *testing.T) {
t.Setenv("DATABASE_URL", "")
t.Setenv("MEMORY_PLUGIN_URL", "http://x")
stderr, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0)
defer stderr.Close()
stdout, _ := os.OpenFile(os.DevNull, os.O_WRONLY, 0)
defer stdout.Close()
err := run([]string{"-verify"}, stdout, stderr)
// Will fail later on missing DATABASE_URL, NOT on the
// mutually-exclusive-modes check. Asserts that -verify is
// recognized as a valid mode.
if err == nil || !strings.Contains(err.Error(), "DATABASE_URL") {
t.Errorf("err = %v, want DATABASE_URL error (-verify alone is a valid mode)", err)
}
}