Memory v2 fixup C1: backfill idempotency via MemoryWrite.id
Self-review (post-merge) flagged that the backfill claimed to be
idempotent on re-run but actually duplicates every row because the
plugin's INSERT uses gen_random_uuid() and ignores any id passed in.
Fix is contract-level: extend MemoryWrite with an optional `id`
idempotency key. When supplied, the plugin MUST treat the write as
upsert keyed on this id; when omitted, the plugin generates a fresh
UUID (production agent commits keep working unchanged).
Changes:
* docs/api-protocol/memory-plugin-v1.yaml: add id field with
description that flags it as idempotency key
* internal/memory/contract/contract.go: add ID to MemoryWrite struct,
update memory_write_minimal golden vector
* internal/memory/pgplugin/store.go: split CommitMemory into two
paths — upsert when body.ID set (INSERT ... ON CONFLICT (id) DO
UPDATE), plain INSERT otherwise
* cmd/memory-backfill/main.go: pass agent_memories.id to MemoryWrite,
fix the false comment about 409 deduplication
New tests:
* pgplugin: TestCommitMemory_WithIDUpserts pins the upsert SQL is
used when id is set; TestCommitMemory_UpsertScanError covers the
error branch
* backfill: TestBackfill_PassesSourceUUIDAsIdempotencyKey pins the
forwarding behavior; TestBackfill_RerunIsIdempotent simulates a
retry and asserts both runs pass the same uuid (plugin upsert is
what makes this safe)
Why this matters: operators retrying a failed backfill (which they
will — networks fail, transactions abort) would otherwise create N
duplicates per memory. The duplicates aren't visible until search
results show obvious dupes — debugging that under prod load is bad.
Production agent commits are unaffected: they leave id empty, the
plugin generates a fresh UUID via gen_random_uuid(), zero behavior
change for the hot path.
This commit is contained in:
parent
7cffff844b
commit
1e97fb9a16
@ -238,6 +238,15 @@ components:
|
||||
type: object
|
||||
required: [content, kind, source]
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
format: uuid
|
||||
nullable: true
|
||||
description: |
|
||||
Optional idempotency key. When supplied, the plugin MUST
|
||||
treat the write as upsert keyed on this id (re-running
|
||||
the same write does not duplicate). When omitted, the
|
||||
plugin generates a fresh UUID. Used by the backfill CLI.
|
||||
content:
|
||||
type: string
|
||||
minLength: 1
|
||||
|
||||
@ -1,8 +1,10 @@
|
||||
// memory-backfill is a one-shot CLI that copies rows from the legacy
|
||||
// agent_memories table into the v2 plugin via its HTTP API.
|
||||
// Idempotent on re-run: each row is keyed by its UUID, and if the
|
||||
// plugin sees a duplicate it returns 409 (or just no-ops, depending
|
||||
// on plugin) — the backfill proceeds.
|
||||
//
|
||||
// Idempotent on re-run: the backfill passes each source row's UUID
|
||||
// to the plugin's MemoryWrite.ID field, and the plugin upserts on
|
||||
// conflict. Re-running the backfill (whole or partial) updates rows
|
||||
// in place rather than duplicating.
|
||||
//
|
||||
// Usage:
|
||||
// memory-backfill -dry-run # count + diff
|
||||
@ -188,7 +190,11 @@ func backfill(ctx context.Context, cfg backfillConfig, stdout *os.File) (*backfi
|
||||
continue
|
||||
}
|
||||
|
||||
// Pass the source row's UUID as the idempotency key so re-runs
|
||||
// upsert in place. Without this, retries would duplicate every
|
||||
// memory.
|
||||
if _, err := cfg.Plugin.CommitMemory(ctx, ns, contract.MemoryWrite{
|
||||
ID: id,
|
||||
Content: content,
|
||||
Kind: contract.MemoryKindFact,
|
||||
Source: contract.MemorySourceAgent,
|
||||
|
||||
@ -16,8 +16,9 @@ import (
|
||||
|
||||
// stubBackfillPlugin records calls for assertions.
|
||||
type stubBackfillPlugin struct {
|
||||
upsertedNamespaces []string
|
||||
upsertedNamespaces []string
|
||||
committedNamespaces []string
|
||||
committedIDs []string // captures MemoryWrite.ID per call
|
||||
upsertErr error
|
||||
commitErr error
|
||||
}
|
||||
@ -29,12 +30,17 @@ func (s *stubBackfillPlugin) UpsertNamespace(_ context.Context, name string, _ c
|
||||
}
|
||||
return &contract.Namespace{Name: name, Kind: contract.NamespaceKindWorkspace}, nil
|
||||
}
|
||||
func (s *stubBackfillPlugin) CommitMemory(_ context.Context, ns string, _ contract.MemoryWrite) (*contract.MemoryWriteResponse, error) {
|
||||
func (s *stubBackfillPlugin) CommitMemory(_ context.Context, ns string, body contract.MemoryWrite) (*contract.MemoryWriteResponse, error) {
|
||||
s.committedNamespaces = append(s.committedNamespaces, ns)
|
||||
s.committedIDs = append(s.committedIDs, body.ID)
|
||||
if s.commitErr != nil {
|
||||
return nil, s.commitErr
|
||||
}
|
||||
return &contract.MemoryWriteResponse{ID: "out-1", Namespace: ns}, nil
|
||||
id := body.ID
|
||||
if id == "" {
|
||||
id = "out-1"
|
||||
}
|
||||
return &contract.MemoryWriteResponse{ID: id, Namespace: ns}, nil
|
||||
}
|
||||
|
||||
type stubBackfillResolver struct {
|
||||
@ -131,6 +137,66 @@ func TestNamespaceKindFromString(t *testing.T) {
|
||||
|
||||
// --- backfill (the workhorse) ---
|
||||
|
||||
// TestBackfill_PassesSourceUUIDAsIdempotencyKey pins the Critical-1
|
||||
// fix: backfill must forward agent_memories.id to MemoryWrite.ID so
|
||||
// re-runs upsert in place.
|
||||
func TestBackfill_PassesSourceUUIDAsIdempotencyKey(t *testing.T) {
|
||||
db, mock, _ := sqlmock.New()
|
||||
defer db.Close()
|
||||
now := time.Now().UTC()
|
||||
mock.ExpectQuery("SELECT id, workspace_id, content, scope, created_at").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "workspace_id", "content", "scope", "created_at"}).
|
||||
AddRow("source-uuid-A", "root-1", "fact 1", "LOCAL", now).
|
||||
AddRow("source-uuid-B", "root-1", "fact 2", "LOCAL", now))
|
||||
|
||||
plugin := &stubBackfillPlugin{}
|
||||
cfg := backfillConfig{DB: db, Plugin: plugin, Resolver: rootBackfillResolver(), Limit: 100}
|
||||
devnull, _ := os.Open(os.DevNull)
|
||||
defer devnull.Close()
|
||||
if _, err := backfill(context.Background(), cfg, devnull); err != nil {
|
||||
t.Fatalf("backfill: %v", err)
|
||||
}
|
||||
if len(plugin.committedIDs) != 2 {
|
||||
t.Fatalf("commits = %d", len(plugin.committedIDs))
|
||||
}
|
||||
if plugin.committedIDs[0] != "source-uuid-A" || plugin.committedIDs[1] != "source-uuid-B" {
|
||||
t.Errorf("committedIDs = %v; idempotency key not forwarded", plugin.committedIDs)
|
||||
}
|
||||
}
|
||||
|
||||
// TestBackfill_RerunIsIdempotent: same agent_memories rows backfilled
|
||||
// twice. Plugin sees the same UUIDs both times; without the fix the
|
||||
// plugin would generate fresh UUIDs and duplicate.
|
||||
func TestBackfill_RerunIsIdempotent(t *testing.T) {
|
||||
db, mock, _ := sqlmock.New()
|
||||
defer db.Close()
|
||||
now := time.Now().UTC()
|
||||
rows1 := sqlmock.NewRows([]string{"id", "workspace_id", "content", "scope", "created_at"}).
|
||||
AddRow("uuid-1", "root-1", "fact", "LOCAL", now)
|
||||
rows2 := sqlmock.NewRows([]string{"id", "workspace_id", "content", "scope", "created_at"}).
|
||||
AddRow("uuid-1", "root-1", "fact", "LOCAL", now)
|
||||
mock.ExpectQuery("SELECT id, workspace_id, content, scope, created_at").WillReturnRows(rows1)
|
||||
mock.ExpectQuery("SELECT id, workspace_id, content, scope, created_at").WillReturnRows(rows2)
|
||||
|
||||
plugin := &stubBackfillPlugin{}
|
||||
cfg := backfillConfig{DB: db, Plugin: plugin, Resolver: rootBackfillResolver(), Limit: 100}
|
||||
devnull, _ := os.Open(os.DevNull)
|
||||
defer devnull.Close()
|
||||
|
||||
if _, err := backfill(context.Background(), cfg, devnull); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := backfill(context.Background(), cfg, devnull); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(plugin.committedIDs) != 2 {
|
||||
t.Errorf("commits = %d, want 2", len(plugin.committedIDs))
|
||||
}
|
||||
if plugin.committedIDs[0] != "uuid-1" || plugin.committedIDs[1] != "uuid-1" {
|
||||
t.Errorf("ids = %v; both runs must pass uuid-1 (relies on plugin upsert for actual de-dup)", plugin.committedIDs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackfill_HappyPath_Apply(t *testing.T) {
|
||||
db, mock, _ := sqlmock.New()
|
||||
defer db.Close()
|
||||
|
||||
@ -129,7 +129,14 @@ type NamespacePatch struct {
|
||||
// `Content` MUST be pre-redacted by workspace-server (SAFE-T1201).
|
||||
// Plugins do not run additional redaction; the workspace-server is the
|
||||
// security perimeter.
|
||||
//
|
||||
// `ID` is an optional idempotency key. When supplied, the plugin MUST
|
||||
// treat the write as upsert keyed on this id so re-running the same
|
||||
// write does not duplicate. The backfill CLI passes the source row's
|
||||
// UUID here; production agent commits leave it empty and the plugin
|
||||
// generates a fresh UUID.
|
||||
type MemoryWrite struct {
|
||||
ID string `json:"id,omitempty"`
|
||||
Content string `json:"content"`
|
||||
Kind MemoryKind `json:"kind"`
|
||||
Source MemorySource `json:"source"`
|
||||
|
||||
@ -342,6 +342,46 @@ func TestCommitMemory_StoreError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCommitMemory_WithIDUpserts(t *testing.T) {
|
||||
// Idempotency-key path. When body.id is set, the store must use
|
||||
// the upsert SQL (INSERT ... ON CONFLICT DO UPDATE) so a re-run
|
||||
// updates in place instead of inserting a new row.
|
||||
db, mock := setupMockDB(t)
|
||||
h := newTestHandler(t, db, nil)
|
||||
mock.ExpectQuery("INSERT INTO memory_records.*ON CONFLICT").
|
||||
WithArgs("fixed-id-1", "workspace:abc", "fact x", "fact", "agent",
|
||||
sqlmock.AnyArg(), sqlmock.AnyArg(), false, sqlmock.AnyArg()).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id", "namespace"}).
|
||||
AddRow("fixed-id-1", "workspace:abc"))
|
||||
w := doRequest(h, "POST", "/v1/namespaces/workspace:abc/memories", contract.MemoryWrite{
|
||||
ID: "fixed-id-1",
|
||||
Content: "fact x",
|
||||
Kind: contract.MemoryKindFact,
|
||||
Source: contract.MemorySourceAgent,
|
||||
})
|
||||
if w.Code != 201 {
|
||||
t.Errorf("code = %d body=%s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("upsert SQL not used: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCommitMemory_UpsertScanError(t *testing.T) {
|
||||
db, mock := setupMockDB(t)
|
||||
h := newTestHandler(t, db, nil)
|
||||
mock.ExpectQuery("INSERT INTO memory_records.*ON CONFLICT").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"id"}). // wrong shape
|
||||
AddRow("x"))
|
||||
w := doRequest(h, "POST", "/v1/namespaces/workspace:abc/memories", contract.MemoryWrite{
|
||||
ID: "fixed-id-1",
|
||||
Content: "x", Kind: contract.MemoryKindFact, Source: contract.MemorySourceAgent,
|
||||
})
|
||||
if w.Code != 500 {
|
||||
t.Errorf("code = %d body=%s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestCommitMemory_WithEmbedding(t *testing.T) {
|
||||
db, mock := setupMockDB(t)
|
||||
h := newTestHandler(t, db, nil)
|
||||
|
||||
@ -122,6 +122,45 @@ func (s *Store) CommitMemory(ctx context.Context, namespace string, body contrac
|
||||
return nil, err
|
||||
}
|
||||
embedding := nullVectorString(body.Embedding)
|
||||
|
||||
// Two paths so that the upsert branch only fires when the caller
|
||||
// supplied an idempotency key. Production agent commits leave id
|
||||
// empty and rely on gen_random_uuid() — splitting the SQL avoids
|
||||
// adding a NULL guard inside the conflict target.
|
||||
if body.ID != "" {
|
||||
const upsertQuery = `
|
||||
INSERT INTO memory_records
|
||||
(id, namespace, content, kind, source, expires_at, propagation, pin, embedding)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::vector)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
namespace = EXCLUDED.namespace,
|
||||
content = EXCLUDED.content,
|
||||
kind = EXCLUDED.kind,
|
||||
source = EXCLUDED.source,
|
||||
expires_at = EXCLUDED.expires_at,
|
||||
propagation = EXCLUDED.propagation,
|
||||
pin = EXCLUDED.pin,
|
||||
embedding = EXCLUDED.embedding
|
||||
RETURNING id, namespace
|
||||
`
|
||||
row := s.db.QueryRowContext(ctx, upsertQuery,
|
||||
body.ID,
|
||||
namespace,
|
||||
body.Content,
|
||||
string(body.Kind),
|
||||
string(body.Source),
|
||||
nullTime(body.ExpiresAt),
|
||||
propagation,
|
||||
body.Pin,
|
||||
embedding,
|
||||
)
|
||||
var resp contract.MemoryWriteResponse
|
||||
if err := row.Scan(&resp.ID, &resp.Namespace); err != nil {
|
||||
return nil, fmt.Errorf("commit memory (upsert): %w", err)
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
const query = `
|
||||
INSERT INTO memory_records
|
||||
(namespace, content, kind, source, expires_at, propagation, pin, embedding)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user