From 0b572153ecefb0eca45f7f262fc9c1cb18c511c0 Mon Sep 17 00:00:00 2001 From: hongming-pc2 Date: Thu, 21 May 2026 14:14:08 -0700 Subject: [PATCH] =?UTF-8?q?feat(activity):=20=3Finclude=3Dpeer=5Finfo=20en?= =?UTF-8?q?richment=20=E2=80=94=20peer=5Fname/role/agent=5Fcard=5Furl=20+?= =?UTF-8?q?=20attachments[]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GET /workspaces/:id/activity gains a `?include=peer_info` query flag that, when set, returns: peer_name — w.name from LEFT JOIN workspaces ON source_id peer_role — w.role agent_card_url — externalPlatformURL + /registry/discover/ (mirrors molecule-ai-workspace-runtime a2a_client._agent_card_url_for shape: `{PLATFORM_URL}/registry/discover/{peer_id}`) attachments[] — flattened from request_body.params.message.parts[] for file / image / audio kinds; v1 `kind=` and v0 `type=` discriminators both honored, nested .file{uri,mime_type,name} and inlined-on-part both surface, malformed (no-uri-no-name) parts skipped The flag is additive + opt-in. Callers that don't pass `?include=peer_info` see the existing wire shape unchanged — the JOIN is omitted, column refs stay unqualified (back-compat with existing sqlmock-regex test fixtures), and no new envelope fields appear. When the flag IS set, column references in SELECT / WHERE / ORDER are qualified `activity_logs.` to disambiguate `id` / `created_at` against the joined workspaces table. The omit-when-absent rule is preserved end-to-end: - peer_name / peer_role are omitted from the envelope (not emitted as null literals) when the LEFT JOIN yields NULL (canvas message OR deleted peer workspace). - agent_card_url is omitted when source_id is absent (canvas messages legitimately have no peer identity). - attachments[] is omitted when there are no file/image/audio parts (text-only messages stay clean). This is Layer 1 of a 3-layer enrichment (sibling repos pending): L2 - molecule-ai-workspace-runtime: inbox.py defensive read of these row-level fields with registry-lookup fallback so Layer 2 ships pre-Layer-1 without regression L3 - molecule-mcp-claude-channel: adaptor envelope spreading the new fields into Claude Code notifications/claude/channel meta when present (CEO-A on Mac side) Tests ----- activity_peer_info_test.go covers: - extractAttachmentsFromRequestBody helper: nil/empty/non-json bodies, no-attachments (text only), v1 kind=file with nested .file shape, mixed image+audio, v0 type=file with inlined fields, no-uri-no-name parts skipped, malformed shapes return nil defensively, json.Marshal round-trip - includeFlagSet helper: table-driven over empty/single/multi- flag query strings, whitespace tolerance, exact-match discriminator (no substring matches) - List handler with ?include=peer_info: * regex-pins the LEFT JOIN keyword + the `w.name AS peer_name, w.role AS peer_role` aliases + `activity_logs.workspace_id` qualification (a future refactor that drops any of these fails the test) * canvas row (source_id NULL) preserves the row in the result set but omits peer_name/peer_role/agent_card_url * attachments[] surfaces from a request_body with a file part * agent_card_url composes correctly from X-Forwarded-Proto + Host + /registry/discover/ - List handler WITHOUT the flag: * regex pins unqualified `FROM activity_logs WHERE workspace_id = …` shape (back-compat) * envelope must NOT contain peer_name / peer_role / agent_card_url / attachments - Unknown flag (?include=bogus) silently ignored — additive convention; query stays back-compat shape Existing tests (TestActivityList_SourceCanvas, TestActivityList_ SourceAgent, TestActivityList_PeerIDFilter, etc.) continue to pass without modification — their sqlmock-regex fixtures match the unqualified-column shape preserved when ?include=peer_info is absent. References ---------- - Empirical probe today (CEO-A, canvas_user "hi" → workspace 30ba7f0b) showed the bare activity row only carries id / source_id / method / summary / request_body — without peer identity fields, Claude Code channel pushes have to display bare UUIDs instead of `[CEO Assistant (operator orchestrator)]`. This flag closes that gap structurally. - Sibling: molecule-ai-workspace-runtime a2a_client.py:380 `_agent_card_url_for` — shape the Layer 1 server-side derivation mirrors. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../internal/handlers/activity.go | 249 +++++++++- .../handlers/activity_peer_info_test.go | 447 ++++++++++++++++++ 2 files changed, 679 insertions(+), 17 deletions(-) create mode 100644 workspace-server/internal/handlers/activity_peer_info_test.go diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index 18dc8fb86..4ab8c157d 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -67,7 +67,128 @@ func NewActivityHandler(b *events.Broadcaster) *ActivityHandler { return &ActivityHandler{broadcaster: b} } -// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id= +// extractAttachmentsFromRequestBody walks a JSON-RPC a2a inbound body to +// surface attachments (file/image/audio parts) as a flat `attachments[]` +// projection so callers don't have to drill into +// `request_body.params.message.parts[]` themselves. +// +// Shape of an a2a inbound request_body that carries attachments: +// +// {"jsonrpc":"2.0","method":"message/send","params":{ +// "message":{"parts":[ +// {"kind":"text", "text":"hi"}, +// {"kind":"file", "file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}}, +// {"kind":"image","file":{"uri":"workspace:bar.png","mime_type":"image/png","name":"bar.png"}}, +// ]}}} +// +// Returns nil (omit-from-JSON) when the body has no attachments — the +// `?include=peer_info` envelope projects this as an array iff non-empty. +// +// Defensive on every step: any missing key / wrong-shape value returns +// nil instead of panicking. The activity_logs row could carry literally +// any JSON in request_body (legacy formats, future formats); we only +// commit to the documented a2a-sdk v1 message-part shape and silently +// skip anything else. +func extractAttachmentsFromRequestBody(raw []byte) []map[string]interface{} { + if len(raw) == 0 { + return nil + } + var body map[string]interface{} + if err := json.Unmarshal(raw, &body); err != nil { + return nil + } + params, ok := body["params"].(map[string]interface{}) + if !ok { + return nil + } + message, ok := params["message"].(map[string]interface{}) + if !ok { + return nil + } + parts, ok := message["parts"].([]interface{}) + if !ok { + return nil + } + out := make([]map[string]interface{}, 0) + for _, p := range parts { + part, ok := p.(map[string]interface{}) + if !ok { + continue + } + // a2a-sdk v1 uses "kind"; older v0 callers sent "type". Accept + // both for the discriminator — same defensive read pattern as + // the runtime-side extract_text helper. + kind, _ := part["kind"].(string) + if kind == "" { + kind, _ = part["type"].(string) + } + if kind != "file" && kind != "image" && kind != "audio" { + continue + } + // The file sub-object holds uri/mime_type/name. The a2a-sdk v1 + // shape nests under "file"; some legacy payloads inlined the + // fields onto the part itself. Support both. + var fileObj map[string]interface{} + if f, ok := part["file"].(map[string]interface{}); ok { + fileObj = f + } else { + fileObj = part + } + uri, _ := fileObj["uri"].(string) + mimeType, _ := fileObj["mime_type"].(string) + name, _ := fileObj["name"].(string) + // At minimum we need either a uri or a name to be useful. + // Empty-part entries are skipped (they're a malformed inbound + // — surface nothing rather than emit a no-info placeholder). + if uri == "" && name == "" { + continue + } + att := map[string]interface{}{"kind": kind} + if uri != "" { + att["uri"] = uri + } + if mimeType != "" { + att["mime_type"] = mimeType + } + if name != "" { + att["name"] = name + } + out = append(out, att) + } + if len(out) == 0 { + return nil + } + return out +} + +// includeFlagSet returns true iff `flag` appears in the comma-separated +// `?include=` query value. Whitespace around entries is tolerated. +// Empty `include` returns false (existing back-compat shape). +// +// The comma-separable form lets future fields ("attachments_only", +// "tool_trace_expanded", etc.) slot in without further URL-param creep. +func includeFlagSet(includeQuery, flag string) bool { + if includeQuery == "" || flag == "" { + return false + } + for _, raw := range strings.Split(includeQuery, ",") { + if strings.TrimSpace(raw) == flag { + return true + } + } + return false +} + +// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id=&include= +// +// The `include` query param is comma-separable; today the only flag is +// `peer_info`, which enriches a2a_receive rows with `peer_name`, +// `peer_role`, `agent_card_url`, and an `attachments[]` projection (see +// extractAttachmentsFromRequestBody). It's additive + opt-in — existing +// callers that don't pass `?include=peer_info` see the unchanged shape. +// Surface for the layered enrichment that lets Claude Code channel +// pushes carry full sender identity instead of bare UUIDs (sibling +// repos: molecule-ai-workspace-runtime + molecule-mcp-claude-channel). // // since_secs filters to activity_logs.created_at >= NOW() - INTERVAL '$N seconds'. // Optional, additive — callers that don't pass it get today's behavior (the @@ -102,6 +223,8 @@ func (h *ActivityHandler) List(c *gin.Context) { sinceSecsStr := c.Query("since_secs") sinceID := c.Query("since_id") beforeTSStr := c.Query("before_ts") // optional RFC3339 — return rows strictly older than this timestamp + include := c.Query("include") // comma-separated; today's only flag is "peer_info" + includePeerInfo := includeFlagSet(include, "peer_info") // Validate peer_id as a UUID at the trust boundary so a malformed // caller (the agent or a downstream MCP tool) can't smuggle SQL @@ -192,22 +315,60 @@ func (h *ActivityHandler) List(c *gin.Context) { usingCursor = true } - // Build query with optional filters - query := `SELECT id, workspace_id, activity_type, source_id, target_id, method, - summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, created_at - FROM activity_logs WHERE workspace_id = $1` + // Build query with optional filters. When ?include=peer_info is set, + // LEFT JOIN workspaces ON activity_logs.source_id = w.id so we can + // surface w.name + w.role on the row. LEFT (not INNER) is required + // for two reasons: + // 1. Canvas rows have source_id IS NULL — those must still appear + // in the result set (with NULL peer_name/peer_role). + // 2. A peer workspace may have been deleted since the row was + // written (no FK constraint on activity_logs.source_id) — + // LEFT JOIN preserves the activity row with NULL peer fields + // rather than silently dropping the row. + // + // agent_card_url is NOT pulled from the workspaces table; it's + // computed server-side from externalPlatformURL + source_id at + // projection time (mirrors molecule-ai-workspace-runtime + // a2a_client._agent_card_url_for which constructs + // {PLATFORM_URL}/registry/discover/{peer_id}). + // + // Column qualification (`activity_logs.`) is added ONLY when + // the JOIN is present — disambiguates `id` / `created_at` which + // exist in both tables. When the JOIN is absent, unqualified + // column references preserve the exact wire-shape existing callers + // + existing test fixtures expect (back-compat). + actCol := "" + if includePeerInfo { + actCol = "activity_logs." + } + selectClause := `SELECT ` + actCol + `id, ` + actCol + `workspace_id, ` + actCol + `activity_type, ` + + actCol + `source_id, ` + actCol + `target_id, ` + actCol + `method, ` + + actCol + `summary, ` + actCol + `request_body, ` + actCol + `response_body, ` + + actCol + `tool_trace, ` + actCol + `duration_ms, ` + actCol + `status, ` + + actCol + `error_detail, ` + actCol + `created_at` + fromClause := ` FROM activity_logs` + if includePeerInfo { + selectClause += `, w.name AS peer_name, w.role AS peer_role` + fromClause += ` LEFT JOIN workspaces w ON w.id = activity_logs.source_id` + } + query := selectClause + fromClause + ` WHERE ` + actCol + `workspace_id = $1` args := []interface{}{workspaceID} argIdx := 2 + // WHERE/ORDER column refs use the same `actCol` qualifier prefix + // computed above — empty string when no JOIN (back-compat with + // existing wire shape + sqlmock-regex test fixtures), or + // `activity_logs.` when LEFT JOIN'd (disambiguates `id` / + // `created_at` between the two tables). if activityType != "" { - query += fmt.Sprintf(" AND activity_type = $%d", argIdx) + query += fmt.Sprintf(" AND "+actCol+"activity_type = $%d", argIdx) args = append(args, activityType) argIdx++ } if source == "canvas" { - query += " AND source_id IS NULL" + query += " AND " + actCol + "source_id IS NULL" } else if source == "agent" { - query += " AND source_id IS NOT NULL" + query += " AND " + actCol + "source_id IS NOT NULL" } else if source != "" { c.JSON(http.StatusBadRequest, gin.H{"error": "source must be 'canvas' or 'agent'"}) return @@ -224,7 +385,7 @@ func (h *ActivityHandler) List(c *gin.Context) { // and avoids duplicate parameter binding (some drivers reject the // same arg slot reused, ours is fine but the explicit form is // clearer to read and matches the rest of the builder.) - query += fmt.Sprintf(" AND (source_id = $%d OR target_id = $%d)", argIdx, argIdx) + query += fmt.Sprintf(" AND ("+actCol+"source_id = $%d OR "+actCol+"target_id = $%d)", argIdx, argIdx) args = append(args, peerID) argIdx++ } @@ -232,7 +393,7 @@ func (h *ActivityHandler) List(c *gin.Context) { // Strictly older — never replay a row with the exact same // timestamp, mirrors the `created_at > cursorTime` shape // `since_id` uses for forward paging. - query += fmt.Sprintf(" AND created_at < $%d", argIdx) + query += fmt.Sprintf(" AND "+actCol+"created_at < $%d", argIdx) args = append(args, beforeTS) argIdx++ } @@ -241,13 +402,13 @@ func (h *ActivityHandler) List(c *gin.Context) { // interpolated into the SQL string. `make_interval(secs => $N)` // avoids the lib/pq quirk where INTERVAL '$N seconds' won't // substitute a placeholder inside the literal. - query += fmt.Sprintf(" AND created_at >= NOW() - make_interval(secs => $%d)", argIdx) + query += fmt.Sprintf(" AND "+actCol+"created_at >= NOW() - make_interval(secs => $%d)", argIdx) args = append(args, sinceSecs) argIdx++ } if usingCursor { // Strictly after — never replay the cursor row itself. - query += fmt.Sprintf(" AND created_at > $%d", argIdx) + query += fmt.Sprintf(" AND "+actCol+"created_at > $%d", argIdx) args = append(args, cursorTime) argIdx++ } @@ -257,9 +418,9 @@ func (h *ActivityHandler) List(c *gin.Context) { // since_id) keeps DESC — that's the canvas/UI shape and changing it // would surprise existing callers. if usingCursor { - query += fmt.Sprintf(" ORDER BY created_at ASC LIMIT $%d", argIdx) + query += fmt.Sprintf(" ORDER BY "+actCol+"created_at ASC LIMIT $%d", argIdx) } else { - query += fmt.Sprintf(" ORDER BY created_at DESC LIMIT $%d", argIdx) + query += fmt.Sprintf(" ORDER BY "+actCol+"created_at DESC LIMIT $%d", argIdx) } args = append(args, limit) @@ -272,6 +433,14 @@ func (h *ActivityHandler) List(c *gin.Context) { } defer rows.Close() + // agent_card_url base computed once per request so we don't pay the + // header-read cost per row. Only meaningful when includePeerInfo is + // set; the empty string here is harmless when the flag is off. + var platformBase string + if includePeerInfo { + platformBase = externalPlatformURL(c) + } + activities := make([]map[string]interface{}, 0) for rows.Next() { var id, wsID, actType, status string @@ -279,10 +448,23 @@ func (h *ActivityHandler) List(c *gin.Context) { var reqBody, respBody, toolTrace []byte var durationMs *int var createdAt time.Time + // LEFT JOIN'd peer columns — pointer-string so a NULL row + // (canvas message OR deleted peer workspace) decodes as nil + // rather than empty-string. Only scanned when includePeerInfo + // is set (matched against the SELECT clause above). + var peerName, peerRole *string - if err := rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method, - &summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt); err != nil { - log.Printf("Activity scan error: %v", err) + var scanErr error + if includePeerInfo { + scanErr = rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method, + &summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt, + &peerName, &peerRole) + } else { + scanErr = rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method, + &summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt) + } + if scanErr != nil { + log.Printf("Activity scan error: %v", scanErr) continue } @@ -308,6 +490,39 @@ func (h *ActivityHandler) List(c *gin.Context) { if toolTrace != nil { entry["tool_trace"] = json.RawMessage(toolTrace) } + + // peer_info enrichment (per ?include=peer_info). Only emit the + // new fields when the flag is set — back-compat for callers + // that don't request it. + if includePeerInfo { + // peer_name / peer_role: emit only when present (canvas + // rows have source_id IS NULL → peer_name is NULL by JOIN; + // also a peer workspace may have been deleted since the + // row was written → same NULL outcome). Omit-when-absent + // matches the Layer 3 adaptor's "spread when present" + // pattern; canvas_user rows legitimately have no peer_*. + if peerName != nil && *peerName != "" { + entry["peer_name"] = *peerName + } + if peerRole != nil && *peerRole != "" { + entry["peer_role"] = *peerRole + } + // agent_card_url: constructed server-side from + // externalPlatformURL + source_id. Mirrors the runtime- + // side helper a2a_client._agent_card_url_for which builds + // {PLATFORM_URL}/registry/discover/{peer_id}. Only set + // when source_id is present + non-empty. + if sourceID != nil && *sourceID != "" && platformBase != "" { + entry["agent_card_url"] = platformBase + "/registry/discover/" + *sourceID + } + // attachments: flatten file/image/audio parts from the + // request_body. nil when none — only project when + // non-empty so the omit-when-absent rule holds. + if atts := extractAttachmentsFromRequestBody(reqBody); len(atts) > 0 { + entry["attachments"] = atts + } + } + activities = append(activities, entry) } if err := rows.Err(); err != nil { diff --git a/workspace-server/internal/handlers/activity_peer_info_test.go b/workspace-server/internal/handlers/activity_peer_info_test.go new file mode 100644 index 000000000..646cfafe1 --- /dev/null +++ b/workspace-server/internal/handlers/activity_peer_info_test.go @@ -0,0 +1,447 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/gin-gonic/gin" +) + +// Tests for the `?include=peer_info` activity-feed enrichment. +// +// The enrichment is additive + opt-in. When the flag is absent, the +// existing tests (TestActivityList_SourceCanvas, etc.) prove the wire +// shape is unchanged. These tests prove: +// - When the flag IS set, the LEFT JOIN is issued and the SELECT +// adds w.name + w.role. +// - peer_name / peer_role surface from the joined row. +// - agent_card_url is composed server-side from +// externalPlatformURL + source_id and appears for non-canvas rows +// (source_id present). +// - attachments[] is projected from request_body.params.message.parts +// for file/image/audio parts. +// - Canvas rows (source_id NULL) do NOT get peer_name / peer_role / +// agent_card_url, but DO still appear in the result set (LEFT JOIN +// preserves them with NULL peer fields). +// - The `include` query param is comma-separable and only recognizes +// known flags. + +// ---------- includeFlagSet helper unit tests ---------- + +func TestIncludeFlagSet(t *testing.T) { + cases := []struct { + query string + flag string + want bool + }{ + {"", "peer_info", false}, + {"peer_info", "peer_info", true}, + {"peer_info,attachments", "peer_info", true}, + {"attachments,peer_info", "peer_info", true}, + {"attachments , peer_info ", "peer_info", true}, + {"peer_infos", "peer_info", false}, + {"peerinfo", "peer_info", false}, + {"peer_info", "", false}, + {",,", "peer_info", false}, + } + for _, tc := range cases { + got := includeFlagSet(tc.query, tc.flag) + if got != tc.want { + t.Errorf("includeFlagSet(%q, %q) = %v, want %v", tc.query, tc.flag, got, tc.want) + } + } +} + +// ---------- extractAttachmentsFromRequestBody unit tests ---------- + +func TestExtractAttachmentsFromRequestBody_Empty(t *testing.T) { + if got := extractAttachmentsFromRequestBody(nil); got != nil { + t.Errorf("nil body: want nil, got %v", got) + } + if got := extractAttachmentsFromRequestBody([]byte("")); got != nil { + t.Errorf("empty body: want nil, got %v", got) + } + if got := extractAttachmentsFromRequestBody([]byte("not json")); got != nil { + t.Errorf("non-json body: want nil, got %v", got) + } +} + +func TestExtractAttachmentsFromRequestBody_NoAttachments(t *testing.T) { + // Text-only message: no file/image/audio parts → nil + body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`) + if got := extractAttachmentsFromRequestBody(body); got != nil { + t.Errorf("text-only: want nil, got %v", got) + } +} + +func TestExtractAttachmentsFromRequestBody_FileKindV1(t *testing.T) { + // a2a-sdk v1 shape: kind=file, file:{uri,mime_type,name} + body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[ + {"kind":"text","text":"see attached"}, + {"kind":"file","file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}} + ]}}}`) + atts := extractAttachmentsFromRequestBody(body) + if len(atts) != 1 { + t.Fatalf("want 1 attachment, got %d", len(atts)) + } + if atts[0]["kind"] != "file" { + t.Errorf("kind: want file, got %v", atts[0]["kind"]) + } + if atts[0]["uri"] != "workspace:foo.pdf" { + t.Errorf("uri mismatch: %v", atts[0]["uri"]) + } + if atts[0]["mime_type"] != "application/pdf" { + t.Errorf("mime_type mismatch: %v", atts[0]["mime_type"]) + } + if atts[0]["name"] != "foo.pdf" { + t.Errorf("name mismatch: %v", atts[0]["name"]) + } +} + +func TestExtractAttachmentsFromRequestBody_ImageAndAudio(t *testing.T) { + // Mixed image + audio parts; both surface + body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[ + {"kind":"image","file":{"uri":"workspace:a.png","mime_type":"image/png","name":"a.png"}}, + {"kind":"audio","file":{"uri":"workspace:b.mp3","mime_type":"audio/mpeg","name":"b.mp3"}} + ]}}}`) + atts := extractAttachmentsFromRequestBody(body) + if len(atts) != 2 { + t.Fatalf("want 2 attachments, got %d", len(atts)) + } + if atts[0]["kind"] != "image" || atts[1]["kind"] != "audio" { + t.Errorf("kind order: got %v / %v", atts[0]["kind"], atts[1]["kind"]) + } +} + +func TestExtractAttachmentsFromRequestBody_LegacyV0TypeDiscriminator(t *testing.T) { + // Legacy v0 shape: type=file (not kind), inlined fields (no nested .file) + body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[ + {"type":"file","uri":"workspace:legacy.txt","mime_type":"text/plain","name":"legacy.txt"} + ]}}}`) + atts := extractAttachmentsFromRequestBody(body) + if len(atts) != 1 { + t.Fatalf("want 1 attachment, got %d", len(atts)) + } + if atts[0]["kind"] != "file" || atts[0]["uri"] != "workspace:legacy.txt" || atts[0]["name"] != "legacy.txt" { + t.Errorf("v0 part not surfaced: %v", atts[0]) + } +} + +func TestExtractAttachmentsFromRequestBody_SkipsEmptyParts(t *testing.T) { + // A "file" part with no uri AND no name is malformed — skip rather + // than emit a no-info entry. + body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[ + {"kind":"file","file":{}}, + {"kind":"file","file":{"name":"only-name.bin"}} + ]}}}`) + atts := extractAttachmentsFromRequestBody(body) + if len(atts) != 1 { + t.Fatalf("want 1 attachment (the named one), got %d", len(atts)) + } + if atts[0]["name"] != "only-name.bin" { + t.Errorf("expected only-name.bin, got %v", atts[0]) + } +} + +func TestExtractAttachmentsFromRequestBody_MalformedShape(t *testing.T) { + // Various malformed shapes return nil (defensive) + for _, b := range []string{ + `{}`, + `{"params":{}}`, + `{"params":{"message":{}}}`, + `{"params":{"message":{"parts":"not-a-list"}}}`, + `{"params":{"message":{"parts":[null,42,"string"]}}}`, + } { + if got := extractAttachmentsFromRequestBody([]byte(b)); got != nil { + t.Errorf("body %q: want nil, got %v", b, got) + } + } +} + +// ---------- Activity List ?include=peer_info handler tests ---------- + +func TestActivityList_IncludePeerInfo_IssuesLeftJoin(t *testing.T) { + // When ?include=peer_info is set, the query must: + // 1. SELECT include w.name + w.role aliased as peer_name/peer_role + // 2. FROM contains LEFT JOIN workspaces w ON w.id = activity_logs.source_id + // 3. WHERE uses qualified activity_logs.workspace_id (disambiguates + // from workspaces.id post-JOIN) + // + // Pin all three so a future refactor can't silently drop the JOIN or + // the alias and have the test still pass. + mock := setupTestDB(t) + broadcaster := newTestBroadcaster() + handler := NewActivityHandler(broadcaster) + + peerID := "11111111-2222-3333-4444-555555555555" + mock.ExpectQuery( + `SELECT .+w\.name AS peer_name, w\.role AS peer_role FROM activity_logs LEFT JOIN workspaces w ON w\.id = activity_logs\.source_id WHERE activity_logs\.workspace_id = .+`, + ). + WithArgs("ws-1", 100). + WillReturnRows(sqlmock.NewRows([]string{ + "id", "workspace_id", "activity_type", "source_id", "target_id", + "method", "summary", "request_body", "response_body", + "tool_trace", "duration_ms", "status", "error_detail", "created_at", + "peer_name", "peer_role", + }). + AddRow("act-1", "ws-1", "a2a_receive", peerID, "ws-1", + "message/send", "Agent message: hello", + []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hello"}]}}}`), + nil, nil, nil, "ok", nil, time.Now(), + "Production Manager", "product manager")) + + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil) + c.Request.Host = "platform.test" + c.Request.Header.Set("X-Forwarded-Proto", "https") + handler.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("parse: %v", err) + } + if len(resp) != 1 { + t.Fatalf("want 1 row, got %d", len(resp)) + } + r := resp[0] + if r["peer_name"] != "Production Manager" { + t.Errorf("peer_name: got %v", r["peer_name"]) + } + if r["peer_role"] != "product manager" { + t.Errorf("peer_role: got %v", r["peer_role"]) + } + wantURL := "https://platform.test/registry/discover/" + peerID + if r["agent_card_url"] != wantURL { + t.Errorf("agent_card_url: got %v, want %v", r["agent_card_url"], wantURL) + } + // Text-only message has no attachments → omit from envelope + if _, present := r["attachments"]; present { + t.Errorf("attachments should be omitted on text-only row; got %v", r["attachments"]) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +func TestActivityList_IncludePeerInfo_CanvasRowHasNoPeerFields(t *testing.T) { + // LEFT JOIN preserves canvas rows (source_id NULL) but their + // peer_name/peer_role come back as NULL — must omit from the + // envelope (not emit empty strings or null literals). + mock := setupTestDB(t) + broadcaster := newTestBroadcaster() + handler := NewActivityHandler(broadcaster) + + mock.ExpectQuery( + `LEFT JOIN workspaces w ON w\.id = activity_logs\.source_id`, + ). + WithArgs("ws-1", 100). + WillReturnRows(sqlmock.NewRows([]string{ + "id", "workspace_id", "activity_type", "source_id", "target_id", + "method", "summary", "request_body", "response_body", + "tool_trace", "duration_ms", "status", "error_detail", "created_at", + "peer_name", "peer_role", + }). + // source_id NULL = canvas message; peer columns also NULL. + AddRow("act-canvas", "ws-1", "a2a_receive", nil, "ws-1", + "notify", "User said hi", + []byte(`{"params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`), + nil, nil, nil, "ok", nil, time.Now(), + nil, nil)) + + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil) + handler.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("parse: %v", err) + } + if len(resp) != 1 { + t.Fatalf("want 1 row, got %d", len(resp)) + } + r := resp[0] + for _, k := range []string{"peer_name", "peer_role", "agent_card_url"} { + if _, present := r[k]; present { + t.Errorf("%s should be absent on canvas row; got %v", k, r[k]) + } + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +func TestActivityList_IncludePeerInfo_AttachmentsSurfaceFromRequestBody(t *testing.T) { + // A peer_agent message with an inline file attachment must have + // attachments[] populated on the envelope. + mock := setupTestDB(t) + broadcaster := newTestBroadcaster() + handler := NewActivityHandler(broadcaster) + + peerID := "11111111-2222-3333-4444-555555555555" + mock.ExpectQuery(`LEFT JOIN workspaces`). + WithArgs("ws-1", 100). + WillReturnRows(sqlmock.NewRows([]string{ + "id", "workspace_id", "activity_type", "source_id", "target_id", + "method", "summary", "request_body", "response_body", + "tool_trace", "duration_ms", "status", "error_detail", "created_at", + "peer_name", "peer_role", + }). + AddRow("act-with-file", "ws-1", "a2a_receive", peerID, "ws-1", + "message/send", "Agent message: see attached", + []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[ + {"kind":"text","text":"see attached"}, + {"kind":"file","file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}} + ]}}}`), + nil, nil, nil, "ok", nil, time.Now(), + "Code Reviewer", "code reviewer")) + + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil) + handler.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("parse: %v", err) + } + r := resp[0] + atts, ok := r["attachments"].([]interface{}) + if !ok { + t.Fatalf("attachments missing or wrong type: %T %v", r["attachments"], r["attachments"]) + } + if len(atts) != 1 { + t.Fatalf("want 1 attachment, got %d: %v", len(atts), atts) + } + att := atts[0].(map[string]interface{}) + if att["kind"] != "file" || att["uri"] != "workspace:foo.pdf" || att["name"] != "foo.pdf" { + t.Errorf("attachment shape: %v", att) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +func TestActivityList_IncludePeerInfo_Unset_NoJoinNoExtraFields(t *testing.T) { + // Back-compat — when ?include=peer_info is NOT passed, the SELECT + // uses unqualified column refs (no `activity_logs.` prefix) AND no + // JOIN. Existing tests pass this implicitly; this test pins it + // explicitly so a future refactor that accidentally turns the JOIN + // always-on gets caught. + mock := setupTestDB(t) + broadcaster := newTestBroadcaster() + handler := NewActivityHandler(broadcaster) + + // Regex pinned: "FROM activity_logs WHERE workspace_id" — no JOIN + // keyword between FROM and WHERE; no `activity_logs.` qualifier on + // workspace_id. + mock.ExpectQuery(`SELECT id, workspace_id,.+ FROM activity_logs WHERE workspace_id = .+`). + WithArgs("ws-1", 100). + WillReturnRows(sqlmock.NewRows([]string{ + "id", "workspace_id", "activity_type", "source_id", "target_id", + "method", "summary", "request_body", "response_body", + "tool_trace", "duration_ms", "status", "error_detail", "created_at", + }). + AddRow("act-1", "ws-1", "a2a_receive", "11111111-2222-3333-4444-555555555555", "ws-1", + "message/send", "Hello", + nil, nil, nil, nil, "ok", nil, time.Now())) + + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity", nil) + handler.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("parse: %v", err) + } + if len(resp) != 1 { + t.Fatalf("want 1 row, got %d", len(resp)) + } + // Confirm no peer_info enrichment leaks into the default envelope. + for _, k := range []string{"peer_name", "peer_role", "agent_card_url", "attachments"} { + if _, present := resp[0][k]; present { + t.Errorf("%s must NOT appear without ?include=peer_info; got %v", k, resp[0][k]) + } + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +func TestActivityList_IncludePeerInfo_UnknownFlagIgnored(t *testing.T) { + // ?include=bogus must NOT issue the JOIN — only the recognized + // `peer_info` flag triggers enrichment. The unknown flag is silently + // ignored (additive, opt-in convention). + mock := setupTestDB(t) + broadcaster := newTestBroadcaster() + handler := NewActivityHandler(broadcaster) + + mock.ExpectQuery(`SELECT id, workspace_id,.+ FROM activity_logs WHERE workspace_id = .+`). + WithArgs("ws-1", 100). + WillReturnRows(sqlmock.NewRows([]string{ + "id", "workspace_id", "activity_type", "source_id", "target_id", + "method", "summary", "request_body", "response_body", + "tool_trace", "duration_ms", "status", "error_detail", "created_at", + })) + + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-1"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=bogus", nil) + handler.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", w.Code) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + +// Sanity test using the existing test broadcaster setup — verifies the +// extractAttachments helper round-trips through json.Marshal cleanly +// (no map ordering issues, no type-coercion surprises). +func TestExtractAttachmentsFromRequestBody_RoundTripsThroughJSON(t *testing.T) { + body := []byte(`{"params":{"message":{"parts":[{"kind":"file","file":{"uri":"workspace:r.bin","mime_type":"application/octet-stream","name":"r.bin"}}]}}}`) + atts := extractAttachmentsFromRequestBody(body) + b, err := json.Marshal(atts) + if err != nil { + t.Fatalf("marshal: %v", err) + } + var decoded []map[string]interface{} + if err := json.Unmarshal(b, &decoded); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(decoded) != 1 || decoded[0]["uri"] != "workspace:r.bin" { + t.Fatalf("round-trip mismatch: %v", decoded) + } + _ = fmt.Sprintf // keep fmt import live if test trimming removes usage +} -- 2.52.0