fix: keep pending uploads readable after ack #1849
@@ -61,8 +61,12 @@ func NewPendingUploadsHandler(storage pendinguploads.Storage) *PendingUploadsHan
|
||||
// - file_id not found
|
||||
// - file_id belongs to a different workspace (cross-workspace bleed
|
||||
// protection)
|
||||
// - row already acked (workspace's bug — should not re-fetch after ack)
|
||||
// - row past expires_at (Phase 3 sweep would delete shortly anyway)
|
||||
//
|
||||
// Acked rows are intentionally still readable until the sweeper's
|
||||
// ack-retention window elapses. Canvas chat history persists
|
||||
// platform-pending: URIs; after a poll-mode workspace acks the handoff,
|
||||
// a browser refresh still needs to preview/download the attachment.
|
||||
func (h *PendingUploadsHandler) GetContent(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
if err := validateWorkspaceID(workspaceID); err != nil {
|
||||
@@ -78,7 +82,7 @@ func (h *PendingUploadsHandler) GetContent(c *gin.Context) {
|
||||
|
||||
rec, err := h.storage.Get(c.Request.Context(), fileID)
|
||||
if errors.Is(err, pendinguploads.ErrNotFound) {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found, expired, or already acked"})
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found or expired"})
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
@@ -181,4 +185,3 @@ func (h *PendingUploadsHandler) Ack(c *gin.Context) {
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{"acked": true})
|
||||
}
|
||||
|
||||
|
||||
@@ -320,20 +320,18 @@ func putBatchInsertRows(ctx context.Context, tx *sql.Tx, workspaceID uuid.UUID,
|
||||
}
|
||||
|
||||
func (p *PostgresStorage) Get(ctx context.Context, fileID uuid.UUID) (Record, error) {
|
||||
// The expires_at + acked_at filter in the WHERE clause means a
|
||||
// caller sees ErrNotFound for absent / acked / expired without
|
||||
// needing per-case branching. Trade-off: we can't differentiate
|
||||
// in metrics, but the workspace's response is the same in all
|
||||
// three cases ("file gone, give up") so the granularity isn't
|
||||
// useful at this layer. Phase 3 dashboards aggregate row-state
|
||||
// counts directly off the table.
|
||||
// The expires_at filter keeps hard-TTL semantics while allowing
|
||||
// acked rows to remain readable during the ack-retention window.
|
||||
// Canvas chat history stores platform-pending: URIs; after the
|
||||
// poll-mode workspace acks the upload, refreshed browser previews
|
||||
// still need to fetch the same bytes until the sweeper reclaims
|
||||
// the acked row.
|
||||
var r Record
|
||||
err := p.db.QueryRowContext(ctx, `
|
||||
SELECT file_id, workspace_id, content, filename, mimetype,
|
||||
size_bytes, created_at, fetched_at, acked_at, expires_at
|
||||
FROM pending_uploads
|
||||
WHERE file_id = $1
|
||||
AND acked_at IS NULL
|
||||
AND expires_at > now()
|
||||
`, fileID).Scan(
|
||||
&r.FileID, &r.WorkspaceID, &r.Content, &r.Filename, &r.Mimetype,
|
||||
@@ -349,15 +347,14 @@ func (p *PostgresStorage) Get(ctx context.Context, fileID uuid.UUID) (Record, er
|
||||
}
|
||||
|
||||
func (p *PostgresStorage) MarkFetched(ctx context.Context, fileID uuid.UUID) error {
|
||||
// UPDATE on the same gating predicate as Get — keeps the "absent
|
||||
// or acked or expired = ErrNotFound" contract symmetric. Without
|
||||
// the predicate a workspace could re-stamp fetched_at on an acked
|
||||
// row, which would mislead Phase 3's stuck-fetch dashboard.
|
||||
// UPDATE on the same expiry predicate as Get. This may re-stamp
|
||||
// fetched_at on an acked row when the canvas previews an attachment
|
||||
// after refresh, which is fine: acked_at remains the delivery-time
|
||||
// signal and the sweeper still deletes by acked_at retention.
|
||||
res, err := p.db.ExecContext(ctx, `
|
||||
UPDATE pending_uploads
|
||||
SET fetched_at = now()
|
||||
WHERE file_id = $1
|
||||
AND acked_at IS NULL
|
||||
AND expires_at > now()
|
||||
`, fileID)
|
||||
if err != nil {
|
||||
|
||||
@@ -50,14 +50,12 @@ const (
|
||||
size_bytes, created_at, fetched_at, acked_at, expires_at
|
||||
FROM pending_uploads
|
||||
WHERE file_id = $1
|
||||
AND acked_at IS NULL
|
||||
AND expires_at > now()
|
||||
`
|
||||
markFetchedSQL = `
|
||||
UPDATE pending_uploads
|
||||
SET fetched_at = now()
|
||||
WHERE file_id = $1
|
||||
AND acked_at IS NULL
|
||||
AND expires_at > now()
|
||||
`
|
||||
ackSQL = `
|
||||
@@ -203,6 +201,36 @@ func TestGet_HappyPath_ReturnsFullRow(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGet_AckedRowWithinRetentionStillReturnsFullRow(t *testing.T) {
|
||||
db, mock := newMockDB(t)
|
||||
store := pendinguploads.NewPostgres(db)
|
||||
|
||||
fid := uuid.New()
|
||||
wsID := uuid.New()
|
||||
now := time.Now().UTC()
|
||||
ackedAt := now.Add(-5 * time.Minute)
|
||||
mock.ExpectQuery(selectSQL).
|
||||
WithArgs(fid).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"file_id", "workspace_id", "content", "filename", "mimetype",
|
||||
"size_bytes", "created_at", "fetched_at", "acked_at", "expires_at",
|
||||
}).AddRow(
|
||||
fid, wsID, []byte("data"), "x.bin", "application/octet-stream",
|
||||
int64(4), now, now, ackedAt, now.Add(24*time.Hour),
|
||||
))
|
||||
|
||||
r, err := store.Get(context.Background(), fid)
|
||||
if err != nil {
|
||||
t.Fatalf("Get acked row: %v", err)
|
||||
}
|
||||
if r.AckedAt == nil || !r.AckedAt.Equal(ackedAt) {
|
||||
t.Fatalf("acked_at not preserved: %+v", r.AckedAt)
|
||||
}
|
||||
if string(r.Content) != "data" {
|
||||
t.Errorf("content mismatch: %q", string(r.Content))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGet_AbsentRow_ReturnsErrNotFound(t *testing.T) {
|
||||
db, mock := newMockDB(t)
|
||||
store := pendinguploads.NewPostgres(db)
|
||||
@@ -247,7 +275,7 @@ func TestMarkFetched_HappyPath(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarkFetched_AbsentOrAckedOrExpired_ReturnsErrNotFound(t *testing.T) {
|
||||
func TestMarkFetched_AbsentOrExpired_ReturnsErrNotFound(t *testing.T) {
|
||||
db, mock := newMockDB(t)
|
||||
store := pendinguploads.NewPostgres(db)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user