feat(activity): chat_upload_receive flat-upload-manifest arm for attachments projection
Lint shellcheck (arm64 pilot) / shellcheck-arm64 (pilot) (push) Waiting to run
Block internal-flavored paths / Block forbidden paths (push) Successful in 4s
CI / Detect changes (push) Successful in 7s
CI / Python Lint & Test (push) Successful in 5s
E2E API Smoke Test / detect-changes (push) Successful in 7s
Handlers Postgres Integration / detect-changes (push) Successful in 7s
Harness Replays / detect-changes (push) Successful in 7s
E2E Staging Canvas (Playwright) / detect-changes (push) Successful in 8s
E2E Chat / detect-changes (push) Successful in 10s
Lint no tenant GITEA or GITHUB token write / Scan for repo-host token write into tenant workspace surface (push) Successful in 5s
Secret scan / Scan diff for credential-shaped strings (push) Successful in 5s
Lint forbidden tenant-env keys / Scan workspace_secrets writers for forbidden env keys (push) Successful in 5s
CI / Shellcheck (E2E scripts) (push) Successful in 13s
publish-workspace-server-image / build-and-push (push) Successful in 2m59s
CI / Platform (Go) (push) Successful in 5m7s
CI / Canvas (Next.js) (push) Successful in 6m7s
CI / all-required (push) Successful in 6m56s
Harness Replays / Harness Replays (push) Successful in 2s
E2E API Smoke Test / E2E API Smoke Test (push) Successful in 1m14s
E2E Staging Canvas (Playwright) / Canvas tabs E2E (push) Successful in 6s
Handlers Postgres Integration / Handlers Postgres Integration (push) Successful in 1m27s
publish-workspace-server-image / Production auto-deploy (push) Successful in 7m44s
gate-check-v3 / gate-check (push) Successful in 47s
main-red-watchdog / watchdog (push) Successful in 2m16s
E2E Chat / E2E Chat (push) Successful in 3m47s
CI / Canvas Deploy Reminder (push) Successful in 3s
Sweep stale Cloudflare DNS records / Sweep CF orphans (push) Successful in 7s
ci-required-drift / drift (push) Successful in 1m8s
Sweep stale e2e-* orgs (staging) / Sweep e2e orgs (push) Successful in 6s
Sweep stale AWS Secrets Manager secrets / Sweep AWS Secrets Manager (push) Successful in 8s
Staging SaaS smoke (every 30 min) / Staging SaaS smoke (push) Failing after 8m4s
Continuous synthetic E2E (staging) / Synthetic E2E against staging (push) Failing after 15m34s

Extends extractAttachmentsFromRequestBody to handle canvas-direct file pastes / drag-drops (method=chat_upload_receive) where request_body is a flat upload manifest {uri, name, size, file_id, mimeType} with no parts[] wrapper.

- New extractAttachmentFromFlatUploadManifest fallback arm (after the existing message-parts arm)
- mimeType (canvas camelCase) -> mime_type (snake_case) normalization
- kindFromMimeType: image/audio/video prefix -> matching kind, else file
- Min-info skip when neither uri nor name present
- message-parts arm takes precedence (pinned by test)

Approved by core-be + core-qa on b6f2b90e9d, gated on required-context CI / all-required (pull_request) == success (per fixed dispatch hygiene that reads BP required_status_check_contexts rather than combined_state).

Downstream: workspace-runtime#37 follow-up arm mirrors this for Python pre-L1 platforms; channel-plugin one-liner adds ?include=peer_info to pollWorkspace so the adapter actually receives the L1 projection.
Co-authored-by: hongming-pc2 <hongming-pc2@moleculesai.app>
Co-committed-by: hongming-pc2 <hongming-pc2@moleculesai.app>
This commit was merged in pull request #1657.
This commit is contained in:
2026-05-22 00:01:48 +00:00
parent 9981a5099a
commit a356bc94f3
2 changed files with 354 additions and 15 deletions
+100 -15
View File
@@ -68,27 +68,43 @@ func NewActivityHandler(b *events.Broadcaster) *ActivityHandler {
}
// extractAttachmentsFromRequestBody walks a JSON-RPC a2a inbound body to
// surface attachments (file/image/audio parts) as a flat `attachments[]`
// projection so callers don't have to drill into
// `request_body.params.message.parts[]` themselves.
// surface attachments (file/image/audio/video) as a flat `attachments[]`
// projection so callers don't have to drill into the request_body shape
// themselves.
//
// Shape of an a2a inbound request_body that carries attachments:
// Two body shapes are walked in order:
//
// {"jsonrpc":"2.0","method":"message/send","params":{
// "message":{"parts":[
// {"kind":"text", "text":"hi"},
// {"kind":"file", "file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}},
// {"kind":"image","file":{"uri":"workspace:bar.png","mime_type":"image/png","name":"bar.png"}},
// ]}}}
// 1. a2a-sdk v1 message-part envelope (peer_agent inbound):
//
// {"jsonrpc":"2.0","method":"message/send","params":{
// "message":{"parts":[
// {"kind":"text", "text":"hi"},
// {"kind":"file", "file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}},
// {"kind":"image","file":{"uri":"workspace:bar.png","mime_type":"image/png","name":"bar.png"}},
// ]}}}
//
// 2. canvas chat_upload_receive flat manifest (canvas_user upload):
//
// {"uri":"platform-pending:<ws>/<file>",
// "name":"pasted.png",
// "size":12345,
// "file_id":"<uuid>",
// "mimeType":"image/png"}
//
// The canvas upload pipe writes a single manifest directly at the
// root of request_body (no JSON-RPC envelope) with camelCase
// `mimeType`. We normalize to snake_case `mime_type` on the way out
// so every downstream adaptor (channel / telegram / codex / hermes)
// sees one wire shape regardless of which inbound shape produced it.
//
// Returns nil (omit-from-JSON) when the body has no attachments — the
// `?include=peer_info` envelope projects this as an array iff non-empty.
//
// Defensive on every step: any missing key / wrong-shape value returns
// nil instead of panicking. The activity_logs row could carry literally
// any JSON in request_body (legacy formats, future formats); we only
// commit to the documented a2a-sdk v1 message-part shape and silently
// skip anything else.
// Defensive on every step: any missing key / wrong-shape value falls
// through to the next arm or returns nil instead of panicking. The
// activity_logs row could carry literally any JSON in request_body
// (legacy formats, future formats); we only commit to the documented
// shapes and silently skip anything else.
func extractAttachmentsFromRequestBody(raw []byte) []map[string]interface{} {
if len(raw) == 0 {
return nil
@@ -97,6 +113,20 @@ func extractAttachmentsFromRequestBody(raw []byte) []map[string]interface{} {
if err := json.Unmarshal(raw, &body); err != nil {
return nil
}
if atts := extractAttachmentsFromMessageParts(body); len(atts) > 0 {
return atts
}
if att := extractAttachmentFromFlatUploadManifest(body); att != nil {
return []map[string]interface{}{att}
}
return nil
}
// extractAttachmentsFromMessageParts handles the a2a-sdk v1 shape:
// body.params.message.parts[]. Walks file/image/audio parts; honors v1
// `kind` and v0 `type` discriminators; accepts nested `.file` sub-object
// or inlined uri/mime_type/name on the part itself.
func extractAttachmentsFromMessageParts(body map[string]interface{}) []map[string]interface{} {
params, ok := body["params"].(map[string]interface{})
if !ok {
return nil
@@ -161,6 +191,61 @@ func extractAttachmentsFromRequestBody(raw []byte) []map[string]interface{} {
return out
}
// extractAttachmentFromFlatUploadManifest handles the canvas
// chat_upload_receive shape: a single upload manifest at the root of
// request_body with no JSON-RPC envelope. Canvas uses camelCase
// `mimeType`; we normalize to snake_case `mime_type` on emit so the
// wire shape matches the message-parts arm. Kind is derived from the
// mime prefix (image/* → "image", audio/* → "audio", video/* → "video",
// anything else → "file") because the canvas upload row doesn't carry
// an explicit discriminator. Returns nil if neither `uri` nor `file_id`
// is present at the root (i.e. not a flat upload manifest).
func extractAttachmentFromFlatUploadManifest(body map[string]interface{}) map[string]interface{} {
uri, _ := body["uri"].(string)
fileID, _ := body["file_id"].(string)
if uri == "" && fileID == "" {
return nil
}
mimeType, _ := body["mimeType"].(string)
if mimeType == "" {
// Defensive: future canvas versions might emit snake_case directly.
mimeType, _ = body["mime_type"].(string)
}
name, _ := body["name"].(string)
// Apply the same minimum-info rule as the message-parts arm: a
// manifest with neither uri nor name is non-actionable; skip.
if uri == "" && name == "" {
return nil
}
att := map[string]interface{}{"kind": kindFromMimeType(mimeType)}
if uri != "" {
att["uri"] = uri
}
if mimeType != "" {
att["mime_type"] = mimeType
}
if name != "" {
att["name"] = name
}
return att
}
// kindFromMimeType derives the attachment `kind` discriminator from a
// MIME type. Used by the flat-upload-manifest arm where the source row
// has no explicit kind field.
func kindFromMimeType(mime string) string {
switch {
case strings.HasPrefix(mime, "image/"):
return "image"
case strings.HasPrefix(mime, "audio/"):
return "audio"
case strings.HasPrefix(mime, "video/"):
return "video"
default:
return "file"
}
}
// includeFlagSet returns true iff `flag` appears in the comma-separated
// `?include=` query value. Whitespace around entries is tolerated.
// Empty `include` returns false (existing back-compat shape).
@@ -426,6 +426,260 @@ func TestActivityList_IncludePeerInfo_UnknownFlagIgnored(t *testing.T) {
}
}
// ---------- flat upload manifest (chat_upload_receive) tests ----------
func TestKindFromMimeType(t *testing.T) {
cases := []struct {
mime string
want string
}{
{"image/png", "image"},
{"image/jpeg", "image"},
{"image/", "image"}, // prefix-only is still image
{"audio/mpeg", "audio"},
{"audio/wav", "audio"},
{"video/mp4", "video"},
{"video/webm", "video"},
{"application/pdf", "file"},
{"text/plain", "file"},
{"", "file"},
{"unknown", "file"},
{"image", "file"}, // no slash → not a prefix match
}
for _, tc := range cases {
if got := kindFromMimeType(tc.mime); got != tc.want {
t.Errorf("kindFromMimeType(%q) = %q, want %q", tc.mime, got, tc.want)
}
}
}
func TestExtractAttachmentsFromRequestBody_FlatUpload_Image(t *testing.T) {
// Canvas chat_upload_receive shape: flat manifest at request_body
// root with camelCase mimeType. The empirical example was a PNG
// pasted into the canvas; surfaces here with kind=image,
// mime_type=image/png (snake-case normalized), uri preserved.
body := []byte(`{
"uri":"platform-pending:091a9180-/26111d48-",
"name":"pasted-2026-05-21T23-12-25-0-0.png",
"size":677133,
"file_id":"26111d48-",
"mimeType":"image/png"
}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 1 {
t.Fatalf("want 1 attachment, got %d: %v", len(atts), atts)
}
att := atts[0]
if att["kind"] != "image" {
t.Errorf("kind: want image, got %v", att["kind"])
}
if att["uri"] != "platform-pending:091a9180-/26111d48-" {
t.Errorf("uri: %v", att["uri"])
}
if att["mime_type"] != "image/png" {
t.Errorf("mime_type normalization (camelCase→snake_case) failed: %v", att["mime_type"])
}
if att["name"] != "pasted-2026-05-21T23-12-25-0-0.png" {
t.Errorf("name: %v", att["name"])
}
// camelCase `mimeType` MUST NOT leak into the projected envelope —
// only snake_case `mime_type` is the wire convention.
if _, present := att["mimeType"]; present {
t.Errorf("camelCase mimeType leaked into envelope: %v", att)
}
if _, present := att["file_id"]; present {
t.Errorf("file_id should not be surfaced on the attachment envelope (it's a canvas-internal id): %v", att)
}
}
func TestExtractAttachmentsFromRequestBody_FlatUpload_Audio(t *testing.T) {
body := []byte(`{"uri":"platform-pending:ws/file","name":"voice.mp3","file_id":"abc","mimeType":"audio/mpeg"}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 1 || atts[0]["kind"] != "audio" {
t.Fatalf("want audio kind, got %v", atts)
}
if atts[0]["mime_type"] != "audio/mpeg" {
t.Errorf("mime_type: %v", atts[0]["mime_type"])
}
}
func TestExtractAttachmentsFromRequestBody_FlatUpload_Video(t *testing.T) {
body := []byte(`{"uri":"platform-pending:ws/file","name":"clip.mp4","file_id":"abc","mimeType":"video/mp4"}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 1 || atts[0]["kind"] != "video" {
t.Fatalf("want video kind, got %v", atts)
}
}
func TestExtractAttachmentsFromRequestBody_FlatUpload_GenericFile(t *testing.T) {
// application/pdf has no image/audio/video prefix → kind=file
body := []byte(`{"uri":"platform-pending:ws/file","name":"doc.pdf","file_id":"abc","mimeType":"application/pdf"}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 1 || atts[0]["kind"] != "file" {
t.Fatalf("want file kind, got %v", atts)
}
}
func TestExtractAttachmentsFromRequestBody_FlatUpload_NoMimeFallsToFile(t *testing.T) {
// No mimeType at all — kind defaults to "file", mime_type omitted.
body := []byte(`{"uri":"platform-pending:ws/file","name":"unknown.bin","file_id":"abc"}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 1 {
t.Fatalf("want 1 attachment, got %d", len(atts))
}
if atts[0]["kind"] != "file" {
t.Errorf("kind: want file (default), got %v", atts[0]["kind"])
}
if _, present := atts[0]["mime_type"]; present {
t.Errorf("mime_type should be omitted when source has none, got %v", atts[0]["mime_type"])
}
}
func TestExtractAttachmentsFromRequestBody_FlatUpload_SnakeCaseMimeTypeAccepted(t *testing.T) {
// Defensive: a future canvas version (or non-canvas caller) that
// already emits snake_case mime_type should still be parsed.
body := []byte(`{"uri":"u","name":"n.png","mime_type":"image/png"}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 1 {
t.Fatalf("want 1 attachment, got %d", len(atts))
}
if atts[0]["mime_type"] != "image/png" || atts[0]["kind"] != "image" {
t.Errorf("snake_case mime_type not honored: %v", atts[0])
}
}
func TestExtractAttachmentsFromRequestBody_FlatUpload_FileIDOnlyIsSkipped(t *testing.T) {
// file_id alone (no uri AND no name) is non-actionable — the
// downstream adaptor can't render a discoverable file from just an
// internal canvas id. Skip per the same minimum-info rule the
// message-parts arm applies to empty parts.
body := []byte(`{"file_id":"orphan-uuid","mimeType":"image/png"}`)
if got := extractAttachmentsFromRequestBody(body); got != nil {
t.Errorf("file_id-only manifest must be skipped, got %v", got)
}
}
func TestExtractAttachmentsFromRequestBody_FlatUpload_NameOnlyIsKept(t *testing.T) {
// Symmetric with the message-parts arm: a name without uri is still
// useful (the downstream adaptor can render "user uploaded foo.png").
body := []byte(`{"name":"only-name.bin","file_id":"abc","mimeType":"application/octet-stream"}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 1 {
t.Fatalf("want 1 attachment, got %d", len(atts))
}
if atts[0]["name"] != "only-name.bin" {
t.Errorf("name not preserved: %v", atts[0])
}
if _, present := atts[0]["uri"]; present {
t.Errorf("uri should be omitted when absent in source, got %v", atts[0]["uri"])
}
}
func TestExtractAttachmentsFromRequestBody_MessagePartsTakesPrecedenceOverFlat(t *testing.T) {
// If a single request_body somehow has BOTH params.message.parts[]
// AND top-level uri/file_id (a pathological inbound), the
// message-parts arm wins — that's the documented inbound shape and
// it's been the only one historically extracted. The flat arm is a
// fallback for shapes that have NO parts.
body := []byte(`{
"uri":"platform-pending:should-not-win",
"file_id":"x",
"mimeType":"image/png",
"params":{"message":{"parts":[
{"kind":"file","file":{"uri":"workspace:should-win.pdf","mime_type":"application/pdf","name":"win.pdf"}}
]}}
}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 1 {
t.Fatalf("want 1 attachment (from parts[]), got %d: %v", len(atts), atts)
}
if atts[0]["uri"] != "workspace:should-win.pdf" {
t.Errorf("message-parts arm did not take precedence: %v", atts[0])
}
}
func TestActivityList_IncludePeerInfo_ChatUploadReceiveCanvasRow(t *testing.T) {
// Wire-level integration: a canvas chat_upload_receive row (canvas
// user pasted an image) with source_id NULL (canvas message), flat
// upload manifest at request_body root. The `?include=peer_info`
// projection must surface attachments[] populated from the flat-
// upload-manifest arm while peer_name / peer_role / agent_card_url
// remain absent (canvas row has no peer).
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
mock.ExpectQuery(`LEFT JOIN workspaces w ON w\.id = activity_logs\.source_id`).
WithArgs("ws-1", 100).
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "activity_type", "source_id", "target_id",
"method", "summary", "request_body", "response_body",
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
"peer_name", "peer_role",
}).
// Empirical shape from 2026-05-21 ~23:12Z agents-team canvas paste.
AddRow("act-upload", "ws-1", "chat_upload_receive", nil, "ws-1",
"chat_upload_receive", "Canvas upload: pasted-2026-05-21T23-12-25-0-0.png",
[]byte(`{
"uri":"platform-pending:091a9180-b303-4a20-aefe-3a4a675b8aa4/26111d48-aaaa-bbbb-cccc-dddddddddddd",
"name":"pasted-2026-05-21T23-12-25-0-0.png",
"size":677133,
"file_id":"26111d48-aaaa-bbbb-cccc-dddddddddddd",
"mimeType":"image/png"
}`),
nil, nil, nil, "ok", nil, time.Now(),
nil, nil))
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil)
handler.List(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse: %v", err)
}
if len(resp) != 1 {
t.Fatalf("want 1 row, got %d", len(resp))
}
r := resp[0]
// Canvas row → no peer fields.
for _, k := range []string{"peer_name", "peer_role", "agent_card_url"} {
if _, present := r[k]; present {
t.Errorf("%s must NOT appear on canvas upload row; got %v", k, r[k])
}
}
// attachments[] populated from the flat-upload arm.
atts, ok := r["attachments"].([]interface{})
if !ok {
t.Fatalf("attachments missing or wrong type: %T %v", r["attachments"], r["attachments"])
}
if len(atts) != 1 {
t.Fatalf("want 1 attachment from flat manifest, got %d: %v", len(atts), atts)
}
att := atts[0].(map[string]interface{})
if att["kind"] != "image" {
t.Errorf("kind: want image (image/png prefix), got %v", att["kind"])
}
if att["mime_type"] != "image/png" {
t.Errorf("mime_type wire shape: want snake_case image/png, got %v", att["mime_type"])
}
if att["uri"] != "platform-pending:091a9180-b303-4a20-aefe-3a4a675b8aa4/26111d48-aaaa-bbbb-cccc-dddddddddddd" {
t.Errorf("uri preserved verbatim: got %v", att["uri"])
}
if att["name"] != "pasted-2026-05-21T23-12-25-0-0.png" {
t.Errorf("name: %v", att["name"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// Sanity test using the existing test broadcaster setup — verifies the
// extractAttachments helper round-trips through json.Marshal cleanly
// (no map ordering issues, no type-coercion surprises).