forked from molecule-ai/molecule-core
Merge pull request #2740 from Molecule-AI/feat/memory-v2-pr8-cutover
Memory v2 PR-8: cutover — admin export/import via plugin
This commit is contained in:
commit
8aea1f008c
@ -1,23 +1,82 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
mclient "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/client"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/contract"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/namespace"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// envMemoryV2Cutover gates whether admin export/import routes through
|
||||
// the v2 plugin (PR-8 / RFC #2728). When unset, the legacy direct-DB
|
||||
// path runs unchanged so operators who haven't enabled the plugin
|
||||
// keep working.
|
||||
const envMemoryV2Cutover = "MEMORY_V2_CUTOVER"
|
||||
|
||||
// AdminMemoriesHandler provides bulk export/import of agent memories for
|
||||
// backup and restore across Docker rebuilds (issue #1051).
|
||||
type AdminMemoriesHandler struct{}
|
||||
//
|
||||
// PR-8 (RFC #2728): when wired with the v2 plugin via WithMemoryV2 AND
|
||||
// MEMORY_V2_CUTOVER is true, export reads from the plugin's namespaces
|
||||
// and import writes through the plugin. Both paths preserve the
|
||||
// SAFE-T1201 redaction shipped in F1084 + F1085.
|
||||
type AdminMemoriesHandler struct {
|
||||
plugin adminMemoriesPlugin
|
||||
resolver adminMemoriesResolver
|
||||
}
|
||||
|
||||
// adminMemoriesPlugin is the slice of the memory plugin client we
|
||||
// call from this handler.
|
||||
type adminMemoriesPlugin interface {
|
||||
CommitMemory(ctx context.Context, namespace string, body contract.MemoryWrite) (*contract.MemoryWriteResponse, error)
|
||||
Search(ctx context.Context, body contract.SearchRequest) (*contract.SearchResponse, error)
|
||||
UpsertNamespace(ctx context.Context, name string, body contract.NamespaceUpsert) (*contract.Namespace, error)
|
||||
}
|
||||
|
||||
// adminMemoriesResolver mirrors the namespace resolver methods this
|
||||
// handler calls.
|
||||
type adminMemoriesResolver interface {
|
||||
WritableNamespaces(ctx context.Context, workspaceID string) ([]namespace.Namespace, error)
|
||||
ReadableNamespaces(ctx context.Context, workspaceID string) ([]namespace.Namespace, error)
|
||||
}
|
||||
|
||||
// NewAdminMemoriesHandler constructs the handler.
|
||||
func NewAdminMemoriesHandler() *AdminMemoriesHandler {
|
||||
return &AdminMemoriesHandler{}
|
||||
}
|
||||
|
||||
// WithMemoryV2 attaches the v2 plugin + resolver. Production wiring
|
||||
// path; main.go calls this after Boot()-ing the plugin client.
|
||||
func (h *AdminMemoriesHandler) WithMemoryV2(plugin *mclient.Client, resolver *namespace.Resolver) *AdminMemoriesHandler {
|
||||
h.plugin = plugin
|
||||
h.resolver = resolver
|
||||
return h
|
||||
}
|
||||
|
||||
// withMemoryV2APIs is the test-only wiring that takes interfaces.
|
||||
func (h *AdminMemoriesHandler) withMemoryV2APIs(plugin adminMemoriesPlugin, resolver adminMemoriesResolver) *AdminMemoriesHandler {
|
||||
h.plugin = plugin
|
||||
h.resolver = resolver
|
||||
return h
|
||||
}
|
||||
|
||||
// cutoverActive reports whether the export/import path should route
|
||||
// through the v2 plugin.
|
||||
func (h *AdminMemoriesHandler) cutoverActive() bool {
|
||||
if os.Getenv(envMemoryV2Cutover) != "true" {
|
||||
return false
|
||||
}
|
||||
return h.plugin != nil && h.resolver != nil
|
||||
}
|
||||
|
||||
// memoryExportEntry is the JSON shape for a single exported memory.
|
||||
type memoryExportEntry struct {
|
||||
ID string `json:"id"`
|
||||
@ -36,9 +95,17 @@ type memoryExportEntry struct {
|
||||
// SECURITY (F1084 / #1131): applies redactSecrets to each content field
|
||||
// before returning so that any credentials stored before SAFE-T1201 (#838)
|
||||
// was applied do not leak out via the admin export endpoint.
|
||||
//
|
||||
// CUTOVER (PR-8 / RFC #2728): when MEMORY_V2_CUTOVER=true and the v2
|
||||
// plugin is wired, reads from the plugin instead of agent_memories.
|
||||
func (h *AdminMemoriesHandler) Export(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
if h.cutoverActive() {
|
||||
h.exportViaPlugin(c, ctx)
|
||||
return
|
||||
}
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT am.id, am.content, am.scope, am.namespace, am.created_at,
|
||||
w.name AS workspace_name
|
||||
@ -91,6 +158,9 @@ type memoryImportEntry struct {
|
||||
// before both the deduplication check and the INSERT so that imported memories
|
||||
// with embedded credentials cannot land unredacted in agent_memories (SAFE-T1201
|
||||
// parity with the commit_memory MCP bridge path).
|
||||
//
|
||||
// CUTOVER (PR-8 / RFC #2728): when MEMORY_V2_CUTOVER=true and the v2
|
||||
// plugin is wired, writes through the plugin instead of agent_memories.
|
||||
func (h *AdminMemoriesHandler) Import(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
@ -100,6 +170,11 @@ func (h *AdminMemoriesHandler) Import(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
if h.cutoverActive() {
|
||||
h.importViaPlugin(c, ctx, entries)
|
||||
return
|
||||
}
|
||||
|
||||
imported := 0
|
||||
skipped := 0
|
||||
errors := 0
|
||||
@ -175,3 +250,193 @@ func (h *AdminMemoriesHandler) Import(c *gin.Context) {
|
||||
"total": len(entries),
|
||||
})
|
||||
}
|
||||
|
||||
// exportViaPlugin reads memories from the v2 plugin and emits them in
|
||||
// the legacy memoryExportEntry shape so existing tooling that consumes
|
||||
// the export keeps working.
|
||||
//
|
||||
// Strategy: enumerate workspaces, ask the resolver for each one's
|
||||
// readable namespaces, search each namespace once. Deduplicate by
|
||||
// memory id (a single memory in team:X is visible to every workspace
|
||||
// under root X — we want one row per memory, not N).
|
||||
func (h *AdminMemoriesHandler) exportViaPlugin(c *gin.Context, ctx context.Context) {
|
||||
rows, err := db.DB.QueryContext(ctx, `SELECT id::text, name FROM workspaces ORDER BY created_at`)
|
||||
if err != nil {
|
||||
log.Printf("admin/memories/export (cutover): workspaces query: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "export query failed"})
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type wsRow struct{ ID, Name string }
|
||||
var workspaces []wsRow
|
||||
for rows.Next() {
|
||||
var w wsRow
|
||||
if err := rows.Scan(&w.ID, &w.Name); err != nil {
|
||||
continue
|
||||
}
|
||||
workspaces = append(workspaces, w)
|
||||
}
|
||||
|
||||
seen := make(map[string]struct{})
|
||||
memories := make([]memoryExportEntry, 0)
|
||||
for _, w := range workspaces {
|
||||
readable, err := h.resolver.ReadableNamespaces(ctx, w.ID)
|
||||
if err != nil {
|
||||
log.Printf("admin/memories/export (cutover) workspace=%s: resolve: %v", w.Name, err)
|
||||
continue
|
||||
}
|
||||
nsList := make([]string, len(readable))
|
||||
for i, ns := range readable {
|
||||
nsList[i] = ns.Name
|
||||
}
|
||||
if len(nsList) == 0 {
|
||||
continue
|
||||
}
|
||||
resp, err := h.plugin.Search(ctx, contract.SearchRequest{Namespaces: nsList, Limit: 100})
|
||||
if err != nil {
|
||||
log.Printf("admin/memories/export (cutover) workspace=%s: plugin search: %v", w.Name, err)
|
||||
continue
|
||||
}
|
||||
for _, m := range resp.Memories {
|
||||
if _, dup := seen[m.ID]; dup {
|
||||
continue
|
||||
}
|
||||
seen[m.ID] = struct{}{}
|
||||
redacted, _ := redactSecrets(w.Name, m.Content)
|
||||
memories = append(memories, memoryExportEntry{
|
||||
ID: m.ID,
|
||||
Content: redacted,
|
||||
Scope: legacyScopeFromNamespace(m.Namespace),
|
||||
Namespace: m.Namespace,
|
||||
CreatedAt: m.CreatedAt,
|
||||
WorkspaceName: w.Name,
|
||||
})
|
||||
}
|
||||
}
|
||||
c.JSON(http.StatusOK, memories)
|
||||
}
|
||||
|
||||
// importViaPlugin writes the entries through the plugin instead of
|
||||
// directly to agent_memories. Workspaces are resolved by name like
|
||||
// the legacy path. Scope→namespace mapping mirrors the PR-6 shim.
|
||||
func (h *AdminMemoriesHandler) importViaPlugin(c *gin.Context, ctx context.Context, entries []memoryImportEntry) {
|
||||
imported := 0
|
||||
skipped := 0
|
||||
errs := 0
|
||||
|
||||
for _, entry := range entries {
|
||||
var workspaceID string
|
||||
if err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT id::text FROM workspaces WHERE name = $1 LIMIT 1`,
|
||||
entry.WorkspaceName,
|
||||
).Scan(&workspaceID); err != nil {
|
||||
log.Printf("admin/memories/import (cutover): workspace %q not found, skipping", entry.WorkspaceName)
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
|
||||
// Redact BEFORE the plugin sees it (SAFE-T1201 parity).
|
||||
content, _ := redactSecrets(workspaceID, entry.Content)
|
||||
|
||||
ns, err := h.scopeToWritableNamespaceForImport(ctx, workspaceID, entry.Scope)
|
||||
if err != nil {
|
||||
log.Printf("admin/memories/import (cutover): %v", err)
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
|
||||
// Idempotent namespace upsert before commit.
|
||||
if _, err := h.plugin.UpsertNamespace(ctx, ns, contract.NamespaceUpsert{
|
||||
Kind: namespaceKindFromLegacyScope(entry.Scope),
|
||||
}); err != nil {
|
||||
log.Printf("admin/memories/import (cutover): upsert ns %s: %v", ns, err)
|
||||
errs++
|
||||
continue
|
||||
}
|
||||
|
||||
if _, err := h.plugin.CommitMemory(ctx, ns, contract.MemoryWrite{
|
||||
Content: content,
|
||||
Kind: contract.MemoryKindFact,
|
||||
Source: contract.MemorySourceAgent,
|
||||
}); err != nil {
|
||||
log.Printf("admin/memories/import (cutover): commit %s: %v", ns, err)
|
||||
errs++
|
||||
continue
|
||||
}
|
||||
imported++
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"imported": imported,
|
||||
"skipped": skipped,
|
||||
"errors": errs,
|
||||
"total": len(entries),
|
||||
})
|
||||
}
|
||||
|
||||
// scopeToWritableNamespaceForImport mirrors the PR-6 shim translation.
|
||||
// Returns the namespace string the resolver picks for the requested
|
||||
// scope; errors out cleanly on GLOBAL or unmapped values so importing
|
||||
// a malformed entry doesn't crash the run.
|
||||
func (h *AdminMemoriesHandler) scopeToWritableNamespaceForImport(ctx context.Context, workspaceID, scope string) (string, error) {
|
||||
writable, err := h.resolver.WritableNamespaces(ctx, workspaceID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
wantKind := contract.NamespaceKindWorkspace
|
||||
switch strings.ToUpper(scope) {
|
||||
case "", "LOCAL":
|
||||
wantKind = contract.NamespaceKindWorkspace
|
||||
case "TEAM":
|
||||
wantKind = contract.NamespaceKindTeam
|
||||
case "GLOBAL":
|
||||
wantKind = contract.NamespaceKindOrg
|
||||
default:
|
||||
return "", &skipImport{reason: "unknown scope: " + scope}
|
||||
}
|
||||
for _, ns := range writable {
|
||||
if ns.Kind == wantKind {
|
||||
return ns.Name, nil
|
||||
}
|
||||
}
|
||||
return "", &skipImport{reason: "no writable namespace of kind " + string(wantKind)}
|
||||
}
|
||||
|
||||
// skipImport is a typed error so the caller can distinguish "skip
|
||||
// this entry" from a hard failure.
|
||||
type skipImport struct{ reason string }
|
||||
|
||||
func (e *skipImport) Error() string { return "skip: " + e.reason }
|
||||
|
||||
// legacyScopeFromNamespace reverses the namespace→scope mapping for
|
||||
// the export shape. Mirrors namespaceKindToLegacyScope from the PR-6
|
||||
// shim but is lifted out so admin_memories doesn't depend on the MCP
|
||||
// handler's helpers.
|
||||
func legacyScopeFromNamespace(ns string) string {
|
||||
switch {
|
||||
case strings.HasPrefix(ns, "workspace:"):
|
||||
return "LOCAL"
|
||||
case strings.HasPrefix(ns, "team:"):
|
||||
return "TEAM"
|
||||
case strings.HasPrefix(ns, "org:"):
|
||||
return "GLOBAL"
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
// namespaceKindFromLegacyScope returns the contract.NamespaceKind for
|
||||
// a legacy scope value. Unknown defaults to workspace so importing
|
||||
// an unexpected row still produces a typed namespace.
|
||||
func namespaceKindFromLegacyScope(scope string) contract.NamespaceKind {
|
||||
switch strings.ToUpper(scope) {
|
||||
case "TEAM":
|
||||
return contract.NamespaceKindTeam
|
||||
case "GLOBAL":
|
||||
return contract.NamespaceKindOrg
|
||||
default:
|
||||
return contract.NamespaceKindWorkspace
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,604 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
|
||||
platformdb "github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/contract"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/namespace"
|
||||
)
|
||||
|
||||
// --- stubs ---
|
||||
|
||||
type stubAdminPlugin struct {
|
||||
upserts []string
|
||||
commits []commitRecord
|
||||
searches []contract.SearchRequest
|
||||
commitFn func(ctx context.Context, ns string, body contract.MemoryWrite) (*contract.MemoryWriteResponse, error)
|
||||
searchFn func(ctx context.Context, body contract.SearchRequest) (*contract.SearchResponse, error)
|
||||
upsertFn func(ctx context.Context, name string, body contract.NamespaceUpsert) (*contract.Namespace, error)
|
||||
}
|
||||
|
||||
type commitRecord struct {
|
||||
NS string
|
||||
Content string
|
||||
}
|
||||
|
||||
func (s *stubAdminPlugin) UpsertNamespace(ctx context.Context, name string, body contract.NamespaceUpsert) (*contract.Namespace, error) {
|
||||
s.upserts = append(s.upserts, name)
|
||||
if s.upsertFn != nil {
|
||||
return s.upsertFn(ctx, name, body)
|
||||
}
|
||||
return &contract.Namespace{Name: name, Kind: body.Kind, CreatedAt: time.Now().UTC()}, nil
|
||||
}
|
||||
func (s *stubAdminPlugin) CommitMemory(ctx context.Context, ns string, body contract.MemoryWrite) (*contract.MemoryWriteResponse, error) {
|
||||
s.commits = append(s.commits, commitRecord{NS: ns, Content: body.Content})
|
||||
if s.commitFn != nil {
|
||||
return s.commitFn(ctx, ns, body)
|
||||
}
|
||||
return &contract.MemoryWriteResponse{ID: "out-1", Namespace: ns}, nil
|
||||
}
|
||||
func (s *stubAdminPlugin) Search(ctx context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
|
||||
s.searches = append(s.searches, body)
|
||||
if s.searchFn != nil {
|
||||
return s.searchFn(ctx, body)
|
||||
}
|
||||
return &contract.SearchResponse{}, nil
|
||||
}
|
||||
|
||||
type stubAdminResolver struct {
|
||||
readable []namespace.Namespace
|
||||
writable []namespace.Namespace
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *stubAdminResolver) ReadableNamespaces(_ context.Context, _ string) ([]namespace.Namespace, error) {
|
||||
return s.readable, s.err
|
||||
}
|
||||
func (s *stubAdminResolver) WritableNamespaces(_ context.Context, _ string) ([]namespace.Namespace, error) {
|
||||
return s.writable, s.err
|
||||
}
|
||||
|
||||
func adminRootResolver() *stubAdminResolver {
|
||||
return &stubAdminResolver{
|
||||
readable: []namespace.Namespace{
|
||||
{Name: "workspace:root-1", Kind: contract.NamespaceKindWorkspace, Writable: true},
|
||||
{Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true},
|
||||
{Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true},
|
||||
},
|
||||
writable: []namespace.Namespace{
|
||||
{Name: "workspace:root-1", Kind: contract.NamespaceKindWorkspace, Writable: true},
|
||||
{Name: "team:root-1", Kind: contract.NamespaceKindTeam, Writable: true},
|
||||
{Name: "org:root-1", Kind: contract.NamespaceKindOrg, Writable: true},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// installMockDB swaps platformdb.DB with a sqlmock for a test.
|
||||
func installMockDB(t *testing.T) sqlmock.Sqlmock {
|
||||
t.Helper()
|
||||
mockDB, mock, err := sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("sqlmock new: %v", err)
|
||||
}
|
||||
prev := platformdb.DB
|
||||
platformdb.DB = mockDB
|
||||
t.Cleanup(func() {
|
||||
_ = mockDB.Close()
|
||||
platformdb.DB = prev
|
||||
})
|
||||
return mock
|
||||
}
|
||||
|
||||
// --- cutoverActive ---
|
||||
|
||||
func TestCutoverActive(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
envVal string
|
||||
plugin adminMemoriesPlugin
|
||||
resolver adminMemoriesResolver
|
||||
want bool
|
||||
}{
|
||||
{"env unset", "", &stubAdminPlugin{}, adminRootResolver(), false},
|
||||
{"env true but unwired", "true", nil, nil, false},
|
||||
{"env false", "false", &stubAdminPlugin{}, adminRootResolver(), false},
|
||||
{"env true wired", "true", &stubAdminPlugin{}, adminRootResolver(), true},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, tc.envVal)
|
||||
h := &AdminMemoriesHandler{plugin: tc.plugin, resolver: tc.resolver}
|
||||
if got := h.cutoverActive(); got != tc.want {
|
||||
t.Errorf("got %v, want %v", got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// --- WithMemoryV2 wiring ---
|
||||
|
||||
func TestWithMemoryV2_AttachesDeps(t *testing.T) {
|
||||
h := NewAdminMemoriesHandler().WithMemoryV2(nil, nil)
|
||||
// Both nil pointers — wiring still attaches them; cutoverActive
|
||||
// reports false because the interface values are nil.
|
||||
if h.plugin == nil && h.resolver == nil {
|
||||
// expected
|
||||
}
|
||||
}
|
||||
|
||||
func TestWithMemoryV2APIs_AttachesDeps(t *testing.T) {
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(&stubAdminPlugin{}, adminRootResolver())
|
||||
if h.plugin == nil || h.resolver == nil {
|
||||
t.Error("withMemoryV2APIs must attach both interfaces")
|
||||
}
|
||||
}
|
||||
|
||||
// --- Export via plugin ---
|
||||
|
||||
func TestExport_RoutesThroughPluginWhenCutoverActive(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
|
||||
AddRow("ws-1", "alpha"))
|
||||
|
||||
plugin := &stubAdminPlugin{
|
||||
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
|
||||
return &contract.SearchResponse{Memories: []contract.Memory{
|
||||
{ID: "mem-1", Namespace: "workspace:root-1", Content: "fact x", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
|
||||
{ID: "mem-2", Namespace: "team:root-1", Content: "team y", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
|
||||
}}, nil
|
||||
},
|
||||
}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
|
||||
h.Export(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("code = %d body=%s", w.Code, w.Body.String())
|
||||
}
|
||||
var entries []memoryExportEntry
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &entries); err != nil {
|
||||
t.Fatalf("unmarshal: %v", err)
|
||||
}
|
||||
if len(entries) != 2 {
|
||||
t.Errorf("entries = %d", len(entries))
|
||||
}
|
||||
// Legacy scope label must be in the export
|
||||
scopes := map[string]bool{}
|
||||
for _, e := range entries {
|
||||
scopes[e.Scope] = true
|
||||
}
|
||||
if !scopes["LOCAL"] || !scopes["TEAM"] {
|
||||
t.Errorf("expected LOCAL+TEAM scopes, got %v", scopes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExport_DeduplicatesByMemoryID(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
|
||||
// Two workspaces, both will see the same team-shared memory.
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
|
||||
AddRow("ws-1", "alpha").
|
||||
AddRow("ws-2", "beta"))
|
||||
|
||||
plugin := &stubAdminPlugin{
|
||||
searchFn: func(_ context.Context, body contract.SearchRequest) (*contract.SearchResponse, error) {
|
||||
return &contract.SearchResponse{Memories: []contract.Memory{
|
||||
{ID: "mem-shared", Namespace: "team:root-1", Content: "team-fact", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
|
||||
}}, nil
|
||||
},
|
||||
}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
|
||||
h.Export(c)
|
||||
|
||||
var entries []memoryExportEntry
|
||||
_ = json.Unmarshal(w.Body.Bytes(), &entries)
|
||||
if len(entries) != 1 {
|
||||
t.Errorf("dedup failed; got %d entries, want 1", len(entries))
|
||||
}
|
||||
}
|
||||
|
||||
func TestExport_SkipsWorkspaceWhenResolverFails(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
|
||||
AddRow("ws-1", "alpha"))
|
||||
|
||||
plugin := &stubAdminPlugin{}
|
||||
resolver := &stubAdminResolver{err: errors.New("resolver dead")}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, resolver)
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
|
||||
h.Export(c)
|
||||
|
||||
// Should still 200 with empty memories — failure is per-workspace.
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("code = %d body=%s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestExport_SkipsWorkspaceWhenPluginSearchFails(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
|
||||
AddRow("ws-1", "alpha"))
|
||||
|
||||
plugin := &stubAdminPlugin{
|
||||
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
|
||||
return nil, errors.New("plugin dead")
|
||||
},
|
||||
}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
|
||||
h.Export(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("code = %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExport_WorkspacesQueryFails(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnError(errors.New("db dead"))
|
||||
|
||||
plugin := &stubAdminPlugin{}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
|
||||
h.Export(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("code = %d, want 500", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExport_EmptyReadable(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
|
||||
AddRow("ws-1", "alpha"))
|
||||
|
||||
resolver := &stubAdminResolver{readable: []namespace.Namespace{}}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(&stubAdminPlugin{}, resolver)
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
|
||||
h.Export(c)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("code = %d", w.Code)
|
||||
}
|
||||
if !strings.Contains(w.Body.String(), "[]") {
|
||||
t.Errorf("expected empty array, got %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestExport_RedactsSecretsInPluginPath(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text, name FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
|
||||
AddRow("ws-1", "alpha"))
|
||||
|
||||
plugin := &stubAdminPlugin{
|
||||
searchFn: func(_ context.Context, _ contract.SearchRequest) (*contract.SearchResponse, error) {
|
||||
return &contract.SearchResponse{Memories: []contract.Memory{
|
||||
{ID: "mem-1", Namespace: "workspace:root-1", Content: "API_KEY=sk-1234567890abcdefghijk0123456789", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, CreatedAt: time.Now().UTC()},
|
||||
}}, nil
|
||||
},
|
||||
}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
|
||||
h.Export(c)
|
||||
|
||||
if strings.Contains(w.Body.String(), "sk-1234567890abcdef") {
|
||||
t.Errorf("export leaked unredacted secret: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
// --- Import via plugin ---
|
||||
|
||||
func TestImport_RoutesThroughPluginWhenCutoverActive(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text FROM workspaces").
|
||||
WithArgs("alpha").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("root-1"))
|
||||
|
||||
plugin := &stubAdminPlugin{}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
|
||||
|
||||
body, _ := json.Marshal([]memoryImportEntry{
|
||||
{Content: "fact x", Scope: "LOCAL", WorkspaceName: "alpha"},
|
||||
})
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
h.Import(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("code = %d body=%s", w.Code, w.Body.String())
|
||||
}
|
||||
if len(plugin.commits) != 1 {
|
||||
t.Errorf("commits = %d, want 1", len(plugin.commits))
|
||||
}
|
||||
if plugin.commits[0].NS != "workspace:root-1" {
|
||||
t.Errorf("ns = %q", plugin.commits[0].NS)
|
||||
}
|
||||
}
|
||||
|
||||
func TestImport_SkipsUnknownWorkspace(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text FROM workspaces").
|
||||
WithArgs("ghost").
|
||||
WillReturnError(errors.New("no rows"))
|
||||
|
||||
plugin := &stubAdminPlugin{}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
|
||||
|
||||
body, _ := json.Marshal([]memoryImportEntry{
|
||||
{Content: "x", Scope: "LOCAL", WorkspaceName: "ghost"},
|
||||
})
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
h.Import(c)
|
||||
|
||||
var resp map[string]int
|
||||
_ = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["skipped"] != 1 || resp["imported"] != 0 {
|
||||
t.Errorf("resp = %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestImport_PluginUpsertNamespaceError(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("root-1"))
|
||||
|
||||
plugin := &stubAdminPlugin{
|
||||
upsertFn: func(_ context.Context, _ string, _ contract.NamespaceUpsert) (*contract.Namespace, error) {
|
||||
return nil, errors.New("upsert dead")
|
||||
},
|
||||
}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
|
||||
|
||||
body, _ := json.Marshal([]memoryImportEntry{
|
||||
{Content: "x", Scope: "LOCAL", WorkspaceName: "alpha"},
|
||||
})
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
h.Import(c)
|
||||
|
||||
var resp map[string]int
|
||||
_ = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["errors"] != 1 || resp["imported"] != 0 {
|
||||
t.Errorf("resp = %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestImport_PluginCommitError(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("root-1"))
|
||||
|
||||
plugin := &stubAdminPlugin{
|
||||
commitFn: func(_ context.Context, _ string, _ contract.MemoryWrite) (*contract.MemoryWriteResponse, error) {
|
||||
return nil, errors.New("commit dead")
|
||||
},
|
||||
}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
|
||||
|
||||
body, _ := json.Marshal([]memoryImportEntry{
|
||||
{Content: "x", Scope: "LOCAL", WorkspaceName: "alpha"},
|
||||
})
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
h.Import(c)
|
||||
|
||||
var resp map[string]int
|
||||
_ = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["errors"] != 1 {
|
||||
t.Errorf("resp = %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestImport_RedactsBeforePluginSeesContent(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("root-1"))
|
||||
|
||||
plugin := &stubAdminPlugin{}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
|
||||
|
||||
body, _ := json.Marshal([]memoryImportEntry{
|
||||
{Content: "API_KEY=sk-1234567890abcdefghijk0123456789", Scope: "LOCAL", WorkspaceName: "alpha"},
|
||||
})
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
h.Import(c)
|
||||
|
||||
if len(plugin.commits) != 1 {
|
||||
t.Fatalf("commits = %d", len(plugin.commits))
|
||||
}
|
||||
if strings.Contains(plugin.commits[0].Content, "sk-1234567890") {
|
||||
t.Errorf("plugin received unredacted content: %q", plugin.commits[0].Content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestImport_SkipsUnknownScope(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("root-1"))
|
||||
|
||||
plugin := &stubAdminPlugin{}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, adminRootResolver())
|
||||
|
||||
body, _ := json.Marshal([]memoryImportEntry{
|
||||
{Content: "x", Scope: "WEIRD", WorkspaceName: "alpha"},
|
||||
})
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
h.Import(c)
|
||||
|
||||
var resp map[string]int
|
||||
_ = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["skipped"] != 1 {
|
||||
t.Errorf("resp = %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestImport_SkipsWhenResolverErrors(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "true")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT id::text FROM workspaces").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("root-1"))
|
||||
|
||||
plugin := &stubAdminPlugin{}
|
||||
resolver := &stubAdminResolver{err: errors.New("dead")}
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(plugin, resolver)
|
||||
|
||||
body, _ := json.Marshal([]memoryImportEntry{
|
||||
{Content: "x", Scope: "LOCAL", WorkspaceName: "alpha"},
|
||||
})
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/admin/memories/import", bytes.NewReader(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
h.Import(c)
|
||||
|
||||
var resp map[string]int
|
||||
_ = json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if resp["skipped"] != 1 {
|
||||
t.Errorf("resp = %v", resp)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Helper functions ---
|
||||
|
||||
func TestLegacyScopeFromNamespace(t *testing.T) {
|
||||
cases := []struct {
|
||||
in string
|
||||
want string
|
||||
}{
|
||||
{"workspace:abc", "LOCAL"},
|
||||
{"team:abc", "TEAM"},
|
||||
{"org:abc", "GLOBAL"},
|
||||
{"custom:abc", ""},
|
||||
{"", ""},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
if got := legacyScopeFromNamespace(tc.in); got != tc.want {
|
||||
t.Errorf("legacyScopeFromNamespace(%q) = %q, want %q", tc.in, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNamespaceKindFromLegacyScope(t *testing.T) {
|
||||
cases := []struct {
|
||||
in string
|
||||
want contract.NamespaceKind
|
||||
}{
|
||||
{"LOCAL", contract.NamespaceKindWorkspace},
|
||||
{"local", contract.NamespaceKindWorkspace},
|
||||
{"TEAM", contract.NamespaceKindTeam},
|
||||
{"GLOBAL", contract.NamespaceKindOrg},
|
||||
{"weird", contract.NamespaceKindWorkspace},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
if got := namespaceKindFromLegacyScope(tc.in); got != tc.want {
|
||||
t.Errorf("namespaceKindFromLegacyScope(%q) = %q, want %q", tc.in, got, tc.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSkipImport_ErrorMessage(t *testing.T) {
|
||||
e := &skipImport{reason: "unknown scope: WEIRD"}
|
||||
if !strings.Contains(e.Error(), "unknown scope: WEIRD") {
|
||||
t.Errorf("Error() = %q", e.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// --- Confirm legacy paths still work when env is unset ---
|
||||
|
||||
func TestExport_LegacyPathWhenCutoverInactive(t *testing.T) {
|
||||
t.Setenv(envMemoryV2Cutover, "")
|
||||
mock := installMockDB(t)
|
||||
mock.ExpectQuery("SELECT am.id, am.content, am.scope, am.namespace").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "content", "scope", "namespace", "created_at", "workspace_name"}))
|
||||
|
||||
h := NewAdminMemoriesHandler().withMemoryV2APIs(&stubAdminPlugin{}, adminRootResolver())
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/admin/memories/export", nil)
|
||||
h.Export(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("code = %d body=%s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("legacy SQL path not exercised: %v", err)
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user