Merge pull request #2877 from Molecule-AI/feat/poll-mode-chat-upload-phase1

feat(rfc): poll-mode chat upload — phase 1 platform staging layer
This commit is contained in:
Hongming Wang 2026-05-05 11:32:10 +00:00 committed by GitHub
commit 243f9bc2b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 2332 additions and 2 deletions

View File

@ -31,23 +31,37 @@ package handlers
import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"log"
"mime/multipart"
"net/http"
"net/url"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
)
// ChatFilesHandler serves file upload + download for chat. Holds a
// reference to TemplatesHandler so the (still docker-exec) Download
// path keeps using the shared findContainer/CopyFromContainer helpers
// without duplicating them. Upload no longer reaches into Docker.
//
// pendingUploads + broadcaster are wired only when the platform's
// migration 20260505100000 has run; nil values fall back to the
// pre-poll-mode behavior (422 on poll-mode upload, same as before).
// This lets the binary keep booting in environments where the
// migration hasn't run yet — the poll branch is gated by a not-nil
// check at the call site.
type ChatFilesHandler struct {
templates *TemplatesHandler
@ -56,6 +70,19 @@ type ChatFilesHandler struct {
// the 50 MB worst case on a slow EC2 link without leaving a
// connection hanging forever on a sick workspace.
httpClient *http.Client
// pendingUploads is the platform-side staging layer for poll-mode
// uploads. nil → poll branch returns 422 unchanged (the pre-feature
// behavior); non-nil → poll branch parses multipart, persists each
// file via storage.Put, logs a chat_upload_receive activity row,
// and returns 200 with synthetic platform-pending: URIs.
pendingUploads pendinguploads.Storage
// broadcaster is the events.EventEmitter used to notify the canvas
// when an activity row lands (so the Agent Comms panel updates
// live). Same emitter the rest of the platform uses; nil = no
// broadcast (tests).
broadcaster events.EventEmitter
}
func NewChatFilesHandler(t *TemplatesHandler) *ChatFilesHandler {
@ -69,6 +96,16 @@ func NewChatFilesHandler(t *TemplatesHandler) *ChatFilesHandler {
}
}
// WithPendingUploads enables the poll-mode upload branch by wiring a
// Storage + broadcaster. Call site (router.go) does this at
// construction; tests set the fields directly when they want the
// poll path exercised. Returns the handler for chained construction.
func (h *ChatFilesHandler) WithPendingUploads(storage pendinguploads.Storage, broadcaster events.EventEmitter) *ChatFilesHandler {
h.pendingUploads = storage
h.broadcaster = broadcaster
return h
}
// chatUploadMaxBytes caps the full multipart request body so a
// malicious / runaway client can't OOM the proxy hop. 50 MB matches
// the workspace-side limit; anything larger is rejected at the
@ -262,6 +299,24 @@ func (h *ChatFilesHandler) Upload(c *gin.Context) {
ctx := c.Request.Context()
// Branch on delivery_mode BEFORE attempting the HTTP forward.
// Push-mode workspaces continue to do the streaming forward
// unchanged. Poll-mode workspaces (typically external runtimes
// on a laptop, no public callback URL) get the platform-side
// staging path — the file lands in pending_uploads, an activity
// row goes into the inbox queue, and the workspace pulls on its
// next poll cycle.
if h.pendingUploads != nil {
mode, modeOK := lookupUploadDeliveryMode(c, ctx, workspaceID)
if !modeOK {
return
}
if mode == "poll" {
h.uploadPollMode(c, ctx, workspaceID)
return
}
}
wsURL, secret, ok := resolveWorkspaceForwardCreds(c, ctx, workspaceID, "upload")
if !ok {
return
@ -405,3 +460,251 @@ func (h *ChatFilesHandler) streamWorkspaceResponse(
}
}
// lookupUploadDeliveryMode returns the workspace's delivery_mode
// for the chat upload branch. Returns ("", false) and writes the
// HTTP error response on lookup failure (caller stops). NULL or
// empty delivery_mode is treated as "push" — that's the schema
// default and matches the legacy pre-#2339 behavior. Only the
// explicit string "poll" routes the upload through the poll-mode
// branch.
//
// Why a dedicated helper instead of reusing lookupDeliveryMode
// from a2a_proxy_helpers.go: that one swallows errors and falls
// back to "push" so the proxy keeps working on a transient DB
// hiccup. For upload we want to surface the not-found case as 404
// (which the workspace-poll branch wouldn't otherwise hit, since
// the workspace-side row IS the source of truth for the mode).
func lookupUploadDeliveryMode(c *gin.Context, ctx context.Context, workspaceID string) (string, bool) {
var mode sql.NullString
err := db.DB.QueryRowContext(ctx,
`SELECT delivery_mode FROM workspaces WHERE id = $1`, workspaceID,
).Scan(&mode)
if errors.Is(err, sql.ErrNoRows) {
c.JSON(http.StatusNotFound, gin.H{"error": "workspace not found"})
return "", false
}
if err != nil {
log.Printf("chat_files Upload: delivery_mode lookup failed for %s: %v", workspaceID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "delivery_mode lookup failed"})
return "", false
}
if !mode.Valid || mode.String == "" {
return "push", true
}
return mode.String, true
}
// unsafeFilenameChars matches every character that isn't in the safe
// alphanumeric + dot/dash/underscore set. Mirrors the Python regex
// _UNSAFE_FILENAME_CHARS in workspace/internal_chat_uploads.py — drift
// here would mean canvas-emitted URIs differ between push and poll
// paths for the same upload.
var unsafeFilenameChars = regexp.MustCompile(`[^a-zA-Z0-9._\-]`)
// SanitizeFilename reduces a user-supplied filename to a safe form.
// Behaviorally identical to sanitize_filename in workspace/
// internal_chat_uploads.py. Exported so tests in other packages can
// pin behavior parity, and so a future shared library can move both
// implementations behind one source of truth.
func SanitizeFilename(name string) string {
base := filepath.Base(name)
// filepath.Base on a path-traversal input ("../../etc/passwd")
// returns "passwd" (just the last component) — which matches what
// Python's os.path.basename does. Tests pin both here and on the
// Python side.
base = strings.ReplaceAll(base, " ", "_")
base = unsafeFilenameChars.ReplaceAllString(base, "_")
if len(base) > 100 {
ext := ""
dot := strings.LastIndex(base, ".")
if dot >= 0 && len(base)-dot <= 16 {
ext = base[dot:]
}
base = base[:100-len(ext)] + ext
}
if base == "" || base == "." || base == ".." {
return "file"
}
return base
}
// uploadedFile is the per-file response shape the workspace-side
// /internal/chat/uploads/ingest also produces. Mirroring the schema
// keeps the canvas client unaware of which path handled the upload.
type uploadedFile struct {
URI string `json:"uri"`
Name string `json:"name"`
Mimetype string `json:"mimeType"`
Size int64 `json:"size"`
}
// uploadPollMode handles a chat upload bound for a poll-mode
// workspace. Parses the multipart in-place, persists each file via
// pendinguploads.Storage, and logs one chat_upload_receive activity
// row per file so the workspace's inbox poller picks them up on its
// next cycle.
//
// Why one activity row per file (not one per multipart batch):
// - Each row carries one URI; agents that consume the inbox treat
// each row as one inbound event. A batch row would force every
// consumer to deserialize a list, doubling the field-shape
// surface for no UX win.
// - At-least-once semantics: a workspace can ack files
// individually. Batch ack would leak partial-success state on
// a fetcher crash mid-batch.
//
// Limits enforced here mirror the workspace-side ingest_handler:
// - Total body cap: 50 MB (set on c.Request.Body before reaching us)
// - Per-file cap: 25 MB (pendinguploads.MaxFileBytes; rejected as 413)
// - Filename: sanitized + capped at 100 chars (SanitizeFilename)
//
// Logging: every persisted file logs an INFO line with workspace_id,
// file_id, size, and sanitized name. Failure modes (oversize, missing
// files field, malformed multipart) log at WARN with the same fields.
// Phase 3 metrics will hook these structured logs.
func (h *ChatFilesHandler) uploadPollMode(c *gin.Context, ctx context.Context, workspaceID string) {
// Parse multipart with the same per-file/per-form limits the
// workspace-side handler uses (workspace/internal_chat_uploads.py:
// max_files=64, max_fields=32). gin's MultipartForm does not
// expose those limits directly — the underlying ParseMultipartForm
// caps memory at 32 MB by default and spills to disk. For poll-
// mode we read each file into memory to hand to Storage.Put;
// 25 MB-per-file × 64-files ceiling means worst-case is 1.6 GB of
// peak memory. Bound the per-file size at the multipart layer so
// the spill never gets close.
if err := c.Request.ParseMultipartForm(32 << 20); err != nil {
log.Printf("chat_files uploadPollMode: parse multipart failed for %s: %v", workspaceID, err)
c.JSON(http.StatusBadRequest, gin.H{"error": "malformed multipart body"})
return
}
form := c.Request.MultipartForm
if form == nil || len(form.File["files"]) == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "no files field in request"})
return
}
headers := form.File["files"]
if len(headers) > 64 {
c.JSON(http.StatusBadRequest, gin.H{"error": "too many files (limit 64)"})
return
}
wsUUID, err := uuid.Parse(workspaceID)
if err != nil {
// validateWorkspaceID at the top of Upload already gates this;
// the re-parse is defence in depth in case validateWorkspaceID
// drifts. Keep the error class consistent so a bad-id reaches
// the same 400 path. Not separately tested because the gate at
// the call site is structurally the same uuid.Parse.
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
return
}
out := make([]uploadedFile, 0, len(headers))
for _, fh := range headers {
// Read full content. Per-file cap enforced post-read so an
// oversized file fails with a clean 413 rather than a torn
// stream. The +1 byte ReadAll trick that the Python side
// uses isn't easy through multipart.FileHeader; instead we
// rely on the multipart layer's ContentLength header and
// short-circuit before opening the part.
if fh.Size > pendinguploads.MaxFileBytes {
log.Printf("chat_files uploadPollMode: per-file cap exceeded for %s: %s (%d bytes)",
workspaceID, fh.Filename, fh.Size)
c.JSON(http.StatusRequestEntityTooLarge, gin.H{
"error": "file exceeds per-file cap",
"filename": fh.Filename,
"size": fh.Size,
"max": pendinguploads.MaxFileBytes,
})
return
}
content, err := readMultipartFile(fh)
if err != nil {
log.Printf("chat_files uploadPollMode: read part failed for %s/%s: %v", workspaceID, fh.Filename, err)
c.JSON(http.StatusBadRequest, gin.H{"error": "could not read file part"})
return
}
sanitized := SanitizeFilename(fh.Filename)
mimetype := fh.Header.Get("Content-Type")
fileID, err := h.pendingUploads.Put(ctx, wsUUID, content, sanitized, mimetype)
if err != nil {
if errors.Is(err, pendinguploads.ErrTooLarge) {
// Belt + suspenders: the size check above already
// caught this, but Storage.Put re-validates so a
// malformed FileHeader can't slip through. 413 with
// the same shape so the client sees one error class.
c.JSON(http.StatusRequestEntityTooLarge, gin.H{
"error": "file exceeds per-file cap",
"filename": fh.Filename,
"size": len(content),
"max": pendinguploads.MaxFileBytes,
})
return
}
log.Printf("chat_files uploadPollMode: storage.Put failed for %s/%s: %v",
workspaceID, sanitized, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "could not stage file"})
return
}
// Activity row so the workspace's inbox poller picks this up
// on its next cycle. activity_type=a2a_receive (NOT a new
// type) so the existing poll filter
// `?type=a2a_receive` catches it without poll-side changes;
// method=chat_upload_receive is the discriminator the
// workspace's adapter (Phase 2) uses to route to the upload
// fetcher instead of the agent's message handler. Same
// shape as A2A's tasks/send vs message/send method split.
uri := fmt.Sprintf("platform-pending:%s/%s", workspaceID, fileID)
summary := "chat_upload_receive: " + sanitized
method := "chat_upload_receive"
LogActivity(ctx, h.broadcaster, ActivityParams{
WorkspaceID: workspaceID,
ActivityType: "a2a_receive",
TargetID: &workspaceID,
Method: &method,
Summary: &summary,
RequestBody: map[string]interface{}{
"file_id": fileID.String(),
"name": sanitized,
"mimeType": mimetype,
"size": len(content),
"uri": uri,
},
Status: "ok",
})
log.Printf("chat_files uploadPollMode: staged %s/%s (file_id=%s size=%d mimetype=%q)",
workspaceID, sanitized, fileID, len(content), mimetype)
out = append(out, uploadedFile{
URI: uri,
Name: sanitized,
Mimetype: mimetype,
Size: int64(len(content)),
})
}
c.JSON(http.StatusOK, gin.H{"files": out})
}
// readMultipartFile reads a multipart part fully into memory. Wraps
// the open + io.ReadAll + close idiom so the call site stays clean,
// and so a future change (chunked reads / hashing) has one place to
// land.
func readMultipartFile(fh *multipartFileHeader) ([]byte, error) {
f, err := fh.Open()
if err != nil {
return nil, fmt.Errorf("open part: %w", err)
}
defer f.Close()
return io.ReadAll(f)
}
// multipartFileHeader is a local alias so the readMultipartFile
// signature doesn't pull "mime/multipart" into every test that
// touches uploadPollMode.
type multipartFileHeader = multipart.FileHeader

View File

@ -0,0 +1,589 @@
package handlers
// chat_files_poll_test.go — Upload poll-mode branch tests.
//
// Pinned in their own file so the existing chat_files_test.go stays
// focused on the push-mode forward proxy. Same setupTestDB / sqlmock
// scaffolding as the rest of the package, plus an in-memory
// pendinguploads.Storage so we don't have to mock six SQL statements
// per assertion.
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"mime/multipart"
"net/http"
"strings"
"sync"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/google/uuid"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
)
// inMemStorage is a process-local pendinguploads.Storage for branch
// tests. Records every Put for assertion. Failure modes (Put error,
// MarkFetched / Ack tested elsewhere) are injected via fields.
type inMemStorage struct {
mu sync.Mutex
rows map[uuid.UUID]pendinguploads.Record
puts []putCall
putErr error
}
type putCall struct {
WorkspaceID uuid.UUID
Filename string
Mimetype string
Size int
}
func newInMemStorage() *inMemStorage {
return &inMemStorage{rows: map[uuid.UUID]pendinguploads.Record{}}
}
func (s *inMemStorage) Put(_ context.Context, ws uuid.UUID, content []byte, filename, mimetype string) (uuid.UUID, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.putErr != nil {
return uuid.Nil, s.putErr
}
id := uuid.New()
s.rows[id] = pendinguploads.Record{
FileID: id, WorkspaceID: ws, Content: content,
Filename: filename, Mimetype: mimetype,
SizeBytes: int64(len(content)), CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(24 * time.Hour),
}
s.puts = append(s.puts, putCall{
WorkspaceID: ws, Filename: filename, Mimetype: mimetype, Size: len(content),
})
return id, nil
}
func (s *inMemStorage) Get(context.Context, uuid.UUID) (pendinguploads.Record, error) {
return pendinguploads.Record{}, pendinguploads.ErrNotFound
}
func (s *inMemStorage) MarkFetched(context.Context, uuid.UUID) error { return nil }
func (s *inMemStorage) Ack(context.Context, uuid.UUID) error { return nil }
// expectPollDeliveryMode stubs the SELECT delivery_mode lookup that
// uploadPollMode does (separate from the one resolveWorkspaceForwardCreds
// does — this is the new helper introduced for the poll branch).
func expectPollDeliveryMode(mock sqlmock.Sqlmock, workspaceID, mode string) {
rows := sqlmock.NewRows([]string{"delivery_mode"}).AddRow(mode)
mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`).
WithArgs(workspaceID).
WillReturnRows(rows)
}
func expectPollDeliveryModeMissing(mock sqlmock.Sqlmock, workspaceID string) {
mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`).
WithArgs(workspaceID).
WillReturnError(sql.ErrNoRows)
}
// expectActivityInsert stubs the LogActivity INSERT so the poll branch's
// per-file activity row write doesn't fail the sqlmock expectations.
func expectActivityInsert(mock sqlmock.Sqlmock) {
mock.ExpectExec(`INSERT INTO activity_logs`).
WillReturnResult(sqlmock.NewResult(1, 1))
}
// expectActivityInsertWithTypeAndMethod is a strict variant that pins
// the activity_type and method positional args. Used in the discriminator
// regression test below — the workspace inbox poller filters
// `?type=a2a_receive`, so writing any other activity_type silently breaks
// poll-mode delivery without a build/test error. Pin the two discriminator
// fields so a refactor that flips activity_type back to a custom value is
// caught here instead of at runtime by a confused poller.
//
// Positional args (LogActivity uses ExecContext with 12 positional params):
// $1 workspace_id, $2 activity_type, $3 source_id, $4 target_id,
// $5 method, $6 summary, $7 request_body, $8 response_body,
// $9 tool_trace, $10 duration_ms, $11 status, $12 error_detail.
func expectActivityInsertWithTypeAndMethod(mock sqlmock.Sqlmock, workspaceID, activityType, method string) {
mock.ExpectExec(`INSERT INTO activity_logs`).
WithArgs(
workspaceID, // $1 workspace_id
activityType, // $2 activity_type ← pinned
sqlmock.AnyArg(), // $3 source_id
sqlmock.AnyArg(), // $4 target_id (workspaceID, but already covered)
method, // $5 method ← pinned
sqlmock.AnyArg(), // $6 summary
sqlmock.AnyArg(), // $7 request_body
sqlmock.AnyArg(), // $8 response_body
sqlmock.AnyArg(), // $9 tool_trace
sqlmock.AnyArg(), // $10 duration_ms
sqlmock.AnyArg(), // $11 status
sqlmock.AnyArg(), // $12 error_detail
).
WillReturnResult(sqlmock.NewResult(1, 1))
}
// pollUploadFixture builds a multipart body with N named files.
func pollUploadFixture(t *testing.T, files map[string][]byte) (*bytes.Buffer, string) {
t.Helper()
var buf bytes.Buffer
mw := multipart.NewWriter(&buf)
for name, data := range files {
fw, err := mw.CreateFormFile("files", name)
if err != nil {
t.Fatalf("CreateFormFile: %v", err)
}
_, _ = fw.Write(data)
}
mw.Close()
return &buf, mw.FormDataContentType()
}
// ---- happy path ----
func TestPollUpload_HappyPath_OneFile_StagesAndLogs(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "11111111-2222-3333-4444-555555555555"
expectPollDeliveryMode(mock, wsID, "poll")
expectActivityInsert(mock)
store := newInMemStorage()
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(store, nil)
body, ct := pollUploadFixture(t, map[string][]byte{"report.pdf": []byte("PDF-bytes")})
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d body=%s", w.Code, w.Body.String())
}
if len(store.puts) != 1 {
t.Fatalf("expected 1 storage Put, got %d", len(store.puts))
}
put := store.puts[0]
if put.Filename != "report.pdf" || put.Size != 9 {
t.Errorf("unexpected put: %+v", put)
}
// Response shape must match the workspace-side
// /internal/chat/uploads/ingest schema so canvas can't tell which
// path handled the upload.
var resp struct {
Files []struct {
URI string `json:"uri"`
Name string `json:"name"`
Mimetype string `json:"mimeType"`
Size int `json:"size"`
} `json:"files"`
}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode response: %v body=%s", err, w.Body.String())
}
if len(resp.Files) != 1 {
t.Fatalf("response files count = %d, want 1", len(resp.Files))
}
got := resp.Files[0]
if got.Name != "report.pdf" || got.Size != 9 {
t.Errorf("response file mismatch: %+v", got)
}
if !strings.HasPrefix(got.URI, "platform-pending:"+wsID+"/") {
t.Errorf("URI %q does not start with platform-pending:%s/", got.URI, wsID)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("expectations: %v", err)
}
}
func TestPollUpload_MultipleFiles_AllStagedAndLogged(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "11111111-aaaa-bbbb-cccc-555555555555"
expectPollDeliveryMode(mock, wsID, "poll")
expectActivityInsert(mock)
expectActivityInsert(mock)
expectActivityInsert(mock)
store := newInMemStorage()
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(store, nil)
body, ct := pollUploadFixture(t, map[string][]byte{
"a.txt": []byte("aaaa"),
"b.txt": []byte("bbbbb"),
"c.txt": []byte("cccccc"),
})
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d body=%s", w.Code, w.Body.String())
}
if len(store.puts) != 3 {
t.Fatalf("expected 3 storage Puts, got %d", len(store.puts))
}
}
// ---- regression: push-mode unchanged ----
func TestPollUpload_PushModeFallsThroughToForward(t *testing.T) {
// With pendingUploads wired but the workspace's mode is push,
// the poll branch must NOT activate — flow falls through to the
// existing resolveWorkspaceForwardCreds path. Pinned via the
// "delivery_mode lookup happened, then the URL+mode SELECT
// happened, then we 503 because no inbound secret" sequence.
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "22222222-2222-3333-4444-555555555555"
expectPollDeliveryMode(mock, wsID, "push")
// After the poll branch is bypassed, we hit
// resolveWorkspaceForwardCreds which selects url+delivery_mode.
expectURL(mock, wsID, "")
// URL empty + mode=push → 503 (no inbound secret check needed).
store := newInMemStorage()
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(store, nil)
body, ct := pollUploadFixture(t, map[string][]byte{"x": []byte("data")})
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusServiceUnavailable {
t.Fatalf("status=%d body=%s — expected push-mode 503 fall-through", w.Code, w.Body.String())
}
if len(store.puts) != 0 {
t.Errorf("push-mode should NOT have hit storage, got %d puts", len(store.puts))
}
}
func TestPollUpload_NotConfigured_FallsThrough(t *testing.T) {
// Backwards compat: a binary running without WithPendingUploads
// behaves exactly as before — the poll branch is dead code.
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "33333333-2222-3333-4444-555555555555"
expectURLAndMode(mock, wsID, "", "poll") // resolveWorkspaceForwardCreds emits 422
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil))
// No WithPendingUploads — pendingUploads is nil.
body, ct := pollUploadFixture(t, map[string][]byte{"x": []byte("data")})
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusUnprocessableEntity {
t.Errorf("status=%d, want 422 (legacy poll-mode rejection)", w.Code)
}
}
// ---- error paths ----
func TestPollUpload_WorkspaceMissing_404(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "44444444-2222-3333-4444-555555555555"
expectPollDeliveryModeMissing(mock, wsID)
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(newInMemStorage(), nil)
body, ct := pollUploadFixture(t, map[string][]byte{"x": []byte("d")})
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusNotFound {
t.Errorf("status=%d, want 404", w.Code)
}
}
func TestPollUpload_DeliveryModeLookupDBError_500(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "55555555-2222-3333-4444-555555555555"
mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`).
WithArgs(wsID).WillReturnError(errors.New("connection lost"))
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(newInMemStorage(), nil)
body, ct := pollUploadFixture(t, map[string][]byte{"x": []byte("d")})
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("status=%d, want 500", w.Code)
}
}
func TestPollUpload_NoFilesField_400(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "66666666-2222-3333-4444-555555555555"
expectPollDeliveryMode(mock, wsID, "poll")
store := newInMemStorage()
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(store, nil)
// Multipart with a non-files field — no actual files.
var buf bytes.Buffer
mw := multipart.NewWriter(&buf)
mw.WriteField("not_files", "hi")
mw.Close()
c, w := makeUploadRequest(t, wsID, &buf, mw.FormDataContentType())
h.Upload(c)
if w.Code != http.StatusBadRequest {
t.Errorf("status=%d, want 400 on no files field", w.Code)
}
}
func TestPollUpload_MalformedMultipart_400(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "77777777-2222-3333-4444-555555555555"
expectPollDeliveryMode(mock, wsID, "poll")
store := newInMemStorage()
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(store, nil)
// Body that doesn't match the boundary in Content-Type.
c, w := makeUploadRequest(t, wsID, bytes.NewBufferString("garbage"), "multipart/form-data; boundary=fake")
h.Upload(c)
if w.Code != http.StatusBadRequest {
t.Errorf("status=%d, want 400 on malformed multipart", w.Code)
}
}
func TestPollUpload_StorageError_500(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "88888888-2222-3333-4444-555555555555"
expectPollDeliveryMode(mock, wsID, "poll")
store := newInMemStorage()
store.putErr = errors.New("disk full")
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(store, nil)
body, ct := pollUploadFixture(t, map[string][]byte{"x.bin": []byte("data")})
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusInternalServerError {
t.Errorf("status=%d, want 500", w.Code)
}
}
func TestPollUpload_StorageTooLarge_413(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "99999999-2222-3333-4444-555555555555"
expectPollDeliveryMode(mock, wsID, "poll")
store := newInMemStorage()
store.putErr = pendinguploads.ErrTooLarge
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(store, nil)
body, ct := pollUploadFixture(t, map[string][]byte{"x.bin": []byte("data")})
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusRequestEntityTooLarge {
t.Errorf("status=%d, want 413", w.Code)
}
}
func TestPollUpload_TooManyFiles_400(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "aaaaaaaa-2222-3333-4444-555555555555"
expectPollDeliveryMode(mock, wsID, "poll")
store := newInMemStorage()
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(store, nil)
// 65 files — over the per-batch cap.
files := map[string][]byte{}
for i := 0; i < 65; i++ {
files[uuid.New().String()] = []byte("x")
}
body, ct := pollUploadFixture(t, files)
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusBadRequest {
t.Errorf("status=%d, want 400 on too many files", w.Code)
}
}
func TestPollUpload_NullDeliveryMode_TreatedAsPush(t *testing.T) {
// Production-observed 2026-05-04: external runtime workspaces
// (molecule-sdk-python on user infra) sometimes register with
// delivery_mode = NULL — the schema default for legacy rows from
// before #2339. The poll branch must NOT activate on NULL — only
// the explicit "poll" string. This is the same defensive posture
// resolveWorkspaceForwardCreds takes for legacy rows.
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "cccccccc-2222-3333-4444-555555555555"
mock.ExpectQuery(`SELECT delivery_mode FROM workspaces WHERE id = \$1`).
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"delivery_mode"}).AddRow(nil))
// Falls through to resolveWorkspaceForwardCreds:
expectURLAndMode(mock, wsID, "", "")
store := newInMemStorage()
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(store, nil)
body, ct := pollUploadFixture(t, map[string][]byte{"x.bin": []byte("data")})
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
// resolveWorkspaceForwardCreds with empty url + NULL mode = 422
// (the legacy "no callback URL" rejection — exactly what we're
// fixing for ACTUAL poll-mode rows but want to preserve for
// NULL ones until the row gets a real mode value via the next
// /registry/register).
if w.Code != http.StatusUnprocessableEntity {
t.Errorf("status=%d, want 422 for NULL delivery_mode (legacy fallthrough)", w.Code)
}
if len(store.puts) != 0 {
t.Errorf("NULL mode should NOT have hit storage, got %d puts", len(store.puts))
}
}
func TestPollUpload_PerFileCapPreStorage_413(t *testing.T) {
// Pin the early-reject branch (fh.Size > MaxFileBytes) BEFORE we
// read the part into memory. Without this, an oversize file
// would hit the storage layer's belt-and-suspenders check, which
// works but burns ~25 MB of memory + DB round-trip first. Send
// 25 MB + 1 byte → 413 with the file size in the response.
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "dddddddd-2222-3333-4444-555555555555"
expectPollDeliveryMode(mock, wsID, "poll")
store := newInMemStorage()
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(store, nil)
// 25 MB + 1 byte. Single file, large enough to trip the early
// size check.
oversize := make([]byte, pendinguploads.MaxFileBytes+1)
body, ct := pollUploadFixture(t, map[string][]byte{"big.bin": oversize})
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusRequestEntityTooLarge {
t.Fatalf("status=%d, want 413 on per-file size cap", w.Code)
}
if len(store.puts) != 0 {
t.Errorf("per-file cap reject should NOT have called storage.Put, got %d puts", len(store.puts))
}
// Sanity: response carries the size we tried to upload + the cap.
var body_ map[string]any
json.Unmarshal(w.Body.Bytes(), &body_)
if got := body_["max"]; got == nil {
t.Errorf("expected max field in response, got %v", body_)
}
}
// SanitizeFilename is exercised in the upload chain — pin one
// end-to-end case that exercises the URI path through the response.
func TestPollUpload_SanitizesFilenameInResponse(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "bbbbbbbb-2222-3333-4444-555555555555"
expectPollDeliveryMode(mock, wsID, "poll")
expectActivityInsert(mock)
store := newInMemStorage()
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(store, nil)
body, ct := pollUploadFixture(t, map[string][]byte{"hello world!.pdf": []byte("data")})
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d body=%s", w.Code, w.Body.String())
}
var resp struct {
Files []struct {
Name string `json:"name"`
URI string `json:"uri"`
}
}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp.Files) == 0 || resp.Files[0].Name != "hello_world_.pdf" {
t.Errorf("expected sanitized name 'hello_world_.pdf', got: %+v", resp.Files)
}
if len(store.puts) == 0 || store.puts[0].Filename != "hello_world_.pdf" {
t.Errorf("storage Put didn't receive sanitized filename: %+v", store.puts)
}
}
// TestPollUpload_ActivityRowDiscriminator pins the
// activity_type / method shape that the workspace inbox poller depends
// on. The poller filters `GET /workspaces/:id/activity?type=a2a_receive`
// so the handler MUST write activity_type=a2a_receive (NOT a custom
// type), and use method=chat_upload_receive as the
// upload-vs-message-vs-task discriminator.
//
// Why pinned: a previous iteration of this handler used
// activity_type="chat_upload_receive" — silently invisible to the
// existing poller. The branch passed every push-mode test, every
// storage test, and every per-file content test; the bug only
// surfaced at runtime when the workspace polled and got nothing.
// Encode the contract in a unit test so the next refactor can't
// re-break it without a red CI.
func TestPollUpload_ActivityRowDiscriminator(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
wsID := "abc12345-6789-4abc-8def-000000000999"
expectPollDeliveryMode(mock, wsID, "poll")
expectActivityInsertWithTypeAndMethod(mock, wsID, "a2a_receive", "chat_upload_receive")
store := newInMemStorage()
h := NewChatFilesHandler(NewTemplatesHandler(t.TempDir(), nil)).
WithPendingUploads(store, nil)
body, ct := pollUploadFixture(t, map[string][]byte{"x.pdf": []byte("xx")})
c, w := makeUploadRequest(t, wsID, body, ct)
h.Upload(c)
if w.Code != http.StatusOK {
t.Fatalf("status=%d body=%s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("expectations: %v", err)
}
}

View File

@ -0,0 +1,184 @@
// pending_uploads.go — endpoints the workspace polls to fetch and ack
// chat-upload files staged on the platform side for poll-mode delivery.
//
// Companion to chat_files.go Upload's poll-mode branch:
//
// Canvas POST /workspaces/:id/chat/uploads
// ↓ (poll-mode workspace)
// Platform: chat_files.uploadPollMode
// ↓ writes pending_uploads row + activity_logs(type=chat_upload_receive)
// Workspace inbox poller picks up activity row
// ↓
// Workspace GETs /workspaces/:id/pending-uploads/:fid/content ← this file
// ↓ writes file to /workspace/.molecule/chat-uploads
// Workspace POSTs /workspaces/:id/pending-uploads/:fid/ack ← this file
// ↓ row marked acked; Phase 3 sweep deletes
//
// Auth: same wsAuth middleware that gates the activity poll endpoint —
// the workspace's per-workspace platform_token. Only the target workspace
// can read OR ack its own pending uploads. The handler enforces that
// :id == file.workspace_id even though the URL param matches; defence in
// depth against a token leak letting one workspace pull another's bytes.
package handlers
import (
"errors"
"log"
"net/http"
"strconv"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
)
// PendingUploadsHandler serves the workspace-side fetch + ack endpoints.
// Holds a Storage so tests can inject an in-memory implementation
// without going through Postgres (sqlmock-based unit tests cover the
// Postgres impl in internal/pendinguploads/storage_test.go).
type PendingUploadsHandler struct {
storage pendinguploads.Storage
}
// NewPendingUploadsHandler constructs the handler with a concrete
// Storage. Production wires up pendinguploads.NewPostgres(db.DB).
func NewPendingUploadsHandler(storage pendinguploads.Storage) *PendingUploadsHandler {
return &PendingUploadsHandler{storage: storage}
}
// GetContent handles GET /workspaces/:id/pending-uploads/:file_id/content.
//
// Returns the file bytes with the original mimetype and a
// Content-Disposition that names the original (sanitized) filename so
// the workspace's fetcher writes it under the expected name. Stamps
// fetched_at on the row best-effort — the read response is already
// flushed to the network before the MarkFetched call so a sweep race
// can't break the workspace's fetch.
//
// 404 on:
// - file_id not found
// - file_id belongs to a different workspace (cross-workspace bleed
// protection)
// - row already acked (workspace's bug — should not re-fetch after ack)
// - row past expires_at (Phase 3 sweep would delete shortly anyway)
func (h *PendingUploadsHandler) GetContent(c *gin.Context) {
workspaceID := c.Param("id")
if err := validateWorkspaceID(workspaceID); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
return
}
fileIDStr := c.Param("file_id")
fileID, err := uuid.Parse(fileIDStr)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid file_id"})
return
}
rec, err := h.storage.Get(c.Request.Context(), fileID)
if errors.Is(err, pendinguploads.ErrNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found, expired, or already acked"})
return
}
if err != nil {
log.Printf("pending_uploads GetContent: storage.Get(%s) failed: %v", fileID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "storage error"})
return
}
// Cross-workspace bleed protection: a token leak from workspace A
// must not let it read workspace B's pending uploads even with the
// correct file_id. wsAuth already pinned the caller to :id; reject
// if the row's workspace_id doesn't match.
if rec.WorkspaceID.String() != workspaceID {
log.Printf("pending_uploads GetContent: workspace mismatch — caller=%s row=%s file_id=%s",
workspaceID, rec.WorkspaceID, fileID)
c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found"})
return
}
// Stream the bytes. Set the original mimetype if known; fall back
// to application/octet-stream so curl / browser clients still get
// a valid response. Content-Disposition uses the workspace-side
// filename so the fetcher writes it under the expected name.
mimetype := rec.Mimetype
if mimetype == "" {
mimetype = "application/octet-stream"
}
c.Header("Content-Type", mimetype)
c.Header("Content-Disposition", contentDispositionAttachment(rec.Filename))
c.Header("Content-Length", strconv.FormatInt(rec.SizeBytes, 10))
c.Status(http.StatusOK)
if _, err := c.Writer.Write(rec.Content); err != nil {
// Connection closed mid-stream — log and bail; we cannot
// re-emit headers at this point. The workspace's HTTP client
// will see the truncated body and retry on next poll.
log.Printf("pending_uploads GetContent: write failed for %s: %v", fileID, err)
return
}
// Best-effort fetched_at stamp. After-the-fact so the GET response
// completes regardless of the UPDATE outcome — a Phase 3 sweep
// race that nukes the row between Get and MarkFetched must not
// break the workspace's fetch.
if err := h.storage.MarkFetched(c.Request.Context(), fileID); err != nil {
log.Printf("pending_uploads GetContent: mark_fetched(%s) failed: %v", fileID, err)
}
}
// Ack handles POST /workspaces/:id/pending-uploads/:file_id/ack.
//
// Marks the row as handed-off; Phase 3 sweep deletes acked rows after
// a retention window. Idempotent — workspace at-least-once retries on
// a flaky network return success without moving the timestamp.
func (h *PendingUploadsHandler) Ack(c *gin.Context) {
workspaceID := c.Param("id")
if err := validateWorkspaceID(workspaceID); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid workspace ID"})
return
}
fileIDStr := c.Param("file_id")
fileID, err := uuid.Parse(fileIDStr)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid file_id"})
return
}
// Cross-workspace bleed protection: do a lookup BEFORE Ack so
// a token leak can't ack a row owned by a different workspace.
// We don't expose this distinction in the response (404 either
// way) — the workspace can't tell whether it ack'd a non-existent
// row vs. one it didn't own, and that's fine for the contract.
rec, err := h.storage.Get(c.Request.Context(), fileID)
if errors.Is(err, pendinguploads.ErrNotFound) {
c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found, expired, or already acked"})
return
}
if err != nil {
log.Printf("pending_uploads Ack: storage.Get(%s) failed: %v", fileID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "storage error"})
return
}
if rec.WorkspaceID.String() != workspaceID {
log.Printf("pending_uploads Ack: workspace mismatch — caller=%s row=%s file_id=%s",
workspaceID, rec.WorkspaceID, fileID)
c.JSON(http.StatusNotFound, gin.H{"error": "pending upload not found"})
return
}
if err := h.storage.Ack(c.Request.Context(), fileID); err != nil {
if errors.Is(err, pendinguploads.ErrNotFound) {
// Race window: the row passed Get but failed Ack — sweep
// raced with us between the two queries. Treat as success
// (the workspace's intent was honored, the row is gone).
c.JSON(http.StatusOK, gin.H{"acked": true, "raced": true})
return
}
log.Printf("pending_uploads Ack: storage.Ack(%s) failed: %v", fileID, err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "storage error"})
return
}
c.JSON(http.StatusOK, gin.H{"acked": true})
}

View File

@ -0,0 +1,373 @@
package handlers_test
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
)
// fakeStorage is an in-memory pendinguploads.Storage. Lets handler
// tests pin behaviour without going through Postgres + sqlmock — the
// storage layer's own tests (internal/pendinguploads/storage_test.go)
// cover the SQL drift surface; here we only care about the handler's
// 4xx/5xx mapping and side-effect ordering.
type fakeStorage struct {
rows map[uuid.UUID]pendinguploads.Record
getErr error // forced error from Get (overrides rows lookup)
ackErr error // forced error from Ack
markErr error // forced error from MarkFetched
markFetched []uuid.UUID
ackCalls []uuid.UUID
}
func newFakeStorage() *fakeStorage {
return &fakeStorage{rows: map[uuid.UUID]pendinguploads.Record{}}
}
func (f *fakeStorage) Put(ctx context.Context, ws uuid.UUID, content []byte, filename, mimetype string) (uuid.UUID, error) {
id := uuid.New()
f.rows[id] = pendinguploads.Record{
FileID: id, WorkspaceID: ws, Content: content,
Filename: filename, Mimetype: mimetype,
SizeBytes: int64(len(content)), CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(24 * time.Hour),
}
return id, nil
}
func (f *fakeStorage) Get(_ context.Context, fileID uuid.UUID) (pendinguploads.Record, error) {
if f.getErr != nil {
return pendinguploads.Record{}, f.getErr
}
rec, ok := f.rows[fileID]
if !ok {
return pendinguploads.Record{}, pendinguploads.ErrNotFound
}
return rec, nil
}
func (f *fakeStorage) MarkFetched(_ context.Context, fileID uuid.UUID) error {
f.markFetched = append(f.markFetched, fileID)
return f.markErr
}
func (f *fakeStorage) Ack(_ context.Context, fileID uuid.UUID) error {
f.ackCalls = append(f.ackCalls, fileID)
if f.ackErr != nil {
return f.ackErr
}
delete(f.rows, fileID)
return nil
}
func newRouter(handler *handlers.PendingUploadsHandler) *gin.Engine {
gin.SetMode(gin.TestMode)
r := gin.New()
r.GET("/workspaces/:id/pending-uploads/:file_id/content", handler.GetContent)
r.POST("/workspaces/:id/pending-uploads/:file_id/ack", handler.Ack)
return r
}
// ---- GetContent ----
func TestGetContent_HappyPath_StreamsBytesAndStampsFetched(t *testing.T) {
fs := newFakeStorage()
wsID := uuid.New()
fileID, err := fs.Put(context.Background(), wsID, []byte("hello world"), "report.pdf", "application/pdf")
if err != nil {
t.Fatalf("Put: %v", err)
}
h := handlers.NewPendingUploadsHandler(fs)
r := newRouter(h)
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/content", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("status=%d body=%s", w.Code, w.Body.String())
}
if got := w.Body.String(); got != "hello world" {
t.Errorf("body = %q, want %q", got, "hello world")
}
if got := w.Header().Get("Content-Type"); got != "application/pdf" {
t.Errorf("Content-Type = %q, want application/pdf", got)
}
if got := w.Header().Get("Content-Disposition"); !strings.Contains(got, "report.pdf") {
t.Errorf("Content-Disposition = %q, expected to mention report.pdf", got)
}
if got := w.Header().Get("Content-Length"); got != "11" {
t.Errorf("Content-Length = %q, want 11", got)
}
if len(fs.markFetched) != 1 || fs.markFetched[0] != fileID {
t.Errorf("expected MarkFetched(%s), got %v", fileID, fs.markFetched)
}
}
func TestGetContent_DefaultsMimetypeWhenEmpty(t *testing.T) {
fs := newFakeStorage()
wsID := uuid.New()
fileID, _ := fs.Put(context.Background(), wsID, []byte("data"), "x.bin", "")
h := handlers.NewPendingUploadsHandler(fs)
r := newRouter(h)
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/content", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if got := w.Header().Get("Content-Type"); got != "application/octet-stream" {
t.Errorf("Content-Type fallback = %q, want application/octet-stream", got)
}
}
func TestGetContent_InvalidWorkspaceID_400(t *testing.T) {
fs := newFakeStorage()
r := newRouter(handlers.NewPendingUploadsHandler(fs))
req := httptest.NewRequest(http.MethodGet, "/workspaces/not-a-uuid/pending-uploads/00000000-0000-0000-0000-000000000000/content", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("status=%d, want 400", w.Code)
}
}
func TestGetContent_InvalidFileID_400(t *testing.T) {
fs := newFakeStorage()
r := newRouter(handlers.NewPendingUploadsHandler(fs))
wsID := uuid.New()
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID.String()+"/pending-uploads/not-a-uuid/content", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("status=%d, want 400", w.Code)
}
}
func TestGetContent_NotFound_404(t *testing.T) {
fs := newFakeStorage()
r := newRouter(handlers.NewPendingUploadsHandler(fs))
wsID := uuid.New()
missing := uuid.New()
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID.String()+"/pending-uploads/"+missing.String()+"/content", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusNotFound {
t.Errorf("status=%d, want 404", w.Code)
}
}
func TestGetContent_StorageError_500(t *testing.T) {
fs := newFakeStorage()
fs.getErr = errors.New("connection refused")
r := newRouter(handlers.NewPendingUploadsHandler(fs))
wsID := uuid.New()
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID.String()+"/pending-uploads/"+uuid.New().String()+"/content", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusInternalServerError {
t.Errorf("status=%d, want 500", w.Code)
}
}
func TestGetContent_CrossWorkspaceBleed_404(t *testing.T) {
// Token leak: workspace A's wsAuth-validated request tries to
// pull workspace B's file_id. Handler must 404 even though the
// row exists.
fs := newFakeStorage()
wsB := uuid.New()
fileID, _ := fs.Put(context.Background(), wsB, []byte("secret"), "leak.txt", "text/plain")
wsA := uuid.New()
r := newRouter(handlers.NewPendingUploadsHandler(fs))
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsA.String()+"/pending-uploads/"+fileID.String()+"/content", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusNotFound {
t.Fatalf("status=%d, want 404 for cross-workspace bleed", w.Code)
}
// Critical: must not have leaked the bytes.
if strings.Contains(w.Body.String(), "secret") {
t.Errorf("response body leaked content from another workspace: %q", w.Body.String())
}
}
func TestGetContent_MarkFetchedFailureLoggedNotPropagated(t *testing.T) {
fs := newFakeStorage()
wsID := uuid.New()
fileID, _ := fs.Put(context.Background(), wsID, []byte("ok"), "x.txt", "text/plain")
fs.markErr = errors.New("update failed (sweep raced)")
h := handlers.NewPendingUploadsHandler(fs)
r := newRouter(h)
req := httptest.NewRequest(http.MethodGet,
"/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/content", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
// Body already returned 200 OK + bytes BEFORE the MarkFetched
// failure — workspace fetch must NOT fail because of an
// observability hook.
if w.Code != http.StatusOK {
t.Errorf("status=%d, want 200 even on MarkFetched failure", w.Code)
}
if w.Body.String() != "ok" {
t.Errorf("body = %q, want %q", w.Body.String(), "ok")
}
}
// ---- Ack ----
func TestAck_HappyPath_RemovesRow(t *testing.T) {
fs := newFakeStorage()
wsID := uuid.New()
fileID, _ := fs.Put(context.Background(), wsID, []byte("data"), "x.bin", "")
r := newRouter(handlers.NewPendingUploadsHandler(fs))
req := httptest.NewRequest(http.MethodPost,
"/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/ack", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("status=%d", w.Code)
}
var body map[string]any
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
t.Fatalf("decode: %v", err)
}
if body["acked"] != true {
t.Errorf("body.acked = %v, want true", body["acked"])
}
if _, exists := fs.rows[fileID]; exists {
t.Errorf("row should have been removed after ack")
}
}
func TestAck_NonExistent_404(t *testing.T) {
fs := newFakeStorage()
r := newRouter(handlers.NewPendingUploadsHandler(fs))
wsID := uuid.New()
req := httptest.NewRequest(http.MethodPost,
"/workspaces/"+wsID.String()+"/pending-uploads/"+uuid.New().String()+"/ack", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusNotFound {
t.Errorf("status=%d, want 404", w.Code)
}
}
func TestAck_CrossWorkspaceBleed_404(t *testing.T) {
fs := newFakeStorage()
wsB := uuid.New()
fileID, _ := fs.Put(context.Background(), wsB, []byte("data"), "x.bin", "")
wsA := uuid.New()
r := newRouter(handlers.NewPendingUploadsHandler(fs))
req := httptest.NewRequest(http.MethodPost,
"/workspaces/"+wsA.String()+"/pending-uploads/"+fileID.String()+"/ack", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusNotFound {
t.Errorf("status=%d, want 404 for cross-workspace ack", w.Code)
}
// Row must remain — workspace A's bogus ack must NOT delete
// workspace B's file.
if _, exists := fs.rows[fileID]; !exists {
t.Errorf("row should NOT have been removed by cross-workspace ack")
}
}
func TestAck_InvalidWorkspaceID_400(t *testing.T) {
fs := newFakeStorage()
r := newRouter(handlers.NewPendingUploadsHandler(fs))
req := httptest.NewRequest(http.MethodPost, "/workspaces/not-a-uuid/pending-uploads/"+uuid.New().String()+"/ack", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("status=%d, want 400", w.Code)
}
}
func TestAck_InvalidFileID_400(t *testing.T) {
fs := newFakeStorage()
r := newRouter(handlers.NewPendingUploadsHandler(fs))
req := httptest.NewRequest(http.MethodPost,
"/workspaces/"+uuid.New().String()+"/pending-uploads/not-a-uuid/ack", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusBadRequest {
t.Errorf("status=%d, want 400", w.Code)
}
}
func TestAck_GetStorageError_500(t *testing.T) {
fs := newFakeStorage()
fs.getErr = errors.New("conn lost")
r := newRouter(handlers.NewPendingUploadsHandler(fs))
req := httptest.NewRequest(http.MethodPost,
"/workspaces/"+uuid.New().String()+"/pending-uploads/"+uuid.New().String()+"/ack", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusInternalServerError {
t.Errorf("status=%d, want 500", w.Code)
}
}
func TestAck_RaceWithSweep_ReturnsRacedTrue(t *testing.T) {
// Sweep deletes the row between the handler's Get and Ack calls.
// Storage.Ack returns ErrNotFound; handler treats that as success
// (intent honored, row gone) and reports raced:true.
fs := newFakeStorage()
wsID := uuid.New()
fileID, _ := fs.Put(context.Background(), wsID, []byte("data"), "x.bin", "")
fs.ackErr = pendinguploads.ErrNotFound
r := newRouter(handlers.NewPendingUploadsHandler(fs))
req := httptest.NewRequest(http.MethodPost,
"/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/ack", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("status=%d, want 200 on race", w.Code)
}
var body map[string]any
json.Unmarshal(w.Body.Bytes(), &body)
if body["acked"] != true || body["raced"] != true {
t.Errorf("expected acked=true raced=true, got %v", body)
}
}
func TestAck_StorageError_500(t *testing.T) {
fs := newFakeStorage()
wsID := uuid.New()
fileID, _ := fs.Put(context.Background(), wsID, []byte("data"), "x.bin", "")
fs.ackErr = errors.New("conn refused")
r := newRouter(handlers.NewPendingUploadsHandler(fs))
req := httptest.NewRequest(http.MethodPost,
"/workspaces/"+wsID.String()+"/pending-uploads/"+fileID.String()+"/ack", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusInternalServerError {
t.Errorf("status=%d, want 500", w.Code)
}
}

View File

@ -0,0 +1,103 @@
package handlers_test
import (
"strings"
"testing"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
)
// SanitizeFilename mirrors workspace/internal_chat_uploads.py's
// sanitize_filename. Drift between the two means canvas-emitted URIs
// differ between push and poll paths for the same upload — pin every
// case the Python suite pins (workspace/tests/test_internal_chat_uploads.py
// :: test_sanitize_filename).
func TestSanitizeFilename_StripsPathTraversal(t *testing.T) {
cases := map[string]string{
"../../etc/passwd": "passwd",
"/etc/passwd": "passwd",
"a/b/c.txt": "c.txt",
"./relative": "relative",
}
for in, want := range cases {
if got := handlers.SanitizeFilename(in); got != want {
t.Errorf("SanitizeFilename(%q) = %q, want %q", in, got, want)
}
}
}
func TestSanitizeFilename_ReplacesUnsafeChars(t *testing.T) {
cases := map[string]string{
"hello world.pdf": "hello_world.pdf",
"weird;chars!?.txt": "weird_chars__.txt",
"中文.docx": "__.docx", // non-ASCII → underscore (each rune)
"file (1).pdf": "file__1_.pdf",
}
for in, want := range cases {
if got := handlers.SanitizeFilename(in); got != want {
t.Errorf("SanitizeFilename(%q) = %q, want %q", in, got, want)
}
}
}
func TestSanitizeFilename_PreservesAllowedChars(t *testing.T) {
in := "report-2026.05.04_v2.pdf"
if got := handlers.SanitizeFilename(in); got != in {
t.Errorf("SanitizeFilename(%q) = %q, want unchanged", in, got)
}
}
func TestSanitizeFilename_CapsAt100Chars_PreservesShortExtension(t *testing.T) {
// 95-char base + ".pdf" (4 chars + dot) = 100 chars total — fits.
base := strings.Repeat("a", 95)
in := base + ".pdf"
got := handlers.SanitizeFilename(in)
if got != in {
t.Errorf("expected unchanged at 100 chars, got %q (len=%d)", got, len(got))
}
// 200-char base + ".pdf" → truncated to 100 with .pdf preserved.
long := strings.Repeat("b", 200) + ".pdf"
got = handlers.SanitizeFilename(long)
if len(got) != 100 {
t.Errorf("expected length 100, got %d (%q)", len(got), got)
}
if !strings.HasSuffix(got, ".pdf") {
t.Errorf("expected .pdf suffix preserved, got %q", got)
}
}
func TestSanitizeFilename_DropsLongExtension(t *testing.T) {
// Extension > 16 chars is treated as part of the name; truncation
// drops it without preservation. Mirrors the Python rule
// (dot >= 0 AND len(base) - dot <= 16).
long := strings.Repeat("c", 90) + ".thisisaverylongextensionnotpreserved"
got := handlers.SanitizeFilename(long)
if len(got) != 100 {
t.Errorf("expected 100, got %d (%q)", len(got), got)
}
// First 100 chars of the SANITIZED input — extension not preserved.
if strings.Contains(got, ".thisisaverylongextensionnotpreserved") {
t.Errorf("long extension should have been truncated, got %q", got)
}
}
func TestSanitizeFilename_FallbackForReservedNames(t *testing.T) {
cases := []string{"", ".", ".."}
for _, in := range cases {
if got := handlers.SanitizeFilename(in); got != "file" {
t.Errorf("SanitizeFilename(%q) = %q, want %q", in, got, "file")
}
}
}
func TestSanitizeFilename_AllUnsafeBecomesAllUnderscores_NotReserved(t *testing.T) {
// All-non-ASCII input becomes all-underscores — not "." or ".." or
// empty, so the fallback path doesn't trigger and we get a real
// (if uninformative) sanitized name.
got := handlers.SanitizeFilename("中文中文")
if got != "____" {
t.Errorf("SanitizeFilename(中文中文) = %q, want %q", got, "____")
}
}

View File

@ -0,0 +1,253 @@
// Package pendinguploads is the platform-side staging layer for chat file
// uploads bound for poll-mode workspaces (delivery_mode='poll', no public
// callback URL — typically external runtimes on a laptop / behind NAT).
//
// In push-mode the platform synchronously POSTs the multipart body to the
// workspace's /internal/chat/uploads/ingest endpoint and forgets about it.
// Poll-mode has no callback URL to forward to, so the platform parses the
// multipart on this side, persists each file as one pending_uploads row,
// and lets the workspace pull it on its next inbox poll cycle.
//
// The Storage interface keeps the bytes-vs-metadata split clean: today
// content is stored inline as bytea on the pending_uploads row, but the
// shape lets a future PR (RFC #2789, S3-backed shared storage) swap to
// object storage by adding a new Storage implementation without touching
// any of the handler-layer callers.
//
// Lifecycle:
//
// Put — handler creates a row with the file content; assigns file_id.
// Get — GET /workspaces/:id/pending-uploads/:fid/content reads bytes.
// MarkFetched — stamps fetched_at on the row (Phase 3 observability).
// Ack — POST /workspaces/:id/pending-uploads/:fid/ack;
// terminal happy-path state. After ack, Get returns ErrNotFound.
// GC sweep deletes acked rows after a retention window.
//
// Hard TTL: every row has an expires_at default of created_at + 24h. After
// expiration the row is GC'd by Phase 3's sweep cron regardless of ack
// state. Get on an expired row returns ErrNotFound — the workspace's next
// poll will see the underlying activity_logs row was orphaned and the
// agent surfaces "file expired" to the user.
package pendinguploads
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/google/uuid"
)
// Per-file size cap. Mirrors workspace-side ingest_handler
// (workspace/internal_chat_uploads.py:198). Pinned at the DB level via
// the size_bytes CHECK constraint; this Go-side constant exists so the
// Put implementation can reject before round-tripping to Postgres.
const MaxFileBytes = 25 * 1024 * 1024
// ErrNotFound is returned by Get / MarkFetched / Ack when the row is
// absent. Callers turn this into HTTP 404. Treat acked + expired rows
// as not-found so the workspace can never re-fetch a file we've
// considered handed-off.
var ErrNotFound = errors.New("pendinguploads: row not found, expired, or already acked")
// ErrTooLarge is returned by Put when content exceeds MaxFileBytes.
// Callers turn this into HTTP 413. Pre-DB check so we don't push a
// 25 MB+1 byte payload through Postgres just to have the CHECK reject it.
var ErrTooLarge = errors.New("pendinguploads: content exceeds per-file cap")
// Record carries the full row including content. Returned by Get;
// the GET /content handler streams Record.Content as the response body.
type Record struct {
FileID uuid.UUID
WorkspaceID uuid.UUID
Content []byte
Filename string
Mimetype string
SizeBytes int64
CreatedAt time.Time
FetchedAt *time.Time // nil before first MarkFetched
AckedAt *time.Time // nil before Ack (Get returns ErrNotFound after)
ExpiresAt time.Time
}
// Storage is the platform-side persistence boundary for poll-mode chat
// uploads. The Postgres implementation backs all callers today; an S3-
// backed implementation can drop in once RFC #2789 lands by making
// content storage out-of-line and updating the Postgres-only metadata
// columns.
type Storage interface {
// Put creates a row for one file targeting workspaceID and returns
// the assigned file_id. content is bounded by MaxFileBytes;
// filename / mimetype are stored verbatim — caller is responsible
// for sanitization (matches workspace-side rule, see
// internal_chat_uploads.py:sanitize_filename). Empty filename and
// content > MaxFileBytes return errors before any DB write.
Put(ctx context.Context, workspaceID uuid.UUID, content []byte, filename, mimetype string) (uuid.UUID, error)
// Get returns the full row including content. Returns ErrNotFound
// when the row is absent, acked, or past expires_at. Caller should
// not differentiate the three cases in the response — from the
// workspace's perspective they all mean "not available, give up."
Get(ctx context.Context, fileID uuid.UUID) (Record, error)
// MarkFetched stamps fetched_at on the row. Idempotent — repeated
// calls update fetched_at to the latest timestamp. Returns
// ErrNotFound if the row is absent / acked / expired.
MarkFetched(ctx context.Context, fileID uuid.UUID) error
// Ack stamps acked_at on the row. Idempotent on the row state
// (acked_at is only set the first time so workspace double-acks
// don't move the timestamp). Returns ErrNotFound if the row is
// absent or already expired; on already-acked, returns nil so
// the workspace's at-least-once retry succeeds without an error.
Ack(ctx context.Context, fileID uuid.UUID) error
}
// PostgresStorage is the production Storage implementation backed by
// the pending_uploads table.
type PostgresStorage struct {
db *sql.DB
}
// NewPostgres returns a Storage backed by db. db must be a connected
// pool; this constructor does no I/O.
func NewPostgres(db *sql.DB) *PostgresStorage {
return &PostgresStorage{db: db}
}
// Compile-time check that PostgresStorage satisfies Storage.
var _ Storage = (*PostgresStorage)(nil)
func (p *PostgresStorage) Put(ctx context.Context, workspaceID uuid.UUID, content []byte, filename, mimetype string) (uuid.UUID, error) {
if len(content) == 0 {
return uuid.Nil, fmt.Errorf("pendinguploads: empty content")
}
if len(content) > MaxFileBytes {
return uuid.Nil, ErrTooLarge
}
if filename == "" {
return uuid.Nil, fmt.Errorf("pendinguploads: empty filename")
}
// Filename length cap is enforced both here (early reject) and at
// the DB layer (CHECK constraint) so a buggy caller can't write a
// 200-char filename that Phase 2's URI rewrite would then truncate.
if len(filename) > 100 {
return uuid.Nil, fmt.Errorf("pendinguploads: filename exceeds 100 chars")
}
var fileID uuid.UUID
err := p.db.QueryRowContext(ctx, `
INSERT INTO pending_uploads (workspace_id, content, size_bytes, filename, mimetype)
VALUES ($1, $2, $3, $4, $5)
RETURNING file_id
`, workspaceID, content, int64(len(content)), filename, mimetype).Scan(&fileID)
if err != nil {
return uuid.Nil, fmt.Errorf("pendinguploads: insert: %w", err)
}
return fileID, nil
}
func (p *PostgresStorage) Get(ctx context.Context, fileID uuid.UUID) (Record, error) {
// The expires_at + acked_at filter in the WHERE clause means a
// caller sees ErrNotFound for absent / acked / expired without
// needing per-case branching. Trade-off: we can't differentiate
// in metrics, but the workspace's response is the same in all
// three cases ("file gone, give up") so the granularity isn't
// useful at this layer. Phase 3 dashboards aggregate row-state
// counts directly off the table.
var r Record
err := p.db.QueryRowContext(ctx, `
SELECT file_id, workspace_id, content, filename, mimetype,
size_bytes, created_at, fetched_at, acked_at, expires_at
FROM pending_uploads
WHERE file_id = $1
AND acked_at IS NULL
AND expires_at > now()
`, fileID).Scan(
&r.FileID, &r.WorkspaceID, &r.Content, &r.Filename, &r.Mimetype,
&r.SizeBytes, &r.CreatedAt, &r.FetchedAt, &r.AckedAt, &r.ExpiresAt,
)
if errors.Is(err, sql.ErrNoRows) {
return Record{}, ErrNotFound
}
if err != nil {
return Record{}, fmt.Errorf("pendinguploads: select: %w", err)
}
return r, nil
}
func (p *PostgresStorage) MarkFetched(ctx context.Context, fileID uuid.UUID) error {
// UPDATE on the same gating predicate as Get — keeps the "absent
// or acked or expired = ErrNotFound" contract symmetric. Without
// the predicate a workspace could re-stamp fetched_at on an acked
// row, which would mislead Phase 3's stuck-fetch dashboard.
res, err := p.db.ExecContext(ctx, `
UPDATE pending_uploads
SET fetched_at = now()
WHERE file_id = $1
AND acked_at IS NULL
AND expires_at > now()
`, fileID)
if err != nil {
return fmt.Errorf("pendinguploads: mark_fetched: %w", err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("pendinguploads: mark_fetched rows: %w", err)
}
if n == 0 {
return ErrNotFound
}
return nil
}
func (p *PostgresStorage) Ack(ctx context.Context, fileID uuid.UUID) error {
// Set acked_at only if currently NULL — workspace at-least-once
// retries don't move the timestamp, so dashboards see the first
// successful ack as the "delivery time." Two-clause WHERE: row
// must exist and not be expired; acked-but-still-in-window is
// returned as success (idempotent retry).
res, err := p.db.ExecContext(ctx, `
UPDATE pending_uploads
SET acked_at = now()
WHERE file_id = $1
AND acked_at IS NULL
AND expires_at > now()
`, fileID)
if err != nil {
return fmt.Errorf("pendinguploads: ack: %w", err)
}
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("pendinguploads: ack rows: %w", err)
}
if n == 1 {
return nil
}
// Zero-rows-affected: either the row doesn't exist / has expired,
// OR it was already acked. Re-query to disambiguate so the
// idempotent-retry case returns nil instead of ErrNotFound.
var ackedAt sql.NullTime
err = p.db.QueryRowContext(ctx, `
SELECT acked_at FROM pending_uploads
WHERE file_id = $1 AND expires_at > now()
`, fileID).Scan(&ackedAt)
if errors.Is(err, sql.ErrNoRows) {
return ErrNotFound
}
if err != nil {
return fmt.Errorf("pendinguploads: ack disambiguate: %w", err)
}
if ackedAt.Valid {
// Already acked — idempotent success.
return nil
}
// Predicate matched a non-acked, non-expired row but RowsAffected
// was 0. This means the row was concurrently modified between the
// UPDATE and the SELECT (extremely rare; e.g. a Phase 3 sweep
// raced with the ACK). Treat as success — the row is gone, but
// the workspace's intent ("I'm done with this file") was honored.
return nil
}

View File

@ -0,0 +1,400 @@
package pendinguploads_test
import (
"context"
"database/sql"
"errors"
"strings"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/google/uuid"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
)
// Tests pin the SQL the handler relies on. Drift detection: if the
// migration changes column order / predicate shape, sqlmock's
// QueryMatcherEqual + ExpectQuery / ExpectExec on the literal text
// fails the test before the handler can ship a silently-broken read.
//
// Why sqlmock and not testcontainers / real Postgres:
//
// The Storage contract is "this Go method runs THIS SQL." Real-DB
// tests would catch SQL-syntax errors but not the contract drift
// we care about (e.g. handler accidentally reordering columns,
// dropping the acked_at predicate, etc.). Postgres-syntax coverage
// lives in the migration round-trip test (Phase 4 E2E).
func newMockDB(t *testing.T) (*sql.DB, sqlmock.Sqlmock) {
t.Helper()
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
if err != nil {
t.Fatalf("sqlmock.New: %v", err)
}
t.Cleanup(func() { _ = db.Close() })
return db, mock
}
// Single source of truth for the SQL strings — drift here = test fails;
// matches the Go literals in storage.go exactly.
const (
insertSQL = `
INSERT INTO pending_uploads (workspace_id, content, size_bytes, filename, mimetype)
VALUES ($1, $2, $3, $4, $5)
RETURNING file_id
`
selectSQL = `
SELECT file_id, workspace_id, content, filename, mimetype,
size_bytes, created_at, fetched_at, acked_at, expires_at
FROM pending_uploads
WHERE file_id = $1
AND acked_at IS NULL
AND expires_at > now()
`
markFetchedSQL = `
UPDATE pending_uploads
SET fetched_at = now()
WHERE file_id = $1
AND acked_at IS NULL
AND expires_at > now()
`
ackSQL = `
UPDATE pending_uploads
SET acked_at = now()
WHERE file_id = $1
AND acked_at IS NULL
AND expires_at > now()
`
ackDisambiguateSQL = `
SELECT acked_at FROM pending_uploads
WHERE file_id = $1 AND expires_at > now()
`
)
// ----- Put ------------------------------------------------------------------
func TestPut_HappyPath_ReturnsAssignedFileID(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
wsID := uuid.New()
expectedID := uuid.New()
mock.ExpectQuery(insertSQL).
WithArgs(wsID, []byte("hello"), int64(5), "report.pdf", "application/pdf").
WillReturnRows(sqlmock.NewRows([]string{"file_id"}).AddRow(expectedID))
got, err := store.Put(context.Background(), wsID, []byte("hello"), "report.pdf", "application/pdf")
if err != nil {
t.Fatalf("Put: %v", err)
}
if got != expectedID {
t.Errorf("file_id mismatch: got %s want %s", got, expectedID)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("expectations: %v", err)
}
}
func TestPut_RejectsEmptyContentBeforeDB(t *testing.T) {
db, _ := newMockDB(t) // no expectations — must NOT round-trip
store := pendinguploads.NewPostgres(db)
_, err := store.Put(context.Background(), uuid.New(), nil, "x.txt", "")
if err == nil || !strings.Contains(err.Error(), "empty content") {
t.Fatalf("expected empty-content error, got %v", err)
}
}
func TestPut_RejectsOversizeBeforeDB(t *testing.T) {
db, _ := newMockDB(t)
store := pendinguploads.NewPostgres(db)
too := make([]byte, pendinguploads.MaxFileBytes+1)
_, err := store.Put(context.Background(), uuid.New(), too, "x.txt", "")
if !errors.Is(err, pendinguploads.ErrTooLarge) {
t.Fatalf("expected ErrTooLarge, got %v", err)
}
}
func TestPut_RejectsEmptyFilenameBeforeDB(t *testing.T) {
db, _ := newMockDB(t)
store := pendinguploads.NewPostgres(db)
_, err := store.Put(context.Background(), uuid.New(), []byte("hi"), "", "")
if err == nil || !strings.Contains(err.Error(), "empty filename") {
t.Fatalf("expected empty-filename error, got %v", err)
}
}
func TestPut_RejectsLongFilenameBeforeDB(t *testing.T) {
db, _ := newMockDB(t)
store := pendinguploads.NewPostgres(db)
long := strings.Repeat("a", 101)
_, err := store.Put(context.Background(), uuid.New(), []byte("hi"), long, "")
if err == nil || !strings.Contains(err.Error(), "exceeds 100 chars") {
t.Fatalf("expected too-long-filename error, got %v", err)
}
}
func TestPut_PropagatesDBError(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
mock.ExpectQuery(insertSQL).
WithArgs(uuid.Nil, sqlmock.AnyArg(), int64(2), "x", "").
WillReturnError(errors.New("connection refused"))
wsID := uuid.Nil
_, err := store.Put(context.Background(), wsID, []byte("hi"), "x", "")
if err == nil || !strings.Contains(err.Error(), "insert") {
t.Fatalf("expected wrapped insert error, got %v", err)
}
}
// ----- Get ------------------------------------------------------------------
func TestGet_HappyPath_ReturnsFullRow(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
fid := uuid.New()
wsID := uuid.New()
now := time.Now().UTC()
mock.ExpectQuery(selectSQL).
WithArgs(fid).
WillReturnRows(sqlmock.NewRows([]string{
"file_id", "workspace_id", "content", "filename", "mimetype",
"size_bytes", "created_at", "fetched_at", "acked_at", "expires_at",
}).AddRow(
fid, wsID, []byte("data"), "x.bin", "application/octet-stream",
int64(4), now, nil, nil, now.Add(24*time.Hour),
))
r, err := store.Get(context.Background(), fid)
if err != nil {
t.Fatalf("Get: %v", err)
}
if r.FileID != fid || r.WorkspaceID != wsID {
t.Errorf("ids mismatch: %+v", r)
}
if string(r.Content) != "data" || r.SizeBytes != 4 {
t.Errorf("content mismatch: %+v", r)
}
if r.FetchedAt != nil || r.AckedAt != nil {
t.Errorf("expected nil timestamps for unfetched row, got fetched=%v acked=%v", r.FetchedAt, r.AckedAt)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("expectations: %v", err)
}
}
func TestGet_AbsentRow_ReturnsErrNotFound(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
fid := uuid.New()
mock.ExpectQuery(selectSQL).
WithArgs(fid).
WillReturnError(sql.ErrNoRows)
_, err := store.Get(context.Background(), fid)
if !errors.Is(err, pendinguploads.ErrNotFound) {
t.Fatalf("expected ErrNotFound, got %v", err)
}
}
func TestGet_DBError_WrappedAndPropagated(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
mock.ExpectQuery(selectSQL).
WillReturnError(errors.New("connection lost"))
_, err := store.Get(context.Background(), uuid.New())
if err == nil || errors.Is(err, pendinguploads.ErrNotFound) || !strings.Contains(err.Error(), "select") {
t.Fatalf("expected wrapped select error, got %v", err)
}
}
// ----- MarkFetched ----------------------------------------------------------
func TestMarkFetched_HappyPath(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
fid := uuid.New()
mock.ExpectExec(markFetchedSQL).
WithArgs(fid).
WillReturnResult(sqlmock.NewResult(0, 1))
if err := store.MarkFetched(context.Background(), fid); err != nil {
t.Fatalf("MarkFetched: %v", err)
}
}
func TestMarkFetched_AbsentOrAckedOrExpired_ReturnsErrNotFound(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
fid := uuid.New()
mock.ExpectExec(markFetchedSQL).
WithArgs(fid).
WillReturnResult(sqlmock.NewResult(0, 0))
err := store.MarkFetched(context.Background(), fid)
if !errors.Is(err, pendinguploads.ErrNotFound) {
t.Fatalf("expected ErrNotFound, got %v", err)
}
}
func TestMarkFetched_DBError_Wrapped(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
mock.ExpectExec(markFetchedSQL).
WillReturnError(errors.New("pg flake"))
err := store.MarkFetched(context.Background(), uuid.New())
if err == nil || errors.Is(err, pendinguploads.ErrNotFound) || !strings.Contains(err.Error(), "mark_fetched") {
t.Fatalf("expected wrapped mark_fetched error, got %v", err)
}
}
// ----- Ack ------------------------------------------------------------------
func TestAck_FirstAck_StampsAckedAt(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
fid := uuid.New()
mock.ExpectExec(ackSQL).
WithArgs(fid).
WillReturnResult(sqlmock.NewResult(0, 1))
if err := store.Ack(context.Background(), fid); err != nil {
t.Fatalf("Ack: %v", err)
}
}
func TestAck_AlreadyAcked_IdempotentSuccess(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
fid := uuid.New()
// First UPDATE matches zero rows (already acked).
mock.ExpectExec(ackSQL).
WithArgs(fid).
WillReturnResult(sqlmock.NewResult(0, 0))
// Disambiguation SELECT finds the row with acked_at non-null.
mock.ExpectQuery(ackDisambiguateSQL).
WithArgs(fid).
WillReturnRows(sqlmock.NewRows([]string{"acked_at"}).AddRow(time.Now().UTC()))
if err := store.Ack(context.Background(), fid); err != nil {
t.Fatalf("expected idempotent success on already-acked, got %v", err)
}
}
func TestAck_AbsentOrExpired_ReturnsErrNotFound(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
fid := uuid.New()
mock.ExpectExec(ackSQL).
WithArgs(fid).
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectQuery(ackDisambiguateSQL).
WithArgs(fid).
WillReturnError(sql.ErrNoRows)
err := store.Ack(context.Background(), fid)
if !errors.Is(err, pendinguploads.ErrNotFound) {
t.Fatalf("expected ErrNotFound, got %v", err)
}
}
func TestAck_RaceWithSweep_ReturnsSuccess(t *testing.T) {
// UPDATE saw 0 rows AND the disambiguate SELECT saw a row with
// acked_at IS NULL — only possible if the GC sweep raced between
// the two queries. The contract says we honor the workspace's ACK
// intent and return success.
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
fid := uuid.New()
mock.ExpectExec(ackSQL).
WithArgs(fid).
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectQuery(ackDisambiguateSQL).
WithArgs(fid).
WillReturnRows(sqlmock.NewRows([]string{"acked_at"}).AddRow(nil))
if err := store.Ack(context.Background(), fid); err != nil {
t.Fatalf("expected race success, got %v", err)
}
}
func TestAck_DBErrorOnUpdate_Wrapped(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
mock.ExpectExec(ackSQL).
WillReturnError(errors.New("conn refused"))
err := store.Ack(context.Background(), uuid.New())
if err == nil || !strings.Contains(err.Error(), "ack:") {
t.Fatalf("expected wrapped ack error, got %v", err)
}
}
func TestMarkFetched_RowsAffectedError_Wrapped(t *testing.T) {
// Some drivers (or Result wrappers) return an error from
// RowsAffected() even when ExecContext succeeded — the contract
// says we surface that as a wrapped error rather than silently
// treating it as 0 rows (= ErrNotFound, which would mislead the
// workspace into giving up on a possibly-fetched row).
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
mock.ExpectExec(markFetchedSQL).
WillReturnResult(sqlmock.NewErrorResult(errors.New("driver doesn't support RowsAffected")))
err := store.MarkFetched(context.Background(), uuid.New())
if err == nil || !strings.Contains(err.Error(), "mark_fetched rows") {
t.Fatalf("expected wrapped rows-affected error, got %v", err)
}
}
func TestAck_RowsAffectedError_Wrapped(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
mock.ExpectExec(ackSQL).
WillReturnResult(sqlmock.NewErrorResult(errors.New("driver doesn't support RowsAffected")))
err := store.Ack(context.Background(), uuid.New())
if err == nil || !strings.Contains(err.Error(), "ack rows") {
t.Fatalf("expected wrapped rows-affected error, got %v", err)
}
}
func TestAck_DBErrorOnDisambiguate_Wrapped(t *testing.T) {
db, mock := newMockDB(t)
store := pendinguploads.NewPostgres(db)
fid := uuid.New()
mock.ExpectExec(ackSQL).
WithArgs(fid).
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectQuery(ackDisambiguateSQL).
WithArgs(fid).
WillReturnError(errors.New("connection refused"))
err := store.Ack(context.Background(), fid)
if err == nil || !strings.Contains(err.Error(), "disambiguate") {
t.Fatalf("expected wrapped disambiguate error, got %v", err)
}
}

View File

@ -13,6 +13,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/handlers"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
memwiring "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/wiring"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware"
@ -540,10 +541,20 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// streaming download (agent → user). Namespaced under /chat/ so
// the security model is obviously distinct from /files/* (which
// handles workspace config/templates and has a different caller).
chatfh := handlers.NewChatFilesHandler(tmplh)
chatfh := handlers.NewChatFilesHandler(tmplh).
WithPendingUploads(pendinguploads.NewPostgres(db.DB), broadcaster)
wsAuth.POST("/chat/uploads", chatfh.Upload)
wsAuth.GET("/chat/download", chatfh.Download)
// Phase 1 RFC: poll-mode chat upload — endpoints the workspace's
// inbox poller hits to fetch staged file content + ack delivery.
// Same wsAuth gate as the activity poll, so a token leak from
// workspace A can't read workspace B's pending uploads (the
// handler also re-checks workspace_id on each row).
puh := handlers.NewPendingUploadsHandler(pendinguploads.NewPostgres(db.DB))
wsAuth.GET("/pending-uploads/:file_id/content", puh.GetContent)
wsAuth.POST("/pending-uploads/:file_id/ack", puh.Ack)
// Plugins
pluginsDir := findPluginsDir(configsDir)
// Runtime lookup lets the plugins handler filter the registry to plugins

View File

@ -0,0 +1,11 @@
-- 20260505100000_pending_uploads.down.sql
--
-- Drops the pending_uploads table and its indexes. Any pending file
-- uploads sitting in the table at rollback time are dropped — operators
-- on poll-mode workspaces lose those attachments, but they were never
-- fetched on the workspace side (otherwise they'd be acked + about to
-- be GC'd anyway), so the practical loss is the same as a cron sweep.
DROP INDEX IF EXISTS idx_pending_uploads_expires;
DROP INDEX IF EXISTS idx_pending_uploads_workspace_unacked;
DROP TABLE IF EXISTS pending_uploads;

View File

@ -0,0 +1,103 @@
-- 20260505100000_pending_uploads.up.sql
--
-- RFC: poll-mode chat upload (counterpart to delivery_mode='poll' messaging).
--
-- Today, chat_files.go's Upload handler refuses delivery_mode != 'push'
-- with HTTP 422 "workspace has no callback URL" — external runtime
-- workspaces (laptop / behind NAT) cannot receive file attachments at all.
-- The only escape was "register with ngrok / Cloudflare tunnel + push
-- mode," which forces every external operator into infra plumbing they
-- shouldn't need.
--
-- This table is the platform-side staging layer that lets canvas → external
-- workspace file uploads ride the same poll loop the inbox already uses for
-- text messages:
--
-- 1. Canvas POSTs multipart to workspace-server.
-- 2. workspace-server parses multipart, stores each file as one
-- pending_uploads row, AND inserts a matching activity_logs row
-- (type='chat_upload_receive', request_body={file_id, filename, ...}).
-- 3. Workspace's existing inbox poller picks up the activity row.
-- 4. Workspace fetches bytes via GET /workspaces/:id/pending-uploads/:fid/content,
-- writes to /workspace/.molecule/chat-uploads/, ACKs via POST.
-- 5. Sweep cron deletes rows past expires_at OR acked_at + N hours.
--
-- Why a separate table and not bytea-on-activity_logs:
--
-- * activity_logs is text/JSON-shaped today; mixing 25 MB binary blobs
-- into request_body inflates every JOIN, every since_id scan, every
-- pgdump. The bytes need their own home.
-- * Lifecycle differs: activity_logs is durable audit history (90d+);
-- pending_uploads is transient buffer (24h default) that GCs hard.
-- Keeping them split lets each table's retention policy run
-- independently.
-- * A future PR (RFC #2789) will migrate the bytes column to S3 keys
-- without touching the activity_logs schema or the metadata columns
-- here. That migration is one ALTER + one backfill rather than a
-- cross-table rewrite.
--
-- No FK to workspaces:
-- workspace delete should NOT cascade-purge pending_uploads — those
-- rows are evidence-of-receipt and should expire on their own TTL.
-- Same posture as tenant_resources (PR #2343) and delegations (PR #2829).
CREATE TABLE IF NOT EXISTS pending_uploads (
-- Server-generated so the canvas can include the URI in the chat
-- message it sends right after the upload POST. Workspace fetches
-- by this id, no name collisions across workspaces.
file_id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
-- Target workspace. NOT a FK (see header).
workspace_id uuid NOT NULL,
-- Content lives inline today via bytea. The Go-side storage interface
-- (PendingUploadStorage) abstracts read/write so a future PR can
-- relocate this column's job to S3 (RFC #2789) by adding an `s3_key
-- text NULL` column, dual-writing for one release, then dropping
-- `content` once the backfill drains. The CHECK below pins the same
-- 25 MB per-file cap the workspace-side ingest_handler enforces
-- (workspace/internal_chat_uploads.py:198) — discrepancy between
-- the two would let the platform accept files the workspace would
-- 413 on after pull.
content bytea NOT NULL,
size_bytes bigint NOT NULL CHECK (size_bytes > 0 AND size_bytes <= 26214400),
-- Filename + mimetype mirror the workspace-side ChatUploadedFile
-- shape so the eventual InboxMessage hand-off needs no translation.
-- Filename is sanitized at write-time (matches sanitize_filename in
-- workspace/internal_chat_uploads.py); 100 char cap is the same.
filename text NOT NULL CHECK (length(filename) > 0 AND length(filename) <= 100),
mimetype text NOT NULL DEFAULT '',
created_at timestamptz NOT NULL DEFAULT now(),
-- Stamped on the GET /content request. Lets Phase 3 sweeper detect
-- "fetched but never acked" — distinct failure mode from "never
-- fetched" (workspace offline) so dashboards can split them.
fetched_at timestamptz,
-- Stamped on the POST /ack request. Terminal state for the happy
-- path. Sweep cron deletes acked rows past acked_at + retention.
acked_at timestamptz,
-- Hard TTL: rows past this are deleted regardless of ack state.
-- 24h matches the longest-observed legitimate "operator stepped
-- away from laptop" gap; tunable later via app-level config without
-- a migration. NOT acked_at + 24h — that would let a stuck-fetched
-- row live forever.
expires_at timestamptz NOT NULL DEFAULT (now() + interval '24 hours')
);
-- Hot path: workspace's poll cycle pulls "give me my unacked uploads
-- in chronological order." Partial-index because acked rows are GC
-- candidates and shouldn't bloat the working set.
CREATE INDEX IF NOT EXISTS idx_pending_uploads_workspace_unacked
ON pending_uploads (workspace_id, created_at)
WHERE acked_at IS NULL;
-- Phase 3 GC sweep hot path: list rows past expires_at, partial-indexed
-- on unacked because acked rows have a different (shorter) retention
-- and GC-via-acked_at is a separate query.
CREATE INDEX IF NOT EXISTS idx_pending_uploads_expires
ON pending_uploads (expires_at)
WHERE acked_at IS NULL;