Merge pull request #3014 from Molecule-AI/test/cross-table-atomicity-integ-149-followup

test(chat-uploads): integration test for cross-table atomicity (#149 follow-up)
This commit is contained in:
Hongming Wang 2026-05-06 05:05:49 +00:00 committed by GitHub
commit c53155ec5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 217 additions and 11 deletions

View File

@ -121,8 +121,16 @@ jobs:
# Per-migration result is logged so a failed migration that
# SHOULD have been replayable surfaces in the CI log instead
# of silently failing.
# Apply both *.sql (legacy, lives next to its module) and
# *.up.sql (newer up/down convention) in a single
# lexicographically-sorted pass. Excluding *.down.sql so the
# newest-naming-convention pairs don't undo themselves mid-run.
# Pre-#149-followup this loop only globbed *.up.sql, which
# silently skipped 001_workspaces.sql + 009_activity_logs.sql
# — fine while no integration test depended on those tables,
# not fine once a cross-table atomicity test came in.
set +e
for migration in migrations/*.up.sql; do
for migration in $(ls migrations/*.sql 2>/dev/null | grep -v '\.down\.sql$' | sort); do
if psql -h localhost -U postgres -d molecule -v ON_ERROR_STOP=1 \
-f "$migration" >/dev/null 2>&1; then
echo "✓ $(basename "$migration")"
@ -132,16 +140,19 @@ jobs:
done
set -e
# Sanity: the delegations table MUST exist for the integration
# tests to be meaningful. Hard-fail if 049 didn't land — that
# would be a real regression we want loud.
if ! psql -h localhost -U postgres -d molecule -tA \
-c "SELECT 1 FROM information_schema.tables WHERE table_name = 'delegations'" \
| grep -q 1; then
echo "::error::delegations table missing after migration replay — handler integration tests would be meaningless"
exit 1
fi
echo "✓ delegations table present"
# Sanity: the delegations + workspaces + activity_logs tables
# MUST exist for the integration tests to be meaningful. Hard-
# fail if any didn't land — that would be a real regression we
# want loud.
for tbl in delegations workspaces activity_logs pending_uploads; do
if ! psql -h localhost -U postgres -d molecule -tA \
-c "SELECT 1 FROM information_schema.tables WHERE table_name = '$tbl'" \
| grep -q 1; then
echo "::error::$tbl table missing after migration replay — handler integration tests would be meaningless"
exit 1
fi
echo "✓ $tbl table present"
done
- if: needs.detect-changes.outputs.handlers == 'true'
name: Run integration tests

View File

@ -451,6 +451,201 @@ func TestIntegration_PendingUploads_AckedIndexExists(t *testing.T) {
}
}
// TestIntegration_PollUpload_AtomicRollback_AcrossBothTables proves the
// #149 cross-table contract at the database layer: when PutBatchTx and
// LogActivityTx run in the same caller-owned Tx and an activity INSERT
// fails after some rows have already been INSERTed, Rollback unwinds
// BOTH tables, leaving zero rows.
//
// Coverage map (#149):
// - chat_files_poll_test.go's TestPollUpload_AtomicRollbackOnActivityInsertFailure
// uses sqlmock to prove the Go handler issues Begin / N inserts /
// Rollback in the right order (no Commit on failure path).
// - This integration test proves the helpers + real Postgres compose
// correctly: rollback after a mid-Tx activity insert failure
// actually reverts BOTH the prior activity row AND the
// pending_uploads rows from PutBatchTx.
// - The pre-existing TestIntegration_PendingUploads_PutBatch_AtomicRollback
// covers the pending_uploads-only case.
//
// Failure injection: a NUL byte in `summary` (TEXT column) — lib/pq
// rejects it at the protocol layer. Same trick the existing PutBatch
// AtomicRollback test uses for the pending_uploads INSERT.
func TestIntegration_PollUpload_AtomicRollback_AcrossBothTables(t *testing.T) {
conn := integrationDB_PendingUploads(t)
ctx := context.Background()
// activity_logs has a FK to workspaces(id) — seed a real row so
// non-failing inserts succeed. Wipe activity_logs + this workspaces
// row at end so the next test sees a clean slate (the integrationDB
// helper only wipes pending_uploads).
wsID := uuid.New()
if _, err := conn.ExecContext(ctx,
`INSERT INTO workspaces (id, name) VALUES ($1, 'test-149-rollback')`, wsID,
); err != nil {
t.Fatalf("seed workspace: %v", err)
}
t.Cleanup(func() {
// CASCADE on workspaces FK deletes the activity_logs rows; explicit
// DELETE on activity_logs catches any rows that somehow leaked.
_, _ = conn.ExecContext(context.Background(), `DELETE FROM activity_logs WHERE workspace_id = $1`, wsID)
_, _ = conn.ExecContext(context.Background(), `DELETE FROM workspaces WHERE id = $1`, wsID)
})
store := pendinguploads.NewPostgres(conn)
// Mirror uploadPollMode's Tx shape: BeginTx → PutBatchTx → N ×
// LogActivityTx → Commit (or Rollback on failure).
tx, err := conn.BeginTx(ctx, nil)
if err != nil {
t.Fatalf("BeginTx: %v", err)
}
items := []pendinguploads.PutItem{
{Content: []byte("first"), Filename: "a.txt", Mimetype: "text/plain"},
{Content: []byte("second"), Filename: "b.txt", Mimetype: "text/plain"},
}
fileIDs, err := store.PutBatchTx(ctx, tx, wsID, items)
if err != nil {
t.Fatalf("PutBatchTx: %v", err)
}
if len(fileIDs) != 2 {
t.Fatalf("len(fileIDs) = %d, want 2", len(fileIDs))
}
// First activity insert succeeds — would commit if not for the
// rollback that the second insert's failure forces.
wsIDStr := wsID.String()
method := "chat_upload_receive"
okSummary := "chat_upload_receive: a.txt"
if _, err := LogActivityTx(ctx, tx, nil, ActivityParams{
WorkspaceID: wsIDStr,
ActivityType: "a2a_receive",
TargetID: &wsIDStr,
Method: &method,
Summary: &okSummary,
Status: "ok",
}); err != nil {
t.Fatalf("first LogActivityTx (should succeed): %v", err)
}
// Second activity insert: NUL byte in summary triggers lib/pq
// "invalid byte sequence for encoding UTF8: 0x00" — the canonical
// "DB-side error after some Tx work has already happened" we want.
badSummary := "chat_upload_receive: b\x00.txt"
_, err = LogActivityTx(ctx, tx, nil, ActivityParams{
WorkspaceID: wsIDStr,
ActivityType: "a2a_receive",
TargetID: &wsIDStr,
Method: &method,
Summary: &badSummary,
Status: "ok",
})
if err == nil {
t.Fatal("expected error from NUL-byte summary, got nil")
}
// Caller (uploadPollMode in production) rolls back on the error.
if rbErr := tx.Rollback(); rbErr != nil {
t.Fatalf("Rollback: %v", rbErr)
}
// THE assertion this test exists for: BOTH tables must have zero
// rows for this workspace. Pre-#149 the activity_logs row from the
// first insert would persist (separate fire-and-forget INSERT) and
// pending_uploads would also persist (committed by PutBatch's own
// Tx). Post-#149 the shared Tx + Rollback unwinds both.
var puCount, alCount int
if err := conn.QueryRowContext(ctx,
`SELECT COUNT(*) FROM pending_uploads WHERE workspace_id = $1`, wsID,
).Scan(&puCount); err != nil {
t.Fatalf("count pending_uploads: %v", err)
}
if err := conn.QueryRowContext(ctx,
`SELECT COUNT(*) FROM activity_logs WHERE workspace_id = $1`, wsID,
).Scan(&alCount); err != nil {
t.Fatalf("count activity_logs: %v", err)
}
if puCount != 0 {
t.Errorf("pending_uploads leaked %d row(s) after Rollback — #149 regression", puCount)
}
if alCount != 0 {
t.Errorf("activity_logs leaked %d row(s) after Rollback — #149 regression "+
"(THIS is the scenario the ticket called out: pre-fix, the first activity row "+
"committed in its own implicit Tx, leaving an orphan)", alCount)
}
}
// TestIntegration_PollUpload_HappyPath_AcrossBothTables is the positive
// counterpart to the rollback test: when nothing fails, both tables
// commit together and the row counts match.
func TestIntegration_PollUpload_HappyPath_AcrossBothTables(t *testing.T) {
conn := integrationDB_PendingUploads(t)
ctx := context.Background()
wsID := uuid.New()
if _, err := conn.ExecContext(ctx,
`INSERT INTO workspaces (id, name) VALUES ($1, 'test-149-happy')`, wsID,
); err != nil {
t.Fatalf("seed workspace: %v", err)
}
t.Cleanup(func() {
_, _ = conn.ExecContext(context.Background(), `DELETE FROM activity_logs WHERE workspace_id = $1`, wsID)
_, _ = conn.ExecContext(context.Background(), `DELETE FROM workspaces WHERE id = $1`, wsID)
})
store := pendinguploads.NewPostgres(conn)
tx, err := conn.BeginTx(ctx, nil)
if err != nil {
t.Fatalf("BeginTx: %v", err)
}
items := []pendinguploads.PutItem{
{Content: []byte("a"), Filename: "a.txt", Mimetype: "text/plain"},
{Content: []byte("b"), Filename: "b.txt", Mimetype: "text/plain"},
{Content: []byte("c"), Filename: "c.txt", Mimetype: "text/plain"},
}
if _, err := store.PutBatchTx(ctx, tx, wsID, items); err != nil {
t.Fatalf("PutBatchTx: %v", err)
}
wsIDStr := wsID.String()
method := "chat_upload_receive"
for _, it := range items {
summary := "chat_upload_receive: " + it.Filename
if _, err := LogActivityTx(ctx, tx, nil, ActivityParams{
WorkspaceID: wsIDStr,
ActivityType: "a2a_receive",
TargetID: &wsIDStr,
Method: &method,
Summary: &summary,
Status: "ok",
}); err != nil {
t.Fatalf("LogActivityTx %q: %v", it.Filename, err)
}
}
if err := tx.Commit(); err != nil {
t.Fatalf("Commit: %v", err)
}
var puCount, alCount int
if err := conn.QueryRowContext(ctx,
`SELECT COUNT(*) FROM pending_uploads WHERE workspace_id = $1`, wsID,
).Scan(&puCount); err != nil {
t.Fatalf("count pending_uploads: %v", err)
}
if err := conn.QueryRowContext(ctx,
`SELECT COUNT(*) FROM activity_logs WHERE workspace_id = $1`, wsID,
).Scan(&alCount); err != nil {
t.Fatalf("count activity_logs: %v", err)
}
if puCount != 3 {
t.Errorf("pending_uploads count = %d, want 3", puCount)
}
if alCount != 3 {
t.Errorf("activity_logs count = %d, want 3", alCount)
}
}
func TestIntegration_PendingUploads_GetIgnoresExpiredAndAcked(t *testing.T) {
conn := integrationDB_PendingUploads(t)
store := pendinguploads.NewPostgres(conn)