diff --git a/docs/api-protocol/memory-plugin-v1.yaml b/docs/api-protocol/memory-plugin-v1.yaml index 92c8842b..95884f58 100644 --- a/docs/api-protocol/memory-plugin-v1.yaml +++ b/docs/api-protocol/memory-plugin-v1.yaml @@ -238,6 +238,15 @@ components: type: object required: [content, kind, source] properties: + id: + type: string + format: uuid + nullable: true + description: | + Optional idempotency key. When supplied, the plugin MUST + treat the write as upsert keyed on this id (re-running + the same write does not duplicate). When omitted, the + plugin generates a fresh UUID. Used by the backfill CLI. content: type: string minLength: 1 diff --git a/workspace-server/cmd/memory-backfill/main.go b/workspace-server/cmd/memory-backfill/main.go index 96ef7d21..362a3f22 100644 --- a/workspace-server/cmd/memory-backfill/main.go +++ b/workspace-server/cmd/memory-backfill/main.go @@ -1,8 +1,10 @@ // memory-backfill is a one-shot CLI that copies rows from the legacy // agent_memories table into the v2 plugin via its HTTP API. -// Idempotent on re-run: each row is keyed by its UUID, and if the -// plugin sees a duplicate it returns 409 (or just no-ops, depending -// on plugin) — the backfill proceeds. +// +// Idempotent on re-run: the backfill passes each source row's UUID +// to the plugin's MemoryWrite.ID field, and the plugin upserts on +// conflict. Re-running the backfill (whole or partial) updates rows +// in place rather than duplicating. // // Usage: // memory-backfill -dry-run # count + diff @@ -188,7 +190,11 @@ func backfill(ctx context.Context, cfg backfillConfig, stdout *os.File) (*backfi continue } + // Pass the source row's UUID as the idempotency key so re-runs + // upsert in place. Without this, retries would duplicate every + // memory. if _, err := cfg.Plugin.CommitMemory(ctx, ns, contract.MemoryWrite{ + ID: id, Content: content, Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, diff --git a/workspace-server/cmd/memory-backfill/main_test.go b/workspace-server/cmd/memory-backfill/main_test.go index a71347ab..667b3f5e 100644 --- a/workspace-server/cmd/memory-backfill/main_test.go +++ b/workspace-server/cmd/memory-backfill/main_test.go @@ -16,8 +16,9 @@ import ( // stubBackfillPlugin records calls for assertions. type stubBackfillPlugin struct { - upsertedNamespaces []string + upsertedNamespaces []string committedNamespaces []string + committedIDs []string // captures MemoryWrite.ID per call upsertErr error commitErr error } @@ -29,12 +30,17 @@ func (s *stubBackfillPlugin) UpsertNamespace(_ context.Context, name string, _ c } return &contract.Namespace{Name: name, Kind: contract.NamespaceKindWorkspace}, nil } -func (s *stubBackfillPlugin) CommitMemory(_ context.Context, ns string, _ contract.MemoryWrite) (*contract.MemoryWriteResponse, error) { +func (s *stubBackfillPlugin) CommitMemory(_ context.Context, ns string, body contract.MemoryWrite) (*contract.MemoryWriteResponse, error) { s.committedNamespaces = append(s.committedNamespaces, ns) + s.committedIDs = append(s.committedIDs, body.ID) if s.commitErr != nil { return nil, s.commitErr } - return &contract.MemoryWriteResponse{ID: "out-1", Namespace: ns}, nil + id := body.ID + if id == "" { + id = "out-1" + } + return &contract.MemoryWriteResponse{ID: id, Namespace: ns}, nil } type stubBackfillResolver struct { @@ -131,6 +137,66 @@ func TestNamespaceKindFromString(t *testing.T) { // --- backfill (the workhorse) --- +// TestBackfill_PassesSourceUUIDAsIdempotencyKey pins the Critical-1 +// fix: backfill must forward agent_memories.id to MemoryWrite.ID so +// re-runs upsert in place. +func TestBackfill_PassesSourceUUIDAsIdempotencyKey(t *testing.T) { + db, mock, _ := sqlmock.New() + defer db.Close() + now := time.Now().UTC() + mock.ExpectQuery("SELECT id, workspace_id, content, scope, created_at"). + WillReturnRows(sqlmock.NewRows([]string{"id", "workspace_id", "content", "scope", "created_at"}). + AddRow("source-uuid-A", "root-1", "fact 1", "LOCAL", now). + AddRow("source-uuid-B", "root-1", "fact 2", "LOCAL", now)) + + plugin := &stubBackfillPlugin{} + cfg := backfillConfig{DB: db, Plugin: plugin, Resolver: rootBackfillResolver(), Limit: 100} + devnull, _ := os.Open(os.DevNull) + defer devnull.Close() + if _, err := backfill(context.Background(), cfg, devnull); err != nil { + t.Fatalf("backfill: %v", err) + } + if len(plugin.committedIDs) != 2 { + t.Fatalf("commits = %d", len(plugin.committedIDs)) + } + if plugin.committedIDs[0] != "source-uuid-A" || plugin.committedIDs[1] != "source-uuid-B" { + t.Errorf("committedIDs = %v; idempotency key not forwarded", plugin.committedIDs) + } +} + +// TestBackfill_RerunIsIdempotent: same agent_memories rows backfilled +// twice. Plugin sees the same UUIDs both times; without the fix the +// plugin would generate fresh UUIDs and duplicate. +func TestBackfill_RerunIsIdempotent(t *testing.T) { + db, mock, _ := sqlmock.New() + defer db.Close() + now := time.Now().UTC() + rows1 := sqlmock.NewRows([]string{"id", "workspace_id", "content", "scope", "created_at"}). + AddRow("uuid-1", "root-1", "fact", "LOCAL", now) + rows2 := sqlmock.NewRows([]string{"id", "workspace_id", "content", "scope", "created_at"}). + AddRow("uuid-1", "root-1", "fact", "LOCAL", now) + mock.ExpectQuery("SELECT id, workspace_id, content, scope, created_at").WillReturnRows(rows1) + mock.ExpectQuery("SELECT id, workspace_id, content, scope, created_at").WillReturnRows(rows2) + + plugin := &stubBackfillPlugin{} + cfg := backfillConfig{DB: db, Plugin: plugin, Resolver: rootBackfillResolver(), Limit: 100} + devnull, _ := os.Open(os.DevNull) + defer devnull.Close() + + if _, err := backfill(context.Background(), cfg, devnull); err != nil { + t.Fatal(err) + } + if _, err := backfill(context.Background(), cfg, devnull); err != nil { + t.Fatal(err) + } + if len(plugin.committedIDs) != 2 { + t.Errorf("commits = %d, want 2", len(plugin.committedIDs)) + } + if plugin.committedIDs[0] != "uuid-1" || plugin.committedIDs[1] != "uuid-1" { + t.Errorf("ids = %v; both runs must pass uuid-1 (relies on plugin upsert for actual de-dup)", plugin.committedIDs) + } +} + func TestBackfill_HappyPath_Apply(t *testing.T) { db, mock, _ := sqlmock.New() defer db.Close() diff --git a/workspace-server/internal/memory/contract/contract.go b/workspace-server/internal/memory/contract/contract.go index 2e913159..828abe5d 100644 --- a/workspace-server/internal/memory/contract/contract.go +++ b/workspace-server/internal/memory/contract/contract.go @@ -129,7 +129,14 @@ type NamespacePatch struct { // `Content` MUST be pre-redacted by workspace-server (SAFE-T1201). // Plugins do not run additional redaction; the workspace-server is the // security perimeter. +// +// `ID` is an optional idempotency key. When supplied, the plugin MUST +// treat the write as upsert keyed on this id so re-running the same +// write does not duplicate. The backfill CLI passes the source row's +// UUID here; production agent commits leave it empty and the plugin +// generates a fresh UUID. type MemoryWrite struct { + ID string `json:"id,omitempty"` Content string `json:"content"` Kind MemoryKind `json:"kind"` Source MemorySource `json:"source"` diff --git a/workspace-server/internal/memory/pgplugin/handlers_test.go b/workspace-server/internal/memory/pgplugin/handlers_test.go index 0be41136..ff683224 100644 --- a/workspace-server/internal/memory/pgplugin/handlers_test.go +++ b/workspace-server/internal/memory/pgplugin/handlers_test.go @@ -342,6 +342,46 @@ func TestCommitMemory_StoreError(t *testing.T) { } } +func TestCommitMemory_WithIDUpserts(t *testing.T) { + // Idempotency-key path. When body.id is set, the store must use + // the upsert SQL (INSERT ... ON CONFLICT DO UPDATE) so a re-run + // updates in place instead of inserting a new row. + db, mock := setupMockDB(t) + h := newTestHandler(t, db, nil) + mock.ExpectQuery("INSERT INTO memory_records.*ON CONFLICT"). + WithArgs("fixed-id-1", "workspace:abc", "fact x", "fact", "agent", + sqlmock.AnyArg(), sqlmock.AnyArg(), false, sqlmock.AnyArg()). + WillReturnRows(sqlmock.NewRows([]string{"id", "namespace"}). + AddRow("fixed-id-1", "workspace:abc")) + w := doRequest(h, "POST", "/v1/namespaces/workspace:abc/memories", contract.MemoryWrite{ + ID: "fixed-id-1", + Content: "fact x", + Kind: contract.MemoryKindFact, + Source: contract.MemorySourceAgent, + }) + if w.Code != 201 { + t.Errorf("code = %d body=%s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("upsert SQL not used: %v", err) + } +} + +func TestCommitMemory_UpsertScanError(t *testing.T) { + db, mock := setupMockDB(t) + h := newTestHandler(t, db, nil) + mock.ExpectQuery("INSERT INTO memory_records.*ON CONFLICT"). + WillReturnRows(sqlmock.NewRows([]string{"id"}). // wrong shape + AddRow("x")) + w := doRequest(h, "POST", "/v1/namespaces/workspace:abc/memories", contract.MemoryWrite{ + ID: "fixed-id-1", + Content: "x", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent, + }) + if w.Code != 500 { + t.Errorf("code = %d body=%s", w.Code, w.Body.String()) + } +} + func TestCommitMemory_WithEmbedding(t *testing.T) { db, mock := setupMockDB(t) h := newTestHandler(t, db, nil) diff --git a/workspace-server/internal/memory/pgplugin/store.go b/workspace-server/internal/memory/pgplugin/store.go index 170abc4d..6896dc75 100644 --- a/workspace-server/internal/memory/pgplugin/store.go +++ b/workspace-server/internal/memory/pgplugin/store.go @@ -122,6 +122,45 @@ func (s *Store) CommitMemory(ctx context.Context, namespace string, body contrac return nil, err } embedding := nullVectorString(body.Embedding) + + // Two paths so that the upsert branch only fires when the caller + // supplied an idempotency key. Production agent commits leave id + // empty and rely on gen_random_uuid() — splitting the SQL avoids + // adding a NULL guard inside the conflict target. + if body.ID != "" { + const upsertQuery = ` + INSERT INTO memory_records + (id, namespace, content, kind, source, expires_at, propagation, pin, embedding) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::vector) + ON CONFLICT (id) DO UPDATE SET + namespace = EXCLUDED.namespace, + content = EXCLUDED.content, + kind = EXCLUDED.kind, + source = EXCLUDED.source, + expires_at = EXCLUDED.expires_at, + propagation = EXCLUDED.propagation, + pin = EXCLUDED.pin, + embedding = EXCLUDED.embedding + RETURNING id, namespace + ` + row := s.db.QueryRowContext(ctx, upsertQuery, + body.ID, + namespace, + body.Content, + string(body.Kind), + string(body.Source), + nullTime(body.ExpiresAt), + propagation, + body.Pin, + embedding, + ) + var resp contract.MemoryWriteResponse + if err := row.Scan(&resp.ID, &resp.Namespace); err != nil { + return nil, fmt.Errorf("commit memory (upsert): %w", err) + } + return &resp, nil + } + const query = ` INSERT INTO memory_records (namespace, content, kind, source, expires_at, propagation, pin, embedding)