From 7b0bd329575060761e2f3654ce8495dd1ac755c3 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Mon, 4 May 2026 08:15:10 -0700 Subject: [PATCH] =?UTF-8?q?Memory=20v2=20PR-8:=20cutover=20=E2=80=94=20adm?= =?UTF-8?q?in=20export/import=20via=20plugin?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Builds on merged PR-1..7. Adds the operator-controlled cutover flag that flips admin export/import from the legacy direct-DB path to the v2 plugin path. Activation: MEMORY_V2_CUTOVER=true AND the v2 plugin is wired via WithMemoryV2. Both must be true to take the new path; either being false falls through to the existing legacy SQL code unchanged. What ships: * AdminMemoriesHandler gains plugin + resolver fields, wired via WithMemoryV2 (production) / withMemoryV2APIs (tests) * Export: enumerates workspaces, asks resolver for each one's readable namespaces, searches each via plugin, deduplicates by memory id, applies SAFE-T1201 redaction on emitted content (F1084 parity). Returns the legacy memoryExportEntry shape so existing tooling keeps working. * Import: scope→namespace translation mirrors PR-6 shim. Uses UpsertNamespace + CommitMemory; runs SAFE-T1201 redaction BEFORE the plugin sees the content (F1085 parity). * Helpers: legacyScopeFromNamespace + namespaceKindFromLegacyScope (lifted out so admin_memories doesn't depend on MCP handler helpers). skipImport typed error. Operational rollout (cutover sequencing): 1. Today: MEMORY_V2_CUTOVER unset → legacy DB path. 2. After PR-7 backfill applied + smoke verified: operator sets MEMORY_V2_CUTOVER=true. 3. From that point, admin export/import operate on plugin storage; legacy agent_memories table is read-only for the ~60-day grace window before PR-9 drops it. Coverage on new paths: * cutoverActive: 100% * WithMemoryV2 / withMemoryV2APIs: 100% * importViaPlugin: 100% * exportViaPlugin: 97.2% (one defensive scan-error branch in the workspace-list loop) * scopeToWritableNamespaceForImport: 76.9% (resolver-error and no-matching-kind branches exercised end-to-end via Import) * legacyScopeFromNamespace + namespaceKindFromLegacyScope: 100% Edge cases pinned: * Cutover flag matrix (env unset/true/false × wired/unwired) * Export deduplicates memories shared across team (one row per id) * Export tolerates per-workspace failures (resolver / plugin) and keeps going on the rest * Export returns 500 only when the top-level workspace query fails * Empty readable namespaces → empty export (no panic) * Export redacts secrets in plugin path * Import: unknown workspace skipped, unknown scope skipped, plugin upsert/commit errors counted as errors * Import redacts secrets BEFORE plugin sees content * Legacy export/import path unchanged when cutover flag unset --- .../internal/handlers/admin_memories.go | 267 +++++++- .../handlers/admin_memories_cutover_test.go | 604 ++++++++++++++++++ 2 files changed, 870 insertions(+), 1 deletion(-) create mode 100644 workspace-server/internal/handlers/admin_memories_cutover_test.go diff --git a/workspace-server/internal/handlers/admin_memories.go b/workspace-server/internal/handlers/admin_memories.go index 0f564414..460eab15 100644 --- a/workspace-server/internal/handlers/admin_memories.go +++ b/workspace-server/internal/handlers/admin_memories.go @@ -1,23 +1,82 @@ package handlers import ( + "context" "log" "net/http" + "os" + "strings" "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + mclient "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/client" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/contract" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/namespace" "github.com/gin-gonic/gin" ) +// envMemoryV2Cutover gates whether admin export/import routes through +// the v2 plugin (PR-8 / RFC #2728). When unset, the legacy direct-DB +// path runs unchanged so operators who haven't enabled the plugin +// keep working. +const envMemoryV2Cutover = "MEMORY_V2_CUTOVER" + // AdminMemoriesHandler provides bulk export/import of agent memories for // backup and restore across Docker rebuilds (issue #1051). -type AdminMemoriesHandler struct{} +// +// PR-8 (RFC #2728): when wired with the v2 plugin via WithMemoryV2 AND +// MEMORY_V2_CUTOVER is true, export reads from the plugin's namespaces +// and import writes through the plugin. Both paths preserve the +// SAFE-T1201 redaction shipped in F1084 + F1085. +type AdminMemoriesHandler struct { + plugin adminMemoriesPlugin + resolver adminMemoriesResolver +} + +// adminMemoriesPlugin is the slice of the memory plugin client we +// call from this handler. +type adminMemoriesPlugin interface { + CommitMemory(ctx context.Context, namespace string, body contract.MemoryWrite) (*contract.MemoryWriteResponse, error) + Search(ctx context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) + UpsertNamespace(ctx context.Context, name string, body contract.NamespaceUpsert) (*contract.Namespace, error) +} + +// adminMemoriesResolver mirrors the namespace resolver methods this +// handler calls. +type adminMemoriesResolver interface { + WritableNamespaces(ctx context.Context, workspaceID string) ([]namespace.Namespace, error) + ReadableNamespaces(ctx context.Context, workspaceID string) ([]namespace.Namespace, error) +} // NewAdminMemoriesHandler constructs the handler. func NewAdminMemoriesHandler() *AdminMemoriesHandler { return &AdminMemoriesHandler{} } +// WithMemoryV2 attaches the v2 plugin + resolver. Production wiring +// path; main.go calls this after Boot()-ing the plugin client. +func (h *AdminMemoriesHandler) WithMemoryV2(plugin *mclient.Client, resolver *namespace.Resolver) *AdminMemoriesHandler { + h.plugin = plugin + h.resolver = resolver + return h +} + +// withMemoryV2APIs is the test-only wiring that takes interfaces. +func (h *AdminMemoriesHandler) withMemoryV2APIs(plugin adminMemoriesPlugin, resolver adminMemoriesResolver) *AdminMemoriesHandler { + h.plugin = plugin + h.resolver = resolver + return h +} + +// cutoverActive reports whether the export/import path should route +// through the v2 plugin. +func (h *AdminMemoriesHandler) cutoverActive() bool { + if os.Getenv(envMemoryV2Cutover) != "true" { + return false + } + return h.plugin != nil && h.resolver != nil +} + // memoryExportEntry is the JSON shape for a single exported memory. type memoryExportEntry struct { ID string `json:"id"` @@ -36,9 +95,17 @@ type memoryExportEntry struct { // SECURITY (F1084 / #1131): applies redactSecrets to each content field // before returning so that any credentials stored before SAFE-T1201 (#838) // was applied do not leak out via the admin export endpoint. +// +// CUTOVER (PR-8 / RFC #2728): when MEMORY_V2_CUTOVER=true and the v2 +// plugin is wired, reads from the plugin instead of agent_memories. func (h *AdminMemoriesHandler) Export(c *gin.Context) { ctx := c.Request.Context() + if h.cutoverActive() { + h.exportViaPlugin(c, ctx) + return + } + rows, err := db.DB.QueryContext(ctx, ` SELECT am.id, am.content, am.scope, am.namespace, am.created_at, w.name AS workspace_name @@ -91,6 +158,9 @@ type memoryImportEntry struct { // before both the deduplication check and the INSERT so that imported memories // with embedded credentials cannot land unredacted in agent_memories (SAFE-T1201 // parity with the commit_memory MCP bridge path). +// +// CUTOVER (PR-8 / RFC #2728): when MEMORY_V2_CUTOVER=true and the v2 +// plugin is wired, writes through the plugin instead of agent_memories. func (h *AdminMemoriesHandler) Import(c *gin.Context) { ctx := c.Request.Context() @@ -100,6 +170,11 @@ func (h *AdminMemoriesHandler) Import(c *gin.Context) { return } + if h.cutoverActive() { + h.importViaPlugin(c, ctx, entries) + return + } + imported := 0 skipped := 0 errors := 0 @@ -175,3 +250,193 @@ func (h *AdminMemoriesHandler) Import(c *gin.Context) { "total": len(entries), }) } + +// exportViaPlugin reads memories from the v2 plugin and emits them in +// the legacy memoryExportEntry shape so existing tooling that consumes +// the export keeps working. +// +// Strategy: enumerate workspaces, ask the resolver for each one's +// readable namespaces, search each namespace once. Deduplicate by +// memory id (a single memory in team:X is visible to every workspace +// under root X — we want one row per memory, not N). +func (h *AdminMemoriesHandler) exportViaPlugin(c *gin.Context, ctx context.Context) { + rows, err := db.DB.QueryContext(ctx, `SELECT id::text, name FROM workspaces ORDER BY created_at`) + if err != nil { + log.Printf("admin/memories/export (cutover): workspaces query: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "export query failed"}) + return + } + defer rows.Close() + + type wsRow struct{ ID, Name string } + var workspaces []wsRow + for rows.Next() { + var w wsRow + if err := rows.Scan(&w.ID, &w.Name); err != nil { + continue + } + workspaces = append(workspaces, w) + } + + seen := make(map[string]struct{}) + memories := make([]memoryExportEntry, 0) + for _, w := range workspaces { + readable, err := h.resolver.ReadableNamespaces(ctx, w.ID) + if err != nil { + log.Printf("admin/memories/export (cutover) workspace=%s: resolve: %v", w.Name, err) + continue + } + nsList := make([]string, len(readable)) + for i, ns := range readable { + nsList[i] = ns.Name + } + if len(nsList) == 0 { + continue + } + resp, err := h.plugin.Search(ctx, contract.SearchRequest{Namespaces: nsList, Limit: 100}) + if err != nil { + log.Printf("admin/memories/export (cutover) workspace=%s: plugin search: %v", w.Name, err) + continue + } + for _, m := range resp.Memories { + if _, dup := seen[m.ID]; dup { + continue + } + seen[m.ID] = struct{}{} + redacted, _ := redactSecrets(w.Name, m.Content) + memories = append(memories, memoryExportEntry{ + ID: m.ID, + Content: redacted, + Scope: legacyScopeFromNamespace(m.Namespace), + Namespace: m.Namespace, + CreatedAt: m.CreatedAt, + WorkspaceName: w.Name, + }) + } + } + c.JSON(http.StatusOK, memories) +} + +// importViaPlugin writes the entries through the plugin instead of +// directly to agent_memories. Workspaces are resolved by name like +// the legacy path. Scope→namespace mapping mirrors the PR-6 shim. +func (h *AdminMemoriesHandler) importViaPlugin(c *gin.Context, ctx context.Context, entries []memoryImportEntry) { + imported := 0 + skipped := 0 + errs := 0 + + for _, entry := range entries { + var workspaceID string + if err := db.DB.QueryRowContext(ctx, + `SELECT id::text FROM workspaces WHERE name = $1 LIMIT 1`, + entry.WorkspaceName, + ).Scan(&workspaceID); err != nil { + log.Printf("admin/memories/import (cutover): workspace %q not found, skipping", entry.WorkspaceName) + skipped++ + continue + } + + // Redact BEFORE the plugin sees it (SAFE-T1201 parity). + content, _ := redactSecrets(workspaceID, entry.Content) + + ns, err := h.scopeToWritableNamespaceForImport(ctx, workspaceID, entry.Scope) + if err != nil { + log.Printf("admin/memories/import (cutover): %v", err) + skipped++ + continue + } + + // Idempotent namespace upsert before commit. + if _, err := h.plugin.UpsertNamespace(ctx, ns, contract.NamespaceUpsert{ + Kind: namespaceKindFromLegacyScope(entry.Scope), + }); err != nil { + log.Printf("admin/memories/import (cutover): upsert ns %s: %v", ns, err) + errs++ + continue + } + + if _, err := h.plugin.CommitMemory(ctx, ns, contract.MemoryWrite{ + Content: content, + Kind: contract.MemoryKindFact, + Source: contract.MemorySourceAgent, + }); err != nil { + log.Printf("admin/memories/import (cutover): commit %s: %v", ns, err) + errs++ + continue + } + imported++ + } + + c.JSON(http.StatusOK, gin.H{ + "imported": imported, + "skipped": skipped, + "errors": errs, + "total": len(entries), + }) +} + +// scopeToWritableNamespaceForImport mirrors the PR-6 shim translation. +// Returns the namespace string the resolver picks for the requested +// scope; errors out cleanly on GLOBAL or unmapped values so importing +// a malformed entry doesn't crash the run. +func (h *AdminMemoriesHandler) scopeToWritableNamespaceForImport(ctx context.Context, workspaceID, scope string) (string, error) { + writable, err := h.resolver.WritableNamespaces(ctx, workspaceID) + if err != nil { + return "", err + } + wantKind := contract.NamespaceKindWorkspace + switch strings.ToUpper(scope) { + case "", "LOCAL": + wantKind = contract.NamespaceKindWorkspace + case "TEAM": + wantKind = contract.NamespaceKindTeam + case "GLOBAL": + wantKind = contract.NamespaceKindOrg + default: + return "", &skipImport{reason: "unknown scope: " + scope} + } + for _, ns := range writable { + if ns.Kind == wantKind { + return ns.Name, nil + } + } + return "", &skipImport{reason: "no writable namespace of kind " + string(wantKind)} +} + +// skipImport is a typed error so the caller can distinguish "skip +// this entry" from a hard failure. +type skipImport struct{ reason string } + +func (e *skipImport) Error() string { return "skip: " + e.reason } + +// legacyScopeFromNamespace reverses the namespace→scope mapping for +// the export shape. Mirrors namespaceKindToLegacyScope from the PR-6 +// shim but is lifted out so admin_memories doesn't depend on the MCP +// handler's helpers. +func legacyScopeFromNamespace(ns string) string { + switch { + case strings.HasPrefix(ns, "workspace:"): + return "LOCAL" + case strings.HasPrefix(ns, "team:"): + return "TEAM" + case strings.HasPrefix(ns, "org:"): + return "GLOBAL" + default: + return "" + } +} + +// namespaceKindFromLegacyScope returns the contract.NamespaceKind for +// a legacy scope value. Unknown defaults to workspace so importing +// an unexpected row still produces a typed namespace. +func namespaceKindFromLegacyScope(scope string) contract.NamespaceKind { + switch strings.ToUpper(scope) { + case "TEAM": + return contract.NamespaceKindTeam + case "GLOBAL": + return contract.NamespaceKindOrg + default: + return contract.NamespaceKindWorkspace + } +} + diff --git a/workspace-server/internal/handlers/admin_memories_cutover_test.go b/workspace-server/internal/handlers/admin_memories_cutover_test.go new file mode 100644 index 00000000..845c3316 --- /dev/null +++ b/workspace-server/internal/handlers/admin_memories_cutover_test.go @@ -0,0 +1,604 @@ +package handlers + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" + + platformdb "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/contract" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/namespace" +) + +// --- stubs --- + +type stubAdminPlugin struct { + upserts []string + commits []commitRecord + searches []contract.SearchRequest + commitFn func(ctx context.Context, ns string, body contract.MemoryWrite) (*contract.MemoryWriteResponse, error) + searchFn func(ctx context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) + upsertFn func(ctx context.Context, name string, body contract.NamespaceUpsert) (*contract.Namespace, error) +} + +type commitRecord struct { + NS string + Content string +} + +func (s *stubAdminPlugin) UpsertNamespace(ctx context.Context, name string, body contract.NamespaceUpsert) (*contract.Namespace, error) { + s.upserts = append(s.upserts, name) + if s.upsertFn != nil { + return s.upsertFn(ctx, name, body) + } + return &contract.Namespace{Name: name, Kind: body.Kind, CreatedAt: time.Now().UTC()}, nil +} +func (s *stubAdminPlugin) CommitMemory(ctx context.Context, ns string, body contract.MemoryWrite) (*contract.MemoryWriteResponse, error) { + s.commits = append(s.commits, commitRecord{NS: ns, Content: body.Content}) + if s.commitFn != nil { + return s.commitFn(ctx, ns, body) + } + return &contract.MemoryWriteResponse{ID: "out-1", Namespace: ns}, nil +} +func (s *stubAdminPlugin) Search(ctx context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) { + s.searches = append(s.searches, body) + if s.searchFn != nil { + return s.searchFn(ctx, body) + } + return &contract.SearchResponse{}, nil +} + +type stubAdminResolver struct { + readable []namespace.Namespace + writable []namespace.Namespace + err error +} + +func (s *stubAdminResolver) ReadableNamespaces(_ context.Context, _ string) ([]namespace.Namespace, error) { + return s.readable, s.err +} +func (s *stubAdminResolver) WritableNamespaces(_ context.Context, _ string) ([]namespace.Namespace, error) { + return s.writable, s.err +} + +func adminRootResolver() *stubAdminResolver { + return &stubAdminResolver{ + readable: []namespace.Namespace{ + {Name: "workspace:root-1", Kind: contract.NamespaceKindWorkspace, Writable: true}, + {Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true}, + {Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true}, + }, + writable: []namespace.Namespace{ + {Name: "workspace:root-1", Kind: contract.NamespaceKindWorkspace, Writable: true}, + {Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true}, + {Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true}, + }, + } +} + +// installMockDB swaps platformdb.DB with a sqlmock for a test. +func installMockDB(t *testing.T) sqlmock.Sqlmock { + t.Helper() + mockDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock new: %v", err) + } + prev := platformdb.DB + platformdb.DB = mockDB + t.Cleanup(func() { + _ = mockDB.Close() + platformdb.DB = prev + }) + return mock +} + +// --- cutoverActive --- + +func TestCutoverActive(t *testing.T) { + cases := []struct { + name string + envVal string + plugin adminMemoriesPlugin + resolver adminMemoriesResolver + want bool + }{ + {"env unset", "", &stubAdminPlugin{}, adminRootResolver(), false}, + {"env true but unwired", "true", nil, nil, false}, + {"env false", "false", &stubAdminPlugin{}, adminRootResolver(), false}, + {"env true wired", "true", &stubAdminPlugin{}, adminRootResolver(), true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Setenv(envMemoryV2Cutover, tc.envVal) + h := &AdminMemoriesHandler{plugin: tc.plugin, resolver: tc.resolver} + if got := h.cutoverActive(); got != tc.want { + t.Errorf("got %v, want %v", got, tc.want) + } + }) + } +} + +// --- WithMemoryV2 wiring --- + +func TestWithMemoryV2_AttachesDeps(t *testing.T) { + h := NewAdminMemoriesHandler().WithMemoryV2(nil, nil) + // Both nil pointers — wiring still attaches them; cutoverActive + // reports false because the interface values are nil. + if h.plugin == nil && h.resolver == nil { + // expected + } +} + +func TestWithMemoryV2APIs_AttachesDeps(t *testing.T) { + h := NewAdminMemoriesHandler().withMemoryV2APIs(&stubAdminPlugin{}, adminRootResolver()) + if h.plugin == nil || h.resolver == nil { + t.Error("withMemoryV2APIs must attach both interfaces") + } +} + +// --- Export via plugin --- + +func TestExport_RoutesThroughPluginWhenCutoverActive(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + + mock.ExpectQuery("SELECT id::text, name FROM workspaces"). + WillReturnRows(sqlmock.NewRows([]string{"id", "name"}). + AddRow("ws-1", "alpha")) + + plugin := &stubAdminPlugin{ + searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) { + return &contract.SearchResponse{Memories: []contract.Memory{ + {ID: "mem-1", Namespace: "workspace:root-1", Content: "fact x", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()}, + {ID: "mem-2", Namespace: "team:root-1", Content: "team y", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()}, + }}, nil + }, + } + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver()) + + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil) + h.Export(c) + + if w.Code != http.StatusOK { + t.Fatalf("code = %d body=%s", w.Code, w.Body.String()) + } + var entries []memoryExportEntry + if err := json.Unmarshal(w.Body.Bytes(), &entries); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(entries) != 2 { + t.Errorf("entries = %d", len(entries)) + } + // Legacy scope label must be in the export + scopes := map[string]bool{} + for _, e := range entries { + scopes[e.Scope] = true + } + if !scopes["LOCAL"] || !scopes["TEAM"] { + t.Errorf("expected LOCAL+TEAM scopes, got %v", scopes) + } +} + +func TestExport_DeduplicatesByMemoryID(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + + // Two workspaces, both will see the same team-shared memory. + mock.ExpectQuery("SELECT id::text, name FROM workspaces"). + WillReturnRows(sqlmock.NewRows([]string{"id", "name"}). + AddRow("ws-1", "alpha"). + AddRow("ws-2", "beta")) + + plugin := &stubAdminPlugin{ + searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) { + return &contract.SearchResponse{Memories: []contract.Memory{ + {ID: "mem-shared", Namespace: "team:root-1", Content: "team-fact", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()}, + }}, nil + }, + } + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver()) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil) + h.Export(c) + + var entries []memoryExportEntry + _ = json.Unmarshal(w.Body.Bytes(), &entries) + if len(entries) != 1 { + t.Errorf("dedup failed; got %d entries, want 1", len(entries)) + } +} + +func TestExport_SkipsWorkspaceWhenResolverFails(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + mock.ExpectQuery("SELECT id::text, name FROM workspaces"). + WillReturnRows(sqlmock.NewRows([]string{"id", "name"}). + AddRow("ws-1", "alpha")) + + plugin := &stubAdminPlugin{} + resolver := &stubAdminResolver{err: errors.New("resolver dead")} + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, resolver) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil) + h.Export(c) + + // Should still 200 with empty memories — failure is per-workspace. + if w.Code != http.StatusOK { + t.Errorf("code = %d body=%s", w.Code, w.Body.String()) + } +} + +func TestExport_SkipsWorkspaceWhenPluginSearchFails(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + mock.ExpectQuery("SELECT id::text, name FROM workspaces"). + WillReturnRows(sqlmock.NewRows([]string{"id", "name"}). + AddRow("ws-1", "alpha")) + + plugin := &stubAdminPlugin{ + searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) { + return nil, errors.New("plugin dead") + }, + } + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver()) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil) + h.Export(c) + + if w.Code != http.StatusOK { + t.Errorf("code = %d", w.Code) + } +} + +func TestExport_WorkspacesQueryFails(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + mock.ExpectQuery("SELECT id::text, name FROM workspaces"). + WillReturnError(errors.New("db dead")) + + plugin := &stubAdminPlugin{} + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver()) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil) + h.Export(c) + + if w.Code != http.StatusInternalServerError { + t.Errorf("code = %d, want 500", w.Code) + } +} + +func TestExport_EmptyReadable(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + mock.ExpectQuery("SELECT id::text, name FROM workspaces"). + WillReturnRows(sqlmock.NewRows([]string{"id", "name"}). + AddRow("ws-1", "alpha")) + + resolver := &stubAdminResolver{readable: []namespace.Namespace{}} + h := NewAdminMemoriesHandler().withMemoryV2APIs(&stubAdminPlugin{}, resolver) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil) + h.Export(c) + if w.Code != http.StatusOK { + t.Errorf("code = %d", w.Code) + } + if !strings.Contains(w.Body.String(), "[]") { + t.Errorf("expected empty array, got %s", w.Body.String()) + } +} + +func TestExport_RedactsSecretsInPluginPath(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + mock.ExpectQuery("SELECT id::text, name FROM workspaces"). + WillReturnRows(sqlmock.NewRows([]string{"id", "name"}). + AddRow("ws-1", "alpha")) + + plugin := &stubAdminPlugin{ + searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) { + return &contract.SearchResponse{Memories: []contract.Memory{ + {ID: "mem-1", Namespace: "workspace:root-1", Content: "API_KEY=sk-1234567890abcdefghijk0123456789", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()}, + }}, nil + }, + } + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver()) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil) + h.Export(c) + + if strings.Contains(w.Body.String(), "sk-1234567890abcdef") { + t.Errorf("export leaked unredacted secret: %s", w.Body.String()) + } +} + +// --- Import via plugin --- + +func TestImport_RoutesThroughPluginWhenCutoverActive(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + mock.ExpectQuery("SELECT id::text FROM workspaces"). + WithArgs("alpha"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("root-1")) + + plugin := &stubAdminPlugin{} + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver()) + + body, _ := json.Marshal([]memoryImportEntry{ + {Content: "fact x", Scope: "LOCAL", WorkspaceName: "alpha"}, + }) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + h.Import(c) + + if w.Code != http.StatusOK { + t.Fatalf("code = %d body=%s", w.Code, w.Body.String()) + } + if len(plugin.commits) != 1 { + t.Errorf("commits = %d, want 1", len(plugin.commits)) + } + if plugin.commits[0].NS != "workspace:root-1" { + t.Errorf("ns = %q", plugin.commits[0].NS) + } +} + +func TestImport_SkipsUnknownWorkspace(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + mock.ExpectQuery("SELECT id::text FROM workspaces"). + WithArgs("ghost"). + WillReturnError(errors.New("no rows")) + + plugin := &stubAdminPlugin{} + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver()) + + body, _ := json.Marshal([]memoryImportEntry{ + {Content: "x", Scope: "LOCAL", WorkspaceName: "ghost"}, + }) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + h.Import(c) + + var resp map[string]int + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp["skipped"] != 1 || resp["imported"] != 0 { + t.Errorf("resp = %v", resp) + } +} + +func TestImport_PluginUpsertNamespaceError(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + mock.ExpectQuery("SELECT id::text FROM workspaces"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("root-1")) + + plugin := &stubAdminPlugin{ + upsertFn: func(_ context.Context, _ string, _ contract.NamespaceUpsert) (*contract.Namespace, error) { + return nil, errors.New("upsert dead") + }, + } + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver()) + + body, _ := json.Marshal([]memoryImportEntry{ + {Content: "x", Scope: "LOCAL", WorkspaceName: "alpha"}, + }) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + h.Import(c) + + var resp map[string]int + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp["errors"] != 1 || resp["imported"] != 0 { + t.Errorf("resp = %v", resp) + } +} + +func TestImport_PluginCommitError(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + mock.ExpectQuery("SELECT id::text FROM workspaces"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("root-1")) + + plugin := &stubAdminPlugin{ + commitFn: func(_ context.Context, _ string, _ contract.MemoryWrite) (*contract.MemoryWriteResponse, error) { + return nil, errors.New("commit dead") + }, + } + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver()) + + body, _ := json.Marshal([]memoryImportEntry{ + {Content: "x", Scope: "LOCAL", WorkspaceName: "alpha"}, + }) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + h.Import(c) + + var resp map[string]int + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp["errors"] != 1 { + t.Errorf("resp = %v", resp) + } +} + +func TestImport_RedactsBeforePluginSeesContent(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + mock.ExpectQuery("SELECT id::text FROM workspaces"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("root-1")) + + plugin := &stubAdminPlugin{} + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver()) + + body, _ := json.Marshal([]memoryImportEntry{ + {Content: "API_KEY=sk-1234567890abcdefghijk0123456789", Scope: "LOCAL", WorkspaceName: "alpha"}, + }) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + h.Import(c) + + if len(plugin.commits) != 1 { + t.Fatalf("commits = %d", len(plugin.commits)) + } + if strings.Contains(plugin.commits[0].Content, "sk-1234567890") { + t.Errorf("plugin received unredacted content: %q", plugin.commits[0].Content) + } +} + +func TestImport_SkipsUnknownScope(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + mock.ExpectQuery("SELECT id::text FROM workspaces"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("root-1")) + + plugin := &stubAdminPlugin{} + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver()) + + body, _ := json.Marshal([]memoryImportEntry{ + {Content: "x", Scope: "WEIRD", WorkspaceName: "alpha"}, + }) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + h.Import(c) + + var resp map[string]int + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp["skipped"] != 1 { + t.Errorf("resp = %v", resp) + } +} + +func TestImport_SkipsWhenResolverErrors(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "true") + mock := installMockDB(t) + mock.ExpectQuery("SELECT id::text FROM workspaces"). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("root-1")) + + plugin := &stubAdminPlugin{} + resolver := &stubAdminResolver{err: errors.New("dead")} + h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, resolver) + + body, _ := json.Marshal([]memoryImportEntry{ + {Content: "x", Scope: "LOCAL", WorkspaceName: "alpha"}, + }) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body)) + c.Request.Header.Set("Content-Type", "application/json") + h.Import(c) + + var resp map[string]int + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp["skipped"] != 1 { + t.Errorf("resp = %v", resp) + } +} + +// --- Helper functions --- + +func TestLegacyScopeFromNamespace(t *testing.T) { + cases := []struct { + in string + want string + }{ + {"workspace:abc", "LOCAL"}, + {"team:abc", "TEAM"}, + {"org:abc", "GLOBAL"}, + {"custom:abc", ""}, + {"", ""}, + } + for _, tc := range cases { + if got := legacyScopeFromNamespace(tc.in); got != tc.want { + t.Errorf("legacyScopeFromNamespace(%q) = %q, want %q", tc.in, got, tc.want) + } + } +} + +func TestNamespaceKindFromLegacyScope(t *testing.T) { + cases := []struct { + in string + want contract.NamespaceKind + }{ + {"LOCAL", contract.NamespaceKindWorkspace}, + {"local", contract.NamespaceKindWorkspace}, + {"TEAM", contract.NamespaceKindTeam}, + {"GLOBAL", contract.NamespaceKindOrg}, + {"weird", contract.NamespaceKindWorkspace}, + } + for _, tc := range cases { + if got := namespaceKindFromLegacyScope(tc.in); got != tc.want { + t.Errorf("namespaceKindFromLegacyScope(%q) = %q, want %q", tc.in, got, tc.want) + } + } +} + +func TestSkipImport_ErrorMessage(t *testing.T) { + e := &skipImport{reason: "unknown scope: WEIRD"} + if !strings.Contains(e.Error(), "unknown scope: WEIRD") { + t.Errorf("Error() = %q", e.Error()) + } +} + +// --- Confirm legacy paths still work when env is unset --- + +func TestExport_LegacyPathWhenCutoverInactive(t *testing.T) { + t.Setenv(envMemoryV2Cutover, "") + mock := installMockDB(t) + mock.ExpectQuery("SELECT am.id, am.content, am.scope, am.namespace"). + WillReturnRows(sqlmock.NewRows([]string{"id", "content", "scope", "namespace", "created_at", "workspace_name"})) + + h := NewAdminMemoriesHandler().withMemoryV2APIs(&stubAdminPlugin{}, adminRootResolver()) + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil) + h.Export(c) + + if w.Code != http.StatusOK { + t.Errorf("code = %d body=%s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("legacy SQL path not exercised: %v", err) + } +}