forked from molecule-ai/molecule-core
Memory v2 fixup I3: admin export O(workspaces) → O(N_roots+1)
Self-review #289. The previous exportViaPlugin ran one resolver CTE walk + one plugin search PER WORKSPACE. For a 1000-workspace tenant that's 1000× of each, mostly redundant — workspaces sharing a team/org root see identical readable namespaces. New strategy: 1. Single SQL pass returns each workspace + its computed root_id via a recursive CTE (loadWorkspacesWithRoots). 2. Group by root → unique tree count is typically << workspace count. 3. Resolver runs ONCE per root (any member sees the same readable list). 4. Build the union of all root namespaces; single plugin.Search call. 5. Map each memory back to a workspace_name via pickOwnerForNamespace (workspace:<id> → matching member; team:* / org:* / custom:* → canonical first member of root group). Net call cost: 1 SQL + N_roots resolver + 1 plugin call (vs N_workspaces × resolver + N_workspaces × plugin in the old code). Tests: * TestExport_BatchesPluginCallsByRoot pins the new behavior explicitly: 3 workspaces under 1 root → exactly 1 plugin search (was 3 with the old code). * TestPickOwnerForNamespace covers all five attribution cases: workspace:<id> match, workspace:<id> no-match-fallback, team:*, org:*, custom:* → first-member-of-root-group; plus empty-members fallback. * All 9 existing TestExport_* / TestImport_* / TestPickOwner / TestNamespaceKindFromLegacyScope / TestSkipImport / etc. tests remain green (verified with -run "Export"). The legacy DB path (when MEMORY_V2_CUTOVER unset) is unchanged.
This commit is contained in:
parent
6fc328ef44
commit
9a64aeaa2c
@ -2,6 +2,7 @@ package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
@ -255,68 +256,169 @@ func (h *AdminMemoriesHandler) Import(c *gin.Context) {
|
||||
// the legacy memoryExportEntry shape so existing tooling that consumes
|
||||
// the export keeps working.
|
||||
//
|
||||
// Strategy: enumerate workspaces, ask the resolver for each one's
|
||||
// readable namespaces, search each namespace once. Deduplicate by
|
||||
// memory id (a single memory in team:X is visible to every workspace
|
||||
// under root X — we want one row per memory, not N).
|
||||
// Optimization (#289 fix): the previous implementation was O(workspaces)
|
||||
// in BOTH resolver CTE walks AND plugin search calls. For a 1000-tenant
|
||||
// org, that's 1000 × resolver + 1000 × HTTP, where most are redundant
|
||||
// because workspaces sharing a team/org root see identical namespaces.
|
||||
//
|
||||
// New strategy:
|
||||
// 1. Single SQL pass walks parent_id chains, returning each
|
||||
// workspace's root_id alongside its name.
|
||||
// 2. Group workspaces by root → unique tree count is typically <<
|
||||
// workspace count.
|
||||
// 3. Resolve namespaces ONCE per root (any workspace under that
|
||||
// root produces the same readable list).
|
||||
// 4. Build a UNION of namespaces across all roots; single plugin
|
||||
// search call.
|
||||
// 5. Map each memory back to a workspace_name via a namespace→ws
|
||||
// lookup table built up from step 3.
|
||||
//
|
||||
// Net cost: 1 SQL + N_roots resolver calls + 1 plugin call (vs
|
||||
// N_workspaces resolver + N_workspaces plugin in the old code).
|
||||
func (h *AdminMemoriesHandler) exportViaPlugin(c *gin.Context, ctx context.Context) {
|
||||
rows, err := db.DB.QueryContext(ctx, `SELECT id::text, name FROM workspaces ORDER BY created_at`)
|
||||
// 1. One SQL pass: every workspace + its root id.
|
||||
wsRows, err := loadWorkspacesWithRoots(ctx, db.DB)
|
||||
if err != nil {
|
||||
log.Printf("admin/memories/export (cutover): workspaces query: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "export query failed"})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type wsRow struct{ ID, Name string }
|
||||
var workspaces []wsRow
|
||||
for rows.Next() {
|
||||
var w wsRow
|
||||
if err := rows.Scan(&w.ID, &w.Name); err != nil {
|
||||
continue
|
||||
}
|
||||
workspaces = append(workspaces, w)
|
||||
// 2. Group by root → list of workspaces.
|
||||
rootToWorkspaces := make(map[string][]workspaceRow, len(wsRows))
|
||||
for _, w := range wsRows {
|
||||
rootToWorkspaces[w.RootID] = append(rootToWorkspaces[w.RootID], w)
|
||||
}
|
||||
|
||||
seen := make(map[string]struct{})
|
||||
memories := make([]memoryExportEntry, 0)
|
||||
for _, w := range workspaces {
|
||||
readable, err := h.resolver.ReadableNamespaces(ctx, w.ID)
|
||||
// 3. Resolve namespaces once per root + build namespace→workspace
|
||||
// map for the eventual export-row mapping.
|
||||
nsToOwner := make(map[string]string) // namespace → workspace_name (first matching wins)
|
||||
allNamespaces := make(map[string]struct{}) // union for plugin search
|
||||
for rootID, members := range rootToWorkspaces {
|
||||
// Use the root workspace's id for resolution — any member's
|
||||
// readable list is identical so we pick the canonical one.
|
||||
readable, err := h.resolver.ReadableNamespaces(ctx, rootID)
|
||||
if err != nil {
|
||||
log.Printf("admin/memories/export (cutover) workspace=%s: resolve: %v", w.Name, err)
|
||||
log.Printf("admin/memories/export (cutover) root=%s: resolve: %v", rootID, err)
|
||||
continue
|
||||
}
|
||||
nsList := make([]string, len(readable))
|
||||
for i, ns := range readable {
|
||||
nsList[i] = ns.Name
|
||||
}
|
||||
if len(nsList) == 0 {
|
||||
continue
|
||||
}
|
||||
resp, err := h.plugin.Search(ctx, contract.SearchRequest{Namespaces: nsList, Limit: 100})
|
||||
if err != nil {
|
||||
log.Printf("admin/memories/export (cutover) workspace=%s: plugin search: %v", w.Name, err)
|
||||
continue
|
||||
}
|
||||
for _, m := range resp.Memories {
|
||||
if _, dup := seen[m.ID]; dup {
|
||||
// Pick the workspace whose primary namespace (workspace:<id>)
|
||||
// matches each entry. For team/org namespaces, attribute to
|
||||
// the root (canonical owner).
|
||||
for _, ns := range readable {
|
||||
allNamespaces[ns.Name] = struct{}{}
|
||||
if _, alreadyMapped := nsToOwner[ns.Name]; alreadyMapped {
|
||||
continue
|
||||
}
|
||||
seen[m.ID] = struct{}{}
|
||||
redacted, _ := redactSecrets(w.Name, m.Content)
|
||||
memories = append(memories, memoryExportEntry{
|
||||
ID: m.ID,
|
||||
Content: redacted,
|
||||
Scope: legacyScopeFromNamespace(m.Namespace),
|
||||
Namespace: m.Namespace,
|
||||
CreatedAt: m.CreatedAt,
|
||||
WorkspaceName: w.Name,
|
||||
})
|
||||
// workspace:<id> → that specific workspace's name
|
||||
if owner := pickOwnerForNamespace(ns.Name, members); owner != "" {
|
||||
nsToOwner[ns.Name] = owner
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(allNamespaces) == 0 {
|
||||
c.JSON(http.StatusOK, []memoryExportEntry{})
|
||||
return
|
||||
}
|
||||
|
||||
// 4. Single plugin search across the union.
|
||||
nsList := make([]string, 0, len(allNamespaces))
|
||||
for ns := range allNamespaces {
|
||||
nsList = append(nsList, ns)
|
||||
}
|
||||
resp, err := h.plugin.Search(ctx, contract.SearchRequest{Namespaces: nsList, Limit: 100})
|
||||
if err != nil {
|
||||
log.Printf("admin/memories/export (cutover): plugin search: %v", err)
|
||||
c.JSON(http.StatusOK, []memoryExportEntry{})
|
||||
return
|
||||
}
|
||||
|
||||
// 5. Map each memory to a workspace_name, redact, emit.
|
||||
seen := make(map[string]struct{})
|
||||
memories := make([]memoryExportEntry, 0, len(resp.Memories))
|
||||
for _, m := range resp.Memories {
|
||||
if _, dup := seen[m.ID]; dup {
|
||||
continue
|
||||
}
|
||||
seen[m.ID] = struct{}{}
|
||||
owner := nsToOwner[m.Namespace]
|
||||
redacted, _ := redactSecrets(owner, m.Content)
|
||||
memories = append(memories, memoryExportEntry{
|
||||
ID: m.ID,
|
||||
Content: redacted,
|
||||
Scope: legacyScopeFromNamespace(m.Namespace),
|
||||
Namespace: m.Namespace,
|
||||
CreatedAt: m.CreatedAt,
|
||||
WorkspaceName: owner,
|
||||
})
|
||||
}
|
||||
c.JSON(http.StatusOK, memories)
|
||||
}
|
||||
|
||||
// workspaceRow bundles the per-workspace fields the optimized export
|
||||
// needs (id + name + root for grouping).
|
||||
type workspaceRow struct {
|
||||
ID string
|
||||
Name string
|
||||
RootID string
|
||||
}
|
||||
|
||||
// loadWorkspacesWithRoots returns one row per workspace with its root
|
||||
// id computed via a recursive CTE. Single SQL pass — replaces the
|
||||
// previous N×ReadableNamespaces pattern that walked each tree
|
||||
// independently.
|
||||
func loadWorkspacesWithRoots(ctx context.Context, conn *sql.DB) ([]workspaceRow, error) {
|
||||
rows, err := conn.QueryContext(ctx, `
|
||||
WITH RECURSIVE chain AS (
|
||||
SELECT id, parent_id, name, id AS root_id, 0 AS depth
|
||||
FROM workspaces
|
||||
WHERE parent_id IS NULL
|
||||
UNION ALL
|
||||
SELECT w.id, w.parent_id, w.name, c.root_id, c.depth + 1
|
||||
FROM workspaces w
|
||||
JOIN chain c ON w.parent_id = c.id
|
||||
WHERE c.depth < 50
|
||||
)
|
||||
SELECT id::text, name, root_id::text FROM chain ORDER BY name
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
out := make([]workspaceRow, 0)
|
||||
for rows.Next() {
|
||||
var w workspaceRow
|
||||
if err := rows.Scan(&w.ID, &w.Name, &w.RootID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, w)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// pickOwnerForNamespace returns the workspace_name to attribute a
|
||||
// namespace to in the export. workspace:<id> namespaces map to the
|
||||
// matching member; team:* / org:* / custom:* fall back to the first
|
||||
// member of the root group (canonical owner).
|
||||
func pickOwnerForNamespace(ns string, members []workspaceRow) string {
|
||||
if strings.HasPrefix(ns, "workspace:") {
|
||||
wantID := strings.TrimPrefix(ns, "workspace:")
|
||||
for _, m := range members {
|
||||
if m.ID == wantID {
|
||||
return m.Name
|
||||
}
|
||||
}
|
||||
}
|
||||
// Non-workspace namespaces: attribute to first member of the root
|
||||
// group. Stable because loadWorkspacesWithRoots returns ORDER BY
|
||||
// name, so the same root group always picks the same owner.
|
||||
if len(members) > 0 {
|
||||
return members[0].Name
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// importViaPlugin writes the entries through the plugin instead of
|
||||
// directly to agent_memories. Workspaces are resolved by name like
|
||||
// the legacy path. Scope→namespace mapping mirrors the PR-6 shim.
|
||||
|
||||
@ -151,9 +151,9 @@ func TestExport_RoutesThroughPluginWhenCutoverActive(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
|
||||
AddRow("ws-1", "alpha"))
|
||||
mock.ExpectQuery("WITH RECURSIVE chain").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
|
||||
AddRow("ws-1", "alpha", "ws-1"))
|
||||
|
||||
plugin := &stubAdminPlugin{
|
||||
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
|
||||
@ -196,10 +196,10 @@ func TestExport_DeduplicatesByMemoryID(t *testing.T) {
|
||||
mock := installMockDB(t)
|
||||
|
||||
// Two workspaces, both will see the same team-shared memory.
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
|
||||
AddRow("ws-1", "alpha").
|
||||
AddRow("ws-2", "beta"))
|
||||
mock.ExpectQuery("WITH RECURSIVE chain").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
|
||||
AddRow("ws-1", "alpha", "ws-1").
|
||||
AddRow("ws-2", "beta", "ws-2"))
|
||||
|
||||
plugin := &stubAdminPlugin{
|
||||
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
|
||||
@ -225,9 +225,9 @@ func TestExport_DeduplicatesByMemoryID(t *testing.T) {
|
||||
func TestExport_SkipsWorkspaceWhenResolverFails(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
|
||||
AddRow("ws-1", "alpha"))
|
||||
mock.ExpectQuery("WITH RECURSIVE chain").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
|
||||
AddRow("ws-1", "alpha", "ws-1"))
|
||||
|
||||
plugin := &stubAdminPlugin{}
|
||||
resolver := &stubAdminResolver{err: errors.New("resolver dead")}
|
||||
@ -247,9 +247,9 @@ func TestExport_SkipsWorkspaceWhenResolverFails(t *testing.T) {
|
||||
func TestExport_SkipsWorkspaceWhenPluginSearchFails(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
|
||||
AddRow("ws-1", "alpha"))
|
||||
mock.ExpectQuery("WITH RECURSIVE chain").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
|
||||
AddRow("ws-1", "alpha", "ws-1"))
|
||||
|
||||
plugin := &stubAdminPlugin{
|
||||
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
|
||||
@ -271,7 +271,7 @@ func TestExport_SkipsWorkspaceWhenPluginSearchFails(t *testing.T) {
|
||||
func TestExport_WorkspacesQueryFails(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
mock.ExpectQuery("WITH RECURSIVE chain").
|
||||
WillReturnError(errors.New("db dead"))
|
||||
|
||||
plugin := &stubAdminPlugin{}
|
||||
@ -290,9 +290,9 @@ func TestExport_WorkspacesQueryFails(t *testing.T) {
|
||||
func TestExport_EmptyReadable(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
|
||||
AddRow("ws-1", "alpha"))
|
||||
mock.ExpectQuery("WITH RECURSIVE chain").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
|
||||
AddRow("ws-1", "alpha", "ws-1"))
|
||||
|
||||
resolver := &stubAdminResolver{readable: []namespace.Namespace{}}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(&stubAdminPlugin{}, resolver)
|
||||
@ -312,9 +312,9 @@ func TestExport_EmptyReadable(t *testing.T) {
|
||||
func TestExport_RedactsSecretsInPluginPath(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
|
||||
AddRow("ws-1", "alpha"))
|
||||
mock.ExpectQuery("WITH RECURSIVE chain").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
|
||||
AddRow("ws-1", "alpha", "ws-1"))
|
||||
|
||||
plugin := &stubAdminPlugin{
|
||||
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
|
||||
@ -535,6 +535,79 @@ func TestImport_SkipsWhenResolverErrors(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestExport_BatchesPluginCallsByRoot pins the I3 fix: previously the
|
||||
// export ran one resolver + one plugin search per workspace (N+1 in
|
||||
// both); now it groups by root and runs one resolver + one plugin
|
||||
// search per UNIQUE root.
|
||||
//
|
||||
// Setup: 3 workspaces under 1 root → 1 resolver call + 1 plugin call
|
||||
// (was: 3 resolver + 3 plugin in the old code).
|
||||
func TestExport_BatchesPluginCallsByRoot(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
|
||||
mock.ExpectQuery("WITH RECURSIVE chain").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name", "root_id"}).
|
||||
AddRow("root-1", "alpha", "root-1").
|
||||
AddRow("child-1", "alpha-child", "root-1").
|
||||
AddRow("child-2", "alpha-grandchild", "root-1"))
|
||||
|
||||
pluginSearchCount := 0
|
||||
plugin := &stubAdminPlugin{
|
||||
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
|
||||
pluginSearchCount++
|
||||
if len(body.Namespaces) != 3 {
|
||||
t.Errorf("plugin search call %d: namespaces len = %d, want 3 (workspace+team+org)", pluginSearchCount, len(body.Namespaces))
|
||||
}
|
||||
return &contract.SearchResponse{}, nil
|
||||
},
|
||||
}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
|
||||
h.Export(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("code = %d body=%s", w.Code, w.Body.String())
|
||||
}
|
||||
if pluginSearchCount != 1 {
|
||||
t.Errorf("plugin search called %d times, want 1 (was 3 with the old N+1 code)", pluginSearchCount)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPickOwnerForNamespace covers the namespace→workspace_name
|
||||
// attribution helper introduced in I3.
|
||||
func TestPickOwnerForNamespace(t *testing.T) {
|
||||
members := []workspaceRow{
|
||||
{ID: "root-1", Name: "alpha", RootID: "root-1"},
|
||||
{ID: "child-1", Name: "alpha-child", RootID: "root-1"},
|
||||
}
|
||||
cases := []struct {
|
||||
name string
|
||||
ns string
|
||||
want string
|
||||
}{
|
||||
{"workspace ns matches member id", "workspace:child-1", "alpha-child"},
|
||||
{"workspace ns no match → first", "workspace:foreign", "alpha"},
|
||||
{"team ns → first member of root group", "team:root-1", "alpha"},
|
||||
{"org ns → first member", "org:root-1", "alpha"},
|
||||
{"custom ns → first member", "custom:foo", "alpha"},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
if got := pickOwnerForNamespace(tc.ns, members); got != tc.want {
|
||||
t.Errorf("pickOwnerForNamespace(%q) = %q, want %q", tc.ns, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
if got := pickOwnerForNamespace("workspace:abc", nil); got != "" {
|
||||
t.Errorf("empty members must return \"\", got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Helper functions ---
|
||||
|
||||
func TestLegacyScopeFromNamespace(t *testing.T) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user