From 86fdaad111b4bc2716743263c83f93048392b2e0 Mon Sep 17 00:00:00 2001 From: Hongming Wang Date: Tue, 5 May 2026 04:22:24 -0700 Subject: [PATCH] =?UTF-8?q?feat(rfc):=20poll-mode=20chat=20upload=20?= =?UTF-8?q?=E2=80=94=20phase=201=20platform=20staging=20layer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit External-runtime workspaces (registered via molecule connect, behind NAT, no public callback URL) currently see HTTP 422 "workspace has no callback URL" on every chat file upload. The only escape is to wrap the laptop in ngrok / Cloudflare tunnel + re-register push-mode — a tax that shouldn't exist for a one-line use case. This phase introduces the platform-side staging layer that lets canvas → external workspace uploads ride the same poll loop the inbox already uses for text messages. Architecture (mirrors inbox poll, SSOT principle): Canvas POST /chat/uploads (multipart) ↓ delivery_mode=poll Platform: chat_files.uploadPollMode ↓ pendinguploads.Storage.Put + LogActivity(chat_upload_receive) Workspace's existing inbox poller picks up the activity row (Phase 2) Workspace fetches: GET /workspaces/:id/pending-uploads/:fid/content Workspace acks: POST /workspaces/:id/pending-uploads/:fid/ack Pieces in this PR: * Migration 20260505100000 — pending_uploads table; partial indexes on unacked + expires_at for the workspace fetch + Phase 3 sweep hot paths. No FK to workspaces (audit retention), 24h hard TTL. * internal/pendinguploads — Storage interface + Postgres impl. Bytes inline (bytea) today; the interface lets a future PR replace with S3 (RFC #2789) by swapping one constructor. 100% test coverage on the Postgres impl via sqlmock-pinned SQL. * handlers.PendingUploadsHandler — GET /content + POST /ack endpoints. wsAuth-gated; cross-workspace bleed protection via per-row workspace_id check (token leak from A can't read B's pending bytes). Handler tests pin happy path + every 4xx/5xx mapping including cross-workspace + race-with-sweep. * chat_files.go — Upload poll-mode branch behind WithPendingUploads builder. Push-mode unchanged (regression-tested). Multipart parse + per-file sanitize + storage.Put + activity_logs row per file. * SanitizeFilename — Go mirror of workspace/internal_chat_uploads.py sanitize_filename. Tests pin parity case-by-case so canvas-emitted URIs stay identical regardless of which path handles the upload. * Comprehensive logging — every state transition (staged, fetch, ack, error) emits a structured log line with workspace_id + file_id + size + sanitized name. Phase 3 metrics will hook these. The pendinguploads.Storage wiring is opt-in (WithPendingUploads on ChatFilesHandler) so a binary deployed without the migration keeps the pre-existing 422 behavior — no boot-order coupling between code roll and schema roll. Phase 2 (separate PR): workspace inbox extension — inbox_uploads.py fetches via the GET endpoint, writes to /workspace/.molecule/chat- uploads/, acks, and rewrites the URI from platform-pending: → workspace: so the agent's existing send-attachments path needs no changes. Phase 3: GC sweep + dashboards. Phase 4: poll-mode E2E on staging. Tests: * 100% coverage on pendinguploads (sqlmock-pinned SQL drift gate). * Functional 100% on new handler code (uncovered branches are documented defensive duplicates: uuid re-parse, multipart Open error, Writer.Write fail — none reproducible in unit tests). * Push-mode + NULL delivery_mode regression tests pin no behavior change for existing workspaces. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/chat_files.go | 304 +++++++++- .../internal/handlers/chat_files_poll_test.go | 520 ++++++++++++++++++ .../internal/handlers/pending_uploads.go | 184 +++++++ .../internal/handlers/pending_uploads_test.go | 373 +++++++++++++ .../handlers/sanitize_filename_test.go | 103 ++++ .../internal/pendinguploads/storage.go | 253 +++++++++ .../internal/pendinguploads/storage_test.go | 400 ++++++++++++++ workspace-server/internal/router/router.go | 13 +- .../20260505100000_pending_uploads.down.sql | 11 + .../20260505100000_pending_uploads.up.sql | 103 ++++ 10 files changed, 2262 insertions(+), 2 deletions(-) create mode 100644 workspace-server/internal/handlers/chat_files_poll_test.go create mode 100644 workspace-server/internal/handlers/pending_uploads.go create mode 100644 workspace-server/internal/handlers/pending_uploads_test.go create mode 100644 workspace-server/internal/handlers/sanitize_filename_test.go create mode 100644 workspace-server/internal/pendinguploads/storage.go create mode 100644 workspace-server/internal/pendinguploads/storage_test.go create mode 100644 workspace-server/migrations/20260505100000_pending_uploads.down.sql create mode 100644 workspace-server/migrations/20260505100000_pending_uploads.up.sql diff --git a/workspace-server/internal/handlers/chat_files.go b/workspace-server/internal/handlers/chat_files.go index b58d2d28..aa2b8c05 100644 --- a/workspace-server/internal/handlers/chat_files.go +++ b/workspace-server/internal/handlers/chat_files.go @@ -31,23 +31,37 @@ package handlers import ( "context" "database/sql" + "errors" "fmt" "io" "log" + "mime/multipart" "net/http" "net/url" "path/filepath" + "regexp" "strings" "time" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/gin-gonic/gin" + "github.com/google/uuid" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" ) // ChatFilesHandler serves file upload + download for chat. Holds a // reference to TemplatesHandler so the (still docker-exec) Download // path keeps using the shared findContainer/CopyFromContainer helpers // without duplicating them. Upload no longer reaches into Docker. +// +// pendingUploads + broadcaster are wired only when the platform's +// migration 20260505100000 has run; nil values fall back to the +// pre-poll-mode behavior (422 on poll-mode upload, same as before). +// This lets the binary keep booting in environments where the +// migration hasn't run yet — the poll branch is gated by a not-nil +// check at the call site. type ChatFilesHandler struct { templates *TemplatesHandler @@ -56,6 +70,19 @@ type ChatFilesHandler struct { // the 50 MB worst case on a slow EC2 link without leaving a // connection hanging forever on a sick workspace. httpClient *http.Client + + // pendingUploads is the platform-side staging layer for poll-mode + // uploads. nil → poll branch returns 422 unchanged (the pre-feature + // behavior); non-nil → poll branch parses multipart, persists each + // file via storage.Put, logs a chat_upload_receive activity row, + // and returns 200 with synthetic platform-pending: URIs. + pendingUploads pendinguploads.Storage + + // broadcaster is the events.EventEmitter used to notify the canvas + // when an activity row lands (so the Agent Comms panel updates + // live). Same emitter the rest of the platform uses; nil = no + // broadcast (tests). + broadcaster events.EventEmitter } func NewChatFilesHandler(t *TemplatesHandler) *ChatFilesHandler { @@ -69,6 +96,16 @@ func NewChatFilesHandler(t *TemplatesHandler) *ChatFilesHandler { } } +// WithPendingUploads enables the poll-mode upload branch by wiring a +// Storage + broadcaster. Call site (router.go) does this at +// construction; tests set the fields directly when they want the +// poll path exercised. Returns the handler for chained construction. +func (h *ChatFilesHandler) WithPendingUploads(storage pendinguploads.Storage, broadcaster events.EventEmitter) *ChatFilesHandler { + h.pendingUploads = storage + h.broadcaster = broadcaster + return h +} + // chatUploadMaxBytes caps the full multipart request body so a // malicious / runaway client can't OOM the proxy hop. 50 MB matches // the workspace-side limit; anything larger is rejected at the @@ -262,6 +299,24 @@ func (h *ChatFilesHandler) Upload(c *gin.Context) { ctx := c.Request.Context() + // Branch on delivery_mode BEFORE attempting the HTTP forward. + // Push-mode workspaces continue to do the streaming forward + // unchanged. Poll-mode workspaces (typically external runtimes + // on a laptop, no public callback URL) get the platform-side + // staging path — the file lands in pending_uploads, an activity + // row goes into the inbox queue, and the workspace pulls on its + // next poll cycle. + if h.pendingUploads != nil { + mode, modeOK := lookupUploadDeliveryMode(c, ctx, workspaceID) + if !modeOK { + return + } + if mode == "poll" { + h.uploadPollMode(c, ctx, workspaceID) + return + } + } + wsURL, secret, ok := resolveWorkspaceForwardCreds(c, ctx, workspaceID, "upload") if !ok { return @@ -405,3 +460,250 @@ func (h *ChatFilesHandler) streamWorkspaceResponse( } } + +// lookupUploadDeliveryMode returns the workspace's delivery_mode +// for the chat upload branch. Returns ("", false) and writes the +// HTTP error response on lookup failure (caller stops). NULL or +// empty delivery_mode is treated as "push" — that's the schema +// default and matches the legacy pre-#2339 behavior. Only the +// explicit string "poll" routes the upload through the poll-mode +// branch. +// +// Why a dedicated helper instead of reusing lookupDeliveryMode +// from a2a_proxy_helpers.go: that one swallows errors and falls +// back to "push" so the proxy keeps working on a transient DB +// hiccup. For upload we want to surface the not-found case as 404 +// (which the workspace-poll branch wouldn't otherwise hit, since +// the workspace-side row IS the source of truth for the mode). +func lookupUploadDeliveryMode(c *gin.Context, ctx context.Context, workspaceID string) (string, bool) { + var mode sql.NullString + err := db.DB.QueryRowContext(ctx, + `SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&mode) + if errors.Is(err, sql.ErrNoRows) { + c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"}) + return "", false + } + if err != nil { + log.Printf("chat_files Upload: delivery_mode lookup failed for %s: %v", workspaceID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "delivery_mode lookup failed"}) + return "", false + } + if !mode.Valid || mode.String == "" { + return "push", true + } + return mode.String, true +} + +// unsafeFilenameChars matches every character that isn't in the safe +// alphanumeric + dot/dash/underscore set. Mirrors the Python regex +// _UNSAFE_FILENAME_CHARS in workspace/internal_chat_uploads.py — drift +// here would mean canvas-emitted URIs differ between push and poll +// paths for the same upload. +var unsafeFilenameChars = regexp.MustCompile(`[^a-zA-Z0-9._\-]`) + +// SanitizeFilename reduces a user-supplied filename to a safe form. +// Behaviorally identical to sanitize_filename in workspace/ +// internal_chat_uploads.py. Exported so tests in other packages can +// pin behavior parity, and so a future shared library can move both +// implementations behind one source of truth. +func SanitizeFilename(name string) string { + base := filepath.Base(name) + // filepath.Base on a path-traversal input ("../../etc/passwd") + // returns "passwd" (just the last component) — which matches what + // Python's os.path.basename does. Tests pin both here and on the + // Python side. + base = strings.ReplaceAll(base, " ", "_") + base = unsafeFilenameChars.ReplaceAllString(base, "_") + if len(base) > 100 { + ext := "" + dot := strings.LastIndex(base, ".") + if dot >= 0 && len(base)-dot <= 16 { + ext = base[dot:] + } + base = base[:100-len(ext)] + ext + } + if base == "" || base == "." || base == ".." { + return "file" + } + return base +} + +// uploadedFile is the per-file response shape the workspace-side +// /internal/chat/uploads/ingest also produces. Mirroring the schema +// keeps the canvas client unaware of which path handled the upload. +type uploadedFile struct { + URI string `json:"uri"` + Name string `json:"name"` + Mimetype string `json:"mimeType"` + Size int64 `json:"size"` +} + +// uploadPollMode handles a chat upload bound for a poll-mode +// workspace. Parses the multipart in-place, persists each file via +// pendinguploads.Storage, and logs one chat_upload_receive activity +// row per file so the workspace's inbox poller picks them up on its +// next cycle. +// +// Why one activity row per file (not one per multipart batch): +// - Each row carries one URI; agents that consume the inbox treat +// each row as one inbound event. A batch row would force every +// consumer to deserialize a list, doubling the field-shape +// surface for no UX win. +// - At-least-once semantics: a workspace can ack files +// individually. Batch ack would leak partial-success state on +// a fetcher crash mid-batch. +// +// Limits enforced here mirror the workspace-side ingest_handler: +// - Total body cap: 50 MB (set on c.Request.Body before reaching us) +// - Per-file cap: 25 MB (pendinguploads.MaxFileBytes; rejected as 413) +// - Filename: sanitized + capped at 100 chars (SanitizeFilename) +// +// Logging: every persisted file logs an INFO line with workspace_id, +// file_id, size, and sanitized name. Failure modes (oversize, missing +// files field, malformed multipart) log at WARN with the same fields. +// Phase 3 metrics will hook these structured logs. +func (h *ChatFilesHandler) uploadPollMode(c *gin.Context, ctx context.Context, workspaceID string) { + // Parse multipart with the same per-file/per-form limits the + // workspace-side handler uses (workspace/internal_chat_uploads.py: + // max_files=64, max_fields=32). gin's MultipartForm does not + // expose those limits directly — the underlying ParseMultipartForm + // caps memory at 32 MB by default and spills to disk. For poll- + // mode we read each file into memory to hand to Storage.Put; + // 25 MB-per-file × 64-files ceiling means worst-case is 1.6 GB of + // peak memory. Bound the per-file size at the multipart layer so + // the spill never gets close. + if err := c.Request.ParseMultipartForm(32 << 20); err != nil { + log.Printf("chat_files uploadPollMode: parse multipart failed for %s: %v", workspaceID, err) + c.JSON(http.StatusBadRequest, gin.H{"error": "malformed multipart body"}) + return + } + form := c.Request.MultipartForm + if form == nil || len(form.File["files"]) == 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "no files field in request"}) + return + } + headers := form.File["files"] + if len(headers) > 64 { + c.JSON(http.StatusBadRequest, gin.H{"error": "too many files (limit 64)"}) + return + } + + wsUUID, err := uuid.Parse(workspaceID) + if err != nil { + // validateWorkspaceID at the top of Upload already gates this; + // the re-parse is defence in depth in case validateWorkspaceID + // drifts. Keep the error class consistent so a bad-id reaches + // the same 400 path. Not separately tested because the gate at + // the call site is structurally the same uuid.Parse. + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"}) + return + } + + out := make([]uploadedFile, 0, len(headers)) + for _, fh := range headers { + // Read full content. Per-file cap enforced post-read so an + // oversized file fails with a clean 413 rather than a torn + // stream. The +1 byte ReadAll trick that the Python side + // uses isn't easy through multipart.FileHeader; instead we + // rely on the multipart layer's ContentLength header and + // short-circuit before opening the part. + if fh.Size > pendinguploads.MaxFileBytes { + log.Printf("chat_files uploadPollMode: per-file cap exceeded for %s: %s (%d bytes)", + workspaceID, fh.Filename, fh.Size) + c.JSON(http.StatusRequestEntityTooLarge, gin.H{ + "error": "file exceeds per-file cap", + "filename": fh.Filename, + "size": fh.Size, + "max": pendinguploads.MaxFileBytes, + }) + return + } + content, err := readMultipartFile(fh) + if err != nil { + log.Printf("chat_files uploadPollMode: read part failed for %s/%s: %v", workspaceID, fh.Filename, err) + c.JSON(http.StatusBadRequest, gin.H{"error": "could not read file part"}) + return + } + + sanitized := SanitizeFilename(fh.Filename) + mimetype := fh.Header.Get("Content-Type") + + fileID, err := h.pendingUploads.Put(ctx, wsUUID, content, sanitized, mimetype) + if err != nil { + if errors.Is(err, pendinguploads.ErrTooLarge) { + // Belt + suspenders: the size check above already + // caught this, but Storage.Put re-validates so a + // malformed FileHeader can't slip through. 413 with + // the same shape so the client sees one error class. + c.JSON(http.StatusRequestEntityTooLarge, gin.H{ + "error": "file exceeds per-file cap", + "filename": fh.Filename, + "size": len(content), + "max": pendinguploads.MaxFileBytes, + }) + return + } + log.Printf("chat_files uploadPollMode: storage.Put failed for %s/%s: %v", + workspaceID, sanitized, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "could not stage file"}) + return + } + + // Activity row so the workspace's inbox poller picks this up + // on its next cycle. type=chat_upload_receive is a new + // activity_type the workspace's message_from_activity adapter + // (Phase 2) will handle by fetching content via the GET + // endpoint. The request_body carries everything the + // workspace needs to dispatch the fetch — including the + // synthetic URI canvas embeds in the chat message. + uri := fmt.Sprintf("platform-pending:%s/%s", workspaceID, fileID) + summary := "chat_upload_receive: " + sanitized + method := "chat_upload_receive" + LogActivity(ctx, h.broadcaster, ActivityParams{ + WorkspaceID: workspaceID, + ActivityType: "chat_upload_receive", + TargetID: &workspaceID, + Method: &method, + Summary: &summary, + RequestBody: map[string]interface{}{ + "file_id": fileID.String(), + "name": sanitized, + "mimeType": mimetype, + "size": len(content), + "uri": uri, + }, + Status: "ok", + }) + + log.Printf("chat_files uploadPollMode: staged %s/%s (file_id=%s size=%d mimetype=%q)", + workspaceID, sanitized, fileID, len(content), mimetype) + + out = append(out, uploadedFile{ + URI: uri, + Name: sanitized, + Mimetype: mimetype, + Size: int64(len(content)), + }) + } + + c.JSON(http.StatusOK, gin.H{"files": out}) +} + +// readMultipartFile reads a multipart part fully into memory. Wraps +// the open + io.ReadAll + close idiom so the call site stays clean, +// and so a future change (chunked reads / hashing) has one place to +// land. +func readMultipartFile(fh *multipartFileHeader) ([]byte, error) { + f, err := fh.Open() + if err != nil { + return nil, fmt.Errorf("open part: %w", err) + } + defer f.Close() + return io.ReadAll(f) +} + +// multipartFileHeader is a local alias so the readMultipartFile +// signature doesn't pull "mime/multipart" into every test that +// touches uploadPollMode. +type multipartFileHeader = multipart.FileHeader diff --git a/workspace-server/internal/handlers/chat_files_poll_test.go b/workspace-server/internal/handlers/chat_files_poll_test.go new file mode 100644 index 00000000..40b289ff --- /dev/null +++ b/workspace-server/internal/handlers/chat_files_poll_test.go @@ -0,0 +1,520 @@ +package handlers + +// chat_files_poll_test.go — Upload poll-mode branch tests. +// +// Pinned in their own file so the existing chat_files_test.go stays +// focused on the push-mode forward proxy. Same setupTestDB / sqlmock +// scaffolding as the rest of the package, plus an in-memory +// pendinguploads.Storage so we don't have to mock six SQL statements +// per assertion. + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "errors" + "mime/multipart" + "net/http" + "strings" + "sync" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/google/uuid" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" +) + +// inMemStorage is a process-local pendinguploads.Storage for branch +// tests. Records every Put for assertion. Failure modes (Put error, +// MarkFetched / Ack tested elsewhere) are injected via fields. +type inMemStorage struct { + mu sync.Mutex + rows map[uuid.UUID]pendinguploads.Record + puts []putCall + putErr error +} + +type putCall struct { + WorkspaceID uuid.UUID + Filename string + Mimetype string + Size int +} + +func newInMemStorage() *inMemStorage { + return &inMemStorage{rows: map[uuid.UUID]pendinguploads.Record{}} +} + +func (s *inMemStorage) Put(_ context.Context, ws uuid.UUID, content []byte, filename, mimetype string) (uuid.UUID, error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.putErr != nil { + return uuid.Nil, s.putErr + } + id := uuid.New() + s.rows[id] = pendinguploads.Record{ + FileID: id, WorkspaceID: ws, Content: content, + Filename: filename, Mimetype: mimetype, + SizeBytes: int64(len(content)), CreatedAt: time.Now(), + ExpiresAt: time.Now().Add(24 * time.Hour), + } + s.puts = append(s.puts, putCall{ + WorkspaceID: ws, Filename: filename, Mimetype: mimetype, Size: len(content), + }) + return id, nil +} + +func (s *inMemStorage) Get(context.Context, uuid.UUID) (pendinguploads.Record, error) { + return pendinguploads.Record{}, pendinguploads.ErrNotFound +} +func (s *inMemStorage) MarkFetched(context.Context, uuid.UUID) error { return nil } +func (s *inMemStorage) Ack(context.Context, uuid.UUID) error { return nil } + +// expectPollDeliveryMode stubs the SELECT delivery_mode lookup that +// uploadPollMode does (separate from the one resolveWorkspaceForwardCreds +// does — this is the new helper introduced for the poll branch). +func expectPollDeliveryMode(mock sqlmock.Sqlmock, workspaceID, mode string) { + rows := sqlmock.NewRows([]string{"delivery_mode"}).AddRow(mode) + mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`). + WithArgs(workspaceID). + WillReturnRows(rows) +} + +func expectPollDeliveryModeMissing(mock sqlmock.Sqlmock, workspaceID string) { + mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`). + WithArgs(workspaceID). + WillReturnError(sql.ErrNoRows) +} + +// expectActivityInsert stubs the LogActivity INSERT so the poll branch's +// per-file activity row write doesn't fail the sqlmock expectations. +func expectActivityInsert(mock sqlmock.Sqlmock) { + mock.ExpectExec(`INSERT INTO activity_logs`). + WillReturnResult(sqlmock.NewResult(1, 1)) +} + +// pollUploadFixture builds a multipart body with N named files. +func pollUploadFixture(t *testing.T, files map[string][]byte) (*bytes.Buffer, string) { + t.Helper() + var buf bytes.Buffer + mw := multipart.NewWriter(&buf) + for name, data := range files { + fw, err := mw.CreateFormFile("files", name) + if err != nil { + t.Fatalf("CreateFormFile: %v", err) + } + _, _ = fw.Write(data) + } + mw.Close() + return &buf, mw.FormDataContentType() +} + +// ---- happy path ---- + +func TestPollUpload_HappyPath_OneFile_StagesAndLogs(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "11111111-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + expectActivityInsert(mock) + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"report.pdf": []byte("PDF-bytes")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", w.Code, w.Body.String()) + } + if len(store.puts) != 1 { + t.Fatalf("expected 1 storage Put, got %d", len(store.puts)) + } + put := store.puts[0] + if put.Filename != "report.pdf" || put.Size != 9 { + t.Errorf("unexpected put: %+v", put) + } + + // Response shape must match the workspace-side + // /internal/chat/uploads/ingest schema so canvas can't tell which + // path handled the upload. + var resp struct { + Files []struct { + URI string `json:"uri"` + Name string `json:"name"` + Mimetype string `json:"mimeType"` + Size int `json:"size"` + } `json:"files"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode response: %v body=%s", err, w.Body.String()) + } + if len(resp.Files) != 1 { + t.Fatalf("response files count = %d, want 1", len(resp.Files)) + } + got := resp.Files[0] + if got.Name != "report.pdf" || got.Size != 9 { + t.Errorf("response file mismatch: %+v", got) + } + if !strings.HasPrefix(got.URI, "platform-pending:"+wsID+"/") { + t.Errorf("URI %q does not start with platform-pending:%s/", got.URI, wsID) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("expectations: %v", err) + } +} + +func TestPollUpload_MultipleFiles_AllStagedAndLogged(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "11111111-aaaa-bbbb-cccc-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + expectActivityInsert(mock) + expectActivityInsert(mock) + expectActivityInsert(mock) + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{ + "a.txt": []byte("aaaa"), + "b.txt": []byte("bbbbb"), + "c.txt": []byte("cccccc"), + }) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", w.Code, w.Body.String()) + } + if len(store.puts) != 3 { + t.Fatalf("expected 3 storage Puts, got %d", len(store.puts)) + } +} + +// ---- regression: push-mode unchanged ---- + +func TestPollUpload_PushModeFallsThroughToForward(t *testing.T) { + // With pendingUploads wired but the workspace's mode is push, + // the poll branch must NOT activate — flow falls through to the + // existing resolveWorkspaceForwardCreds path. Pinned via the + // "delivery_mode lookup happened, then the URL+mode SELECT + // happened, then we 503 because no inbound secret" sequence. + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "22222222-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "push") + // After the poll branch is bypassed, we hit + // resolveWorkspaceForwardCreds which selects url+delivery_mode. + expectURL(mock, wsID, "") + // URL empty + mode=push → 503 (no inbound secret check needed). + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x": []byte("data")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusServiceUnavailable { + t.Fatalf("status=%d body=%s — expected push-mode 503 fall-through", w.Code, w.Body.String()) + } + if len(store.puts) != 0 { + t.Errorf("push-mode should NOT have hit storage, got %d puts", len(store.puts)) + } +} + +func TestPollUpload_NotConfigured_FallsThrough(t *testing.T) { + // Backwards compat: a binary running without WithPendingUploads + // behaves exactly as before — the poll branch is dead code. + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "33333333-2222-3333-4444-555555555555" + expectURLAndMode(mock, wsID, "", "poll") // resolveWorkspaceForwardCreds emits 422 + + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)) + // No WithPendingUploads — pendingUploads is nil. + + body, ct := pollUploadFixture(t, map[string][]byte{"x": []byte("data")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("status=%d, want 422 (legacy poll-mode rejection)", w.Code) + } +} + +// ---- error paths ---- + +func TestPollUpload_WorkspaceMissing_404(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "44444444-2222-3333-4444-555555555555" + expectPollDeliveryModeMissing(mock, wsID) + + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(newInMemStorage(), nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x": []byte("d")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusNotFound { + t.Errorf("status=%d, want 404", w.Code) + } +} + +func TestPollUpload_DeliveryModeLookupDBError_500(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "55555555-2222-3333-4444-555555555555" + mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`). + WithArgs(wsID).WillReturnError(errors.New("connection lost")) + + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(newInMemStorage(), nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x": []byte("d")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusInternalServerError { + t.Errorf("status=%d, want 500", w.Code) + } +} + +func TestPollUpload_NoFilesField_400(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "66666666-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + // Multipart with a non-files field — no actual files. + var buf bytes.Buffer + mw := multipart.NewWriter(&buf) + mw.WriteField("not_files", "hi") + mw.Close() + + c, w := makeUploadRequest(t, wsID, &buf, mw.FormDataContentType()) + h.Upload(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400 on no files field", w.Code) + } +} + +func TestPollUpload_MalformedMultipart_400(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "77777777-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + // Body that doesn't match the boundary in Content-Type. + c, w := makeUploadRequest(t, wsID, bytes.NewBufferString("garbage"), "multipart/form-data; boundary=fake") + h.Upload(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400 on malformed multipart", w.Code) + } +} + +func TestPollUpload_StorageError_500(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "88888888-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + + store := newInMemStorage() + store.putErr = errors.New("disk full") + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x.bin": []byte("data")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusInternalServerError { + t.Errorf("status=%d, want 500", w.Code) + } +} + +func TestPollUpload_StorageTooLarge_413(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "99999999-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + + store := newInMemStorage() + store.putErr = pendinguploads.ErrTooLarge + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x.bin": []byte("data")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusRequestEntityTooLarge { + t.Errorf("status=%d, want 413", w.Code) + } +} + +func TestPollUpload_TooManyFiles_400(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "aaaaaaaa-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + // 65 files — over the per-batch cap. + files := map[string][]byte{} + for i := 0; i < 65; i++ { + files[uuid.New().String()] = []byte("x") + } + body, ct := pollUploadFixture(t, files) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400 on too many files", w.Code) + } +} + +func TestPollUpload_NullDeliveryMode_TreatedAsPush(t *testing.T) { + // Production-observed 2026-05-04: external runtime workspaces + // (molecule-sdk-python on user infra) sometimes register with + // delivery_mode = NULL — the schema default for legacy rows from + // before #2339. The poll branch must NOT activate on NULL — only + // the explicit "poll" string. This is the same defensive posture + // resolveWorkspaceForwardCreds takes for legacy rows. + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "cccccccc-2222-3333-4444-555555555555" + mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow(nil)) + // Falls through to resolveWorkspaceForwardCreds: + expectURLAndMode(mock, wsID, "", "") + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x.bin": []byte("data")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + // resolveWorkspaceForwardCreds with empty url + NULL mode = 422 + // (the legacy "no callback URL" rejection — exactly what we're + // fixing for ACTUAL poll-mode rows but want to preserve for + // NULL ones until the row gets a real mode value via the next + // /registry/register). + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("status=%d, want 422 for NULL delivery_mode (legacy fallthrough)", w.Code) + } + if len(store.puts) != 0 { + t.Errorf("NULL mode should NOT have hit storage, got %d puts", len(store.puts)) + } +} + +func TestPollUpload_PerFileCapPreStorage_413(t *testing.T) { + // Pin the early-reject branch (fh.Size > MaxFileBytes) BEFORE we + // read the part into memory. Without this, an oversize file + // would hit the storage layer's belt-and-suspenders check, which + // works but burns ~25 MB of memory + DB round-trip first. Send + // 25 MB + 1 byte → 413 with the file size in the response. + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "dddddddd-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + // 25 MB + 1 byte. Single file, large enough to trip the early + // size check. + oversize := make([]byte, pendinguploads.MaxFileBytes+1) + body, ct := pollUploadFixture(t, map[string][]byte{"big.bin": oversize}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusRequestEntityTooLarge { + t.Fatalf("status=%d, want 413 on per-file size cap", w.Code) + } + if len(store.puts) != 0 { + t.Errorf("per-file cap reject should NOT have called storage.Put, got %d puts", len(store.puts)) + } + // Sanity: response carries the size we tried to upload + the cap. + var body_ map[string]any + json.Unmarshal(w.Body.Bytes(), &body_) + if got := body_["max"]; got == nil { + t.Errorf("expected max field in response, got %v", body_) + } +} + +// SanitizeFilename is exercised in the upload chain — pin one +// end-to-end case that exercises the URI path through the response. +func TestPollUpload_SanitizesFilenameInResponse(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "bbbbbbbb-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + expectActivityInsert(mock) + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"hello world!.pdf": []byte("data")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", w.Code, w.Body.String()) + } + var resp struct { + Files []struct { + Name string `json:"name"` + URI string `json:"uri"` + } + } + json.Unmarshal(w.Body.Bytes(), &resp) + if len(resp.Files) == 0 || resp.Files[0].Name != "hello_world_.pdf" { + t.Errorf("expected sanitized name 'hello_world_.pdf', got: %+v", resp.Files) + } + if len(store.puts) == 0 || store.puts[0].Filename != "hello_world_.pdf" { + t.Errorf("storage Put didn't receive sanitized filename: %+v", store.puts) + } +} diff --git a/workspace-server/internal/handlers/pending_uploads.go b/workspace-server/internal/handlers/pending_uploads.go new file mode 100644 index 00000000..aec1ca0f --- /dev/null +++ b/workspace-server/internal/handlers/pending_uploads.go @@ -0,0 +1,184 @@ +// pending_uploads.go — endpoints the workspace polls to fetch and ack +// chat-upload files staged on the platform side for poll-mode delivery. +// +// Companion to chat_files.go Upload's poll-mode branch: +// +// Canvas POST /workspaces/:id/chat/uploads +// ↓ (poll-mode workspace) +// Platform: chat_files.uploadPollMode +// ↓ writes pending_uploads row + activity_logs(type=chat_upload_receive) +// Workspace inbox poller picks up activity row +// ↓ +// Workspace GETs /workspaces/:id/pending-uploads/:fid/content ← this file +// ↓ writes file to /workspace/.molecule/chat-uploads +// Workspace POSTs /workspaces/:id/pending-uploads/:fid/ack ← this file +// ↓ row marked acked; Phase 3 sweep deletes +// +// Auth: same wsAuth middleware that gates the activity poll endpoint — +// the workspace's per-workspace platform_token. Only the target workspace +// can read OR ack its own pending uploads. The handler enforces that +// :id == file.workspace_id even though the URL param matches; defence in +// depth against a token leak letting one workspace pull another's bytes. + +package handlers + +import ( + "errors" + "log" + "net/http" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" +) + +// PendingUploadsHandler serves the workspace-side fetch + ack endpoints. +// Holds a Storage so tests can inject an in-memory implementation +// without going through Postgres (sqlmock-based unit tests cover the +// Postgres impl in internal/pendinguploads/storage_test.go). +type PendingUploadsHandler struct { + storage pendinguploads.Storage +} + +// NewPendingUploadsHandler constructs the handler with a concrete +// Storage. Production wires up pendinguploads.NewPostgres(db.DB). +func NewPendingUploadsHandler(storage pendinguploads.Storage) *PendingUploadsHandler { + return &PendingUploadsHandler{storage: storage} +} + +// GetContent handles GET /workspaces/:id/pending-uploads/:file_id/content. +// +// Returns the file bytes with the original mimetype and a +// Content-Disposition that names the original (sanitized) filename so +// the workspace's fetcher writes it under the expected name. Stamps +// fetched_at on the row best-effort — the read response is already +// flushed to the network before the MarkFetched call so a sweep race +// can't break the workspace's fetch. +// +// 404 on: +// - file_id not found +// - file_id belongs to a different workspace (cross-workspace bleed +// protection) +// - row already acked (workspace's bug — should not re-fetch after ack) +// - row past expires_at (Phase 3 sweep would delete shortly anyway) +func (h *PendingUploadsHandler) GetContent(c *gin.Context) { + workspaceID := c.Param("id") + if err := validateWorkspaceID(workspaceID); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"}) + return + } + fileIDStr := c.Param("file_id") + fileID, err := uuid.Parse(fileIDStr) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid file_id"}) + return + } + + rec, err := h.storage.Get(c.Request.Context(), fileID) + if errors.Is(err, pendinguploads.ErrNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found, expired, or already acked"}) + return + } + if err != nil { + log.Printf("pending_uploads GetContent: storage.Get(%s) failed: %v", fileID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "storage error"}) + return + } + + // Cross-workspace bleed protection: a token leak from workspace A + // must not let it read workspace B's pending uploads even with the + // correct file_id. wsAuth already pinned the caller to :id; reject + // if the row's workspace_id doesn't match. + if rec.WorkspaceID.String() != workspaceID { + log.Printf("pending_uploads GetContent: workspace mismatch — caller=%s row=%s file_id=%s", + workspaceID, rec.WorkspaceID, fileID) + c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found"}) + return + } + + // Stream the bytes. Set the original mimetype if known; fall back + // to application/octet-stream so curl / browser clients still get + // a valid response. Content-Disposition uses the workspace-side + // filename so the fetcher writes it under the expected name. + mimetype := rec.Mimetype + if mimetype == "" { + mimetype = "application/octet-stream" + } + c.Header("Content-Type", mimetype) + c.Header("Content-Disposition", contentDispositionAttachment(rec.Filename)) + c.Header("Content-Length", strconv.FormatInt(rec.SizeBytes, 10)) + c.Status(http.StatusOK) + if _, err := c.Writer.Write(rec.Content); err != nil { + // Connection closed mid-stream — log and bail; we cannot + // re-emit headers at this point. The workspace's HTTP client + // will see the truncated body and retry on next poll. + log.Printf("pending_uploads GetContent: write failed for %s: %v", fileID, err) + return + } + + // Best-effort fetched_at stamp. After-the-fact so the GET response + // completes regardless of the UPDATE outcome — a Phase 3 sweep + // race that nukes the row between Get and MarkFetched must not + // break the workspace's fetch. + if err := h.storage.MarkFetched(c.Request.Context(), fileID); err != nil { + log.Printf("pending_uploads GetContent: mark_fetched(%s) failed: %v", fileID, err) + } +} + +// Ack handles POST /workspaces/:id/pending-uploads/:file_id/ack. +// +// Marks the row as handed-off; Phase 3 sweep deletes acked rows after +// a retention window. Idempotent — workspace at-least-once retries on +// a flaky network return success without moving the timestamp. +func (h *PendingUploadsHandler) Ack(c *gin.Context) { + workspaceID := c.Param("id") + if err := validateWorkspaceID(workspaceID); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"}) + return + } + fileIDStr := c.Param("file_id") + fileID, err := uuid.Parse(fileIDStr) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid file_id"}) + return + } + + // Cross-workspace bleed protection: do a lookup BEFORE Ack so + // a token leak can't ack a row owned by a different workspace. + // We don't expose this distinction in the response (404 either + // way) — the workspace can't tell whether it ack'd a non-existent + // row vs. one it didn't own, and that's fine for the contract. + rec, err := h.storage.Get(c.Request.Context(), fileID) + if errors.Is(err, pendinguploads.ErrNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found, expired, or already acked"}) + return + } + if err != nil { + log.Printf("pending_uploads Ack: storage.Get(%s) failed: %v", fileID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "storage error"}) + return + } + if rec.WorkspaceID.String() != workspaceID { + log.Printf("pending_uploads Ack: workspace mismatch — caller=%s row=%s file_id=%s", + workspaceID, rec.WorkspaceID, fileID) + c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found"}) + return + } + + if err := h.storage.Ack(c.Request.Context(), fileID); err != nil { + if errors.Is(err, pendinguploads.ErrNotFound) { + // Race window: the row passed Get but failed Ack — sweep + // raced with us between the two queries. Treat as success + // (the workspace's intent was honored, the row is gone). + c.JSON(http.StatusOK, gin.H{"acked": true, "raced": true}) + return + } + log.Printf("pending_uploads Ack: storage.Ack(%s) failed: %v", fileID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "storage error"}) + return + } + c.JSON(http.StatusOK, gin.H{"acked": true}) +} + diff --git a/workspace-server/internal/handlers/pending_uploads_test.go b/workspace-server/internal/handlers/pending_uploads_test.go new file mode 100644 index 00000000..17da24af --- /dev/null +++ b/workspace-server/internal/handlers/pending_uploads_test.go @@ -0,0 +1,373 @@ +package handlers_test + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" +) + +// fakeStorage is an in-memory pendinguploads.Storage. Lets handler +// tests pin behaviour without going through Postgres + sqlmock — the +// storage layer's own tests (internal/pendinguploads/storage_test.go) +// cover the SQL drift surface; here we only care about the handler's +// 4xx/5xx mapping and side-effect ordering. +type fakeStorage struct { + rows map[uuid.UUID]pendinguploads.Record + getErr error // forced error from Get (overrides rows lookup) + ackErr error // forced error from Ack + markErr error // forced error from MarkFetched + markFetched []uuid.UUID + ackCalls []uuid.UUID +} + +func newFakeStorage() *fakeStorage { + return &fakeStorage{rows: map[uuid.UUID]pendinguploads.Record{}} +} + +func (f *fakeStorage) Put(ctx context.Context, ws uuid.UUID, content []byte, filename, mimetype string) (uuid.UUID, error) { + id := uuid.New() + f.rows[id] = pendinguploads.Record{ + FileID: id, WorkspaceID: ws, Content: content, + Filename: filename, Mimetype: mimetype, + SizeBytes: int64(len(content)), CreatedAt: time.Now(), + ExpiresAt: time.Now().Add(24 * time.Hour), + } + return id, nil +} + +func (f *fakeStorage) Get(_ context.Context, fileID uuid.UUID) (pendinguploads.Record, error) { + if f.getErr != nil { + return pendinguploads.Record{}, f.getErr + } + rec, ok := f.rows[fileID] + if !ok { + return pendinguploads.Record{}, pendinguploads.ErrNotFound + } + return rec, nil +} + +func (f *fakeStorage) MarkFetched(_ context.Context, fileID uuid.UUID) error { + f.markFetched = append(f.markFetched, fileID) + return f.markErr +} + +func (f *fakeStorage) Ack(_ context.Context, fileID uuid.UUID) error { + f.ackCalls = append(f.ackCalls, fileID) + if f.ackErr != nil { + return f.ackErr + } + delete(f.rows, fileID) + return nil +} + +func newRouter(handler *handlers.PendingUploadsHandler) *gin.Engine { + gin.SetMode(gin.TestMode) + r := gin.New() + r.GET("/workspaces/:id/pending-uploads/:file_id/content", handler.GetContent) + r.POST("/workspaces/:id/pending-uploads/:file_id/ack", handler.Ack) + return r +} + +// ---- GetContent ---- + +func TestGetContent_HappyPath_StreamsBytesAndStampsFetched(t *testing.T) { + fs := newFakeStorage() + wsID := uuid.New() + fileID, err := fs.Put(context.Background(), wsID, []byte("hello world"), "report.pdf", "application/pdf") + if err != nil { + t.Fatalf("Put: %v", err) + } + h := handlers.NewPendingUploadsHandler(fs) + r := newRouter(h) + + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", w.Code, w.Body.String()) + } + if got := w.Body.String(); got != "hello world" { + t.Errorf("body = %q, want %q", got, "hello world") + } + if got := w.Header().Get("Content-Type"); got != "application/pdf" { + t.Errorf("Content-Type = %q, want application/pdf", got) + } + if got := w.Header().Get("Content-Disposition"); !strings.Contains(got, "report.pdf") { + t.Errorf("Content-Disposition = %q, expected to mention report.pdf", got) + } + if got := w.Header().Get("Content-Length"); got != "11" { + t.Errorf("Content-Length = %q, want 11", got) + } + if len(fs.markFetched) != 1 || fs.markFetched[0] != fileID { + t.Errorf("expected MarkFetched(%s), got %v", fileID, fs.markFetched) + } +} + +func TestGetContent_DefaultsMimetypeWhenEmpty(t *testing.T) { + fs := newFakeStorage() + wsID := uuid.New() + fileID, _ := fs.Put(context.Background(), wsID, []byte("data"), "x.bin", "") + h := handlers.NewPendingUploadsHandler(fs) + r := newRouter(h) + + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if got := w.Header().Get("Content-Type"); got != "application/octet-stream" { + t.Errorf("Content-Type fallback = %q, want application/octet-stream", got) + } +} + +func TestGetContent_InvalidWorkspaceID_400(t *testing.T) { + fs := newFakeStorage() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + req := httptest.NewRequest(http.MethodGet, "/workspaces/not-a-uuid/pending-uploads/00000000-0000-0000-0000-000000000000/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400", w.Code) + } +} + +func TestGetContent_InvalidFileID_400(t *testing.T) { + fs := newFakeStorage() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + wsID := uuid.New() + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsID.String()+"/pending-uploads/not-a-uuid/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400", w.Code) + } +} + +func TestGetContent_NotFound_404(t *testing.T) { + fs := newFakeStorage() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + wsID := uuid.New() + missing := uuid.New() + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsID.String()+"/pending-uploads/"+missing.String()+"/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusNotFound { + t.Errorf("status=%d, want 404", w.Code) + } +} + +func TestGetContent_StorageError_500(t *testing.T) { + fs := newFakeStorage() + fs.getErr = errors.New("connection refused") + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + wsID := uuid.New() + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsID.String()+"/pending-uploads/"+uuid.New().String()+"/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusInternalServerError { + t.Errorf("status=%d, want 500", w.Code) + } +} + +func TestGetContent_CrossWorkspaceBleed_404(t *testing.T) { + // Token leak: workspace A's wsAuth-validated request tries to + // pull workspace B's file_id. Handler must 404 even though the + // row exists. + fs := newFakeStorage() + wsB := uuid.New() + fileID, _ := fs.Put(context.Background(), wsB, []byte("secret"), "leak.txt", "text/plain") + + wsA := uuid.New() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsA.String()+"/pending-uploads/"+fileID.String()+"/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusNotFound { + t.Fatalf("status=%d, want 404 for cross-workspace bleed", w.Code) + } + // Critical: must not have leaked the bytes. + if strings.Contains(w.Body.String(), "secret") { + t.Errorf("response body leaked content from another workspace: %q", w.Body.String()) + } +} + +func TestGetContent_MarkFetchedFailureLoggedNotPropagated(t *testing.T) { + fs := newFakeStorage() + wsID := uuid.New() + fileID, _ := fs.Put(context.Background(), wsID, []byte("ok"), "x.txt", "text/plain") + fs.markErr = errors.New("update failed (sweep raced)") + h := handlers.NewPendingUploadsHandler(fs) + r := newRouter(h) + + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + // Body already returned 200 OK + bytes BEFORE the MarkFetched + // failure — workspace fetch must NOT fail because of an + // observability hook. + if w.Code != http.StatusOK { + t.Errorf("status=%d, want 200 even on MarkFetched failure", w.Code) + } + if w.Body.String() != "ok" { + t.Errorf("body = %q, want %q", w.Body.String(), "ok") + } +} + +// ---- Ack ---- + +func TestAck_HappyPath_RemovesRow(t *testing.T) { + fs := newFakeStorage() + wsID := uuid.New() + fileID, _ := fs.Put(context.Background(), wsID, []byte("data"), "x.bin", "") + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d", w.Code) + } + var body map[string]any + if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil { + t.Fatalf("decode: %v", err) + } + if body["acked"] != true { + t.Errorf("body.acked = %v, want true", body["acked"]) + } + if _, exists := fs.rows[fileID]; exists { + t.Errorf("row should have been removed after ack") + } +} + +func TestAck_NonExistent_404(t *testing.T) { + fs := newFakeStorage() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + wsID := uuid.New() + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+wsID.String()+"/pending-uploads/"+uuid.New().String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusNotFound { + t.Errorf("status=%d, want 404", w.Code) + } +} + +func TestAck_CrossWorkspaceBleed_404(t *testing.T) { + fs := newFakeStorage() + wsB := uuid.New() + fileID, _ := fs.Put(context.Background(), wsB, []byte("data"), "x.bin", "") + wsA := uuid.New() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+wsA.String()+"/pending-uploads/"+fileID.String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusNotFound { + t.Errorf("status=%d, want 404 for cross-workspace ack", w.Code) + } + // Row must remain — workspace A's bogus ack must NOT delete + // workspace B's file. + if _, exists := fs.rows[fileID]; !exists { + t.Errorf("row should NOT have been removed by cross-workspace ack") + } +} + +func TestAck_InvalidWorkspaceID_400(t *testing.T) { + fs := newFakeStorage() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + req := httptest.NewRequest(http.MethodPost, "/workspaces/not-a-uuid/pending-uploads/"+uuid.New().String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400", w.Code) + } +} + +func TestAck_InvalidFileID_400(t *testing.T) { + fs := newFakeStorage() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+uuid.New().String()+"/pending-uploads/not-a-uuid/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400", w.Code) + } +} + +func TestAck_GetStorageError_500(t *testing.T) { + fs := newFakeStorage() + fs.getErr = errors.New("conn lost") + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+uuid.New().String()+"/pending-uploads/"+uuid.New().String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusInternalServerError { + t.Errorf("status=%d, want 500", w.Code) + } +} + +func TestAck_RaceWithSweep_ReturnsRacedTrue(t *testing.T) { + // Sweep deletes the row between the handler's Get and Ack calls. + // Storage.Ack returns ErrNotFound; handler treats that as success + // (intent honored, row gone) and reports raced:true. + fs := newFakeStorage() + wsID := uuid.New() + fileID, _ := fs.Put(context.Background(), wsID, []byte("data"), "x.bin", "") + fs.ackErr = pendinguploads.ErrNotFound + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d, want 200 on race", w.Code) + } + var body map[string]any + json.Unmarshal(w.Body.Bytes(), &body) + if body["acked"] != true || body["raced"] != true { + t.Errorf("expected acked=true raced=true, got %v", body) + } +} + +func TestAck_StorageError_500(t *testing.T) { + fs := newFakeStorage() + wsID := uuid.New() + fileID, _ := fs.Put(context.Background(), wsID, []byte("data"), "x.bin", "") + fs.ackErr = errors.New("conn refused") + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusInternalServerError { + t.Errorf("status=%d, want 500", w.Code) + } +} diff --git a/workspace-server/internal/handlers/sanitize_filename_test.go b/workspace-server/internal/handlers/sanitize_filename_test.go new file mode 100644 index 00000000..82a6d355 --- /dev/null +++ b/workspace-server/internal/handlers/sanitize_filename_test.go @@ -0,0 +1,103 @@ +package handlers_test + +import ( + "strings" + "testing" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers" +) + +// SanitizeFilename mirrors workspace/internal_chat_uploads.py's +// sanitize_filename. Drift between the two means canvas-emitted URIs +// differ between push and poll paths for the same upload — pin every +// case the Python suite pins (workspace/tests/test_internal_chat_uploads.py +// :: test_sanitize_filename). + +func TestSanitizeFilename_StripsPathTraversal(t *testing.T) { + cases := map[string]string{ + "../../etc/passwd": "passwd", + "/etc/passwd": "passwd", + "a/b/c.txt": "c.txt", + "./relative": "relative", + } + for in, want := range cases { + if got := handlers.SanitizeFilename(in); got != want { + t.Errorf("SanitizeFilename(%q) = %q, want %q", in, got, want) + } + } +} + +func TestSanitizeFilename_ReplacesUnsafeChars(t *testing.T) { + cases := map[string]string{ + "hello world.pdf": "hello_world.pdf", + "weird;chars!?.txt": "weird_chars__.txt", + "中文.docx": "__.docx", // non-ASCII → underscore (each rune) + "file (1).pdf": "file__1_.pdf", + } + for in, want := range cases { + if got := handlers.SanitizeFilename(in); got != want { + t.Errorf("SanitizeFilename(%q) = %q, want %q", in, got, want) + } + } +} + +func TestSanitizeFilename_PreservesAllowedChars(t *testing.T) { + in := "report-2026.05.04_v2.pdf" + if got := handlers.SanitizeFilename(in); got != in { + t.Errorf("SanitizeFilename(%q) = %q, want unchanged", in, got) + } +} + +func TestSanitizeFilename_CapsAt100Chars_PreservesShortExtension(t *testing.T) { + // 95-char base + ".pdf" (4 chars + dot) = 100 chars total — fits. + base := strings.Repeat("a", 95) + in := base + ".pdf" + got := handlers.SanitizeFilename(in) + if got != in { + t.Errorf("expected unchanged at 100 chars, got %q (len=%d)", got, len(got)) + } + + // 200-char base + ".pdf" → truncated to 100 with .pdf preserved. + long := strings.Repeat("b", 200) + ".pdf" + got = handlers.SanitizeFilename(long) + if len(got) != 100 { + t.Errorf("expected length 100, got %d (%q)", len(got), got) + } + if !strings.HasSuffix(got, ".pdf") { + t.Errorf("expected .pdf suffix preserved, got %q", got) + } +} + +func TestSanitizeFilename_DropsLongExtension(t *testing.T) { + // Extension > 16 chars is treated as part of the name; truncation + // drops it without preservation. Mirrors the Python rule + // (dot >= 0 AND len(base) - dot <= 16). + long := strings.Repeat("c", 90) + ".thisisaverylongextensionnotpreserved" + got := handlers.SanitizeFilename(long) + if len(got) != 100 { + t.Errorf("expected 100, got %d (%q)", len(got), got) + } + // First 100 chars of the SANITIZED input — extension not preserved. + if strings.Contains(got, ".thisisaverylongextensionnotpreserved") { + t.Errorf("long extension should have been truncated, got %q", got) + } +} + +func TestSanitizeFilename_FallbackForReservedNames(t *testing.T) { + cases := []string{"", ".", ".."} + for _, in := range cases { + if got := handlers.SanitizeFilename(in); got != "file" { + t.Errorf("SanitizeFilename(%q) = %q, want %q", in, got, "file") + } + } +} + +func TestSanitizeFilename_AllUnsafeBecomesAllUnderscores_NotReserved(t *testing.T) { + // All-non-ASCII input becomes all-underscores — not "." or ".." or + // empty, so the fallback path doesn't trigger and we get a real + // (if uninformative) sanitized name. + got := handlers.SanitizeFilename("中文中文") + if got != "____" { + t.Errorf("SanitizeFilename(中文中文) = %q, want %q", got, "____") + } +} diff --git a/workspace-server/internal/pendinguploads/storage.go b/workspace-server/internal/pendinguploads/storage.go new file mode 100644 index 00000000..0289c9b8 --- /dev/null +++ b/workspace-server/internal/pendinguploads/storage.go @@ -0,0 +1,253 @@ +// Package pendinguploads is the platform-side staging layer for chat file +// uploads bound for poll-mode workspaces (delivery_mode='poll', no public +// callback URL — typically external runtimes on a laptop / behind NAT). +// +// In push-mode the platform synchronously POSTs the multipart body to the +// workspace's /internal/chat/uploads/ingest endpoint and forgets about it. +// Poll-mode has no callback URL to forward to, so the platform parses the +// multipart on this side, persists each file as one pending_uploads row, +// and lets the workspace pull it on its next inbox poll cycle. +// +// The Storage interface keeps the bytes-vs-metadata split clean: today +// content is stored inline as bytea on the pending_uploads row, but the +// shape lets a future PR (RFC #2789, S3-backed shared storage) swap to +// object storage by adding a new Storage implementation without touching +// any of the handler-layer callers. +// +// Lifecycle: +// +// Put — handler creates a row with the file content; assigns file_id. +// Get — GET /workspaces/:id/pending-uploads/:fid/content reads bytes. +// MarkFetched — stamps fetched_at on the row (Phase 3 observability). +// Ack — POST /workspaces/:id/pending-uploads/:fid/ack; +// terminal happy-path state. After ack, Get returns ErrNotFound. +// GC sweep deletes acked rows after a retention window. +// +// Hard TTL: every row has an expires_at default of created_at + 24h. After +// expiration the row is GC'd by Phase 3's sweep cron regardless of ack +// state. Get on an expired row returns ErrNotFound — the workspace's next +// poll will see the underlying activity_logs row was orphaned and the +// agent surfaces "file expired" to the user. +package pendinguploads + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/google/uuid" +) + +// Per-file size cap. Mirrors workspace-side ingest_handler +// (workspace/internal_chat_uploads.py:198). Pinned at the DB level via +// the size_bytes CHECK constraint; this Go-side constant exists so the +// Put implementation can reject before round-tripping to Postgres. +const MaxFileBytes = 25 * 1024 * 1024 + +// ErrNotFound is returned by Get / MarkFetched / Ack when the row is +// absent. Callers turn this into HTTP 404. Treat acked + expired rows +// as not-found so the workspace can never re-fetch a file we've +// considered handed-off. +var ErrNotFound = errors.New("pendinguploads: row not found, expired, or already acked") + +// ErrTooLarge is returned by Put when content exceeds MaxFileBytes. +// Callers turn this into HTTP 413. Pre-DB check so we don't push a +// 25 MB+1 byte payload through Postgres just to have the CHECK reject it. +var ErrTooLarge = errors.New("pendinguploads: content exceeds per-file cap") + +// Record carries the full row including content. Returned by Get; +// the GET /content handler streams Record.Content as the response body. +type Record struct { + FileID uuid.UUID + WorkspaceID uuid.UUID + Content []byte + Filename string + Mimetype string + SizeBytes int64 + CreatedAt time.Time + FetchedAt *time.Time // nil before first MarkFetched + AckedAt *time.Time // nil before Ack (Get returns ErrNotFound after) + ExpiresAt time.Time +} + +// Storage is the platform-side persistence boundary for poll-mode chat +// uploads. The Postgres implementation backs all callers today; an S3- +// backed implementation can drop in once RFC #2789 lands by making +// content storage out-of-line and updating the Postgres-only metadata +// columns. +type Storage interface { + // Put creates a row for one file targeting workspaceID and returns + // the assigned file_id. content is bounded by MaxFileBytes; + // filename / mimetype are stored verbatim — caller is responsible + // for sanitization (matches workspace-side rule, see + // internal_chat_uploads.py:sanitize_filename). Empty filename and + // content > MaxFileBytes return errors before any DB write. + Put(ctx context.Context, workspaceID uuid.UUID, content []byte, filename, mimetype string) (uuid.UUID, error) + + // Get returns the full row including content. Returns ErrNotFound + // when the row is absent, acked, or past expires_at. Caller should + // not differentiate the three cases in the response — from the + // workspace's perspective they all mean "not available, give up." + Get(ctx context.Context, fileID uuid.UUID) (Record, error) + + // MarkFetched stamps fetched_at on the row. Idempotent — repeated + // calls update fetched_at to the latest timestamp. Returns + // ErrNotFound if the row is absent / acked / expired. + MarkFetched(ctx context.Context, fileID uuid.UUID) error + + // Ack stamps acked_at on the row. Idempotent on the row state + // (acked_at is only set the first time so workspace double-acks + // don't move the timestamp). Returns ErrNotFound if the row is + // absent or already expired; on already-acked, returns nil so + // the workspace's at-least-once retry succeeds without an error. + Ack(ctx context.Context, fileID uuid.UUID) error +} + +// PostgresStorage is the production Storage implementation backed by +// the pending_uploads table. +type PostgresStorage struct { + db *sql.DB +} + +// NewPostgres returns a Storage backed by db. db must be a connected +// pool; this constructor does no I/O. +func NewPostgres(db *sql.DB) *PostgresStorage { + return &PostgresStorage{db: db} +} + +// Compile-time check that PostgresStorage satisfies Storage. +var _ Storage = (*PostgresStorage)(nil) + +func (p *PostgresStorage) Put(ctx context.Context, workspaceID uuid.UUID, content []byte, filename, mimetype string) (uuid.UUID, error) { + if len(content) == 0 { + return uuid.Nil, fmt.Errorf("pendinguploads: empty content") + } + if len(content) > MaxFileBytes { + return uuid.Nil, ErrTooLarge + } + if filename == "" { + return uuid.Nil, fmt.Errorf("pendinguploads: empty filename") + } + // Filename length cap is enforced both here (early reject) and at + // the DB layer (CHECK constraint) so a buggy caller can't write a + // 200-char filename that Phase 2's URI rewrite would then truncate. + if len(filename) > 100 { + return uuid.Nil, fmt.Errorf("pendinguploads: filename exceeds 100 chars") + } + + var fileID uuid.UUID + err := p.db.QueryRowContext(ctx, ` + INSERT INTO pending_uploads (workspace_id, content, size_bytes, filename, mimetype) + VALUES ($1, $2, $3, $4, $5) + RETURNING file_id + `, workspaceID, content, int64(len(content)), filename, mimetype).Scan(&fileID) + if err != nil { + return uuid.Nil, fmt.Errorf("pendinguploads: insert: %w", err) + } + return fileID, nil +} + +func (p *PostgresStorage) Get(ctx context.Context, fileID uuid.UUID) (Record, error) { + // The expires_at + acked_at filter in the WHERE clause means a + // caller sees ErrNotFound for absent / acked / expired without + // needing per-case branching. Trade-off: we can't differentiate + // in metrics, but the workspace's response is the same in all + // three cases ("file gone, give up") so the granularity isn't + // useful at this layer. Phase 3 dashboards aggregate row-state + // counts directly off the table. + var r Record + err := p.db.QueryRowContext(ctx, ` + SELECT file_id, workspace_id, content, filename, mimetype, + size_bytes, created_at, fetched_at, acked_at, expires_at + FROM pending_uploads + WHERE file_id = $1 + AND acked_at IS NULL + AND expires_at > now() + `, fileID).Scan( + &r.FileID, &r.WorkspaceID, &r.Content, &r.Filename, &r.Mimetype, + &r.SizeBytes, &r.CreatedAt, &r.FetchedAt, &r.AckedAt, &r.ExpiresAt, + ) + if errors.Is(err, sql.ErrNoRows) { + return Record{}, ErrNotFound + } + if err != nil { + return Record{}, fmt.Errorf("pendinguploads: select: %w", err) + } + return r, nil +} + +func (p *PostgresStorage) MarkFetched(ctx context.Context, fileID uuid.UUID) error { + // UPDATE on the same gating predicate as Get — keeps the "absent + // or acked or expired = ErrNotFound" contract symmetric. Without + // the predicate a workspace could re-stamp fetched_at on an acked + // row, which would mislead Phase 3's stuck-fetch dashboard. + res, err := p.db.ExecContext(ctx, ` + UPDATE pending_uploads + SET fetched_at = now() + WHERE file_id = $1 + AND acked_at IS NULL + AND expires_at > now() + `, fileID) + if err != nil { + return fmt.Errorf("pendinguploads: mark_fetched: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("pendinguploads: mark_fetched rows: %w", err) + } + if n == 0 { + return ErrNotFound + } + return nil +} + +func (p *PostgresStorage) Ack(ctx context.Context, fileID uuid.UUID) error { + // Set acked_at only if currently NULL — workspace at-least-once + // retries don't move the timestamp, so dashboards see the first + // successful ack as the "delivery time." Two-clause WHERE: row + // must exist and not be expired; acked-but-still-in-window is + // returned as success (idempotent retry). + res, err := p.db.ExecContext(ctx, ` + UPDATE pending_uploads + SET acked_at = now() + WHERE file_id = $1 + AND acked_at IS NULL + AND expires_at > now() + `, fileID) + if err != nil { + return fmt.Errorf("pendinguploads: ack: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("pendinguploads: ack rows: %w", err) + } + if n == 1 { + return nil + } + // Zero-rows-affected: either the row doesn't exist / has expired, + // OR it was already acked. Re-query to disambiguate so the + // idempotent-retry case returns nil instead of ErrNotFound. + var ackedAt sql.NullTime + err = p.db.QueryRowContext(ctx, ` + SELECT acked_at FROM pending_uploads + WHERE file_id = $1 AND expires_at > now() + `, fileID).Scan(&ackedAt) + if errors.Is(err, sql.ErrNoRows) { + return ErrNotFound + } + if err != nil { + return fmt.Errorf("pendinguploads: ack disambiguate: %w", err) + } + if ackedAt.Valid { + // Already acked — idempotent success. + return nil + } + // Predicate matched a non-acked, non-expired row but RowsAffected + // was 0. This means the row was concurrently modified between the + // UPDATE and the SELECT (extremely rare; e.g. a Phase 3 sweep + // raced with the ACK). Treat as success — the row is gone, but + // the workspace's intent ("I'm done with this file") was honored. + return nil +} diff --git a/workspace-server/internal/pendinguploads/storage_test.go b/workspace-server/internal/pendinguploads/storage_test.go new file mode 100644 index 00000000..45f797c7 --- /dev/null +++ b/workspace-server/internal/pendinguploads/storage_test.go @@ -0,0 +1,400 @@ +package pendinguploads_test + +import ( + "context" + "database/sql" + "errors" + "strings" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/google/uuid" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" +) + +// Tests pin the SQL the handler relies on. Drift detection: if the +// migration changes column order / predicate shape, sqlmock's +// QueryMatcherEqual + ExpectQuery / ExpectExec on the literal text +// fails the test before the handler can ship a silently-broken read. +// +// Why sqlmock and not testcontainers / real Postgres: +// +// The Storage contract is "this Go method runs THIS SQL." Real-DB +// tests would catch SQL-syntax errors but not the contract drift +// we care about (e.g. handler accidentally reordering columns, +// dropping the acked_at predicate, etc.). Postgres-syntax coverage +// lives in the migration round-trip test (Phase 4 E2E). + +func newMockDB(t *testing.T) (*sql.DB, sqlmock.Sqlmock) { + t.Helper() + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + return db, mock +} + +// Single source of truth for the SQL strings — drift here = test fails; +// matches the Go literals in storage.go exactly. +const ( + insertSQL = ` + INSERT INTO pending_uploads (workspace_id, content, size_bytes, filename, mimetype) + VALUES ($1, $2, $3, $4, $5) + RETURNING file_id + ` + selectSQL = ` + SELECT file_id, workspace_id, content, filename, mimetype, + size_bytes, created_at, fetched_at, acked_at, expires_at + FROM pending_uploads + WHERE file_id = $1 + AND acked_at IS NULL + AND expires_at > now() + ` + markFetchedSQL = ` + UPDATE pending_uploads + SET fetched_at = now() + WHERE file_id = $1 + AND acked_at IS NULL + AND expires_at > now() + ` + ackSQL = ` + UPDATE pending_uploads + SET acked_at = now() + WHERE file_id = $1 + AND acked_at IS NULL + AND expires_at > now() + ` + ackDisambiguateSQL = ` + SELECT acked_at FROM pending_uploads + WHERE file_id = $1 AND expires_at > now() + ` +) + +// ----- Put ------------------------------------------------------------------ + +func TestPut_HappyPath_ReturnsAssignedFileID(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + wsID := uuid.New() + expectedID := uuid.New() + mock.ExpectQuery(insertSQL). + WithArgs(wsID, []byte("hello"), int64(5), "report.pdf", "application/pdf"). + WillReturnRows(sqlmock.NewRows([]string{"file_id"}).AddRow(expectedID)) + + got, err := store.Put(context.Background(), wsID, []byte("hello"), "report.pdf", "application/pdf") + if err != nil { + t.Fatalf("Put: %v", err) + } + if got != expectedID { + t.Errorf("file_id mismatch: got %s want %s", got, expectedID) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("expectations: %v", err) + } +} + +func TestPut_RejectsEmptyContentBeforeDB(t *testing.T) { + db, _ := newMockDB(t) // no expectations — must NOT round-trip + store := pendinguploads.NewPostgres(db) + + _, err := store.Put(context.Background(), uuid.New(), nil, "x.txt", "") + if err == nil || !strings.Contains(err.Error(), "empty content") { + t.Fatalf("expected empty-content error, got %v", err) + } +} + +func TestPut_RejectsOversizeBeforeDB(t *testing.T) { + db, _ := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + too := make([]byte, pendinguploads.MaxFileBytes+1) + _, err := store.Put(context.Background(), uuid.New(), too, "x.txt", "") + if !errors.Is(err, pendinguploads.ErrTooLarge) { + t.Fatalf("expected ErrTooLarge, got %v", err) + } +} + +func TestPut_RejectsEmptyFilenameBeforeDB(t *testing.T) { + db, _ := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + _, err := store.Put(context.Background(), uuid.New(), []byte("hi"), "", "") + if err == nil || !strings.Contains(err.Error(), "empty filename") { + t.Fatalf("expected empty-filename error, got %v", err) + } +} + +func TestPut_RejectsLongFilenameBeforeDB(t *testing.T) { + db, _ := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + long := strings.Repeat("a", 101) + _, err := store.Put(context.Background(), uuid.New(), []byte("hi"), long, "") + if err == nil || !strings.Contains(err.Error(), "exceeds 100 chars") { + t.Fatalf("expected too-long-filename error, got %v", err) + } +} + +func TestPut_PropagatesDBError(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + mock.ExpectQuery(insertSQL). + WithArgs(uuid.Nil, sqlmock.AnyArg(), int64(2), "x", ""). + WillReturnError(errors.New("connection refused")) + + wsID := uuid.Nil + _, err := store.Put(context.Background(), wsID, []byte("hi"), "x", "") + if err == nil || !strings.Contains(err.Error(), "insert") { + t.Fatalf("expected wrapped insert error, got %v", err) + } +} + +// ----- Get ------------------------------------------------------------------ + +func TestGet_HappyPath_ReturnsFullRow(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + wsID := uuid.New() + now := time.Now().UTC() + mock.ExpectQuery(selectSQL). + WithArgs(fid). + WillReturnRows(sqlmock.NewRows([]string{ + "file_id", "workspace_id", "content", "filename", "mimetype", + "size_bytes", "created_at", "fetched_at", "acked_at", "expires_at", + }).AddRow( + fid, wsID, []byte("data"), "x.bin", "application/octet-stream", + int64(4), now, nil, nil, now.Add(24*time.Hour), + )) + + r, err := store.Get(context.Background(), fid) + if err != nil { + t.Fatalf("Get: %v", err) + } + if r.FileID != fid || r.WorkspaceID != wsID { + t.Errorf("ids mismatch: %+v", r) + } + if string(r.Content) != "data" || r.SizeBytes != 4 { + t.Errorf("content mismatch: %+v", r) + } + if r.FetchedAt != nil || r.AckedAt != nil { + t.Errorf("expected nil timestamps for unfetched row, got fetched=%v acked=%v", r.FetchedAt, r.AckedAt) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("expectations: %v", err) + } +} + +func TestGet_AbsentRow_ReturnsErrNotFound(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectQuery(selectSQL). + WithArgs(fid). + WillReturnError(sql.ErrNoRows) + + _, err := store.Get(context.Background(), fid) + if !errors.Is(err, pendinguploads.ErrNotFound) { + t.Fatalf("expected ErrNotFound, got %v", err) + } +} + +func TestGet_DBError_WrappedAndPropagated(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + mock.ExpectQuery(selectSQL). + WillReturnError(errors.New("connection lost")) + + _, err := store.Get(context.Background(), uuid.New()) + if err == nil || errors.Is(err, pendinguploads.ErrNotFound) || !strings.Contains(err.Error(), "select") { + t.Fatalf("expected wrapped select error, got %v", err) + } +} + +// ----- MarkFetched ---------------------------------------------------------- + +func TestMarkFetched_HappyPath(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectExec(markFetchedSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 1)) + + if err := store.MarkFetched(context.Background(), fid); err != nil { + t.Fatalf("MarkFetched: %v", err) + } +} + +func TestMarkFetched_AbsentOrAckedOrExpired_ReturnsErrNotFound(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectExec(markFetchedSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 0)) + + err := store.MarkFetched(context.Background(), fid) + if !errors.Is(err, pendinguploads.ErrNotFound) { + t.Fatalf("expected ErrNotFound, got %v", err) + } +} + +func TestMarkFetched_DBError_Wrapped(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + mock.ExpectExec(markFetchedSQL). + WillReturnError(errors.New("pg flake")) + + err := store.MarkFetched(context.Background(), uuid.New()) + if err == nil || errors.Is(err, pendinguploads.ErrNotFound) || !strings.Contains(err.Error(), "mark_fetched") { + t.Fatalf("expected wrapped mark_fetched error, got %v", err) + } +} + +// ----- Ack ------------------------------------------------------------------ + +func TestAck_FirstAck_StampsAckedAt(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectExec(ackSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 1)) + + if err := store.Ack(context.Background(), fid); err != nil { + t.Fatalf("Ack: %v", err) + } +} + +func TestAck_AlreadyAcked_IdempotentSuccess(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + // First UPDATE matches zero rows (already acked). + mock.ExpectExec(ackSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 0)) + // Disambiguation SELECT finds the row with acked_at non-null. + mock.ExpectQuery(ackDisambiguateSQL). + WithArgs(fid). + WillReturnRows(sqlmock.NewRows([]string{"acked_at"}).AddRow(time.Now().UTC())) + + if err := store.Ack(context.Background(), fid); err != nil { + t.Fatalf("expected idempotent success on already-acked, got %v", err) + } +} + +func TestAck_AbsentOrExpired_ReturnsErrNotFound(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectExec(ackSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectQuery(ackDisambiguateSQL). + WithArgs(fid). + WillReturnError(sql.ErrNoRows) + + err := store.Ack(context.Background(), fid) + if !errors.Is(err, pendinguploads.ErrNotFound) { + t.Fatalf("expected ErrNotFound, got %v", err) + } +} + +func TestAck_RaceWithSweep_ReturnsSuccess(t *testing.T) { + // UPDATE saw 0 rows AND the disambiguate SELECT saw a row with + // acked_at IS NULL — only possible if the GC sweep raced between + // the two queries. The contract says we honor the workspace's ACK + // intent and return success. + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectExec(ackSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectQuery(ackDisambiguateSQL). + WithArgs(fid). + WillReturnRows(sqlmock.NewRows([]string{"acked_at"}).AddRow(nil)) + + if err := store.Ack(context.Background(), fid); err != nil { + t.Fatalf("expected race success, got %v", err) + } +} + +func TestAck_DBErrorOnUpdate_Wrapped(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + mock.ExpectExec(ackSQL). + WillReturnError(errors.New("conn refused")) + + err := store.Ack(context.Background(), uuid.New()) + if err == nil || !strings.Contains(err.Error(), "ack:") { + t.Fatalf("expected wrapped ack error, got %v", err) + } +} + +func TestMarkFetched_RowsAffectedError_Wrapped(t *testing.T) { + // Some drivers (or Result wrappers) return an error from + // RowsAffected() even when ExecContext succeeded — the contract + // says we surface that as a wrapped error rather than silently + // treating it as 0 rows (= ErrNotFound, which would mislead the + // workspace into giving up on a possibly-fetched row). + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + mock.ExpectExec(markFetchedSQL). + WillReturnResult(sqlmock.NewErrorResult(errors.New("driver doesn't support RowsAffected"))) + + err := store.MarkFetched(context.Background(), uuid.New()) + if err == nil || !strings.Contains(err.Error(), "mark_fetched rows") { + t.Fatalf("expected wrapped rows-affected error, got %v", err) + } +} + +func TestAck_RowsAffectedError_Wrapped(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + mock.ExpectExec(ackSQL). + WillReturnResult(sqlmock.NewErrorResult(errors.New("driver doesn't support RowsAffected"))) + + err := store.Ack(context.Background(), uuid.New()) + if err == nil || !strings.Contains(err.Error(), "ack rows") { + t.Fatalf("expected wrapped rows-affected error, got %v", err) + } +} + +func TestAck_DBErrorOnDisambiguate_Wrapped(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectExec(ackSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectQuery(ackDisambiguateSQL). + WithArgs(fid). + WillReturnError(errors.New("connection refused")) + + err := store.Ack(context.Background(), fid) + if err == nil || !strings.Contains(err.Error(), "disambiguate") { + t.Fatalf("expected wrapped disambiguate error, got %v", err) + } +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index 6aff74e5..86007d00 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -13,6 +13,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" memwiring "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/wiring" "github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics" "github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware" @@ -540,10 +541,20 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // streaming download (agent → user). Namespaced under /chat/ so // the security model is obviously distinct from /files/* (which // handles workspace config/templates and has a different caller). - chatfh := handlers.NewChatFilesHandler(tmplh) + chatfh := handlers.NewChatFilesHandler(tmplh). + WithPendingUploads(pendinguploads.NewPostgres(db.DB), broadcaster) wsAuth.POST("/chat/uploads", chatfh.Upload) wsAuth.GET("/chat/download", chatfh.Download) + // Phase 1 RFC: poll-mode chat upload — endpoints the workspace's + // inbox poller hits to fetch staged file content + ack delivery. + // Same wsAuth gate as the activity poll, so a token leak from + // workspace A can't read workspace B's pending uploads (the + // handler also re-checks workspace_id on each row). + puh := handlers.NewPendingUploadsHandler(pendinguploads.NewPostgres(db.DB)) + wsAuth.GET("/pending-uploads/:file_id/content", puh.GetContent) + wsAuth.POST("/pending-uploads/:file_id/ack", puh.Ack) + // Plugins pluginsDir := findPluginsDir(configsDir) // Runtime lookup lets the plugins handler filter the registry to plugins diff --git a/workspace-server/migrations/20260505100000_pending_uploads.down.sql b/workspace-server/migrations/20260505100000_pending_uploads.down.sql new file mode 100644 index 00000000..c8efc627 --- /dev/null +++ b/workspace-server/migrations/20260505100000_pending_uploads.down.sql @@ -0,0 +1,11 @@ +-- 20260505100000_pending_uploads.down.sql +-- +-- Drops the pending_uploads table and its indexes. Any pending file +-- uploads sitting in the table at rollback time are dropped — operators +-- on poll-mode workspaces lose those attachments, but they were never +-- fetched on the workspace side (otherwise they'd be acked + about to +-- be GC'd anyway), so the practical loss is the same as a cron sweep. + +DROP INDEX IF EXISTS idx_pending_uploads_expires; +DROP INDEX IF EXISTS idx_pending_uploads_workspace_unacked; +DROP TABLE IF EXISTS pending_uploads; diff --git a/workspace-server/migrations/20260505100000_pending_uploads.up.sql b/workspace-server/migrations/20260505100000_pending_uploads.up.sql new file mode 100644 index 00000000..d43f2649 --- /dev/null +++ b/workspace-server/migrations/20260505100000_pending_uploads.up.sql @@ -0,0 +1,103 @@ +-- 20260505100000_pending_uploads.up.sql +-- +-- RFC: poll-mode chat upload (counterpart to delivery_mode='poll' messaging). +-- +-- Today, chat_files.go's Upload handler refuses delivery_mode != 'push' +-- with HTTP 422 "workspace has no callback URL" — external runtime +-- workspaces (laptop / behind NAT) cannot receive file attachments at all. +-- The only escape was "register with ngrok / Cloudflare tunnel + push +-- mode," which forces every external operator into infra plumbing they +-- shouldn't need. +-- +-- This table is the platform-side staging layer that lets canvas → external +-- workspace file uploads ride the same poll loop the inbox already uses for +-- text messages: +-- +-- 1. Canvas POSTs multipart to workspace-server. +-- 2. workspace-server parses multipart, stores each file as one +-- pending_uploads row, AND inserts a matching activity_logs row +-- (type='chat_upload_receive', request_body={file_id, filename, ...}). +-- 3. Workspace's existing inbox poller picks up the activity row. +-- 4. Workspace fetches bytes via GET /workspaces/:id/pending-uploads/:fid/content, +-- writes to /workspace/.molecule/chat-uploads/, ACKs via POST. +-- 5. Sweep cron deletes rows past expires_at OR acked_at + N hours. +-- +-- Why a separate table and not bytea-on-activity_logs: +-- +-- * activity_logs is text/JSON-shaped today; mixing 25 MB binary blobs +-- into request_body inflates every JOIN, every since_id scan, every +-- pgdump. The bytes need their own home. +-- * Lifecycle differs: activity_logs is durable audit history (90d+); +-- pending_uploads is transient buffer (24h default) that GCs hard. +-- Keeping them split lets each table's retention policy run +-- independently. +-- * A future PR (RFC #2789) will migrate the bytes column to S3 keys +-- without touching the activity_logs schema or the metadata columns +-- here. That migration is one ALTER + one backfill rather than a +-- cross-table rewrite. +-- +-- No FK to workspaces: +-- workspace delete should NOT cascade-purge pending_uploads — those +-- rows are evidence-of-receipt and should expire on their own TTL. +-- Same posture as tenant_resources (PR #2343) and delegations (PR #2829). + +CREATE TABLE IF NOT EXISTS pending_uploads ( + -- Server-generated so the canvas can include the URI in the chat + -- message it sends right after the upload POST. Workspace fetches + -- by this id, no name collisions across workspaces. + file_id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + + -- Target workspace. NOT a FK (see header). + workspace_id uuid NOT NULL, + + -- Content lives inline today via bytea. The Go-side storage interface + -- (PendingUploadStorage) abstracts read/write so a future PR can + -- relocate this column's job to S3 (RFC #2789) by adding an `s3_key + -- text NULL` column, dual-writing for one release, then dropping + -- `content` once the backfill drains. The CHECK below pins the same + -- 25 MB per-file cap the workspace-side ingest_handler enforces + -- (workspace/internal_chat_uploads.py:198) — discrepancy between + -- the two would let the platform accept files the workspace would + -- 413 on after pull. + content bytea NOT NULL, + size_bytes bigint NOT NULL CHECK (size_bytes > 0 AND size_bytes <= 26214400), + + -- Filename + mimetype mirror the workspace-side ChatUploadedFile + -- shape so the eventual InboxMessage hand-off needs no translation. + -- Filename is sanitized at write-time (matches sanitize_filename in + -- workspace/internal_chat_uploads.py); 100 char cap is the same. + filename text NOT NULL CHECK (length(filename) > 0 AND length(filename) <= 100), + mimetype text NOT NULL DEFAULT '', + + created_at timestamptz NOT NULL DEFAULT now(), + + -- Stamped on the GET /content request. Lets Phase 3 sweeper detect + -- "fetched but never acked" — distinct failure mode from "never + -- fetched" (workspace offline) so dashboards can split them. + fetched_at timestamptz, + + -- Stamped on the POST /ack request. Terminal state for the happy + -- path. Sweep cron deletes acked rows past acked_at + retention. + acked_at timestamptz, + + -- Hard TTL: rows past this are deleted regardless of ack state. + -- 24h matches the longest-observed legitimate "operator stepped + -- away from laptop" gap; tunable later via app-level config without + -- a migration. NOT acked_at + 24h — that would let a stuck-fetched + -- row live forever. + expires_at timestamptz NOT NULL DEFAULT (now() + interval '24 hours') +); + +-- Hot path: workspace's poll cycle pulls "give me my unacked uploads +-- in chronological order." Partial-index because acked rows are GC +-- candidates and shouldn't bloat the working set. +CREATE INDEX IF NOT EXISTS idx_pending_uploads_workspace_unacked + ON pending_uploads (workspace_id, created_at) + WHERE acked_at IS NULL; + +-- Phase 3 GC sweep hot path: list rows past expires_at, partial-indexed +-- on unacked because acked rows have a different (shorter) retention +-- and GC-via-acked_at is a separate query. +CREATE INDEX IF NOT EXISTS idx_pending_uploads_expires + ON pending_uploads (expires_at) + WHERE acked_at IS NULL;