diff --git a/workspace-server/internal/handlers/pending_uploads.go b/workspace-server/internal/handlers/pending_uploads.go index c7ac02513..a8bc06bca 100644 --- a/workspace-server/internal/handlers/pending_uploads.go +++ b/workspace-server/internal/handlers/pending_uploads.go @@ -61,8 +61,12 @@ func NewPendingUploadsHandler(storage pendinguploads.Storage) *PendingUploadsHan // - file_id not found // - file_id belongs to a different workspace (cross-workspace bleed // protection) -// - row already acked (workspace's bug — should not re-fetch after ack) // - row past expires_at (Phase 3 sweep would delete shortly anyway) +// +// Acked rows are intentionally still readable until the sweeper's +// ack-retention window elapses. Canvas chat history persists +// platform-pending: URIs; after a poll-mode workspace acks the handoff, +// a browser refresh still needs to preview/download the attachment. func (h *PendingUploadsHandler) GetContent(c *gin.Context) { workspaceID := c.Param("id") if err := validateWorkspaceID(workspaceID); err != nil { @@ -78,7 +82,7 @@ func (h *PendingUploadsHandler) GetContent(c *gin.Context) { rec, err := h.storage.Get(c.Request.Context(), fileID) if errors.Is(err, pendinguploads.ErrNotFound) { - c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found, expired, or already acked"}) + c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found or expired"}) return } if err != nil { @@ -181,4 +185,3 @@ func (h *PendingUploadsHandler) Ack(c *gin.Context) { } c.JSON(http.StatusOK, gin.H{"acked": true}) } - diff --git a/workspace-server/internal/pendinguploads/storage.go b/workspace-server/internal/pendinguploads/storage.go index ad38479a6..66ec18b88 100644 --- a/workspace-server/internal/pendinguploads/storage.go +++ b/workspace-server/internal/pendinguploads/storage.go @@ -320,20 +320,18 @@ func putBatchInsertRows(ctx context.Context, tx *sql.Tx, workspaceID uuid.UUID, } func (p *PostgresStorage) Get(ctx context.Context, fileID uuid.UUID) (Record, error) { - // The expires_at + acked_at filter in the WHERE clause means a - // caller sees ErrNotFound for absent / acked / expired without - // needing per-case branching. Trade-off: we can't differentiate - // in metrics, but the workspace's response is the same in all - // three cases ("file gone, give up") so the granularity isn't - // useful at this layer. Phase 3 dashboards aggregate row-state - // counts directly off the table. + // The expires_at filter keeps hard-TTL semantics while allowing + // acked rows to remain readable during the ack-retention window. + // Canvas chat history stores platform-pending: URIs; after the + // poll-mode workspace acks the upload, refreshed browser previews + // still need to fetch the same bytes until the sweeper reclaims + // the acked row. var r Record err := p.db.QueryRowContext(ctx, ` SELECT file_id, workspace_id, content, filename, mimetype, size_bytes, created_at, fetched_at, acked_at, expires_at FROM pending_uploads WHERE file_id = $1 - AND acked_at IS NULL AND expires_at > now() `, fileID).Scan( &r.FileID, &r.WorkspaceID, &r.Content, &r.Filename, &r.Mimetype, @@ -349,15 +347,14 @@ func (p *PostgresStorage) Get(ctx context.Context, fileID uuid.UUID) (Record, er } func (p *PostgresStorage) MarkFetched(ctx context.Context, fileID uuid.UUID) error { - // UPDATE on the same gating predicate as Get — keeps the "absent - // or acked or expired = ErrNotFound" contract symmetric. Without - // the predicate a workspace could re-stamp fetched_at on an acked - // row, which would mislead Phase 3's stuck-fetch dashboard. + // UPDATE on the same expiry predicate as Get. This may re-stamp + // fetched_at on an acked row when the canvas previews an attachment + // after refresh, which is fine: acked_at remains the delivery-time + // signal and the sweeper still deletes by acked_at retention. res, err := p.db.ExecContext(ctx, ` UPDATE pending_uploads SET fetched_at = now() WHERE file_id = $1 - AND acked_at IS NULL AND expires_at > now() `, fileID) if err != nil { diff --git a/workspace-server/internal/pendinguploads/storage_test.go b/workspace-server/internal/pendinguploads/storage_test.go index 60b5be90a..6924b1f09 100644 --- a/workspace-server/internal/pendinguploads/storage_test.go +++ b/workspace-server/internal/pendinguploads/storage_test.go @@ -50,14 +50,12 @@ const ( size_bytes, created_at, fetched_at, acked_at, expires_at FROM pending_uploads WHERE file_id = $1 - AND acked_at IS NULL AND expires_at > now() ` markFetchedSQL = ` UPDATE pending_uploads SET fetched_at = now() WHERE file_id = $1 - AND acked_at IS NULL AND expires_at > now() ` ackSQL = ` @@ -203,6 +201,36 @@ func TestGet_HappyPath_ReturnsFullRow(t *testing.T) { } } +func TestGet_AckedRowWithinRetentionStillReturnsFullRow(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + wsID := uuid.New() + now := time.Now().UTC() + ackedAt := now.Add(-5 * time.Minute) + mock.ExpectQuery(selectSQL). + WithArgs(fid). + WillReturnRows(sqlmock.NewRows([]string{ + "file_id", "workspace_id", "content", "filename", "mimetype", + "size_bytes", "created_at", "fetched_at", "acked_at", "expires_at", + }).AddRow( + fid, wsID, []byte("data"), "x.bin", "application/octet-stream", + int64(4), now, now, ackedAt, now.Add(24*time.Hour), + )) + + r, err := store.Get(context.Background(), fid) + if err != nil { + t.Fatalf("Get acked row: %v", err) + } + if r.AckedAt == nil || !r.AckedAt.Equal(ackedAt) { + t.Fatalf("acked_at not preserved: %+v", r.AckedAt) + } + if string(r.Content) != "data" { + t.Errorf("content mismatch: %q", string(r.Content)) + } +} + func TestGet_AbsentRow_ReturnsErrNotFound(t *testing.T) { db, mock := newMockDB(t) store := pendinguploads.NewPostgres(db) @@ -247,7 +275,7 @@ func TestMarkFetched_HappyPath(t *testing.T) { } } -func TestMarkFetched_AbsentOrAckedOrExpired_ReturnsErrNotFound(t *testing.T) { +func TestMarkFetched_AbsentOrExpired_ReturnsErrNotFound(t *testing.T) { db, mock := newMockDB(t) store := pendinguploads.NewPostgres(db)