fix: keep pending uploads readable after ack #1849

Merged
hongming merged 1 commits from fix/pending-upload-preview-after-ack into main 2026-05-25 14:43:41 +00:00
3 changed files with 47 additions and 19 deletions
@@ -61,8 +61,12 @@ func NewPendingUploadsHandler(storage pendinguploads.Storage) *PendingUploadsHan
// - file_id not found
// - file_id belongs to a different workspace (cross-workspace bleed
// protection)
// - row already acked (workspace's bug — should not re-fetch after ack)
// - row past expires_at (Phase 3 sweep would delete shortly anyway)
//
// Acked rows are intentionally still readable until the sweeper's
// ack-retention window elapses. Canvas chat history persists
// platform-pending: URIs; after a poll-mode workspace acks the handoff,
// a browser refresh still needs to preview/download the attachment.
func (h *PendingUploadsHandler) GetContent(c *gin.Context) {
workspaceID := c.Param("id")
if err := validateWorkspaceID(workspaceID); err != nil {
@@ -78,7 +82,7 @@ func (h *PendingUploadsHandler) GetContent(c *gin.Context) {
rec, err := h.storage.Get(c.Request.Context(), fileID)
if errors.Is(err, pendinguploads.ErrNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found, expired, or already acked"})
c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found or expired"})
return
}
if err != nil {
@@ -181,4 +185,3 @@ func (h *PendingUploadsHandler) Ack(c *gin.Context) {
}
c.JSON(http.StatusOK, gin.H{"acked": true})
}
@@ -320,20 +320,18 @@ func putBatchInsertRows(ctx context.Context, tx *sql.Tx, workspaceID uuid.UUID,
}
func (p *PostgresStorage) Get(ctx context.Context, fileID uuid.UUID) (Record, error) {
// The expires_at + acked_at filter in the WHERE clause means a
// caller sees ErrNotFound for absent / acked / expired without
// needing per-case branching. Trade-off: we can't differentiate
// in metrics, but the workspace's response is the same in all
// three cases ("file gone, give up") so the granularity isn't
// useful at this layer. Phase 3 dashboards aggregate row-state
// counts directly off the table.
// The expires_at filter keeps hard-TTL semantics while allowing
// acked rows to remain readable during the ack-retention window.
// Canvas chat history stores platform-pending: URIs; after the
// poll-mode workspace acks the upload, refreshed browser previews
// still need to fetch the same bytes until the sweeper reclaims
// the acked row.
var r Record
err := p.db.QueryRowContext(ctx, `
SELECT file_id, workspace_id, content, filename, mimetype,
size_bytes, created_at, fetched_at, acked_at, expires_at
FROM pending_uploads
WHERE file_id = $1
AND acked_at IS NULL
AND expires_at > now()
`, fileID).Scan(
&r.FileID, &r.WorkspaceID, &r.Content, &r.Filename, &r.Mimetype,
@@ -349,15 +347,14 @@ func (p *PostgresStorage) Get(ctx context.Context, fileID uuid.UUID) (Record, er
}
func (p *PostgresStorage) MarkFetched(ctx context.Context, fileID uuid.UUID) error {
// UPDATE on the same gating predicate as Get — keeps the "absent
// or acked or expired = ErrNotFound" contract symmetric. Without
// the predicate a workspace could re-stamp fetched_at on an acked
// row, which would mislead Phase 3's stuck-fetch dashboard.
// UPDATE on the same expiry predicate as Get. This may re-stamp
// fetched_at on an acked row when the canvas previews an attachment
// after refresh, which is fine: acked_at remains the delivery-time
// signal and the sweeper still deletes by acked_at retention.
res, err := p.db.ExecContext(ctx, `
UPDATE pending_uploads
SET fetched_at = now()
WHERE file_id = $1
AND acked_at IS NULL
AND expires_at > now()
`, fileID)
if err != nil {
@@ -50,14 +50,12 @@ const (
size_bytes, created_at, fetched_at, acked_at, expires_at
FROM pending_uploads
WHERE file_id = $1
AND acked_at IS NULL
AND expires_at > now()
`
markFetchedSQL = `
UPDATE pending_uploads
SET fetched_at = now()
WHERE file_id = $1
AND acked_at IS NULL
AND expires_at > now()
`
ackSQL = `
@@ -203,6 +201,36 @@ func TestGet_HappyPath_ReturnsFullRow(t *testing.T) {
}
}
func TestGet_AckedRowWithinRetentionStillReturnsFullRow(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
fid := uuid.New()
wsID := uuid.New()
now := time.Now().UTC()
ackedAt := now.Add(-5 * time.Minute)
mock.ExpectQuery(selectSQL).
WithArgs(fid).
WillReturnRows(sqlmock.NewRows([]string{
"file_id", "workspace_id", "content", "filename", "mimetype",
"size_bytes", "created_at", "fetched_at", "acked_at", "expires_at",
}).AddRow(
fid, wsID, []byte("data"), "x.bin", "application/octet-stream",
int64(4), now, now, ackedAt, now.Add(24*time.Hour),
))
r, err := store.Get(context.Background(), fid)
if err != nil {
t.Fatalf("Get acked row: %v", err)
}
if r.AckedAt == nil || !r.AckedAt.Equal(ackedAt) {
t.Fatalf("acked_at not preserved: %+v", r.AckedAt)
}
if string(r.Content) != "data" {
t.Errorf("content mismatch: %q", string(r.Content))
}
}
func TestGet_AbsentRow_ReturnsErrNotFound(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
@@ -247,7 +275,7 @@ func TestMarkFetched_HappyPath(t *testing.T) {
}
}
func TestMarkFetched_AbsentOrAckedOrExpired_ReturnsErrNotFound(t *testing.T) {
func TestMarkFetched_AbsentOrExpired_ReturnsErrNotFound(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)