molecule-core/workspace-server/cmd/memory-backfill/verify.go
Hongming Wang 656a02fae4 fix(textutil): SSOT for rune-safe string truncation, fix 3 audit-gap bugs
Closes #2962.

## Why

Six per-package `truncate` helpers had drifted into independent
re-implementations of the same idea. Three of them (delegation.go,
memory/client/client.go, memory-backfill/verify.go) used
`s[:max] + "…"` byte-slice form, which on a multi-byte codepoint at
byte `max` produces invalid UTF-8 → Postgres `text`/`jsonb` rejects
the INSERT silently → `delegation` / `activity_logs` row never lands
→ audit gap.

Three other helpers (delegation_ledger.go #2962, agent_message_writer.go
#2959, scheduler.go #2026) had each been fixed in isolation with three
slightly different rune-safe shapes — confirming this is a class of
bug, not a single instance.

## What

New package `internal/textutil` with three rune-safe functions:

- `TruncateBytes(s, maxBytes)` — byte-cap, "…" marker. Used by 5
  callers writing into byte-bounded columns / log lines.
- `TruncateBytesNoMarker(s, maxBytes)` — byte-cap, no marker. Used by
  delegation_ledger.go where the storage already conveys "preview"
  and an extra ellipsis would push the result over the column cap.
- `TruncateRunes(s, maxRunes)` — rune-cap, "…" marker. Used by
  agent_message_writer.go where the cap is in display chars (UI
  summary), not bytes.

All three guarantee `utf8.ValidString(out)` for any `utf8.ValidString(in)`.
Inputs already invalid go through `sanitizeUTF8` at the call site
boundary (scheduler.go preserved this defense-in-depth).

## Migration map

| Old | New | Behavior change |
|---|---|---|
| `delegation_ledger.truncatePreview` | `textutil.TruncateBytesNoMarker(s, 4096)` | none |
| `agent_message_writer.truncatePreviewRunes` | `textutil.TruncateRunes(s, n)` | none |
| `scheduler.truncate` | `textutil.TruncateBytes(s, n)` | "..." → "…" (3 bytes either way; single-glyph display) |
| `delegation.truncate` | `textutil.TruncateBytes(s, n)` | bug fix + ellipsis swap |
| `memory/client.truncate` | `textutil.TruncateBytes(s, n)` | bug fix |
| `memory-backfill.truncate` | `textutil.TruncateBytes(s, n)` | bug fix |

Five separate `truncate*` helpers + their per-package tests removed.
Net: 12 files / +427 / -255.

## Tests

- `internal/textutil/truncate_test.go` — 27 table-test cases + 145
  fuzz-invariant cases asserting `utf8.ValidString` and byte-cap
  invariants on every output.
- `delegation_ledger_test.go TestLedgerInsert_TruncatesOversizedPreview`
  strengthened with `capValidUTF8Matcher` so the SQL-write argument
  is asserted to be valid UTF-8 + within cap (not just `AnyArg()`).
  Mutation-tested: replacing the SSOT call with byte-slice form makes
  this test fail loud.

## Compatibility

- All callers internal; no external API surface change.
- Ellipsis swap "..." → "…": same byte budget (3 bytes), single-glyph
  display. No alerting/grep on either marker in this codebase
  (verified). Canvas renders both correctly.
- DB column widths unchanged (4096 / 80 / 200 / 256 / 300 — all
  preserved in the migrations).

## Security

Fixes a silent INSERT-failure mode that hid `activity_logs` /
`delegations` rows containing peer-controlled text. The class of input
that triggered it (CJK, emoji, accented Latin) is normal user content,
not malicious — but the symptom (audit gap) makes incident
reconstruction harder. Helper is pure-function over `string`; no
secrets / PII / auth handling involved. Untrusted input is handled
identically to before, just rune-aligned now.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 23:01:21 -07:00

197 lines
5.9 KiB
Go

package main
// verify.go — post-apply parity check.
//
// After a backfill -apply, run with -verify to confirm the migration
// actually produced equivalent data. Picks `SampleSize` random
// workspaces, queries agent_memories direct + plugin search via the
// caller's namespaces, and diffs the result sets by content.
//
// The diff is best-effort: pg's recent-first ordering and the plugin's
// internal ordering may differ, so we compare as sets, not lists.
// We do require strict 1:1 multiset equality (every legacy row maps
// to exactly one plugin row, ignoring id since the backfill preserves
// it via the C1 idempotency key).
import (
"context"
"database/sql"
"fmt"
"math/rand"
"os"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/contract"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/textutil"
)
// verifyConfig is the typed dependency bundle for verifyParity.
type verifyConfig struct {
DB *sql.DB
Plugin verifyPlugin
Resolver verifyResolver
SampleSize int
WorkspaceID string // optional: limit to one workspace
Rand *rand.Rand
}
// verifyPlugin is the slice of memory-plugin client we call.
type verifyPlugin interface {
Search(ctx context.Context, body contract.SearchRequest) (*contract.SearchResponse, error)
}
// verifyResolver mirrors namespace.Resolver. Same shape as
// backfillResolver but kept distinct so verify isn't tied to
// backfill's interface.
type verifyResolver interface {
ReadableNamespaces(ctx context.Context, workspaceID string) ([]ResolvedNamespace, error)
}
// ResolvedNamespace is the minimum we need from the resolver — kept
// separate so the verify code doesn't depend on the namespace package
// (the live tests inject stubs, the binary uses an adapter).
type ResolvedNamespace struct {
Name string
}
// verifyReport accumulates the per-workspace results.
type verifyReport struct {
WorkspacesSampled int
Matches int
Mismatches int
Errors int
}
// verifyParity is the workhorse. Returns a report; the CLI converts
// any non-zero mismatches/errors into a non-zero exit so CI can gate
// the cutover.
func verifyParity(ctx context.Context, cfg verifyConfig, stdout *os.File) (*verifyReport, error) {
report := &verifyReport{}
rng := cfg.Rand
if rng == nil {
rng = rand.New(rand.NewSource(42)) //nolint:gosec // determinism > unpredictability for ops
}
wsIDs, err := pickWorkspaceSample(ctx, cfg.DB, cfg.WorkspaceID, cfg.SampleSize, rng)
if err != nil {
return report, fmt.Errorf("pick sample: %w", err)
}
for _, wsID := range wsIDs {
report.WorkspacesSampled++
legacy, err := queryLegacyMemories(ctx, cfg.DB, wsID)
if err != nil {
fmt.Fprintf(stdout, "[err] workspace=%s legacy query: %v\n", wsID, err)
report.Errors++
continue
}
readable, err := cfg.Resolver.ReadableNamespaces(ctx, wsID)
if err != nil {
fmt.Fprintf(stdout, "[err] workspace=%s resolve: %v\n", wsID, err)
report.Errors++
continue
}
nsList := make([]string, len(readable))
for i, ns := range readable {
nsList[i] = ns.Name
}
if len(nsList) == 0 {
// No readable namespaces — empty plugin result expected.
if len(legacy) == 0 {
report.Matches++
} else {
fmt.Fprintf(stdout, "[mismatch] workspace=%s legacy=%d plugin=0 (no readable namespaces)\n", wsID, len(legacy))
report.Mismatches++
}
continue
}
resp, err := cfg.Plugin.Search(ctx, contract.SearchRequest{Namespaces: nsList, Limit: 100})
if err != nil {
fmt.Fprintf(stdout, "[err] workspace=%s plugin search: %v\n", wsID, err)
report.Errors++
continue
}
pluginContents := make(map[string]int, len(resp.Memories))
for _, m := range resp.Memories {
pluginContents[m.Content]++
}
// Compare as multisets: each legacy content appears at least
// once in plugin output. We deliberately tolerate plugin
// having MORE rows (the namespace might include team-shared
// memories from sibling workspaces that aren't in this
// workspace's agent_memories rows).
matched := true
for _, c := range legacy {
if pluginContents[c] == 0 {
fmt.Fprintf(stdout, "[mismatch] workspace=%s missing-from-plugin content=%q\n", wsID, textutil.TruncateBytes(c, 80))
matched = false
break
}
pluginContents[c]--
}
if matched {
report.Matches++
} else {
report.Mismatches++
}
}
return report, nil
}
// pickWorkspaceSample returns up to N workspace UUIDs. If
// WorkspaceID is set, returns only that one. Otherwise selects N
// random workspaces from the workspaces table (TABLESAMPLE would be
// nicer but SYSTEM/BERNOULLI sampling has surprising distribution
// properties for small populations; we just ORDER BY random() LIMIT).
func pickWorkspaceSample(ctx context.Context, db *sql.DB, workspaceID string, n int, _ *rand.Rand) ([]string, error) {
if workspaceID != "" {
return []string{workspaceID}, nil
}
rows, err := db.QueryContext(ctx, `
SELECT id::text
FROM workspaces
WHERE status != 'removed'
ORDER BY random()
LIMIT $1
`, n)
if err != nil {
return nil, err
}
defer rows.Close()
out := make([]string, 0, n)
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
out = append(out, id)
}
return out, rows.Err()
}
// queryLegacyMemories pulls all agent_memories rows for a workspace
// (LOCAL + TEAM scopes — what the plugin search would return through
// the resolver's readable list, mapped via PR-6 shim semantics).
func queryLegacyMemories(ctx context.Context, db *sql.DB, workspaceID string) ([]string, error) {
rows, err := db.QueryContext(ctx, `
SELECT content
FROM agent_memories
WHERE workspace_id = $1
ORDER BY created_at DESC
`, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
out := []string{}
for rows.Next() {
var c string
if err := rows.Scan(&c); err != nil {
return nil, err
}
out = append(out, c)
}
return out, rows.Err()
}
// truncation moved to internal/textutil.TruncateBytes (#2962 SSOT).