forked from molecule-ai/molecule-core
test(chat-uploads): integration test for cross-table atomicity (#149 follow-up)
Adds two real-Postgres tests under //go:build integration: - TestIntegration_PollUpload_AtomicRollback_AcrossBothTables exercises the helpers in the same Tx shape uploadPollMode does (PutBatchTx + LogActivityTx + Rollback) and asserts COUNT(*)=0 on BOTH pending_uploads AND activity_logs after the rollback. Failure injection: NUL byte in `summary` triggers lib/pq protocol rejection on the second activity insert — same trick the existing PutBatch AtomicRollback test uses. - TestIntegration_PollUpload_HappyPath_AcrossBothTables is the positive counterpart — Commit lands N rows in both tables. Coverage rationale (post-PR-3010 review): - sqlmock unit test (TestPollUpload_AtomicRollbackOnActivityInsertFailure) proved the handler calls Begin/Exec/Exec-fail/Rollback in order. - Existing PutBatch integration test proved Postgres honors rollback for pending_uploads alone. - New tests close the cross-table gap: prove LogActivityTx + PutBatchTx + real Postgres MVCC compose correctly under rollback. A regression that made LogActivityTx silently route through db.DB instead of the passed tx would still pass the sqlmock test (the Begin/Commit/Rollback shape would look right) but would fail this integration test (the activity_logs row would survive the rollback). Verified locally: postgres:15-alpine + all migrations applied, both tests pass in 0.1s. Skips cleanly without INTEGRATION_DB_URL — CI already runs this file via the Handlers Postgres Integration job. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
b759548822
commit
7a39a08837
@ -451,6 +451,201 @@ func TestIntegration_PendingUploads_AckedIndexExists(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestIntegration_PollUpload_AtomicRollback_AcrossBothTables proves the
|
||||
// #149 cross-table contract at the database layer: when PutBatchTx and
|
||||
// LogActivityTx run in the same caller-owned Tx and an activity INSERT
|
||||
// fails after some rows have already been INSERTed, Rollback unwinds
|
||||
// BOTH tables, leaving zero rows.
|
||||
//
|
||||
// Coverage map (#149):
|
||||
// - chat_files_poll_test.go's TestPollUpload_AtomicRollbackOnActivityInsertFailure
|
||||
// uses sqlmock to prove the Go handler issues Begin / N inserts /
|
||||
// Rollback in the right order (no Commit on failure path).
|
||||
// - This integration test proves the helpers + real Postgres compose
|
||||
// correctly: rollback after a mid-Tx activity insert failure
|
||||
// actually reverts BOTH the prior activity row AND the
|
||||
// pending_uploads rows from PutBatchTx.
|
||||
// - The pre-existing TestIntegration_PendingUploads_PutBatch_AtomicRollback
|
||||
// covers the pending_uploads-only case.
|
||||
//
|
||||
// Failure injection: a NUL byte in `summary` (TEXT column) — lib/pq
|
||||
// rejects it at the protocol layer. Same trick the existing PutBatch
|
||||
// AtomicRollback test uses for the pending_uploads INSERT.
|
||||
func TestIntegration_PollUpload_AtomicRollback_AcrossBothTables(t *testing.T) {
|
||||
conn := integrationDB_PendingUploads(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// activity_logs has a FK to workspaces(id) — seed a real row so
|
||||
// non-failing inserts succeed. Wipe activity_logs + this workspaces
|
||||
// row at end so the next test sees a clean slate (the integrationDB
|
||||
// helper only wipes pending_uploads).
|
||||
wsID := uuid.New()
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`INSERT INTO workspaces (id, name) VALUES ($1, 'test-149-rollback')`, wsID,
|
||||
); err != nil {
|
||||
t.Fatalf("seed workspace: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
// CASCADE on workspaces FK deletes the activity_logs rows; explicit
|
||||
// DELETE on activity_logs catches any rows that somehow leaked.
|
||||
_, _ = conn.ExecContext(context.Background(), `DELETE FROM activity_logs WHERE workspace_id = $1`, wsID)
|
||||
_, _ = conn.ExecContext(context.Background(), `DELETE FROM workspaces WHERE id = $1`, wsID)
|
||||
})
|
||||
|
||||
store := pendinguploads.NewPostgres(conn)
|
||||
|
||||
// Mirror uploadPollMode's Tx shape: BeginTx → PutBatchTx → N ×
|
||||
// LogActivityTx → Commit (or Rollback on failure).
|
||||
tx, err := conn.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("BeginTx: %v", err)
|
||||
}
|
||||
|
||||
items := []pendinguploads.PutItem{
|
||||
{Content: []byte("first"), Filename: "a.txt", Mimetype: "text/plain"},
|
||||
{Content: []byte("second"), Filename: "b.txt", Mimetype: "text/plain"},
|
||||
}
|
||||
fileIDs, err := store.PutBatchTx(ctx, tx, wsID, items)
|
||||
if err != nil {
|
||||
t.Fatalf("PutBatchTx: %v", err)
|
||||
}
|
||||
if len(fileIDs) != 2 {
|
||||
t.Fatalf("len(fileIDs) = %d, want 2", len(fileIDs))
|
||||
}
|
||||
|
||||
// First activity insert succeeds — would commit if not for the
|
||||
// rollback that the second insert's failure forces.
|
||||
wsIDStr := wsID.String()
|
||||
method := "chat_upload_receive"
|
||||
okSummary := "chat_upload_receive: a.txt"
|
||||
if _, err := LogActivityTx(ctx, tx, nil, ActivityParams{
|
||||
WorkspaceID: wsIDStr,
|
||||
ActivityType: "a2a_receive",
|
||||
TargetID: &wsIDStr,
|
||||
Method: &method,
|
||||
Summary: &okSummary,
|
||||
Status: "ok",
|
||||
}); err != nil {
|
||||
t.Fatalf("first LogActivityTx (should succeed): %v", err)
|
||||
}
|
||||
|
||||
// Second activity insert: NUL byte in summary triggers lib/pq
|
||||
// "invalid byte sequence for encoding UTF8: 0x00" — the canonical
|
||||
// "DB-side error after some Tx work has already happened" we want.
|
||||
badSummary := "chat_upload_receive: b\x00.txt"
|
||||
_, err = LogActivityTx(ctx, tx, nil, ActivityParams{
|
||||
WorkspaceID: wsIDStr,
|
||||
ActivityType: "a2a_receive",
|
||||
TargetID: &wsIDStr,
|
||||
Method: &method,
|
||||
Summary: &badSummary,
|
||||
Status: "ok",
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("expected error from NUL-byte summary, got nil")
|
||||
}
|
||||
|
||||
// Caller (uploadPollMode in production) rolls back on the error.
|
||||
if rbErr := tx.Rollback(); rbErr != nil {
|
||||
t.Fatalf("Rollback: %v", rbErr)
|
||||
}
|
||||
|
||||
// THE assertion this test exists for: BOTH tables must have zero
|
||||
// rows for this workspace. Pre-#149 the activity_logs row from the
|
||||
// first insert would persist (separate fire-and-forget INSERT) and
|
||||
// pending_uploads would also persist (committed by PutBatch's own
|
||||
// Tx). Post-#149 the shared Tx + Rollback unwinds both.
|
||||
var puCount, alCount int
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT COUNT(*) FROM pending_uploads WHERE workspace_id = $1`, wsID,
|
||||
).Scan(&puCount); err != nil {
|
||||
t.Fatalf("count pending_uploads: %v", err)
|
||||
}
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT COUNT(*) FROM activity_logs WHERE workspace_id = $1`, wsID,
|
||||
).Scan(&alCount); err != nil {
|
||||
t.Fatalf("count activity_logs: %v", err)
|
||||
}
|
||||
if puCount != 0 {
|
||||
t.Errorf("pending_uploads leaked %d row(s) after Rollback — #149 regression", puCount)
|
||||
}
|
||||
if alCount != 0 {
|
||||
t.Errorf("activity_logs leaked %d row(s) after Rollback — #149 regression "+
|
||||
"(THIS is the scenario the ticket called out: pre-fix, the first activity row "+
|
||||
"committed in its own implicit Tx, leaving an orphan)", alCount)
|
||||
}
|
||||
}
|
||||
|
||||
// TestIntegration_PollUpload_HappyPath_AcrossBothTables is the positive
|
||||
// counterpart to the rollback test: when nothing fails, both tables
|
||||
// commit together and the row counts match.
|
||||
func TestIntegration_PollUpload_HappyPath_AcrossBothTables(t *testing.T) {
|
||||
conn := integrationDB_PendingUploads(t)
|
||||
ctx := context.Background()
|
||||
|
||||
wsID := uuid.New()
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`INSERT INTO workspaces (id, name) VALUES ($1, 'test-149-happy')`, wsID,
|
||||
); err != nil {
|
||||
t.Fatalf("seed workspace: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
_, _ = conn.ExecContext(context.Background(), `DELETE FROM activity_logs WHERE workspace_id = $1`, wsID)
|
||||
_, _ = conn.ExecContext(context.Background(), `DELETE FROM workspaces WHERE id = $1`, wsID)
|
||||
})
|
||||
|
||||
store := pendinguploads.NewPostgres(conn)
|
||||
tx, err := conn.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("BeginTx: %v", err)
|
||||
}
|
||||
|
||||
items := []pendinguploads.PutItem{
|
||||
{Content: []byte("a"), Filename: "a.txt", Mimetype: "text/plain"},
|
||||
{Content: []byte("b"), Filename: "b.txt", Mimetype: "text/plain"},
|
||||
{Content: []byte("c"), Filename: "c.txt", Mimetype: "text/plain"},
|
||||
}
|
||||
if _, err := store.PutBatchTx(ctx, tx, wsID, items); err != nil {
|
||||
t.Fatalf("PutBatchTx: %v", err)
|
||||
}
|
||||
wsIDStr := wsID.String()
|
||||
method := "chat_upload_receive"
|
||||
for _, it := range items {
|
||||
summary := "chat_upload_receive: " + it.Filename
|
||||
if _, err := LogActivityTx(ctx, tx, nil, ActivityParams{
|
||||
WorkspaceID: wsIDStr,
|
||||
ActivityType: "a2a_receive",
|
||||
TargetID: &wsIDStr,
|
||||
Method: &method,
|
||||
Summary: &summary,
|
||||
Status: "ok",
|
||||
}); err != nil {
|
||||
t.Fatalf("LogActivityTx %q: %v", it.Filename, err)
|
||||
}
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
t.Fatalf("Commit: %v", err)
|
||||
}
|
||||
|
||||
var puCount, alCount int
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT COUNT(*) FROM pending_uploads WHERE workspace_id = $1`, wsID,
|
||||
).Scan(&puCount); err != nil {
|
||||
t.Fatalf("count pending_uploads: %v", err)
|
||||
}
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT COUNT(*) FROM activity_logs WHERE workspace_id = $1`, wsID,
|
||||
).Scan(&alCount); err != nil {
|
||||
t.Fatalf("count activity_logs: %v", err)
|
||||
}
|
||||
if puCount != 3 {
|
||||
t.Errorf("pending_uploads count = %d, want 3", puCount)
|
||||
}
|
||||
if alCount != 3 {
|
||||
t.Errorf("activity_logs count = %d, want 3", alCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntegration_PendingUploads_GetIgnoresExpiredAndAcked(t *testing.T) {
|
||||
conn := integrationDB_PendingUploads(t)
|
||||
store := pendinguploads.NewPostgres(conn)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user