diff --git a/workspace-server/internal/handlers/chat_files.go b/workspace-server/internal/handlers/chat_files.go index b58d2d28..ccfa0d4c 100644 --- a/workspace-server/internal/handlers/chat_files.go +++ b/workspace-server/internal/handlers/chat_files.go @@ -31,23 +31,37 @@ package handlers import ( "context" "database/sql" + "errors" "fmt" "io" "log" + "mime/multipart" "net/http" "net/url" "path/filepath" + "regexp" "strings" "time" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/gin-gonic/gin" + "github.com/google/uuid" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" ) // ChatFilesHandler serves file upload + download for chat. Holds a // reference to TemplatesHandler so the (still docker-exec) Download // path keeps using the shared findContainer/CopyFromContainer helpers // without duplicating them. Upload no longer reaches into Docker. +// +// pendingUploads + broadcaster are wired only when the platform's +// migration 20260505100000 has run; nil values fall back to the +// pre-poll-mode behavior (422 on poll-mode upload, same as before). +// This lets the binary keep booting in environments where the +// migration hasn't run yet — the poll branch is gated by a not-nil +// check at the call site. type ChatFilesHandler struct { templates *TemplatesHandler @@ -56,6 +70,19 @@ type ChatFilesHandler struct { // the 50 MB worst case on a slow EC2 link without leaving a // connection hanging forever on a sick workspace. httpClient *http.Client + + // pendingUploads is the platform-side staging layer for poll-mode + // uploads. nil → poll branch returns 422 unchanged (the pre-feature + // behavior); non-nil → poll branch parses multipart, persists each + // file via storage.Put, logs a chat_upload_receive activity row, + // and returns 200 with synthetic platform-pending: URIs. + pendingUploads pendinguploads.Storage + + // broadcaster is the events.EventEmitter used to notify the canvas + // when an activity row lands (so the Agent Comms panel updates + // live). Same emitter the rest of the platform uses; nil = no + // broadcast (tests). + broadcaster events.EventEmitter } func NewChatFilesHandler(t *TemplatesHandler) *ChatFilesHandler { @@ -69,6 +96,16 @@ func NewChatFilesHandler(t *TemplatesHandler) *ChatFilesHandler { } } +// WithPendingUploads enables the poll-mode upload branch by wiring a +// Storage + broadcaster. Call site (router.go) does this at +// construction; tests set the fields directly when they want the +// poll path exercised. Returns the handler for chained construction. +func (h *ChatFilesHandler) WithPendingUploads(storage pendinguploads.Storage, broadcaster events.EventEmitter) *ChatFilesHandler { + h.pendingUploads = storage + h.broadcaster = broadcaster + return h +} + // chatUploadMaxBytes caps the full multipart request body so a // malicious / runaway client can't OOM the proxy hop. 50 MB matches // the workspace-side limit; anything larger is rejected at the @@ -262,6 +299,24 @@ func (h *ChatFilesHandler) Upload(c *gin.Context) { ctx := c.Request.Context() + // Branch on delivery_mode BEFORE attempting the HTTP forward. + // Push-mode workspaces continue to do the streaming forward + // unchanged. Poll-mode workspaces (typically external runtimes + // on a laptop, no public callback URL) get the platform-side + // staging path — the file lands in pending_uploads, an activity + // row goes into the inbox queue, and the workspace pulls on its + // next poll cycle. + if h.pendingUploads != nil { + mode, modeOK := lookupUploadDeliveryMode(c, ctx, workspaceID) + if !modeOK { + return + } + if mode == "poll" { + h.uploadPollMode(c, ctx, workspaceID) + return + } + } + wsURL, secret, ok := resolveWorkspaceForwardCreds(c, ctx, workspaceID, "upload") if !ok { return @@ -405,3 +460,251 @@ func (h *ChatFilesHandler) streamWorkspaceResponse( } } + +// lookupUploadDeliveryMode returns the workspace's delivery_mode +// for the chat upload branch. Returns ("", false) and writes the +// HTTP error response on lookup failure (caller stops). NULL or +// empty delivery_mode is treated as "push" — that's the schema +// default and matches the legacy pre-#2339 behavior. Only the +// explicit string "poll" routes the upload through the poll-mode +// branch. +// +// Why a dedicated helper instead of reusing lookupDeliveryMode +// from a2a_proxy_helpers.go: that one swallows errors and falls +// back to "push" so the proxy keeps working on a transient DB +// hiccup. For upload we want to surface the not-found case as 404 +// (which the workspace-poll branch wouldn't otherwise hit, since +// the workspace-side row IS the source of truth for the mode). +func lookupUploadDeliveryMode(c *gin.Context, ctx context.Context, workspaceID string) (string, bool) { + var mode sql.NullString + err := db.DB.QueryRowContext(ctx, + `SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID, + ).Scan(&mode) + if errors.Is(err, sql.ErrNoRows) { + c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"}) + return "", false + } + if err != nil { + log.Printf("chat_files Upload: delivery_mode lookup failed for %s: %v", workspaceID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "delivery_mode lookup failed"}) + return "", false + } + if !mode.Valid || mode.String == "" { + return "push", true + } + return mode.String, true +} + +// unsafeFilenameChars matches every character that isn't in the safe +// alphanumeric + dot/dash/underscore set. Mirrors the Python regex +// _UNSAFE_FILENAME_CHARS in workspace/internal_chat_uploads.py — drift +// here would mean canvas-emitted URIs differ between push and poll +// paths for the same upload. +var unsafeFilenameChars = regexp.MustCompile(`[^a-zA-Z0-9._\-]`) + +// SanitizeFilename reduces a user-supplied filename to a safe form. +// Behaviorally identical to sanitize_filename in workspace/ +// internal_chat_uploads.py. Exported so tests in other packages can +// pin behavior parity, and so a future shared library can move both +// implementations behind one source of truth. +func SanitizeFilename(name string) string { + base := filepath.Base(name) + // filepath.Base on a path-traversal input ("../../etc/passwd") + // returns "passwd" (just the last component) — which matches what + // Python's os.path.basename does. Tests pin both here and on the + // Python side. + base = strings.ReplaceAll(base, " ", "_") + base = unsafeFilenameChars.ReplaceAllString(base, "_") + if len(base) > 100 { + ext := "" + dot := strings.LastIndex(base, ".") + if dot >= 0 && len(base)-dot <= 16 { + ext = base[dot:] + } + base = base[:100-len(ext)] + ext + } + if base == "" || base == "." || base == ".." { + return "file" + } + return base +} + +// uploadedFile is the per-file response shape the workspace-side +// /internal/chat/uploads/ingest also produces. Mirroring the schema +// keeps the canvas client unaware of which path handled the upload. +type uploadedFile struct { + URI string `json:"uri"` + Name string `json:"name"` + Mimetype string `json:"mimeType"` + Size int64 `json:"size"` +} + +// uploadPollMode handles a chat upload bound for a poll-mode +// workspace. Parses the multipart in-place, persists each file via +// pendinguploads.Storage, and logs one chat_upload_receive activity +// row per file so the workspace's inbox poller picks them up on its +// next cycle. +// +// Why one activity row per file (not one per multipart batch): +// - Each row carries one URI; agents that consume the inbox treat +// each row as one inbound event. A batch row would force every +// consumer to deserialize a list, doubling the field-shape +// surface for no UX win. +// - At-least-once semantics: a workspace can ack files +// individually. Batch ack would leak partial-success state on +// a fetcher crash mid-batch. +// +// Limits enforced here mirror the workspace-side ingest_handler: +// - Total body cap: 50 MB (set on c.Request.Body before reaching us) +// - Per-file cap: 25 MB (pendinguploads.MaxFileBytes; rejected as 413) +// - Filename: sanitized + capped at 100 chars (SanitizeFilename) +// +// Logging: every persisted file logs an INFO line with workspace_id, +// file_id, size, and sanitized name. Failure modes (oversize, missing +// files field, malformed multipart) log at WARN with the same fields. +// Phase 3 metrics will hook these structured logs. +func (h *ChatFilesHandler) uploadPollMode(c *gin.Context, ctx context.Context, workspaceID string) { + // Parse multipart with the same per-file/per-form limits the + // workspace-side handler uses (workspace/internal_chat_uploads.py: + // max_files=64, max_fields=32). gin's MultipartForm does not + // expose those limits directly — the underlying ParseMultipartForm + // caps memory at 32 MB by default and spills to disk. For poll- + // mode we read each file into memory to hand to Storage.Put; + // 25 MB-per-file × 64-files ceiling means worst-case is 1.6 GB of + // peak memory. Bound the per-file size at the multipart layer so + // the spill never gets close. + if err := c.Request.ParseMultipartForm(32 << 20); err != nil { + log.Printf("chat_files uploadPollMode: parse multipart failed for %s: %v", workspaceID, err) + c.JSON(http.StatusBadRequest, gin.H{"error": "malformed multipart body"}) + return + } + form := c.Request.MultipartForm + if form == nil || len(form.File["files"]) == 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "no files field in request"}) + return + } + headers := form.File["files"] + if len(headers) > 64 { + c.JSON(http.StatusBadRequest, gin.H{"error": "too many files (limit 64)"}) + return + } + + wsUUID, err := uuid.Parse(workspaceID) + if err != nil { + // validateWorkspaceID at the top of Upload already gates this; + // the re-parse is defence in depth in case validateWorkspaceID + // drifts. Keep the error class consistent so a bad-id reaches + // the same 400 path. Not separately tested because the gate at + // the call site is structurally the same uuid.Parse. + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"}) + return + } + + out := make([]uploadedFile, 0, len(headers)) + for _, fh := range headers { + // Read full content. Per-file cap enforced post-read so an + // oversized file fails with a clean 413 rather than a torn + // stream. The +1 byte ReadAll trick that the Python side + // uses isn't easy through multipart.FileHeader; instead we + // rely on the multipart layer's ContentLength header and + // short-circuit before opening the part. + if fh.Size > pendinguploads.MaxFileBytes { + log.Printf("chat_files uploadPollMode: per-file cap exceeded for %s: %s (%d bytes)", + workspaceID, fh.Filename, fh.Size) + c.JSON(http.StatusRequestEntityTooLarge, gin.H{ + "error": "file exceeds per-file cap", + "filename": fh.Filename, + "size": fh.Size, + "max": pendinguploads.MaxFileBytes, + }) + return + } + content, err := readMultipartFile(fh) + if err != nil { + log.Printf("chat_files uploadPollMode: read part failed for %s/%s: %v", workspaceID, fh.Filename, err) + c.JSON(http.StatusBadRequest, gin.H{"error": "could not read file part"}) + return + } + + sanitized := SanitizeFilename(fh.Filename) + mimetype := fh.Header.Get("Content-Type") + + fileID, err := h.pendingUploads.Put(ctx, wsUUID, content, sanitized, mimetype) + if err != nil { + if errors.Is(err, pendinguploads.ErrTooLarge) { + // Belt + suspenders: the size check above already + // caught this, but Storage.Put re-validates so a + // malformed FileHeader can't slip through. 413 with + // the same shape so the client sees one error class. + c.JSON(http.StatusRequestEntityTooLarge, gin.H{ + "error": "file exceeds per-file cap", + "filename": fh.Filename, + "size": len(content), + "max": pendinguploads.MaxFileBytes, + }) + return + } + log.Printf("chat_files uploadPollMode: storage.Put failed for %s/%s: %v", + workspaceID, sanitized, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "could not stage file"}) + return + } + + // Activity row so the workspace's inbox poller picks this up + // on its next cycle. activity_type=a2a_receive (NOT a new + // type) so the existing poll filter + // `?type=a2a_receive` catches it without poll-side changes; + // method=chat_upload_receive is the discriminator the + // workspace's adapter (Phase 2) uses to route to the upload + // fetcher instead of the agent's message handler. Same + // shape as A2A's tasks/send vs message/send method split. + uri := fmt.Sprintf("platform-pending:%s/%s", workspaceID, fileID) + summary := "chat_upload_receive: " + sanitized + method := "chat_upload_receive" + LogActivity(ctx, h.broadcaster, ActivityParams{ + WorkspaceID: workspaceID, + ActivityType: "a2a_receive", + TargetID: &workspaceID, + Method: &method, + Summary: &summary, + RequestBody: map[string]interface{}{ + "file_id": fileID.String(), + "name": sanitized, + "mimeType": mimetype, + "size": len(content), + "uri": uri, + }, + Status: "ok", + }) + + log.Printf("chat_files uploadPollMode: staged %s/%s (file_id=%s size=%d mimetype=%q)", + workspaceID, sanitized, fileID, len(content), mimetype) + + out = append(out, uploadedFile{ + URI: uri, + Name: sanitized, + Mimetype: mimetype, + Size: int64(len(content)), + }) + } + + c.JSON(http.StatusOK, gin.H{"files": out}) +} + +// readMultipartFile reads a multipart part fully into memory. Wraps +// the open + io.ReadAll + close idiom so the call site stays clean, +// and so a future change (chunked reads / hashing) has one place to +// land. +func readMultipartFile(fh *multipartFileHeader) ([]byte, error) { + f, err := fh.Open() + if err != nil { + return nil, fmt.Errorf("open part: %w", err) + } + defer f.Close() + return io.ReadAll(f) +} + +// multipartFileHeader is a local alias so the readMultipartFile +// signature doesn't pull "mime/multipart" into every test that +// touches uploadPollMode. +type multipartFileHeader = multipart.FileHeader diff --git a/workspace-server/internal/handlers/chat_files_poll_test.go b/workspace-server/internal/handlers/chat_files_poll_test.go new file mode 100644 index 00000000..c064bd6a --- /dev/null +++ b/workspace-server/internal/handlers/chat_files_poll_test.go @@ -0,0 +1,589 @@ +package handlers + +// chat_files_poll_test.go — Upload poll-mode branch tests. +// +// Pinned in their own file so the existing chat_files_test.go stays +// focused on the push-mode forward proxy. Same setupTestDB / sqlmock +// scaffolding as the rest of the package, plus an in-memory +// pendinguploads.Storage so we don't have to mock six SQL statements +// per assertion. + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "errors" + "mime/multipart" + "net/http" + "strings" + "sync" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/google/uuid" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" +) + +// inMemStorage is a process-local pendinguploads.Storage for branch +// tests. Records every Put for assertion. Failure modes (Put error, +// MarkFetched / Ack tested elsewhere) are injected via fields. +type inMemStorage struct { + mu sync.Mutex + rows map[uuid.UUID]pendinguploads.Record + puts []putCall + putErr error +} + +type putCall struct { + WorkspaceID uuid.UUID + Filename string + Mimetype string + Size int +} + +func newInMemStorage() *inMemStorage { + return &inMemStorage{rows: map[uuid.UUID]pendinguploads.Record{}} +} + +func (s *inMemStorage) Put(_ context.Context, ws uuid.UUID, content []byte, filename, mimetype string) (uuid.UUID, error) { + s.mu.Lock() + defer s.mu.Unlock() + if s.putErr != nil { + return uuid.Nil, s.putErr + } + id := uuid.New() + s.rows[id] = pendinguploads.Record{ + FileID: id, WorkspaceID: ws, Content: content, + Filename: filename, Mimetype: mimetype, + SizeBytes: int64(len(content)), CreatedAt: time.Now(), + ExpiresAt: time.Now().Add(24 * time.Hour), + } + s.puts = append(s.puts, putCall{ + WorkspaceID: ws, Filename: filename, Mimetype: mimetype, Size: len(content), + }) + return id, nil +} + +func (s *inMemStorage) Get(context.Context, uuid.UUID) (pendinguploads.Record, error) { + return pendinguploads.Record{}, pendinguploads.ErrNotFound +} +func (s *inMemStorage) MarkFetched(context.Context, uuid.UUID) error { return nil } +func (s *inMemStorage) Ack(context.Context, uuid.UUID) error { return nil } + +// expectPollDeliveryMode stubs the SELECT delivery_mode lookup that +// uploadPollMode does (separate from the one resolveWorkspaceForwardCreds +// does — this is the new helper introduced for the poll branch). +func expectPollDeliveryMode(mock sqlmock.Sqlmock, workspaceID, mode string) { + rows := sqlmock.NewRows([]string{"delivery_mode"}).AddRow(mode) + mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`). + WithArgs(workspaceID). + WillReturnRows(rows) +} + +func expectPollDeliveryModeMissing(mock sqlmock.Sqlmock, workspaceID string) { + mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`). + WithArgs(workspaceID). + WillReturnError(sql.ErrNoRows) +} + +// expectActivityInsert stubs the LogActivity INSERT so the poll branch's +// per-file activity row write doesn't fail the sqlmock expectations. +func expectActivityInsert(mock sqlmock.Sqlmock) { + mock.ExpectExec(`INSERT INTO activity_logs`). + WillReturnResult(sqlmock.NewResult(1, 1)) +} + +// expectActivityInsertWithTypeAndMethod is a strict variant that pins +// the activity_type and method positional args. Used in the discriminator +// regression test below — the workspace inbox poller filters +// `?type=a2a_receive`, so writing any other activity_type silently breaks +// poll-mode delivery without a build/test error. Pin the two discriminator +// fields so a refactor that flips activity_type back to a custom value is +// caught here instead of at runtime by a confused poller. +// +// Positional args (LogActivity uses ExecContext with 12 positional params): +// $1 workspace_id, $2 activity_type, $3 source_id, $4 target_id, +// $5 method, $6 summary, $7 request_body, $8 response_body, +// $9 tool_trace, $10 duration_ms, $11 status, $12 error_detail. +func expectActivityInsertWithTypeAndMethod(mock sqlmock.Sqlmock, workspaceID, activityType, method string) { + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs( + workspaceID, // $1 workspace_id + activityType, // $2 activity_type ← pinned + sqlmock.AnyArg(), // $3 source_id + sqlmock.AnyArg(), // $4 target_id (workspaceID, but already covered) + method, // $5 method ← pinned + sqlmock.AnyArg(), // $6 summary + sqlmock.AnyArg(), // $7 request_body + sqlmock.AnyArg(), // $8 response_body + sqlmock.AnyArg(), // $9 tool_trace + sqlmock.AnyArg(), // $10 duration_ms + sqlmock.AnyArg(), // $11 status + sqlmock.AnyArg(), // $12 error_detail + ). + WillReturnResult(sqlmock.NewResult(1, 1)) +} + +// pollUploadFixture builds a multipart body with N named files. +func pollUploadFixture(t *testing.T, files map[string][]byte) (*bytes.Buffer, string) { + t.Helper() + var buf bytes.Buffer + mw := multipart.NewWriter(&buf) + for name, data := range files { + fw, err := mw.CreateFormFile("files", name) + if err != nil { + t.Fatalf("CreateFormFile: %v", err) + } + _, _ = fw.Write(data) + } + mw.Close() + return &buf, mw.FormDataContentType() +} + +// ---- happy path ---- + +func TestPollUpload_HappyPath_OneFile_StagesAndLogs(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "11111111-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + expectActivityInsert(mock) + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"report.pdf": []byte("PDF-bytes")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", w.Code, w.Body.String()) + } + if len(store.puts) != 1 { + t.Fatalf("expected 1 storage Put, got %d", len(store.puts)) + } + put := store.puts[0] + if put.Filename != "report.pdf" || put.Size != 9 { + t.Errorf("unexpected put: %+v", put) + } + + // Response shape must match the workspace-side + // /internal/chat/uploads/ingest schema so canvas can't tell which + // path handled the upload. + var resp struct { + Files []struct { + URI string `json:"uri"` + Name string `json:"name"` + Mimetype string `json:"mimeType"` + Size int `json:"size"` + } `json:"files"` + } + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode response: %v body=%s", err, w.Body.String()) + } + if len(resp.Files) != 1 { + t.Fatalf("response files count = %d, want 1", len(resp.Files)) + } + got := resp.Files[0] + if got.Name != "report.pdf" || got.Size != 9 { + t.Errorf("response file mismatch: %+v", got) + } + if !strings.HasPrefix(got.URI, "platform-pending:"+wsID+"/") { + t.Errorf("URI %q does not start with platform-pending:%s/", got.URI, wsID) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("expectations: %v", err) + } +} + +func TestPollUpload_MultipleFiles_AllStagedAndLogged(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "11111111-aaaa-bbbb-cccc-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + expectActivityInsert(mock) + expectActivityInsert(mock) + expectActivityInsert(mock) + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{ + "a.txt": []byte("aaaa"), + "b.txt": []byte("bbbbb"), + "c.txt": []byte("cccccc"), + }) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", w.Code, w.Body.String()) + } + if len(store.puts) != 3 { + t.Fatalf("expected 3 storage Puts, got %d", len(store.puts)) + } +} + +// ---- regression: push-mode unchanged ---- + +func TestPollUpload_PushModeFallsThroughToForward(t *testing.T) { + // With pendingUploads wired but the workspace's mode is push, + // the poll branch must NOT activate — flow falls through to the + // existing resolveWorkspaceForwardCreds path. Pinned via the + // "delivery_mode lookup happened, then the URL+mode SELECT + // happened, then we 503 because no inbound secret" sequence. + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "22222222-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "push") + // After the poll branch is bypassed, we hit + // resolveWorkspaceForwardCreds which selects url+delivery_mode. + expectURL(mock, wsID, "") + // URL empty + mode=push → 503 (no inbound secret check needed). + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x": []byte("data")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusServiceUnavailable { + t.Fatalf("status=%d body=%s — expected push-mode 503 fall-through", w.Code, w.Body.String()) + } + if len(store.puts) != 0 { + t.Errorf("push-mode should NOT have hit storage, got %d puts", len(store.puts)) + } +} + +func TestPollUpload_NotConfigured_FallsThrough(t *testing.T) { + // Backwards compat: a binary running without WithPendingUploads + // behaves exactly as before — the poll branch is dead code. + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "33333333-2222-3333-4444-555555555555" + expectURLAndMode(mock, wsID, "", "poll") // resolveWorkspaceForwardCreds emits 422 + + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)) + // No WithPendingUploads — pendingUploads is nil. + + body, ct := pollUploadFixture(t, map[string][]byte{"x": []byte("data")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("status=%d, want 422 (legacy poll-mode rejection)", w.Code) + } +} + +// ---- error paths ---- + +func TestPollUpload_WorkspaceMissing_404(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "44444444-2222-3333-4444-555555555555" + expectPollDeliveryModeMissing(mock, wsID) + + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(newInMemStorage(), nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x": []byte("d")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusNotFound { + t.Errorf("status=%d, want 404", w.Code) + } +} + +func TestPollUpload_DeliveryModeLookupDBError_500(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "55555555-2222-3333-4444-555555555555" + mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`). + WithArgs(wsID).WillReturnError(errors.New("connection lost")) + + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(newInMemStorage(), nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x": []byte("d")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusInternalServerError { + t.Errorf("status=%d, want 500", w.Code) + } +} + +func TestPollUpload_NoFilesField_400(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "66666666-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + // Multipart with a non-files field — no actual files. + var buf bytes.Buffer + mw := multipart.NewWriter(&buf) + mw.WriteField("not_files", "hi") + mw.Close() + + c, w := makeUploadRequest(t, wsID, &buf, mw.FormDataContentType()) + h.Upload(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400 on no files field", w.Code) + } +} + +func TestPollUpload_MalformedMultipart_400(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "77777777-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + // Body that doesn't match the boundary in Content-Type. + c, w := makeUploadRequest(t, wsID, bytes.NewBufferString("garbage"), "multipart/form-data; boundary=fake") + h.Upload(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400 on malformed multipart", w.Code) + } +} + +func TestPollUpload_StorageError_500(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "88888888-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + + store := newInMemStorage() + store.putErr = errors.New("disk full") + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x.bin": []byte("data")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusInternalServerError { + t.Errorf("status=%d, want 500", w.Code) + } +} + +func TestPollUpload_StorageTooLarge_413(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "99999999-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + + store := newInMemStorage() + store.putErr = pendinguploads.ErrTooLarge + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x.bin": []byte("data")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusRequestEntityTooLarge { + t.Errorf("status=%d, want 413", w.Code) + } +} + +func TestPollUpload_TooManyFiles_400(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "aaaaaaaa-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + // 65 files — over the per-batch cap. + files := map[string][]byte{} + for i := 0; i < 65; i++ { + files[uuid.New().String()] = []byte("x") + } + body, ct := pollUploadFixture(t, files) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400 on too many files", w.Code) + } +} + +func TestPollUpload_NullDeliveryMode_TreatedAsPush(t *testing.T) { + // Production-observed 2026-05-04: external runtime workspaces + // (molecule-sdk-python on user infra) sometimes register with + // delivery_mode = NULL — the schema default for legacy rows from + // before #2339. The poll branch must NOT activate on NULL — only + // the explicit "poll" string. This is the same defensive posture + // resolveWorkspaceForwardCreds takes for legacy rows. + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "cccccccc-2222-3333-4444-555555555555" + mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow(nil)) + // Falls through to resolveWorkspaceForwardCreds: + expectURLAndMode(mock, wsID, "", "") + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x.bin": []byte("data")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + // resolveWorkspaceForwardCreds with empty url + NULL mode = 422 + // (the legacy "no callback URL" rejection — exactly what we're + // fixing for ACTUAL poll-mode rows but want to preserve for + // NULL ones until the row gets a real mode value via the next + // /registry/register). + if w.Code != http.StatusUnprocessableEntity { + t.Errorf("status=%d, want 422 for NULL delivery_mode (legacy fallthrough)", w.Code) + } + if len(store.puts) != 0 { + t.Errorf("NULL mode should NOT have hit storage, got %d puts", len(store.puts)) + } +} + +func TestPollUpload_PerFileCapPreStorage_413(t *testing.T) { + // Pin the early-reject branch (fh.Size > MaxFileBytes) BEFORE we + // read the part into memory. Without this, an oversize file + // would hit the storage layer's belt-and-suspenders check, which + // works but burns ~25 MB of memory + DB round-trip first. Send + // 25 MB + 1 byte → 413 with the file size in the response. + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "dddddddd-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + // 25 MB + 1 byte. Single file, large enough to trip the early + // size check. + oversize := make([]byte, pendinguploads.MaxFileBytes+1) + body, ct := pollUploadFixture(t, map[string][]byte{"big.bin": oversize}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusRequestEntityTooLarge { + t.Fatalf("status=%d, want 413 on per-file size cap", w.Code) + } + if len(store.puts) != 0 { + t.Errorf("per-file cap reject should NOT have called storage.Put, got %d puts", len(store.puts)) + } + // Sanity: response carries the size we tried to upload + the cap. + var body_ map[string]any + json.Unmarshal(w.Body.Bytes(), &body_) + if got := body_["max"]; got == nil { + t.Errorf("expected max field in response, got %v", body_) + } +} + +// SanitizeFilename is exercised in the upload chain — pin one +// end-to-end case that exercises the URI path through the response. +func TestPollUpload_SanitizesFilenameInResponse(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "bbbbbbbb-2222-3333-4444-555555555555" + expectPollDeliveryMode(mock, wsID, "poll") + expectActivityInsert(mock) + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"hello world!.pdf": []byte("data")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", w.Code, w.Body.String()) + } + var resp struct { + Files []struct { + Name string `json:"name"` + URI string `json:"uri"` + } + } + json.Unmarshal(w.Body.Bytes(), &resp) + if len(resp.Files) == 0 || resp.Files[0].Name != "hello_world_.pdf" { + t.Errorf("expected sanitized name 'hello_world_.pdf', got: %+v", resp.Files) + } + if len(store.puts) == 0 || store.puts[0].Filename != "hello_world_.pdf" { + t.Errorf("storage Put didn't receive sanitized filename: %+v", store.puts) + } +} + +// TestPollUpload_ActivityRowDiscriminator pins the +// activity_type / method shape that the workspace inbox poller depends +// on. The poller filters `GET /workspaces/:id/activity?type=a2a_receive` +// so the handler MUST write activity_type=a2a_receive (NOT a custom +// type), and use method=chat_upload_receive as the +// upload-vs-message-vs-task discriminator. +// +// Why pinned: a previous iteration of this handler used +// activity_type="chat_upload_receive" — silently invisible to the +// existing poller. The branch passed every push-mode test, every +// storage test, and every per-file content test; the bug only +// surfaced at runtime when the workspace polled and got nothing. +// Encode the contract in a unit test so the next refactor can't +// re-break it without a red CI. +func TestPollUpload_ActivityRowDiscriminator(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + + wsID := "abc12345-6789-4abc-8def-000000000999" + expectPollDeliveryMode(mock, wsID, "poll") + expectActivityInsertWithTypeAndMethod(mock, wsID, "a2a_receive", "chat_upload_receive") + + store := newInMemStorage() + h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)). + WithPendingUploads(store, nil) + + body, ct := pollUploadFixture(t, map[string][]byte{"x.pdf": []byte("xx")}) + c, w := makeUploadRequest(t, wsID, body, ct) + h.Upload(c) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", w.Code, w.Body.String()) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("expectations: %v", err) + } +} diff --git a/workspace-server/internal/handlers/pending_uploads.go b/workspace-server/internal/handlers/pending_uploads.go new file mode 100644 index 00000000..aec1ca0f --- /dev/null +++ b/workspace-server/internal/handlers/pending_uploads.go @@ -0,0 +1,184 @@ +// pending_uploads.go — endpoints the workspace polls to fetch and ack +// chat-upload files staged on the platform side for poll-mode delivery. +// +// Companion to chat_files.go Upload's poll-mode branch: +// +// Canvas POST /workspaces/:id/chat/uploads +// ↓ (poll-mode workspace) +// Platform: chat_files.uploadPollMode +// ↓ writes pending_uploads row + activity_logs(type=chat_upload_receive) +// Workspace inbox poller picks up activity row +// ↓ +// Workspace GETs /workspaces/:id/pending-uploads/:fid/content ← this file +// ↓ writes file to /workspace/.molecule/chat-uploads +// Workspace POSTs /workspaces/:id/pending-uploads/:fid/ack ← this file +// ↓ row marked acked; Phase 3 sweep deletes +// +// Auth: same wsAuth middleware that gates the activity poll endpoint — +// the workspace's per-workspace platform_token. Only the target workspace +// can read OR ack its own pending uploads. The handler enforces that +// :id == file.workspace_id even though the URL param matches; defence in +// depth against a token leak letting one workspace pull another's bytes. + +package handlers + +import ( + "errors" + "log" + "net/http" + "strconv" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" +) + +// PendingUploadsHandler serves the workspace-side fetch + ack endpoints. +// Holds a Storage so tests can inject an in-memory implementation +// without going through Postgres (sqlmock-based unit tests cover the +// Postgres impl in internal/pendinguploads/storage_test.go). +type PendingUploadsHandler struct { + storage pendinguploads.Storage +} + +// NewPendingUploadsHandler constructs the handler with a concrete +// Storage. Production wires up pendinguploads.NewPostgres(db.DB). +func NewPendingUploadsHandler(storage pendinguploads.Storage) *PendingUploadsHandler { + return &PendingUploadsHandler{storage: storage} +} + +// GetContent handles GET /workspaces/:id/pending-uploads/:file_id/content. +// +// Returns the file bytes with the original mimetype and a +// Content-Disposition that names the original (sanitized) filename so +// the workspace's fetcher writes it under the expected name. Stamps +// fetched_at on the row best-effort — the read response is already +// flushed to the network before the MarkFetched call so a sweep race +// can't break the workspace's fetch. +// +// 404 on: +// - file_id not found +// - file_id belongs to a different workspace (cross-workspace bleed +// protection) +// - row already acked (workspace's bug — should not re-fetch after ack) +// - row past expires_at (Phase 3 sweep would delete shortly anyway) +func (h *PendingUploadsHandler) GetContent(c *gin.Context) { + workspaceID := c.Param("id") + if err := validateWorkspaceID(workspaceID); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"}) + return + } + fileIDStr := c.Param("file_id") + fileID, err := uuid.Parse(fileIDStr) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid file_id"}) + return + } + + rec, err := h.storage.Get(c.Request.Context(), fileID) + if errors.Is(err, pendinguploads.ErrNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found, expired, or already acked"}) + return + } + if err != nil { + log.Printf("pending_uploads GetContent: storage.Get(%s) failed: %v", fileID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "storage error"}) + return + } + + // Cross-workspace bleed protection: a token leak from workspace A + // must not let it read workspace B's pending uploads even with the + // correct file_id. wsAuth already pinned the caller to :id; reject + // if the row's workspace_id doesn't match. + if rec.WorkspaceID.String() != workspaceID { + log.Printf("pending_uploads GetContent: workspace mismatch — caller=%s row=%s file_id=%s", + workspaceID, rec.WorkspaceID, fileID) + c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found"}) + return + } + + // Stream the bytes. Set the original mimetype if known; fall back + // to application/octet-stream so curl / browser clients still get + // a valid response. Content-Disposition uses the workspace-side + // filename so the fetcher writes it under the expected name. + mimetype := rec.Mimetype + if mimetype == "" { + mimetype = "application/octet-stream" + } + c.Header("Content-Type", mimetype) + c.Header("Content-Disposition", contentDispositionAttachment(rec.Filename)) + c.Header("Content-Length", strconv.FormatInt(rec.SizeBytes, 10)) + c.Status(http.StatusOK) + if _, err := c.Writer.Write(rec.Content); err != nil { + // Connection closed mid-stream — log and bail; we cannot + // re-emit headers at this point. The workspace's HTTP client + // will see the truncated body and retry on next poll. + log.Printf("pending_uploads GetContent: write failed for %s: %v", fileID, err) + return + } + + // Best-effort fetched_at stamp. After-the-fact so the GET response + // completes regardless of the UPDATE outcome — a Phase 3 sweep + // race that nukes the row between Get and MarkFetched must not + // break the workspace's fetch. + if err := h.storage.MarkFetched(c.Request.Context(), fileID); err != nil { + log.Printf("pending_uploads GetContent: mark_fetched(%s) failed: %v", fileID, err) + } +} + +// Ack handles POST /workspaces/:id/pending-uploads/:file_id/ack. +// +// Marks the row as handed-off; Phase 3 sweep deletes acked rows after +// a retention window. Idempotent — workspace at-least-once retries on +// a flaky network return success without moving the timestamp. +func (h *PendingUploadsHandler) Ack(c *gin.Context) { + workspaceID := c.Param("id") + if err := validateWorkspaceID(workspaceID); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"}) + return + } + fileIDStr := c.Param("file_id") + fileID, err := uuid.Parse(fileIDStr) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid file_id"}) + return + } + + // Cross-workspace bleed protection: do a lookup BEFORE Ack so + // a token leak can't ack a row owned by a different workspace. + // We don't expose this distinction in the response (404 either + // way) — the workspace can't tell whether it ack'd a non-existent + // row vs. one it didn't own, and that's fine for the contract. + rec, err := h.storage.Get(c.Request.Context(), fileID) + if errors.Is(err, pendinguploads.ErrNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found, expired, or already acked"}) + return + } + if err != nil { + log.Printf("pending_uploads Ack: storage.Get(%s) failed: %v", fileID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "storage error"}) + return + } + if rec.WorkspaceID.String() != workspaceID { + log.Printf("pending_uploads Ack: workspace mismatch — caller=%s row=%s file_id=%s", + workspaceID, rec.WorkspaceID, fileID) + c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found"}) + return + } + + if err := h.storage.Ack(c.Request.Context(), fileID); err != nil { + if errors.Is(err, pendinguploads.ErrNotFound) { + // Race window: the row passed Get but failed Ack — sweep + // raced with us between the two queries. Treat as success + // (the workspace's intent was honored, the row is gone). + c.JSON(http.StatusOK, gin.H{"acked": true, "raced": true}) + return + } + log.Printf("pending_uploads Ack: storage.Ack(%s) failed: %v", fileID, err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "storage error"}) + return + } + c.JSON(http.StatusOK, gin.H{"acked": true}) +} + diff --git a/workspace-server/internal/handlers/pending_uploads_test.go b/workspace-server/internal/handlers/pending_uploads_test.go new file mode 100644 index 00000000..17da24af --- /dev/null +++ b/workspace-server/internal/handlers/pending_uploads_test.go @@ -0,0 +1,373 @@ +package handlers_test + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/gin-gonic/gin" + "github.com/google/uuid" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" +) + +// fakeStorage is an in-memory pendinguploads.Storage. Lets handler +// tests pin behaviour without going through Postgres + sqlmock — the +// storage layer's own tests (internal/pendinguploads/storage_test.go) +// cover the SQL drift surface; here we only care about the handler's +// 4xx/5xx mapping and side-effect ordering. +type fakeStorage struct { + rows map[uuid.UUID]pendinguploads.Record + getErr error // forced error from Get (overrides rows lookup) + ackErr error // forced error from Ack + markErr error // forced error from MarkFetched + markFetched []uuid.UUID + ackCalls []uuid.UUID +} + +func newFakeStorage() *fakeStorage { + return &fakeStorage{rows: map[uuid.UUID]pendinguploads.Record{}} +} + +func (f *fakeStorage) Put(ctx context.Context, ws uuid.UUID, content []byte, filename, mimetype string) (uuid.UUID, error) { + id := uuid.New() + f.rows[id] = pendinguploads.Record{ + FileID: id, WorkspaceID: ws, Content: content, + Filename: filename, Mimetype: mimetype, + SizeBytes: int64(len(content)), CreatedAt: time.Now(), + ExpiresAt: time.Now().Add(24 * time.Hour), + } + return id, nil +} + +func (f *fakeStorage) Get(_ context.Context, fileID uuid.UUID) (pendinguploads.Record, error) { + if f.getErr != nil { + return pendinguploads.Record{}, f.getErr + } + rec, ok := f.rows[fileID] + if !ok { + return pendinguploads.Record{}, pendinguploads.ErrNotFound + } + return rec, nil +} + +func (f *fakeStorage) MarkFetched(_ context.Context, fileID uuid.UUID) error { + f.markFetched = append(f.markFetched, fileID) + return f.markErr +} + +func (f *fakeStorage) Ack(_ context.Context, fileID uuid.UUID) error { + f.ackCalls = append(f.ackCalls, fileID) + if f.ackErr != nil { + return f.ackErr + } + delete(f.rows, fileID) + return nil +} + +func newRouter(handler *handlers.PendingUploadsHandler) *gin.Engine { + gin.SetMode(gin.TestMode) + r := gin.New() + r.GET("/workspaces/:id/pending-uploads/:file_id/content", handler.GetContent) + r.POST("/workspaces/:id/pending-uploads/:file_id/ack", handler.Ack) + return r +} + +// ---- GetContent ---- + +func TestGetContent_HappyPath_StreamsBytesAndStampsFetched(t *testing.T) { + fs := newFakeStorage() + wsID := uuid.New() + fileID, err := fs.Put(context.Background(), wsID, []byte("hello world"), "report.pdf", "application/pdf") + if err != nil { + t.Fatalf("Put: %v", err) + } + h := handlers.NewPendingUploadsHandler(fs) + r := newRouter(h) + + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", w.Code, w.Body.String()) + } + if got := w.Body.String(); got != "hello world" { + t.Errorf("body = %q, want %q", got, "hello world") + } + if got := w.Header().Get("Content-Type"); got != "application/pdf" { + t.Errorf("Content-Type = %q, want application/pdf", got) + } + if got := w.Header().Get("Content-Disposition"); !strings.Contains(got, "report.pdf") { + t.Errorf("Content-Disposition = %q, expected to mention report.pdf", got) + } + if got := w.Header().Get("Content-Length"); got != "11" { + t.Errorf("Content-Length = %q, want 11", got) + } + if len(fs.markFetched) != 1 || fs.markFetched[0] != fileID { + t.Errorf("expected MarkFetched(%s), got %v", fileID, fs.markFetched) + } +} + +func TestGetContent_DefaultsMimetypeWhenEmpty(t *testing.T) { + fs := newFakeStorage() + wsID := uuid.New() + fileID, _ := fs.Put(context.Background(), wsID, []byte("data"), "x.bin", "") + h := handlers.NewPendingUploadsHandler(fs) + r := newRouter(h) + + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if got := w.Header().Get("Content-Type"); got != "application/octet-stream" { + t.Errorf("Content-Type fallback = %q, want application/octet-stream", got) + } +} + +func TestGetContent_InvalidWorkspaceID_400(t *testing.T) { + fs := newFakeStorage() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + req := httptest.NewRequest(http.MethodGet, "/workspaces/not-a-uuid/pending-uploads/00000000-0000-0000-0000-000000000000/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400", w.Code) + } +} + +func TestGetContent_InvalidFileID_400(t *testing.T) { + fs := newFakeStorage() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + wsID := uuid.New() + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsID.String()+"/pending-uploads/not-a-uuid/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400", w.Code) + } +} + +func TestGetContent_NotFound_404(t *testing.T) { + fs := newFakeStorage() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + wsID := uuid.New() + missing := uuid.New() + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsID.String()+"/pending-uploads/"+missing.String()+"/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusNotFound { + t.Errorf("status=%d, want 404", w.Code) + } +} + +func TestGetContent_StorageError_500(t *testing.T) { + fs := newFakeStorage() + fs.getErr = errors.New("connection refused") + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + wsID := uuid.New() + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsID.String()+"/pending-uploads/"+uuid.New().String()+"/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusInternalServerError { + t.Errorf("status=%d, want 500", w.Code) + } +} + +func TestGetContent_CrossWorkspaceBleed_404(t *testing.T) { + // Token leak: workspace A's wsAuth-validated request tries to + // pull workspace B's file_id. Handler must 404 even though the + // row exists. + fs := newFakeStorage() + wsB := uuid.New() + fileID, _ := fs.Put(context.Background(), wsB, []byte("secret"), "leak.txt", "text/plain") + + wsA := uuid.New() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsA.String()+"/pending-uploads/"+fileID.String()+"/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusNotFound { + t.Fatalf("status=%d, want 404 for cross-workspace bleed", w.Code) + } + // Critical: must not have leaked the bytes. + if strings.Contains(w.Body.String(), "secret") { + t.Errorf("response body leaked content from another workspace: %q", w.Body.String()) + } +} + +func TestGetContent_MarkFetchedFailureLoggedNotPropagated(t *testing.T) { + fs := newFakeStorage() + wsID := uuid.New() + fileID, _ := fs.Put(context.Background(), wsID, []byte("ok"), "x.txt", "text/plain") + fs.markErr = errors.New("update failed (sweep raced)") + h := handlers.NewPendingUploadsHandler(fs) + r := newRouter(h) + + req := httptest.NewRequest(http.MethodGet, + "/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/content", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + // Body already returned 200 OK + bytes BEFORE the MarkFetched + // failure — workspace fetch must NOT fail because of an + // observability hook. + if w.Code != http.StatusOK { + t.Errorf("status=%d, want 200 even on MarkFetched failure", w.Code) + } + if w.Body.String() != "ok" { + t.Errorf("body = %q, want %q", w.Body.String(), "ok") + } +} + +// ---- Ack ---- + +func TestAck_HappyPath_RemovesRow(t *testing.T) { + fs := newFakeStorage() + wsID := uuid.New() + fileID, _ := fs.Put(context.Background(), wsID, []byte("data"), "x.bin", "") + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d", w.Code) + } + var body map[string]any + if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil { + t.Fatalf("decode: %v", err) + } + if body["acked"] != true { + t.Errorf("body.acked = %v, want true", body["acked"]) + } + if _, exists := fs.rows[fileID]; exists { + t.Errorf("row should have been removed after ack") + } +} + +func TestAck_NonExistent_404(t *testing.T) { + fs := newFakeStorage() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + wsID := uuid.New() + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+wsID.String()+"/pending-uploads/"+uuid.New().String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusNotFound { + t.Errorf("status=%d, want 404", w.Code) + } +} + +func TestAck_CrossWorkspaceBleed_404(t *testing.T) { + fs := newFakeStorage() + wsB := uuid.New() + fileID, _ := fs.Put(context.Background(), wsB, []byte("data"), "x.bin", "") + wsA := uuid.New() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+wsA.String()+"/pending-uploads/"+fileID.String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusNotFound { + t.Errorf("status=%d, want 404 for cross-workspace ack", w.Code) + } + // Row must remain — workspace A's bogus ack must NOT delete + // workspace B's file. + if _, exists := fs.rows[fileID]; !exists { + t.Errorf("row should NOT have been removed by cross-workspace ack") + } +} + +func TestAck_InvalidWorkspaceID_400(t *testing.T) { + fs := newFakeStorage() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + req := httptest.NewRequest(http.MethodPost, "/workspaces/not-a-uuid/pending-uploads/"+uuid.New().String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400", w.Code) + } +} + +func TestAck_InvalidFileID_400(t *testing.T) { + fs := newFakeStorage() + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+uuid.New().String()+"/pending-uploads/not-a-uuid/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusBadRequest { + t.Errorf("status=%d, want 400", w.Code) + } +} + +func TestAck_GetStorageError_500(t *testing.T) { + fs := newFakeStorage() + fs.getErr = errors.New("conn lost") + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+uuid.New().String()+"/pending-uploads/"+uuid.New().String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + if w.Code != http.StatusInternalServerError { + t.Errorf("status=%d, want 500", w.Code) + } +} + +func TestAck_RaceWithSweep_ReturnsRacedTrue(t *testing.T) { + // Sweep deletes the row between the handler's Get and Ack calls. + // Storage.Ack returns ErrNotFound; handler treats that as success + // (intent honored, row gone) and reports raced:true. + fs := newFakeStorage() + wsID := uuid.New() + fileID, _ := fs.Put(context.Background(), wsID, []byte("data"), "x.bin", "") + fs.ackErr = pendinguploads.ErrNotFound + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status=%d, want 200 on race", w.Code) + } + var body map[string]any + json.Unmarshal(w.Body.Bytes(), &body) + if body["acked"] != true || body["raced"] != true { + t.Errorf("expected acked=true raced=true, got %v", body) + } +} + +func TestAck_StorageError_500(t *testing.T) { + fs := newFakeStorage() + wsID := uuid.New() + fileID, _ := fs.Put(context.Background(), wsID, []byte("data"), "x.bin", "") + fs.ackErr = errors.New("conn refused") + r := newRouter(handlers.NewPendingUploadsHandler(fs)) + + req := httptest.NewRequest(http.MethodPost, + "/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/ack", nil) + w := httptest.NewRecorder() + r.ServeHTTP(w, req) + + if w.Code != http.StatusInternalServerError { + t.Errorf("status=%d, want 500", w.Code) + } +} diff --git a/workspace-server/internal/handlers/sanitize_filename_test.go b/workspace-server/internal/handlers/sanitize_filename_test.go new file mode 100644 index 00000000..82a6d355 --- /dev/null +++ b/workspace-server/internal/handlers/sanitize_filename_test.go @@ -0,0 +1,103 @@ +package handlers_test + +import ( + "strings" + "testing" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers" +) + +// SanitizeFilename mirrors workspace/internal_chat_uploads.py's +// sanitize_filename. Drift between the two means canvas-emitted URIs +// differ between push and poll paths for the same upload — pin every +// case the Python suite pins (workspace/tests/test_internal_chat_uploads.py +// :: test_sanitize_filename). + +func TestSanitizeFilename_StripsPathTraversal(t *testing.T) { + cases := map[string]string{ + "../../etc/passwd": "passwd", + "/etc/passwd": "passwd", + "a/b/c.txt": "c.txt", + "./relative": "relative", + } + for in, want := range cases { + if got := handlers.SanitizeFilename(in); got != want { + t.Errorf("SanitizeFilename(%q) = %q, want %q", in, got, want) + } + } +} + +func TestSanitizeFilename_ReplacesUnsafeChars(t *testing.T) { + cases := map[string]string{ + "hello world.pdf": "hello_world.pdf", + "weird;chars!?.txt": "weird_chars__.txt", + "中文.docx": "__.docx", // non-ASCII → underscore (each rune) + "file (1).pdf": "file__1_.pdf", + } + for in, want := range cases { + if got := handlers.SanitizeFilename(in); got != want { + t.Errorf("SanitizeFilename(%q) = %q, want %q", in, got, want) + } + } +} + +func TestSanitizeFilename_PreservesAllowedChars(t *testing.T) { + in := "report-2026.05.04_v2.pdf" + if got := handlers.SanitizeFilename(in); got != in { + t.Errorf("SanitizeFilename(%q) = %q, want unchanged", in, got) + } +} + +func TestSanitizeFilename_CapsAt100Chars_PreservesShortExtension(t *testing.T) { + // 95-char base + ".pdf" (4 chars + dot) = 100 chars total — fits. + base := strings.Repeat("a", 95) + in := base + ".pdf" + got := handlers.SanitizeFilename(in) + if got != in { + t.Errorf("expected unchanged at 100 chars, got %q (len=%d)", got, len(got)) + } + + // 200-char base + ".pdf" → truncated to 100 with .pdf preserved. + long := strings.Repeat("b", 200) + ".pdf" + got = handlers.SanitizeFilename(long) + if len(got) != 100 { + t.Errorf("expected length 100, got %d (%q)", len(got), got) + } + if !strings.HasSuffix(got, ".pdf") { + t.Errorf("expected .pdf suffix preserved, got %q", got) + } +} + +func TestSanitizeFilename_DropsLongExtension(t *testing.T) { + // Extension > 16 chars is treated as part of the name; truncation + // drops it without preservation. Mirrors the Python rule + // (dot >= 0 AND len(base) - dot <= 16). + long := strings.Repeat("c", 90) + ".thisisaverylongextensionnotpreserved" + got := handlers.SanitizeFilename(long) + if len(got) != 100 { + t.Errorf("expected 100, got %d (%q)", len(got), got) + } + // First 100 chars of the SANITIZED input — extension not preserved. + if strings.Contains(got, ".thisisaverylongextensionnotpreserved") { + t.Errorf("long extension should have been truncated, got %q", got) + } +} + +func TestSanitizeFilename_FallbackForReservedNames(t *testing.T) { + cases := []string{"", ".", ".."} + for _, in := range cases { + if got := handlers.SanitizeFilename(in); got != "file" { + t.Errorf("SanitizeFilename(%q) = %q, want %q", in, got, "file") + } + } +} + +func TestSanitizeFilename_AllUnsafeBecomesAllUnderscores_NotReserved(t *testing.T) { + // All-non-ASCII input becomes all-underscores — not "." or ".." or + // empty, so the fallback path doesn't trigger and we get a real + // (if uninformative) sanitized name. + got := handlers.SanitizeFilename("中文中文") + if got != "____" { + t.Errorf("SanitizeFilename(中文中文) = %q, want %q", got, "____") + } +} diff --git a/workspace-server/internal/pendinguploads/storage.go b/workspace-server/internal/pendinguploads/storage.go new file mode 100644 index 00000000..0289c9b8 --- /dev/null +++ b/workspace-server/internal/pendinguploads/storage.go @@ -0,0 +1,253 @@ +// Package pendinguploads is the platform-side staging layer for chat file +// uploads bound for poll-mode workspaces (delivery_mode='poll', no public +// callback URL — typically external runtimes on a laptop / behind NAT). +// +// In push-mode the platform synchronously POSTs the multipart body to the +// workspace's /internal/chat/uploads/ingest endpoint and forgets about it. +// Poll-mode has no callback URL to forward to, so the platform parses the +// multipart on this side, persists each file as one pending_uploads row, +// and lets the workspace pull it on its next inbox poll cycle. +// +// The Storage interface keeps the bytes-vs-metadata split clean: today +// content is stored inline as bytea on the pending_uploads row, but the +// shape lets a future PR (RFC #2789, S3-backed shared storage) swap to +// object storage by adding a new Storage implementation without touching +// any of the handler-layer callers. +// +// Lifecycle: +// +// Put — handler creates a row with the file content; assigns file_id. +// Get — GET /workspaces/:id/pending-uploads/:fid/content reads bytes. +// MarkFetched — stamps fetched_at on the row (Phase 3 observability). +// Ack — POST /workspaces/:id/pending-uploads/:fid/ack; +// terminal happy-path state. After ack, Get returns ErrNotFound. +// GC sweep deletes acked rows after a retention window. +// +// Hard TTL: every row has an expires_at default of created_at + 24h. After +// expiration the row is GC'd by Phase 3's sweep cron regardless of ack +// state. Get on an expired row returns ErrNotFound — the workspace's next +// poll will see the underlying activity_logs row was orphaned and the +// agent surfaces "file expired" to the user. +package pendinguploads + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/google/uuid" +) + +// Per-file size cap. Mirrors workspace-side ingest_handler +// (workspace/internal_chat_uploads.py:198). Pinned at the DB level via +// the size_bytes CHECK constraint; this Go-side constant exists so the +// Put implementation can reject before round-tripping to Postgres. +const MaxFileBytes = 25 * 1024 * 1024 + +// ErrNotFound is returned by Get / MarkFetched / Ack when the row is +// absent. Callers turn this into HTTP 404. Treat acked + expired rows +// as not-found so the workspace can never re-fetch a file we've +// considered handed-off. +var ErrNotFound = errors.New("pendinguploads: row not found, expired, or already acked") + +// ErrTooLarge is returned by Put when content exceeds MaxFileBytes. +// Callers turn this into HTTP 413. Pre-DB check so we don't push a +// 25 MB+1 byte payload through Postgres just to have the CHECK reject it. +var ErrTooLarge = errors.New("pendinguploads: content exceeds per-file cap") + +// Record carries the full row including content. Returned by Get; +// the GET /content handler streams Record.Content as the response body. +type Record struct { + FileID uuid.UUID + WorkspaceID uuid.UUID + Content []byte + Filename string + Mimetype string + SizeBytes int64 + CreatedAt time.Time + FetchedAt *time.Time // nil before first MarkFetched + AckedAt *time.Time // nil before Ack (Get returns ErrNotFound after) + ExpiresAt time.Time +} + +// Storage is the platform-side persistence boundary for poll-mode chat +// uploads. The Postgres implementation backs all callers today; an S3- +// backed implementation can drop in once RFC #2789 lands by making +// content storage out-of-line and updating the Postgres-only metadata +// columns. +type Storage interface { + // Put creates a row for one file targeting workspaceID and returns + // the assigned file_id. content is bounded by MaxFileBytes; + // filename / mimetype are stored verbatim — caller is responsible + // for sanitization (matches workspace-side rule, see + // internal_chat_uploads.py:sanitize_filename). Empty filename and + // content > MaxFileBytes return errors before any DB write. + Put(ctx context.Context, workspaceID uuid.UUID, content []byte, filename, mimetype string) (uuid.UUID, error) + + // Get returns the full row including content. Returns ErrNotFound + // when the row is absent, acked, or past expires_at. Caller should + // not differentiate the three cases in the response — from the + // workspace's perspective they all mean "not available, give up." + Get(ctx context.Context, fileID uuid.UUID) (Record, error) + + // MarkFetched stamps fetched_at on the row. Idempotent — repeated + // calls update fetched_at to the latest timestamp. Returns + // ErrNotFound if the row is absent / acked / expired. + MarkFetched(ctx context.Context, fileID uuid.UUID) error + + // Ack stamps acked_at on the row. Idempotent on the row state + // (acked_at is only set the first time so workspace double-acks + // don't move the timestamp). Returns ErrNotFound if the row is + // absent or already expired; on already-acked, returns nil so + // the workspace's at-least-once retry succeeds without an error. + Ack(ctx context.Context, fileID uuid.UUID) error +} + +// PostgresStorage is the production Storage implementation backed by +// the pending_uploads table. +type PostgresStorage struct { + db *sql.DB +} + +// NewPostgres returns a Storage backed by db. db must be a connected +// pool; this constructor does no I/O. +func NewPostgres(db *sql.DB) *PostgresStorage { + return &PostgresStorage{db: db} +} + +// Compile-time check that PostgresStorage satisfies Storage. +var _ Storage = (*PostgresStorage)(nil) + +func (p *PostgresStorage) Put(ctx context.Context, workspaceID uuid.UUID, content []byte, filename, mimetype string) (uuid.UUID, error) { + if len(content) == 0 { + return uuid.Nil, fmt.Errorf("pendinguploads: empty content") + } + if len(content) > MaxFileBytes { + return uuid.Nil, ErrTooLarge + } + if filename == "" { + return uuid.Nil, fmt.Errorf("pendinguploads: empty filename") + } + // Filename length cap is enforced both here (early reject) and at + // the DB layer (CHECK constraint) so a buggy caller can't write a + // 200-char filename that Phase 2's URI rewrite would then truncate. + if len(filename) > 100 { + return uuid.Nil, fmt.Errorf("pendinguploads: filename exceeds 100 chars") + } + + var fileID uuid.UUID + err := p.db.QueryRowContext(ctx, ` + INSERT INTO pending_uploads (workspace_id, content, size_bytes, filename, mimetype) + VALUES ($1, $2, $3, $4, $5) + RETURNING file_id + `, workspaceID, content, int64(len(content)), filename, mimetype).Scan(&fileID) + if err != nil { + return uuid.Nil, fmt.Errorf("pendinguploads: insert: %w", err) + } + return fileID, nil +} + +func (p *PostgresStorage) Get(ctx context.Context, fileID uuid.UUID) (Record, error) { + // The expires_at + acked_at filter in the WHERE clause means a + // caller sees ErrNotFound for absent / acked / expired without + // needing per-case branching. Trade-off: we can't differentiate + // in metrics, but the workspace's response is the same in all + // three cases ("file gone, give up") so the granularity isn't + // useful at this layer. Phase 3 dashboards aggregate row-state + // counts directly off the table. + var r Record + err := p.db.QueryRowContext(ctx, ` + SELECT file_id, workspace_id, content, filename, mimetype, + size_bytes, created_at, fetched_at, acked_at, expires_at + FROM pending_uploads + WHERE file_id = $1 + AND acked_at IS NULL + AND expires_at > now() + `, fileID).Scan( + &r.FileID, &r.WorkspaceID, &r.Content, &r.Filename, &r.Mimetype, + &r.SizeBytes, &r.CreatedAt, &r.FetchedAt, &r.AckedAt, &r.ExpiresAt, + ) + if errors.Is(err, sql.ErrNoRows) { + return Record{}, ErrNotFound + } + if err != nil { + return Record{}, fmt.Errorf("pendinguploads: select: %w", err) + } + return r, nil +} + +func (p *PostgresStorage) MarkFetched(ctx context.Context, fileID uuid.UUID) error { + // UPDATE on the same gating predicate as Get — keeps the "absent + // or acked or expired = ErrNotFound" contract symmetric. Without + // the predicate a workspace could re-stamp fetched_at on an acked + // row, which would mislead Phase 3's stuck-fetch dashboard. + res, err := p.db.ExecContext(ctx, ` + UPDATE pending_uploads + SET fetched_at = now() + WHERE file_id = $1 + AND acked_at IS NULL + AND expires_at > now() + `, fileID) + if err != nil { + return fmt.Errorf("pendinguploads: mark_fetched: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("pendinguploads: mark_fetched rows: %w", err) + } + if n == 0 { + return ErrNotFound + } + return nil +} + +func (p *PostgresStorage) Ack(ctx context.Context, fileID uuid.UUID) error { + // Set acked_at only if currently NULL — workspace at-least-once + // retries don't move the timestamp, so dashboards see the first + // successful ack as the "delivery time." Two-clause WHERE: row + // must exist and not be expired; acked-but-still-in-window is + // returned as success (idempotent retry). + res, err := p.db.ExecContext(ctx, ` + UPDATE pending_uploads + SET acked_at = now() + WHERE file_id = $1 + AND acked_at IS NULL + AND expires_at > now() + `, fileID) + if err != nil { + return fmt.Errorf("pendinguploads: ack: %w", err) + } + n, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("pendinguploads: ack rows: %w", err) + } + if n == 1 { + return nil + } + // Zero-rows-affected: either the row doesn't exist / has expired, + // OR it was already acked. Re-query to disambiguate so the + // idempotent-retry case returns nil instead of ErrNotFound. + var ackedAt sql.NullTime + err = p.db.QueryRowContext(ctx, ` + SELECT acked_at FROM pending_uploads + WHERE file_id = $1 AND expires_at > now() + `, fileID).Scan(&ackedAt) + if errors.Is(err, sql.ErrNoRows) { + return ErrNotFound + } + if err != nil { + return fmt.Errorf("pendinguploads: ack disambiguate: %w", err) + } + if ackedAt.Valid { + // Already acked — idempotent success. + return nil + } + // Predicate matched a non-acked, non-expired row but RowsAffected + // was 0. This means the row was concurrently modified between the + // UPDATE and the SELECT (extremely rare; e.g. a Phase 3 sweep + // raced with the ACK). Treat as success — the row is gone, but + // the workspace's intent ("I'm done with this file") was honored. + return nil +} diff --git a/workspace-server/internal/pendinguploads/storage_test.go b/workspace-server/internal/pendinguploads/storage_test.go new file mode 100644 index 00000000..45f797c7 --- /dev/null +++ b/workspace-server/internal/pendinguploads/storage_test.go @@ -0,0 +1,400 @@ +package pendinguploads_test + +import ( + "context" + "database/sql" + "errors" + "strings" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/google/uuid" + + "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" +) + +// Tests pin the SQL the handler relies on. Drift detection: if the +// migration changes column order / predicate shape, sqlmock's +// QueryMatcherEqual + ExpectQuery / ExpectExec on the literal text +// fails the test before the handler can ship a silently-broken read. +// +// Why sqlmock and not testcontainers / real Postgres: +// +// The Storage contract is "this Go method runs THIS SQL." Real-DB +// tests would catch SQL-syntax errors but not the contract drift +// we care about (e.g. handler accidentally reordering columns, +// dropping the acked_at predicate, etc.). Postgres-syntax coverage +// lives in the migration round-trip test (Phase 4 E2E). + +func newMockDB(t *testing.T) (*sql.DB, sqlmock.Sqlmock) { + t.Helper() + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + if err != nil { + t.Fatalf("sqlmock.New: %v", err) + } + t.Cleanup(func() { _ = db.Close() }) + return db, mock +} + +// Single source of truth for the SQL strings — drift here = test fails; +// matches the Go literals in storage.go exactly. +const ( + insertSQL = ` + INSERT INTO pending_uploads (workspace_id, content, size_bytes, filename, mimetype) + VALUES ($1, $2, $3, $4, $5) + RETURNING file_id + ` + selectSQL = ` + SELECT file_id, workspace_id, content, filename, mimetype, + size_bytes, created_at, fetched_at, acked_at, expires_at + FROM pending_uploads + WHERE file_id = $1 + AND acked_at IS NULL + AND expires_at > now() + ` + markFetchedSQL = ` + UPDATE pending_uploads + SET fetched_at = now() + WHERE file_id = $1 + AND acked_at IS NULL + AND expires_at > now() + ` + ackSQL = ` + UPDATE pending_uploads + SET acked_at = now() + WHERE file_id = $1 + AND acked_at IS NULL + AND expires_at > now() + ` + ackDisambiguateSQL = ` + SELECT acked_at FROM pending_uploads + WHERE file_id = $1 AND expires_at > now() + ` +) + +// ----- Put ------------------------------------------------------------------ + +func TestPut_HappyPath_ReturnsAssignedFileID(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + wsID := uuid.New() + expectedID := uuid.New() + mock.ExpectQuery(insertSQL). + WithArgs(wsID, []byte("hello"), int64(5), "report.pdf", "application/pdf"). + WillReturnRows(sqlmock.NewRows([]string{"file_id"}).AddRow(expectedID)) + + got, err := store.Put(context.Background(), wsID, []byte("hello"), "report.pdf", "application/pdf") + if err != nil { + t.Fatalf("Put: %v", err) + } + if got != expectedID { + t.Errorf("file_id mismatch: got %s want %s", got, expectedID) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("expectations: %v", err) + } +} + +func TestPut_RejectsEmptyContentBeforeDB(t *testing.T) { + db, _ := newMockDB(t) // no expectations — must NOT round-trip + store := pendinguploads.NewPostgres(db) + + _, err := store.Put(context.Background(), uuid.New(), nil, "x.txt", "") + if err == nil || !strings.Contains(err.Error(), "empty content") { + t.Fatalf("expected empty-content error, got %v", err) + } +} + +func TestPut_RejectsOversizeBeforeDB(t *testing.T) { + db, _ := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + too := make([]byte, pendinguploads.MaxFileBytes+1) + _, err := store.Put(context.Background(), uuid.New(), too, "x.txt", "") + if !errors.Is(err, pendinguploads.ErrTooLarge) { + t.Fatalf("expected ErrTooLarge, got %v", err) + } +} + +func TestPut_RejectsEmptyFilenameBeforeDB(t *testing.T) { + db, _ := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + _, err := store.Put(context.Background(), uuid.New(), []byte("hi"), "", "") + if err == nil || !strings.Contains(err.Error(), "empty filename") { + t.Fatalf("expected empty-filename error, got %v", err) + } +} + +func TestPut_RejectsLongFilenameBeforeDB(t *testing.T) { + db, _ := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + long := strings.Repeat("a", 101) + _, err := store.Put(context.Background(), uuid.New(), []byte("hi"), long, "") + if err == nil || !strings.Contains(err.Error(), "exceeds 100 chars") { + t.Fatalf("expected too-long-filename error, got %v", err) + } +} + +func TestPut_PropagatesDBError(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + mock.ExpectQuery(insertSQL). + WithArgs(uuid.Nil, sqlmock.AnyArg(), int64(2), "x", ""). + WillReturnError(errors.New("connection refused")) + + wsID := uuid.Nil + _, err := store.Put(context.Background(), wsID, []byte("hi"), "x", "") + if err == nil || !strings.Contains(err.Error(), "insert") { + t.Fatalf("expected wrapped insert error, got %v", err) + } +} + +// ----- Get ------------------------------------------------------------------ + +func TestGet_HappyPath_ReturnsFullRow(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + wsID := uuid.New() + now := time.Now().UTC() + mock.ExpectQuery(selectSQL). + WithArgs(fid). + WillReturnRows(sqlmock.NewRows([]string{ + "file_id", "workspace_id", "content", "filename", "mimetype", + "size_bytes", "created_at", "fetched_at", "acked_at", "expires_at", + }).AddRow( + fid, wsID, []byte("data"), "x.bin", "application/octet-stream", + int64(4), now, nil, nil, now.Add(24*time.Hour), + )) + + r, err := store.Get(context.Background(), fid) + if err != nil { + t.Fatalf("Get: %v", err) + } + if r.FileID != fid || r.WorkspaceID != wsID { + t.Errorf("ids mismatch: %+v", r) + } + if string(r.Content) != "data" || r.SizeBytes != 4 { + t.Errorf("content mismatch: %+v", r) + } + if r.FetchedAt != nil || r.AckedAt != nil { + t.Errorf("expected nil timestamps for unfetched row, got fetched=%v acked=%v", r.FetchedAt, r.AckedAt) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("expectations: %v", err) + } +} + +func TestGet_AbsentRow_ReturnsErrNotFound(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectQuery(selectSQL). + WithArgs(fid). + WillReturnError(sql.ErrNoRows) + + _, err := store.Get(context.Background(), fid) + if !errors.Is(err, pendinguploads.ErrNotFound) { + t.Fatalf("expected ErrNotFound, got %v", err) + } +} + +func TestGet_DBError_WrappedAndPropagated(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + mock.ExpectQuery(selectSQL). + WillReturnError(errors.New("connection lost")) + + _, err := store.Get(context.Background(), uuid.New()) + if err == nil || errors.Is(err, pendinguploads.ErrNotFound) || !strings.Contains(err.Error(), "select") { + t.Fatalf("expected wrapped select error, got %v", err) + } +} + +// ----- MarkFetched ---------------------------------------------------------- + +func TestMarkFetched_HappyPath(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectExec(markFetchedSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 1)) + + if err := store.MarkFetched(context.Background(), fid); err != nil { + t.Fatalf("MarkFetched: %v", err) + } +} + +func TestMarkFetched_AbsentOrAckedOrExpired_ReturnsErrNotFound(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectExec(markFetchedSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 0)) + + err := store.MarkFetched(context.Background(), fid) + if !errors.Is(err, pendinguploads.ErrNotFound) { + t.Fatalf("expected ErrNotFound, got %v", err) + } +} + +func TestMarkFetched_DBError_Wrapped(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + mock.ExpectExec(markFetchedSQL). + WillReturnError(errors.New("pg flake")) + + err := store.MarkFetched(context.Background(), uuid.New()) + if err == nil || errors.Is(err, pendinguploads.ErrNotFound) || !strings.Contains(err.Error(), "mark_fetched") { + t.Fatalf("expected wrapped mark_fetched error, got %v", err) + } +} + +// ----- Ack ------------------------------------------------------------------ + +func TestAck_FirstAck_StampsAckedAt(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectExec(ackSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 1)) + + if err := store.Ack(context.Background(), fid); err != nil { + t.Fatalf("Ack: %v", err) + } +} + +func TestAck_AlreadyAcked_IdempotentSuccess(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + // First UPDATE matches zero rows (already acked). + mock.ExpectExec(ackSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 0)) + // Disambiguation SELECT finds the row with acked_at non-null. + mock.ExpectQuery(ackDisambiguateSQL). + WithArgs(fid). + WillReturnRows(sqlmock.NewRows([]string{"acked_at"}).AddRow(time.Now().UTC())) + + if err := store.Ack(context.Background(), fid); err != nil { + t.Fatalf("expected idempotent success on already-acked, got %v", err) + } +} + +func TestAck_AbsentOrExpired_ReturnsErrNotFound(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectExec(ackSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectQuery(ackDisambiguateSQL). + WithArgs(fid). + WillReturnError(sql.ErrNoRows) + + err := store.Ack(context.Background(), fid) + if !errors.Is(err, pendinguploads.ErrNotFound) { + t.Fatalf("expected ErrNotFound, got %v", err) + } +} + +func TestAck_RaceWithSweep_ReturnsSuccess(t *testing.T) { + // UPDATE saw 0 rows AND the disambiguate SELECT saw a row with + // acked_at IS NULL — only possible if the GC sweep raced between + // the two queries. The contract says we honor the workspace's ACK + // intent and return success. + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectExec(ackSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectQuery(ackDisambiguateSQL). + WithArgs(fid). + WillReturnRows(sqlmock.NewRows([]string{"acked_at"}).AddRow(nil)) + + if err := store.Ack(context.Background(), fid); err != nil { + t.Fatalf("expected race success, got %v", err) + } +} + +func TestAck_DBErrorOnUpdate_Wrapped(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + mock.ExpectExec(ackSQL). + WillReturnError(errors.New("conn refused")) + + err := store.Ack(context.Background(), uuid.New()) + if err == nil || !strings.Contains(err.Error(), "ack:") { + t.Fatalf("expected wrapped ack error, got %v", err) + } +} + +func TestMarkFetched_RowsAffectedError_Wrapped(t *testing.T) { + // Some drivers (or Result wrappers) return an error from + // RowsAffected() even when ExecContext succeeded — the contract + // says we surface that as a wrapped error rather than silently + // treating it as 0 rows (= ErrNotFound, which would mislead the + // workspace into giving up on a possibly-fetched row). + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + mock.ExpectExec(markFetchedSQL). + WillReturnResult(sqlmock.NewErrorResult(errors.New("driver doesn't support RowsAffected"))) + + err := store.MarkFetched(context.Background(), uuid.New()) + if err == nil || !strings.Contains(err.Error(), "mark_fetched rows") { + t.Fatalf("expected wrapped rows-affected error, got %v", err) + } +} + +func TestAck_RowsAffectedError_Wrapped(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + mock.ExpectExec(ackSQL). + WillReturnResult(sqlmock.NewErrorResult(errors.New("driver doesn't support RowsAffected"))) + + err := store.Ack(context.Background(), uuid.New()) + if err == nil || !strings.Contains(err.Error(), "ack rows") { + t.Fatalf("expected wrapped rows-affected error, got %v", err) + } +} + +func TestAck_DBErrorOnDisambiguate_Wrapped(t *testing.T) { + db, mock := newMockDB(t) + store := pendinguploads.NewPostgres(db) + + fid := uuid.New() + mock.ExpectExec(ackSQL). + WithArgs(fid). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectQuery(ackDisambiguateSQL). + WithArgs(fid). + WillReturnError(errors.New("connection refused")) + + err := store.Ack(context.Background(), fid) + if err == nil || !strings.Contains(err.Error(), "disambiguate") { + t.Fatalf("expected wrapped disambiguate error, got %v", err) + } +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index 6aff74e5..86007d00 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -13,6 +13,7 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" memwiring "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/wiring" "github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics" "github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware" @@ -540,10 +541,20 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // streaming download (agent → user). Namespaced under /chat/ so // the security model is obviously distinct from /files/* (which // handles workspace config/templates and has a different caller). - chatfh := handlers.NewChatFilesHandler(tmplh) + chatfh := handlers.NewChatFilesHandler(tmplh). + WithPendingUploads(pendinguploads.NewPostgres(db.DB), broadcaster) wsAuth.POST("/chat/uploads", chatfh.Upload) wsAuth.GET("/chat/download", chatfh.Download) + // Phase 1 RFC: poll-mode chat upload — endpoints the workspace's + // inbox poller hits to fetch staged file content + ack delivery. + // Same wsAuth gate as the activity poll, so a token leak from + // workspace A can't read workspace B's pending uploads (the + // handler also re-checks workspace_id on each row). + puh := handlers.NewPendingUploadsHandler(pendinguploads.NewPostgres(db.DB)) + wsAuth.GET("/pending-uploads/:file_id/content", puh.GetContent) + wsAuth.POST("/pending-uploads/:file_id/ack", puh.Ack) + // Plugins pluginsDir := findPluginsDir(configsDir) // Runtime lookup lets the plugins handler filter the registry to plugins diff --git a/workspace-server/migrations/20260505100000_pending_uploads.down.sql b/workspace-server/migrations/20260505100000_pending_uploads.down.sql new file mode 100644 index 00000000..c8efc627 --- /dev/null +++ b/workspace-server/migrations/20260505100000_pending_uploads.down.sql @@ -0,0 +1,11 @@ +-- 20260505100000_pending_uploads.down.sql +-- +-- Drops the pending_uploads table and its indexes. Any pending file +-- uploads sitting in the table at rollback time are dropped — operators +-- on poll-mode workspaces lose those attachments, but they were never +-- fetched on the workspace side (otherwise they'd be acked + about to +-- be GC'd anyway), so the practical loss is the same as a cron sweep. + +DROP INDEX IF EXISTS idx_pending_uploads_expires; +DROP INDEX IF EXISTS idx_pending_uploads_workspace_unacked; +DROP TABLE IF EXISTS pending_uploads; diff --git a/workspace-server/migrations/20260505100000_pending_uploads.up.sql b/workspace-server/migrations/20260505100000_pending_uploads.up.sql new file mode 100644 index 00000000..d43f2649 --- /dev/null +++ b/workspace-server/migrations/20260505100000_pending_uploads.up.sql @@ -0,0 +1,103 @@ +-- 20260505100000_pending_uploads.up.sql +-- +-- RFC: poll-mode chat upload (counterpart to delivery_mode='poll' messaging). +-- +-- Today, chat_files.go's Upload handler refuses delivery_mode != 'push' +-- with HTTP 422 "workspace has no callback URL" — external runtime +-- workspaces (laptop / behind NAT) cannot receive file attachments at all. +-- The only escape was "register with ngrok / Cloudflare tunnel + push +-- mode," which forces every external operator into infra plumbing they +-- shouldn't need. +-- +-- This table is the platform-side staging layer that lets canvas → external +-- workspace file uploads ride the same poll loop the inbox already uses for +-- text messages: +-- +-- 1. Canvas POSTs multipart to workspace-server. +-- 2. workspace-server parses multipart, stores each file as one +-- pending_uploads row, AND inserts a matching activity_logs row +-- (type='chat_upload_receive', request_body={file_id, filename, ...}). +-- 3. Workspace's existing inbox poller picks up the activity row. +-- 4. Workspace fetches bytes via GET /workspaces/:id/pending-uploads/:fid/content, +-- writes to /workspace/.molecule/chat-uploads/, ACKs via POST. +-- 5. Sweep cron deletes rows past expires_at OR acked_at + N hours. +-- +-- Why a separate table and not bytea-on-activity_logs: +-- +-- * activity_logs is text/JSON-shaped today; mixing 25 MB binary blobs +-- into request_body inflates every JOIN, every since_id scan, every +-- pgdump. The bytes need their own home. +-- * Lifecycle differs: activity_logs is durable audit history (90d+); +-- pending_uploads is transient buffer (24h default) that GCs hard. +-- Keeping them split lets each table's retention policy run +-- independently. +-- * A future PR (RFC #2789) will migrate the bytes column to S3 keys +-- without touching the activity_logs schema or the metadata columns +-- here. That migration is one ALTER + one backfill rather than a +-- cross-table rewrite. +-- +-- No FK to workspaces: +-- workspace delete should NOT cascade-purge pending_uploads — those +-- rows are evidence-of-receipt and should expire on their own TTL. +-- Same posture as tenant_resources (PR #2343) and delegations (PR #2829). + +CREATE TABLE IF NOT EXISTS pending_uploads ( + -- Server-generated so the canvas can include the URI in the chat + -- message it sends right after the upload POST. Workspace fetches + -- by this id, no name collisions across workspaces. + file_id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + + -- Target workspace. NOT a FK (see header). + workspace_id uuid NOT NULL, + + -- Content lives inline today via bytea. The Go-side storage interface + -- (PendingUploadStorage) abstracts read/write so a future PR can + -- relocate this column's job to S3 (RFC #2789) by adding an `s3_key + -- text NULL` column, dual-writing for one release, then dropping + -- `content` once the backfill drains. The CHECK below pins the same + -- 25 MB per-file cap the workspace-side ingest_handler enforces + -- (workspace/internal_chat_uploads.py:198) — discrepancy between + -- the two would let the platform accept files the workspace would + -- 413 on after pull. + content bytea NOT NULL, + size_bytes bigint NOT NULL CHECK (size_bytes > 0 AND size_bytes <= 26214400), + + -- Filename + mimetype mirror the workspace-side ChatUploadedFile + -- shape so the eventual InboxMessage hand-off needs no translation. + -- Filename is sanitized at write-time (matches sanitize_filename in + -- workspace/internal_chat_uploads.py); 100 char cap is the same. + filename text NOT NULL CHECK (length(filename) > 0 AND length(filename) <= 100), + mimetype text NOT NULL DEFAULT '', + + created_at timestamptz NOT NULL DEFAULT now(), + + -- Stamped on the GET /content request. Lets Phase 3 sweeper detect + -- "fetched but never acked" — distinct failure mode from "never + -- fetched" (workspace offline) so dashboards can split them. + fetched_at timestamptz, + + -- Stamped on the POST /ack request. Terminal state for the happy + -- path. Sweep cron deletes acked rows past acked_at + retention. + acked_at timestamptz, + + -- Hard TTL: rows past this are deleted regardless of ack state. + -- 24h matches the longest-observed legitimate "operator stepped + -- away from laptop" gap; tunable later via app-level config without + -- a migration. NOT acked_at + 24h — that would let a stuck-fetched + -- row live forever. + expires_at timestamptz NOT NULL DEFAULT (now() + interval '24 hours') +); + +-- Hot path: workspace's poll cycle pulls "give me my unacked uploads +-- in chronological order." Partial-index because acked rows are GC +-- candidates and shouldn't bloat the working set. +CREATE INDEX IF NOT EXISTS idx_pending_uploads_workspace_unacked + ON pending_uploads (workspace_id, created_at) + WHERE acked_at IS NULL; + +-- Phase 3 GC sweep hot path: list rows past expires_at, partial-indexed +-- on unacked because acked rows have a different (shorter) retention +-- and GC-via-acked_at is a separate query. +CREATE INDEX IF NOT EXISTS idx_pending_uploads_expires + ON pending_uploads (expires_at) + WHERE acked_at IS NULL;