diff --git a/workspace-server/internal/handlers/pending_uploads_integration_test.go b/workspace-server/internal/handlers/pending_uploads_integration_test.go index 61c64f86..f54f8df0 100644 --- a/workspace-server/internal/handlers/pending_uploads_integration_test.go +++ b/workspace-server/internal/handlers/pending_uploads_integration_test.go @@ -451,6 +451,201 @@ func TestIntegration_PendingUploads_AckedIndexExists(t *testing.T) { } } +// TestIntegration_PollUpload_AtomicRollback_AcrossBothTables proves the +// #149 cross-table contract at the database layer: when PutBatchTx and +// LogActivityTx run in the same caller-owned Tx and an activity INSERT +// fails after some rows have already been INSERTed, Rollback unwinds +// BOTH tables, leaving zero rows. +// +// Coverage map (#149): +// - chat_files_poll_test.go's TestPollUpload_AtomicRollbackOnActivityInsertFailure +// uses sqlmock to prove the Go handler issues Begin / N inserts / +// Rollback in the right order (no Commit on failure path). +// - This integration test proves the helpers + real Postgres compose +// correctly: rollback after a mid-Tx activity insert failure +// actually reverts BOTH the prior activity row AND the +// pending_uploads rows from PutBatchTx. +// - The pre-existing TestIntegration_PendingUploads_PutBatch_AtomicRollback +// covers the pending_uploads-only case. +// +// Failure injection: a NUL byte in `summary` (TEXT column) — lib/pq +// rejects it at the protocol layer. Same trick the existing PutBatch +// AtomicRollback test uses for the pending_uploads INSERT. +func TestIntegration_PollUpload_AtomicRollback_AcrossBothTables(t *testing.T) { + conn := integrationDB_PendingUploads(t) + ctx := context.Background() + + // activity_logs has a FK to workspaces(id) — seed a real row so + // non-failing inserts succeed. Wipe activity_logs + this workspaces + // row at end so the next test sees a clean slate (the integrationDB + // helper only wipes pending_uploads). + wsID := uuid.New() + if _, err := conn.ExecContext(ctx, + `INSERT INTO workspaces (id, name) VALUES ($1, 'test-149-rollback')`, wsID, + ); err != nil { + t.Fatalf("seed workspace: %v", err) + } + t.Cleanup(func() { + // CASCADE on workspaces FK deletes the activity_logs rows; explicit + // DELETE on activity_logs catches any rows that somehow leaked. + _, _ = conn.ExecContext(context.Background(), `DELETE FROM activity_logs WHERE workspace_id = $1`, wsID) + _, _ = conn.ExecContext(context.Background(), `DELETE FROM workspaces WHERE id = $1`, wsID) + }) + + store := pendinguploads.NewPostgres(conn) + + // Mirror uploadPollMode's Tx shape: BeginTx → PutBatchTx → N × + // LogActivityTx → Commit (or Rollback on failure). + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + t.Fatalf("BeginTx: %v", err) + } + + items := []pendinguploads.PutItem{ + {Content: []byte("first"), Filename: "a.txt", Mimetype: "text/plain"}, + {Content: []byte("second"), Filename: "b.txt", Mimetype: "text/plain"}, + } + fileIDs, err := store.PutBatchTx(ctx, tx, wsID, items) + if err != nil { + t.Fatalf("PutBatchTx: %v", err) + } + if len(fileIDs) != 2 { + t.Fatalf("len(fileIDs) = %d, want 2", len(fileIDs)) + } + + // First activity insert succeeds — would commit if not for the + // rollback that the second insert's failure forces. + wsIDStr := wsID.String() + method := "chat_upload_receive" + okSummary := "chat_upload_receive: a.txt" + if _, err := LogActivityTx(ctx, tx, nil, ActivityParams{ + WorkspaceID: wsIDStr, + ActivityType: "a2a_receive", + TargetID: &wsIDStr, + Method: &method, + Summary: &okSummary, + Status: "ok", + }); err != nil { + t.Fatalf("first LogActivityTx (should succeed): %v", err) + } + + // Second activity insert: NUL byte in summary triggers lib/pq + // "invalid byte sequence for encoding UTF8: 0x00" — the canonical + // "DB-side error after some Tx work has already happened" we want. + badSummary := "chat_upload_receive: b\x00.txt" + _, err = LogActivityTx(ctx, tx, nil, ActivityParams{ + WorkspaceID: wsIDStr, + ActivityType: "a2a_receive", + TargetID: &wsIDStr, + Method: &method, + Summary: &badSummary, + Status: "ok", + }) + if err == nil { + t.Fatal("expected error from NUL-byte summary, got nil") + } + + // Caller (uploadPollMode in production) rolls back on the error. + if rbErr := tx.Rollback(); rbErr != nil { + t.Fatalf("Rollback: %v", rbErr) + } + + // THE assertion this test exists for: BOTH tables must have zero + // rows for this workspace. Pre-#149 the activity_logs row from the + // first insert would persist (separate fire-and-forget INSERT) and + // pending_uploads would also persist (committed by PutBatch's own + // Tx). Post-#149 the shared Tx + Rollback unwinds both. + var puCount, alCount int + if err := conn.QueryRowContext(ctx, + `SELECT COUNT(*) FROM pending_uploads WHERE workspace_id = $1`, wsID, + ).Scan(&puCount); err != nil { + t.Fatalf("count pending_uploads: %v", err) + } + if err := conn.QueryRowContext(ctx, + `SELECT COUNT(*) FROM activity_logs WHERE workspace_id = $1`, wsID, + ).Scan(&alCount); err != nil { + t.Fatalf("count activity_logs: %v", err) + } + if puCount != 0 { + t.Errorf("pending_uploads leaked %d row(s) after Rollback — #149 regression", puCount) + } + if alCount != 0 { + t.Errorf("activity_logs leaked %d row(s) after Rollback — #149 regression "+ + "(THIS is the scenario the ticket called out: pre-fix, the first activity row "+ + "committed in its own implicit Tx, leaving an orphan)", alCount) + } +} + +// TestIntegration_PollUpload_HappyPath_AcrossBothTables is the positive +// counterpart to the rollback test: when nothing fails, both tables +// commit together and the row counts match. +func TestIntegration_PollUpload_HappyPath_AcrossBothTables(t *testing.T) { + conn := integrationDB_PendingUploads(t) + ctx := context.Background() + + wsID := uuid.New() + if _, err := conn.ExecContext(ctx, + `INSERT INTO workspaces (id, name) VALUES ($1, 'test-149-happy')`, wsID, + ); err != nil { + t.Fatalf("seed workspace: %v", err) + } + t.Cleanup(func() { + _, _ = conn.ExecContext(context.Background(), `DELETE FROM activity_logs WHERE workspace_id = $1`, wsID) + _, _ = conn.ExecContext(context.Background(), `DELETE FROM workspaces WHERE id = $1`, wsID) + }) + + store := pendinguploads.NewPostgres(conn) + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + t.Fatalf("BeginTx: %v", err) + } + + items := []pendinguploads.PutItem{ + {Content: []byte("a"), Filename: "a.txt", Mimetype: "text/plain"}, + {Content: []byte("b"), Filename: "b.txt", Mimetype: "text/plain"}, + {Content: []byte("c"), Filename: "c.txt", Mimetype: "text/plain"}, + } + if _, err := store.PutBatchTx(ctx, tx, wsID, items); err != nil { + t.Fatalf("PutBatchTx: %v", err) + } + wsIDStr := wsID.String() + method := "chat_upload_receive" + for _, it := range items { + summary := "chat_upload_receive: " + it.Filename + if _, err := LogActivityTx(ctx, tx, nil, ActivityParams{ + WorkspaceID: wsIDStr, + ActivityType: "a2a_receive", + TargetID: &wsIDStr, + Method: &method, + Summary: &summary, + Status: "ok", + }); err != nil { + t.Fatalf("LogActivityTx %q: %v", it.Filename, err) + } + } + if err := tx.Commit(); err != nil { + t.Fatalf("Commit: %v", err) + } + + var puCount, alCount int + if err := conn.QueryRowContext(ctx, + `SELECT COUNT(*) FROM pending_uploads WHERE workspace_id = $1`, wsID, + ).Scan(&puCount); err != nil { + t.Fatalf("count pending_uploads: %v", err) + } + if err := conn.QueryRowContext(ctx, + `SELECT COUNT(*) FROM activity_logs WHERE workspace_id = $1`, wsID, + ).Scan(&alCount); err != nil { + t.Fatalf("count activity_logs: %v", err) + } + if puCount != 3 { + t.Errorf("pending_uploads count = %d, want 3", puCount) + } + if alCount != 3 { + t.Errorf("activity_logs count = %d, want 3", alCount) + } +} + func TestIntegration_PendingUploads_GetIgnoresExpiredAndAcked(t *testing.T) { conn := integrationDB_PendingUploads(t) store := pendinguploads.NewPostgres(conn)