Merge pull request #2749 from Molecule-AI/fix/memory-v2-i3-export-on

Memory v2 fixup I3: admin export O(workspaces) → O(N_roots+1)
This commit is contained in:
Hongming Wang 2026-05-04 16:49:43 +00:00 committed by GitHub
commit e13dcab5e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 377 additions and 63 deletions

View File

@ -2,6 +2,7 @@ package handlers
import (
"context"
"database/sql"
"log"
"net/http"
"os"
@ -255,68 +256,185 @@ func (h *AdminMemoriesHandler) Import(c *gin.Context) {
// the legacy memoryExportEntry shape so existing tooling that consumes
// the export keeps working.
//
// Strategy: enumerate workspaces, ask the resolver for each one's
// readable namespaces, search each namespace once. Deduplicate by
// memory id (a single memory in team:X is visible to every workspace
// under root X — we want one row per memory, not N).
// Optimization (#289 fix): the previous implementation was O(workspaces)
// in BOTH resolver CTE walks AND plugin search calls. For a 1000-tenant
// org, that's 1000 × resolver + 1000 × HTTP, where most are redundant
// because workspaces sharing a team/org root see identical namespaces.
//
// New strategy:
// 1. Single SQL pass walks parent_id chains, returning each
// workspace's root_id alongside its name.
// 2. Group workspaces by root → unique tree count is typically <<
// workspace count.
// 3. Resolve namespaces ONCE per root (any workspace under that
// root produces the same readable list).
// 4. Build a UNION of namespaces across all roots; single plugin
// search call.
// 5. Map each memory back to a workspace_name via a namespace→ws
// lookup table built up from step 3.
//
// Net cost: 1 SQL + N_roots resolver calls + 1 plugin call (vs
// N_workspaces resolver + N_workspaces plugin in the old code).
func (h *AdminMemoriesHandler) exportViaPlugin(c *gin.Context, ctx context.Context) {
rows, err := db.DB.QueryContext(ctx, `SELECT id::text, name FROM workspaces ORDER BY created_at`)
// 1. One SQL pass: every workspace + its root id.
wsRows, err := loadWorkspacesWithRoots(ctx, db.DB)
if err != nil {
log.Printf("admin/memories/export (cutover): workspaces query: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "export query failed"})
return
}
defer rows.Close()
type wsRow struct{ ID, Name string }
var workspaces []wsRow
for rows.Next() {
var w wsRow
if err := rows.Scan(&w.ID, &w.Name); err != nil {
continue
}
workspaces = append(workspaces, w)
// 2. Group by root → list of workspaces.
rootToWorkspaces := make(map[string][]workspaceRow, len(wsRows))
for _, w := range wsRows {
rootToWorkspaces[w.RootID] = append(rootToWorkspaces[w.RootID], w)
}
seen := make(map[string]struct{})
memories := make([]memoryExportEntry, 0)
for _, w := range workspaces {
readable, err := h.resolver.ReadableNamespaces(ctx, w.ID)
// 3. Resolve team/org namespaces once per root, then add each
// member's private workspace:<id> namespace explicitly.
//
// IMPORTANT: ReadableNamespaces(rootID) returns
// {workspace:rootID, team:rootID, org:rootID}. Calling it once
// per root is enough for team:/org:/custom: (those are shared by
// every member of the root group), but the workspace: namespace
// it returns is rootID's only — child members' private
// workspace:<childID> namespaces would be silently dropped from
// the export. Inject each member's workspace:<id> below to keep
// coverage parity with the legacy per-workspace iteration.
nsToOwner := make(map[string]string) // namespace → workspace_name (first matching wins)
allNamespaces := make(map[string]struct{}) // union for plugin search
for rootID, members := range rootToWorkspaces {
readable, err := h.resolver.ReadableNamespaces(ctx, rootID)
if err != nil {
log.Printf("admin/memories/export (cutover) workspace=%s: resolve: %v", w.Name, err)
log.Printf("admin/memories/export (cutover) root=%s: resolve: %v", rootID, err)
continue
}
nsList := make([]string, len(readable))
for i, ns := range readable {
nsList[i] = ns.Name
}
if len(nsList) == 0 {
continue
}
resp, err := h.plugin.Search(ctx, contract.SearchRequest{Namespaces: nsList, Limit: 100})
if err != nil {
log.Printf("admin/memories/export (cutover) workspace=%s: plugin search: %v", w.Name, err)
continue
}
for _, m := range resp.Memories {
if _, dup := seen[m.ID]; dup {
// Collect non-workspace namespaces (team:/org:/custom:/...) from
// the root view; these are identical across every member.
for _, ns := range readable {
if strings.HasPrefix(ns.Name, "workspace:") {
continue
}
seen[m.ID] = struct{}{}
redacted, _ := redactSecrets(w.Name, m.Content)
memories = append(memories, memoryExportEntry{
ID: m.ID,
Content: redacted,
Scope: legacyScopeFromNamespace(m.Namespace),
Namespace: m.Namespace,
CreatedAt: m.CreatedAt,
WorkspaceName: w.Name,
})
allNamespaces[ns.Name] = struct{}{}
if _, alreadyMapped := nsToOwner[ns.Name]; alreadyMapped {
continue
}
if owner := pickOwnerForNamespace(ns.Name, members); owner != "" {
nsToOwner[ns.Name] = owner
}
}
// Inject each member's private workspace:<id> namespace + its
// owner. Children's private memories live in workspace:<childID>
// which the root-only resolve doesn't surface.
for _, m := range members {
ns := "workspace:" + m.ID
allNamespaces[ns] = struct{}{}
nsToOwner[ns] = m.Name
}
}
if len(allNamespaces) == 0 {
c.JSON(http.StatusOK, []memoryExportEntry{})
return
}
// 4. Single plugin search across the union.
nsList := make([]string, 0, len(allNamespaces))
for ns := range allNamespaces {
nsList = append(nsList, ns)
}
resp, err := h.plugin.Search(ctx, contract.SearchRequest{Namespaces: nsList, Limit: 100})
if err != nil {
log.Printf("admin/memories/export (cutover): plugin search: %v", err)
c.JSON(http.StatusOK, []memoryExportEntry{})
return
}
// 5. Map each memory to a workspace_name, redact, emit.
seen := make(map[string]struct{})
memories := make([]memoryExportEntry, 0, len(resp.Memories))
for _, m := range resp.Memories {
if _, dup := seen[m.ID]; dup {
continue
}
seen[m.ID] = struct{}{}
owner := nsToOwner[m.Namespace]
redacted, _ := redactSecrets(owner, m.Content)
memories = append(memories, memoryExportEntry{
ID: m.ID,
Content: redacted,
Scope: legacyScopeFromNamespace(m.Namespace),
Namespace: m.Namespace,
CreatedAt: m.CreatedAt,
WorkspaceName: owner,
})
}
c.JSON(http.StatusOK, memories)
}
// workspaceRow bundles the per-workspace fields the optimized export
// needs (id + name + root for grouping).
type workspaceRow struct {
ID string
Name string
RootID string
}
// loadWorkspacesWithRoots returns one row per workspace with its root
// id computed via a recursive CTE. Single SQL pass — replaces the
// previous N×ReadableNamespaces pattern that walked each tree
// independently.
func loadWorkspacesWithRoots(ctx context.Context, conn *sql.DB) ([]workspaceRow, error) {
rows, err := conn.QueryContext(ctx, `
WITH RECURSIVE chain AS (
SELECT id, parent_id, name, id AS root_id, 0 AS depth
FROM workspaces
WHERE parent_id IS NULL
UNION ALL
SELECT w.id, w.parent_id, w.name, c.root_id, c.depth + 1
FROM workspaces w
JOIN chain c ON w.parent_id = c.id
WHERE c.depth < 50
)
SELECT id::text, name, root_id::text FROM chain ORDER BY name
`)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]workspaceRow, 0)
for rows.Next() {
var w workspaceRow
if err := rows.Scan(&w.ID, &w.Name, &w.RootID); err != nil {
return nil, err
}
out = append(out, w)
}
return out, rows.Err()
}
// pickOwnerForNamespace returns the workspace_name to attribute a
// namespace to in the export. workspace:<id> namespaces map to the
// matching member; team:* / org:* / custom:* fall back to the first
// member of the root group (canonical owner).
func pickOwnerForNamespace(ns string, members []workspaceRow) string {
if strings.HasPrefix(ns, "workspace:") {
wantID := strings.TrimPrefix(ns, "workspace:")
for _, m := range members {
if m.ID == wantID {
return m.Name
}
}
}
// Non-workspace namespaces: attribute to first member of the root
// group. Stable because loadWorkspacesWithRoots returns ORDER BY
// name, so the same root group always picks the same owner.
if len(members) > 0 {
return members[0].Name
}
return ""
}
// importViaPlugin writes the entries through the plugin instead of
// directly to agent_memories. Workspaces are resolved by name like
// the legacy path. Scope→namespace mapping mirrors the PR-6 shim.

View File

@ -151,9 +151,9 @@ func TestExport_RoutesThroughPluginWhenCutoverActive(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
@ -196,10 +196,10 @@ func TestExport_DeduplicatesByMemoryID(t *testing.T) {
mock := installMockDB(t)
// Two workspaces, both will see the same team-shared memory.
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha").
AddRow("ws-2", "beta"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1").
AddRow("ws-2", "beta", "ws-2"))
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
@ -225,9 +225,9 @@ func TestExport_DeduplicatesByMemoryID(t *testing.T) {
func TestExport_SkipsWorkspaceWhenResolverFails(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
plugin := &stubAdminPlugin{}
resolver := &stubAdminResolver{err: errors.New("resolver dead")}
@ -247,9 +247,9 @@ func TestExport_SkipsWorkspaceWhenResolverFails(t *testing.T) {
func TestExport_SkipsWorkspaceWhenPluginSearchFails(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
@ -271,7 +271,7 @@ func TestExport_SkipsWorkspaceWhenPluginSearchFails(t *testing.T) {
func TestExport_WorkspacesQueryFails(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnError(errors.New("db dead"))
plugin := &stubAdminPlugin{}
@ -290,9 +290,9 @@ func TestExport_WorkspacesQueryFails(t *testing.T) {
func TestExport_EmptyReadable(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
resolver := &stubAdminResolver{readable: []namespace.Namespace{}}
h := NewAdminMemoriesHandler().withMemoryV2APIs(&stubAdminPlugin{}, resolver)
@ -312,9 +312,9 @@ func TestExport_EmptyReadable(t *testing.T) {
func TestExport_RedactsSecretsInPluginPath(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow("ws-1", "alpha"))
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("ws-1", "alpha", "ws-1"))
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
@ -535,6 +535,202 @@ func TestImport_SkipsWhenResolverErrors(t *testing.T) {
}
}
// TestExport_BatchesPluginCallsByRoot pins the I3 fix: previously the
// export ran one resolver + one plugin search per workspace (N+1 in
// both); now it groups by root and runs one resolver + one plugin
// search per UNIQUE root.
//
// Setup: 3 workspaces under 1 root → 1 resolver call + 1 plugin call
// (was: 3 resolver + 3 plugin in the old code). The plugin search
// receives 5 namespaces: each member's workspace:<id> + team:root-1
// + org:root-1. (Children's workspace:<id> namespaces must be
// included or admin export silently drops their private memories.)
func TestExport_BatchesPluginCallsByRoot(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("root-1", "alpha", "root-1").
AddRow("child-1", "alpha-child", "root-1").
AddRow("child-2", "alpha-grandchild", "root-1"))
pluginSearchCount := 0
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
pluginSearchCount++
if len(body.Namespaces) != 5 {
t.Errorf("plugin search call %d: namespaces len = %d, want 5 (3 workspace + team + org); got %v", pluginSearchCount, len(body.Namespaces), body.Namespaces)
}
return &contract.SearchResponse{}, nil
},
}
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
h.Export(c)
if w.Code != http.StatusOK {
t.Errorf("code = %d body=%s", w.Code, w.Body.String())
}
if pluginSearchCount != 1 {
t.Errorf("plugin search called %d times, want 1 (was 3 with the old N+1 code)", pluginSearchCount)
}
}
// perWorkspaceResolver mimics the real resolver: ReadableNamespaces
// returns the SPECIFIC workspace's view (workspace:<that ID> +
// team:<root> + org:<root>), not a constant set. The legacy
// stubAdminResolver hides the I3 silent-drop bug by ignoring its
// workspace-id argument.
type perWorkspaceResolver map[string][]namespace.Namespace
func (r perWorkspaceResolver) ReadableNamespaces(_ context.Context, ws string) ([]namespace.Namespace, error) {
v, ok := r[ws]
if !ok {
return nil, errors.New("perWorkspaceResolver: unknown ws " + ws)
}
return v, nil
}
func (r perWorkspaceResolver) WritableNamespaces(_ context.Context, ws string) ([]namespace.Namespace, error) {
return r.ReadableNamespaces(nil, ws)
}
// TestExport_IncludesEveryMembersPrivateNamespace pins the I3 follow-up
// fix: when a root group has multiple members, the export must surface
// each member's workspace:<id> namespace, not just the root's. Before
// the fix, calling ReadableNamespaces(rootID) returned only
// workspace:rootID + team:rootID + org:rootID — every child workspace's
// private memories were silently dropped from admin export.
func TestExport_IncludesEveryMembersPrivateNamespace(t *testing.T) {
t.Setenv(envMemoryV2Cutover, "true")
mock := installMockDB(t)
mock.ExpectQuery("WITH RECURSIVE chain").
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
AddRow("root-1", "alpha", "root-1").
AddRow("child-1", "alpha-child", "root-1").
AddRow("child-2", "alpha-grandchild", "root-1"))
resolver := perWorkspaceResolver{
"root-1": {
{Name: "workspace:root-1", Kind: contract.NamespaceKindWorkspace, Writable: true},
{Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true},
{Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true},
},
"child-1": {
{Name: "workspace:child-1", Kind: contract.NamespaceKindWorkspace, Writable: true},
{Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true},
{Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true},
},
"child-2": {
{Name: "workspace:child-2", Kind: contract.NamespaceKindWorkspace, Writable: true},
{Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true},
{Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true},
},
}
var passedNamespaces []string
plugin := &stubAdminPlugin{
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
passedNamespaces = append(passedNamespaces, body.Namespaces...)
return &contract.SearchResponse{Memories: []contract.Memory{
{ID: "m-root", Namespace: "workspace:root-1", Content: "root private", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
{ID: "m-child1", Namespace: "workspace:child-1", Content: "child-1 private", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
{ID: "m-child2", Namespace: "workspace:child-2", Content: "child-2 private", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
{ID: "m-team", Namespace: "team:root-1", Content: "shared team", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
}}, nil
},
}
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, resolver)
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
h.Export(c)
if w.Code != http.StatusOK {
t.Fatalf("code = %d body=%s", w.Code, w.Body.String())
}
// Every member's private namespace must reach the plugin search.
want := []string{"workspace:root-1", "workspace:child-1", "workspace:child-2", "team:root-1", "org:root-1"}
got := make(map[string]bool, len(passedNamespaces))
for _, ns := range passedNamespaces {
got[ns] = true
}
for _, w := range want {
if !got[w] {
t.Errorf("plugin search missing namespace %q (got %v)", w, passedNamespaces)
}
}
if len(passedNamespaces) != 5 {
t.Errorf("plugin search namespace count = %d, want 5 (3 workspace + team + org)", len(passedNamespaces))
}
// Children's private memories must appear in the export, attributed
// to the right workspace_name.
var entries []memoryExportEntry
if err := json.Unmarshal(w.Body.Bytes(), &entries); err != nil {
t.Fatalf("unmarshal: %v", err)
}
byID := map[string]memoryExportEntry{}
for _, e := range entries {
byID[e.ID] = e
}
for _, exp := range []struct{ id, ns, owner string }{
{"m-root", "workspace:root-1", "alpha"},
{"m-child1", "workspace:child-1", "alpha-child"},
{"m-child2", "workspace:child-2", "alpha-grandchild"},
} {
e, ok := byID[exp.id]
if !ok {
t.Errorf("export missing memory %s — children's private memories silently dropped", exp.id)
continue
}
if e.Namespace != exp.ns {
t.Errorf("memory %s namespace = %q, want %q", exp.id, e.Namespace, exp.ns)
}
if e.WorkspaceName != exp.owner {
t.Errorf("memory %s owner = %q, want %q", exp.id, e.WorkspaceName, exp.owner)
}
}
}
// TestPickOwnerForNamespace covers the namespace→workspace_name
// attribution helper introduced in I3.
func TestPickOwnerForNamespace(t *testing.T) {
members := []workspaceRow{
{ID: "root-1", Name: "alpha", RootID: "root-1"},
{ID: "child-1", Name: "alpha-child", RootID: "root-1"},
}
cases := []struct {
name string
ns string
want string
}{
{"workspace ns matches member id", "workspace:child-1", "alpha-child"},
{"workspace ns no match → first", "workspace:foreign", "alpha"},
{"team ns → first member of root group", "team:root-1", "alpha"},
{"org ns → first member", "org:root-1", "alpha"},
{"custom ns → first member", "custom:foo", "alpha"},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := pickOwnerForNamespace(tc.ns, members); got != tc.want {
t.Errorf("pickOwnerForNamespace(%q) = %q, want %q", tc.ns, got, tc.want)
}
})
}
if got := pickOwnerForNamespace("workspace:abc", nil); got != "" {
t.Errorf("empty members must return \"\", got %q", got)
}
}
// --- Helper functions ---
func TestLegacyScopeFromNamespace(t *testing.T) {