diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go
index 18dc8fb86..4ab8c157d 100644
--- a/workspace-server/internal/handlers/activity.go
+++ b/workspace-server/internal/handlers/activity.go
@@ -67,7 +67,128 @@ func NewActivityHandler(b *events.Broadcaster) *ActivityHandler {
return &ActivityHandler{broadcaster: b}
}
-// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id=
+// extractAttachmentsFromRequestBody walks a JSON-RPC a2a inbound body to
+// surface attachments (file/image/audio parts) as a flat `attachments[]`
+// projection so callers don't have to drill into
+// `request_body.params.message.parts[]` themselves.
+//
+// Shape of an a2a inbound request_body that carries attachments:
+//
+// {"jsonrpc":"2.0","method":"message/send","params":{
+// "message":{"parts":[
+// {"kind":"text", "text":"hi"},
+// {"kind":"file", "file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}},
+// {"kind":"image","file":{"uri":"workspace:bar.png","mime_type":"image/png","name":"bar.png"}},
+// ]}}}
+//
+// Returns nil (omit-from-JSON) when the body has no attachments — the
+// `?include=peer_info` envelope projects this as an array iff non-empty.
+//
+// Defensive on every step: any missing key / wrong-shape value returns
+// nil instead of panicking. The activity_logs row could carry literally
+// any JSON in request_body (legacy formats, future formats); we only
+// commit to the documented a2a-sdk v1 message-part shape and silently
+// skip anything else.
+func extractAttachmentsFromRequestBody(raw []byte) []map[string]interface{} {
+ if len(raw) == 0 {
+ return nil
+ }
+ var body map[string]interface{}
+ if err := json.Unmarshal(raw, &body); err != nil {
+ return nil
+ }
+ params, ok := body["params"].(map[string]interface{})
+ if !ok {
+ return nil
+ }
+ message, ok := params["message"].(map[string]interface{})
+ if !ok {
+ return nil
+ }
+ parts, ok := message["parts"].([]interface{})
+ if !ok {
+ return nil
+ }
+ out := make([]map[string]interface{}, 0)
+ for _, p := range parts {
+ part, ok := p.(map[string]interface{})
+ if !ok {
+ continue
+ }
+ // a2a-sdk v1 uses "kind"; older v0 callers sent "type". Accept
+ // both for the discriminator — same defensive read pattern as
+ // the runtime-side extract_text helper.
+ kind, _ := part["kind"].(string)
+ if kind == "" {
+ kind, _ = part["type"].(string)
+ }
+ if kind != "file" && kind != "image" && kind != "audio" {
+ continue
+ }
+ // The file sub-object holds uri/mime_type/name. The a2a-sdk v1
+ // shape nests under "file"; some legacy payloads inlined the
+ // fields onto the part itself. Support both.
+ var fileObj map[string]interface{}
+ if f, ok := part["file"].(map[string]interface{}); ok {
+ fileObj = f
+ } else {
+ fileObj = part
+ }
+ uri, _ := fileObj["uri"].(string)
+ mimeType, _ := fileObj["mime_type"].(string)
+ name, _ := fileObj["name"].(string)
+ // At minimum we need either a uri or a name to be useful.
+ // Empty-part entries are skipped (they're a malformed inbound
+ // — surface nothing rather than emit a no-info placeholder).
+ if uri == "" && name == "" {
+ continue
+ }
+ att := map[string]interface{}{"kind": kind}
+ if uri != "" {
+ att["uri"] = uri
+ }
+ if mimeType != "" {
+ att["mime_type"] = mimeType
+ }
+ if name != "" {
+ att["name"] = name
+ }
+ out = append(out, att)
+ }
+ if len(out) == 0 {
+ return nil
+ }
+ return out
+}
+
+// includeFlagSet returns true iff `flag` appears in the comma-separated
+// `?include=` query value. Whitespace around entries is tolerated.
+// Empty `include` returns false (existing back-compat shape).
+//
+// The comma-separable form lets future fields ("attachments_only",
+// "tool_trace_expanded", etc.) slot in without further URL-param creep.
+func includeFlagSet(includeQuery, flag string) bool {
+ if includeQuery == "" || flag == "" {
+ return false
+ }
+ for _, raw := range strings.Split(includeQuery, ",") {
+ if strings.TrimSpace(raw) == flag {
+ return true
+ }
+ }
+ return false
+}
+
+// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id=&include=
+//
+// The `include` query param is comma-separable; today the only flag is
+// `peer_info`, which enriches a2a_receive rows with `peer_name`,
+// `peer_role`, `agent_card_url`, and an `attachments[]` projection (see
+// extractAttachmentsFromRequestBody). It's additive + opt-in — existing
+// callers that don't pass `?include=peer_info` see the unchanged shape.
+// Surface for the layered enrichment that lets Claude Code channel
+// pushes carry full sender identity instead of bare UUIDs (sibling
+// repos: molecule-ai-workspace-runtime + molecule-mcp-claude-channel).
//
// since_secs filters to activity_logs.created_at >= NOW() - INTERVAL '$N seconds'.
// Optional, additive — callers that don't pass it get today's behavior (the
@@ -102,6 +223,8 @@ func (h *ActivityHandler) List(c *gin.Context) {
sinceSecsStr := c.Query("since_secs")
sinceID := c.Query("since_id")
beforeTSStr := c.Query("before_ts") // optional RFC3339 — return rows strictly older than this timestamp
+ include := c.Query("include") // comma-separated; today's only flag is "peer_info"
+ includePeerInfo := includeFlagSet(include, "peer_info")
// Validate peer_id as a UUID at the trust boundary so a malformed
// caller (the agent or a downstream MCP tool) can't smuggle SQL
@@ -192,22 +315,60 @@ func (h *ActivityHandler) List(c *gin.Context) {
usingCursor = true
}
- // Build query with optional filters
- query := `SELECT id, workspace_id, activity_type, source_id, target_id, method,
- summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, created_at
- FROM activity_logs WHERE workspace_id = $1`
+ // Build query with optional filters. When ?include=peer_info is set,
+ // LEFT JOIN workspaces ON activity_logs.source_id = w.id so we can
+ // surface w.name + w.role on the row. LEFT (not INNER) is required
+ // for two reasons:
+ // 1. Canvas rows have source_id IS NULL — those must still appear
+ // in the result set (with NULL peer_name/peer_role).
+ // 2. A peer workspace may have been deleted since the row was
+ // written (no FK constraint on activity_logs.source_id) —
+ // LEFT JOIN preserves the activity row with NULL peer fields
+ // rather than silently dropping the row.
+ //
+ // agent_card_url is NOT pulled from the workspaces table; it's
+ // computed server-side from externalPlatformURL + source_id at
+ // projection time (mirrors molecule-ai-workspace-runtime
+ // a2a_client._agent_card_url_for which constructs
+ // {PLATFORM_URL}/registry/discover/{peer_id}).
+ //
+ // Column qualification (`activity_logs.
`) is added ONLY when
+ // the JOIN is present — disambiguates `id` / `created_at` which
+ // exist in both tables. When the JOIN is absent, unqualified
+ // column references preserve the exact wire-shape existing callers
+ // + existing test fixtures expect (back-compat).
+ actCol := ""
+ if includePeerInfo {
+ actCol = "activity_logs."
+ }
+ selectClause := `SELECT ` + actCol + `id, ` + actCol + `workspace_id, ` + actCol + `activity_type, ` +
+ actCol + `source_id, ` + actCol + `target_id, ` + actCol + `method, ` +
+ actCol + `summary, ` + actCol + `request_body, ` + actCol + `response_body, ` +
+ actCol + `tool_trace, ` + actCol + `duration_ms, ` + actCol + `status, ` +
+ actCol + `error_detail, ` + actCol + `created_at`
+ fromClause := ` FROM activity_logs`
+ if includePeerInfo {
+ selectClause += `, w.name AS peer_name, w.role AS peer_role`
+ fromClause += ` LEFT JOIN workspaces w ON w.id = activity_logs.source_id`
+ }
+ query := selectClause + fromClause + ` WHERE ` + actCol + `workspace_id = $1`
args := []interface{}{workspaceID}
argIdx := 2
+ // WHERE/ORDER column refs use the same `actCol` qualifier prefix
+ // computed above — empty string when no JOIN (back-compat with
+ // existing wire shape + sqlmock-regex test fixtures), or
+ // `activity_logs.` when LEFT JOIN'd (disambiguates `id` /
+ // `created_at` between the two tables).
if activityType != "" {
- query += fmt.Sprintf(" AND activity_type = $%d", argIdx)
+ query += fmt.Sprintf(" AND "+actCol+"activity_type = $%d", argIdx)
args = append(args, activityType)
argIdx++
}
if source == "canvas" {
- query += " AND source_id IS NULL"
+ query += " AND " + actCol + "source_id IS NULL"
} else if source == "agent" {
- query += " AND source_id IS NOT NULL"
+ query += " AND " + actCol + "source_id IS NOT NULL"
} else if source != "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "source must be 'canvas' or 'agent'"})
return
@@ -224,7 +385,7 @@ func (h *ActivityHandler) List(c *gin.Context) {
// and avoids duplicate parameter binding (some drivers reject the
// same arg slot reused, ours is fine but the explicit form is
// clearer to read and matches the rest of the builder.)
- query += fmt.Sprintf(" AND (source_id = $%d OR target_id = $%d)", argIdx, argIdx)
+ query += fmt.Sprintf(" AND ("+actCol+"source_id = $%d OR "+actCol+"target_id = $%d)", argIdx, argIdx)
args = append(args, peerID)
argIdx++
}
@@ -232,7 +393,7 @@ func (h *ActivityHandler) List(c *gin.Context) {
// Strictly older — never replay a row with the exact same
// timestamp, mirrors the `created_at > cursorTime` shape
// `since_id` uses for forward paging.
- query += fmt.Sprintf(" AND created_at < $%d", argIdx)
+ query += fmt.Sprintf(" AND "+actCol+"created_at < $%d", argIdx)
args = append(args, beforeTS)
argIdx++
}
@@ -241,13 +402,13 @@ func (h *ActivityHandler) List(c *gin.Context) {
// interpolated into the SQL string. `make_interval(secs => $N)`
// avoids the lib/pq quirk where INTERVAL '$N seconds' won't
// substitute a placeholder inside the literal.
- query += fmt.Sprintf(" AND created_at >= NOW() - make_interval(secs => $%d)", argIdx)
+ query += fmt.Sprintf(" AND "+actCol+"created_at >= NOW() - make_interval(secs => $%d)", argIdx)
args = append(args, sinceSecs)
argIdx++
}
if usingCursor {
// Strictly after — never replay the cursor row itself.
- query += fmt.Sprintf(" AND created_at > $%d", argIdx)
+ query += fmt.Sprintf(" AND "+actCol+"created_at > $%d", argIdx)
args = append(args, cursorTime)
argIdx++
}
@@ -257,9 +418,9 @@ func (h *ActivityHandler) List(c *gin.Context) {
// since_id) keeps DESC — that's the canvas/UI shape and changing it
// would surprise existing callers.
if usingCursor {
- query += fmt.Sprintf(" ORDER BY created_at ASC LIMIT $%d", argIdx)
+ query += fmt.Sprintf(" ORDER BY "+actCol+"created_at ASC LIMIT $%d", argIdx)
} else {
- query += fmt.Sprintf(" ORDER BY created_at DESC LIMIT $%d", argIdx)
+ query += fmt.Sprintf(" ORDER BY "+actCol+"created_at DESC LIMIT $%d", argIdx)
}
args = append(args, limit)
@@ -272,6 +433,14 @@ func (h *ActivityHandler) List(c *gin.Context) {
}
defer rows.Close()
+ // agent_card_url base computed once per request so we don't pay the
+ // header-read cost per row. Only meaningful when includePeerInfo is
+ // set; the empty string here is harmless when the flag is off.
+ var platformBase string
+ if includePeerInfo {
+ platformBase = externalPlatformURL(c)
+ }
+
activities := make([]map[string]interface{}, 0)
for rows.Next() {
var id, wsID, actType, status string
@@ -279,10 +448,23 @@ func (h *ActivityHandler) List(c *gin.Context) {
var reqBody, respBody, toolTrace []byte
var durationMs *int
var createdAt time.Time
+ // LEFT JOIN'd peer columns — pointer-string so a NULL row
+ // (canvas message OR deleted peer workspace) decodes as nil
+ // rather than empty-string. Only scanned when includePeerInfo
+ // is set (matched against the SELECT clause above).
+ var peerName, peerRole *string
- if err := rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method,
- &summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt); err != nil {
- log.Printf("Activity scan error: %v", err)
+ var scanErr error
+ if includePeerInfo {
+ scanErr = rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method,
+ &summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt,
+ &peerName, &peerRole)
+ } else {
+ scanErr = rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method,
+ &summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt)
+ }
+ if scanErr != nil {
+ log.Printf("Activity scan error: %v", scanErr)
continue
}
@@ -308,6 +490,39 @@ func (h *ActivityHandler) List(c *gin.Context) {
if toolTrace != nil {
entry["tool_trace"] = json.RawMessage(toolTrace)
}
+
+ // peer_info enrichment (per ?include=peer_info). Only emit the
+ // new fields when the flag is set — back-compat for callers
+ // that don't request it.
+ if includePeerInfo {
+ // peer_name / peer_role: emit only when present (canvas
+ // rows have source_id IS NULL → peer_name is NULL by JOIN;
+ // also a peer workspace may have been deleted since the
+ // row was written → same NULL outcome). Omit-when-absent
+ // matches the Layer 3 adaptor's "spread when present"
+ // pattern; canvas_user rows legitimately have no peer_*.
+ if peerName != nil && *peerName != "" {
+ entry["peer_name"] = *peerName
+ }
+ if peerRole != nil && *peerRole != "" {
+ entry["peer_role"] = *peerRole
+ }
+ // agent_card_url: constructed server-side from
+ // externalPlatformURL + source_id. Mirrors the runtime-
+ // side helper a2a_client._agent_card_url_for which builds
+ // {PLATFORM_URL}/registry/discover/{peer_id}. Only set
+ // when source_id is present + non-empty.
+ if sourceID != nil && *sourceID != "" && platformBase != "" {
+ entry["agent_card_url"] = platformBase + "/registry/discover/" + *sourceID
+ }
+ // attachments: flatten file/image/audio parts from the
+ // request_body. nil when none — only project when
+ // non-empty so the omit-when-absent rule holds.
+ if atts := extractAttachmentsFromRequestBody(reqBody); len(atts) > 0 {
+ entry["attachments"] = atts
+ }
+ }
+
activities = append(activities, entry)
}
if err := rows.Err(); err != nil {
diff --git a/workspace-server/internal/handlers/activity_peer_info_test.go b/workspace-server/internal/handlers/activity_peer_info_test.go
new file mode 100644
index 000000000..646cfafe1
--- /dev/null
+++ b/workspace-server/internal/handlers/activity_peer_info_test.go
@@ -0,0 +1,447 @@
+package handlers
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+
+ "github.com/DATA-DOG/go-sqlmock"
+ "github.com/gin-gonic/gin"
+)
+
+// Tests for the `?include=peer_info` activity-feed enrichment.
+//
+// The enrichment is additive + opt-in. When the flag is absent, the
+// existing tests (TestActivityList_SourceCanvas, etc.) prove the wire
+// shape is unchanged. These tests prove:
+// - When the flag IS set, the LEFT JOIN is issued and the SELECT
+// adds w.name + w.role.
+// - peer_name / peer_role surface from the joined row.
+// - agent_card_url is composed server-side from
+// externalPlatformURL + source_id and appears for non-canvas rows
+// (source_id present).
+// - attachments[] is projected from request_body.params.message.parts
+// for file/image/audio parts.
+// - Canvas rows (source_id NULL) do NOT get peer_name / peer_role /
+// agent_card_url, but DO still appear in the result set (LEFT JOIN
+// preserves them with NULL peer fields).
+// - The `include` query param is comma-separable and only recognizes
+// known flags.
+
+// ---------- includeFlagSet helper unit tests ----------
+
+func TestIncludeFlagSet(t *testing.T) {
+ cases := []struct {
+ query string
+ flag string
+ want bool
+ }{
+ {"", "peer_info", false},
+ {"peer_info", "peer_info", true},
+ {"peer_info,attachments", "peer_info", true},
+ {"attachments,peer_info", "peer_info", true},
+ {"attachments , peer_info ", "peer_info", true},
+ {"peer_infos", "peer_info", false},
+ {"peerinfo", "peer_info", false},
+ {"peer_info", "", false},
+ {",,", "peer_info", false},
+ }
+ for _, tc := range cases {
+ got := includeFlagSet(tc.query, tc.flag)
+ if got != tc.want {
+ t.Errorf("includeFlagSet(%q, %q) = %v, want %v", tc.query, tc.flag, got, tc.want)
+ }
+ }
+}
+
+// ---------- extractAttachmentsFromRequestBody unit tests ----------
+
+func TestExtractAttachmentsFromRequestBody_Empty(t *testing.T) {
+ if got := extractAttachmentsFromRequestBody(nil); got != nil {
+ t.Errorf("nil body: want nil, got %v", got)
+ }
+ if got := extractAttachmentsFromRequestBody([]byte("")); got != nil {
+ t.Errorf("empty body: want nil, got %v", got)
+ }
+ if got := extractAttachmentsFromRequestBody([]byte("not json")); got != nil {
+ t.Errorf("non-json body: want nil, got %v", got)
+ }
+}
+
+func TestExtractAttachmentsFromRequestBody_NoAttachments(t *testing.T) {
+ // Text-only message: no file/image/audio parts → nil
+ body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`)
+ if got := extractAttachmentsFromRequestBody(body); got != nil {
+ t.Errorf("text-only: want nil, got %v", got)
+ }
+}
+
+func TestExtractAttachmentsFromRequestBody_FileKindV1(t *testing.T) {
+ // a2a-sdk v1 shape: kind=file, file:{uri,mime_type,name}
+ body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
+ {"kind":"text","text":"see attached"},
+ {"kind":"file","file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}}
+ ]}}}`)
+ atts := extractAttachmentsFromRequestBody(body)
+ if len(atts) != 1 {
+ t.Fatalf("want 1 attachment, got %d", len(atts))
+ }
+ if atts[0]["kind"] != "file" {
+ t.Errorf("kind: want file, got %v", atts[0]["kind"])
+ }
+ if atts[0]["uri"] != "workspace:foo.pdf" {
+ t.Errorf("uri mismatch: %v", atts[0]["uri"])
+ }
+ if atts[0]["mime_type"] != "application/pdf" {
+ t.Errorf("mime_type mismatch: %v", atts[0]["mime_type"])
+ }
+ if atts[0]["name"] != "foo.pdf" {
+ t.Errorf("name mismatch: %v", atts[0]["name"])
+ }
+}
+
+func TestExtractAttachmentsFromRequestBody_ImageAndAudio(t *testing.T) {
+ // Mixed image + audio parts; both surface
+ body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
+ {"kind":"image","file":{"uri":"workspace:a.png","mime_type":"image/png","name":"a.png"}},
+ {"kind":"audio","file":{"uri":"workspace:b.mp3","mime_type":"audio/mpeg","name":"b.mp3"}}
+ ]}}}`)
+ atts := extractAttachmentsFromRequestBody(body)
+ if len(atts) != 2 {
+ t.Fatalf("want 2 attachments, got %d", len(atts))
+ }
+ if atts[0]["kind"] != "image" || atts[1]["kind"] != "audio" {
+ t.Errorf("kind order: got %v / %v", atts[0]["kind"], atts[1]["kind"])
+ }
+}
+
+func TestExtractAttachmentsFromRequestBody_LegacyV0TypeDiscriminator(t *testing.T) {
+ // Legacy v0 shape: type=file (not kind), inlined fields (no nested .file)
+ body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
+ {"type":"file","uri":"workspace:legacy.txt","mime_type":"text/plain","name":"legacy.txt"}
+ ]}}}`)
+ atts := extractAttachmentsFromRequestBody(body)
+ if len(atts) != 1 {
+ t.Fatalf("want 1 attachment, got %d", len(atts))
+ }
+ if atts[0]["kind"] != "file" || atts[0]["uri"] != "workspace:legacy.txt" || atts[0]["name"] != "legacy.txt" {
+ t.Errorf("v0 part not surfaced: %v", atts[0])
+ }
+}
+
+func TestExtractAttachmentsFromRequestBody_SkipsEmptyParts(t *testing.T) {
+ // A "file" part with no uri AND no name is malformed — skip rather
+ // than emit a no-info entry.
+ body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
+ {"kind":"file","file":{}},
+ {"kind":"file","file":{"name":"only-name.bin"}}
+ ]}}}`)
+ atts := extractAttachmentsFromRequestBody(body)
+ if len(atts) != 1 {
+ t.Fatalf("want 1 attachment (the named one), got %d", len(atts))
+ }
+ if atts[0]["name"] != "only-name.bin" {
+ t.Errorf("expected only-name.bin, got %v", atts[0])
+ }
+}
+
+func TestExtractAttachmentsFromRequestBody_MalformedShape(t *testing.T) {
+ // Various malformed shapes return nil (defensive)
+ for _, b := range []string{
+ `{}`,
+ `{"params":{}}`,
+ `{"params":{"message":{}}}`,
+ `{"params":{"message":{"parts":"not-a-list"}}}`,
+ `{"params":{"message":{"parts":[null,42,"string"]}}}`,
+ } {
+ if got := extractAttachmentsFromRequestBody([]byte(b)); got != nil {
+ t.Errorf("body %q: want nil, got %v", b, got)
+ }
+ }
+}
+
+// ---------- Activity List ?include=peer_info handler tests ----------
+
+func TestActivityList_IncludePeerInfo_IssuesLeftJoin(t *testing.T) {
+ // When ?include=peer_info is set, the query must:
+ // 1. SELECT include w.name + w.role aliased as peer_name/peer_role
+ // 2. FROM contains LEFT JOIN workspaces w ON w.id = activity_logs.source_id
+ // 3. WHERE uses qualified activity_logs.workspace_id (disambiguates
+ // from workspaces.id post-JOIN)
+ //
+ // Pin all three so a future refactor can't silently drop the JOIN or
+ // the alias and have the test still pass.
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewActivityHandler(broadcaster)
+
+ peerID := "11111111-2222-3333-4444-555555555555"
+ mock.ExpectQuery(
+ `SELECT .+w\.name AS peer_name, w\.role AS peer_role FROM activity_logs LEFT JOIN workspaces w ON w\.id = activity_logs\.source_id WHERE activity_logs\.workspace_id = .+`,
+ ).
+ WithArgs("ws-1", 100).
+ WillReturnRows(sqlmock.NewRows([]string{
+ "id", "workspace_id", "activity_type", "source_id", "target_id",
+ "method", "summary", "request_body", "response_body",
+ "tool_trace", "duration_ms", "status", "error_detail", "created_at",
+ "peer_name", "peer_role",
+ }).
+ AddRow("act-1", "ws-1", "a2a_receive", peerID, "ws-1",
+ "message/send", "Agent message: hello",
+ []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hello"}]}}}`),
+ nil, nil, nil, "ok", nil, time.Now(),
+ "Production Manager", "product manager"))
+
+ gin.SetMode(gin.TestMode)
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
+ c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil)
+ c.Request.Host = "platform.test"
+ c.Request.Header.Set("X-Forwarded-Proto", "https")
+ handler.List(c)
+
+ if w.Code != http.StatusOK {
+ t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
+ }
+ var resp []map[string]interface{}
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("parse: %v", err)
+ }
+ if len(resp) != 1 {
+ t.Fatalf("want 1 row, got %d", len(resp))
+ }
+ r := resp[0]
+ if r["peer_name"] != "Production Manager" {
+ t.Errorf("peer_name: got %v", r["peer_name"])
+ }
+ if r["peer_role"] != "product manager" {
+ t.Errorf("peer_role: got %v", r["peer_role"])
+ }
+ wantURL := "https://platform.test/registry/discover/" + peerID
+ if r["agent_card_url"] != wantURL {
+ t.Errorf("agent_card_url: got %v, want %v", r["agent_card_url"], wantURL)
+ }
+ // Text-only message has no attachments → omit from envelope
+ if _, present := r["attachments"]; present {
+ t.Errorf("attachments should be omitted on text-only row; got %v", r["attachments"])
+ }
+ if err := mock.ExpectationsWereMet(); err != nil {
+ t.Fatalf("unmet expectations: %v", err)
+ }
+}
+
+func TestActivityList_IncludePeerInfo_CanvasRowHasNoPeerFields(t *testing.T) {
+ // LEFT JOIN preserves canvas rows (source_id NULL) but their
+ // peer_name/peer_role come back as NULL — must omit from the
+ // envelope (not emit empty strings or null literals).
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewActivityHandler(broadcaster)
+
+ mock.ExpectQuery(
+ `LEFT JOIN workspaces w ON w\.id = activity_logs\.source_id`,
+ ).
+ WithArgs("ws-1", 100).
+ WillReturnRows(sqlmock.NewRows([]string{
+ "id", "workspace_id", "activity_type", "source_id", "target_id",
+ "method", "summary", "request_body", "response_body",
+ "tool_trace", "duration_ms", "status", "error_detail", "created_at",
+ "peer_name", "peer_role",
+ }).
+ // source_id NULL = canvas message; peer columns also NULL.
+ AddRow("act-canvas", "ws-1", "a2a_receive", nil, "ws-1",
+ "notify", "User said hi",
+ []byte(`{"params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`),
+ nil, nil, nil, "ok", nil, time.Now(),
+ nil, nil))
+
+ gin.SetMode(gin.TestMode)
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
+ c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil)
+ handler.List(c)
+
+ if w.Code != http.StatusOK {
+ t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
+ }
+ var resp []map[string]interface{}
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("parse: %v", err)
+ }
+ if len(resp) != 1 {
+ t.Fatalf("want 1 row, got %d", len(resp))
+ }
+ r := resp[0]
+ for _, k := range []string{"peer_name", "peer_role", "agent_card_url"} {
+ if _, present := r[k]; present {
+ t.Errorf("%s should be absent on canvas row; got %v", k, r[k])
+ }
+ }
+ if err := mock.ExpectationsWereMet(); err != nil {
+ t.Fatalf("unmet expectations: %v", err)
+ }
+}
+
+func TestActivityList_IncludePeerInfo_AttachmentsSurfaceFromRequestBody(t *testing.T) {
+ // A peer_agent message with an inline file attachment must have
+ // attachments[] populated on the envelope.
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewActivityHandler(broadcaster)
+
+ peerID := "11111111-2222-3333-4444-555555555555"
+ mock.ExpectQuery(`LEFT JOIN workspaces`).
+ WithArgs("ws-1", 100).
+ WillReturnRows(sqlmock.NewRows([]string{
+ "id", "workspace_id", "activity_type", "source_id", "target_id",
+ "method", "summary", "request_body", "response_body",
+ "tool_trace", "duration_ms", "status", "error_detail", "created_at",
+ "peer_name", "peer_role",
+ }).
+ AddRow("act-with-file", "ws-1", "a2a_receive", peerID, "ws-1",
+ "message/send", "Agent message: see attached",
+ []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
+ {"kind":"text","text":"see attached"},
+ {"kind":"file","file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}}
+ ]}}}`),
+ nil, nil, nil, "ok", nil, time.Now(),
+ "Code Reviewer", "code reviewer"))
+
+ gin.SetMode(gin.TestMode)
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
+ c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil)
+ handler.List(c)
+
+ if w.Code != http.StatusOK {
+ t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
+ }
+ var resp []map[string]interface{}
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("parse: %v", err)
+ }
+ r := resp[0]
+ atts, ok := r["attachments"].([]interface{})
+ if !ok {
+ t.Fatalf("attachments missing or wrong type: %T %v", r["attachments"], r["attachments"])
+ }
+ if len(atts) != 1 {
+ t.Fatalf("want 1 attachment, got %d: %v", len(atts), atts)
+ }
+ att := atts[0].(map[string]interface{})
+ if att["kind"] != "file" || att["uri"] != "workspace:foo.pdf" || att["name"] != "foo.pdf" {
+ t.Errorf("attachment shape: %v", att)
+ }
+ if err := mock.ExpectationsWereMet(); err != nil {
+ t.Fatalf("unmet expectations: %v", err)
+ }
+}
+
+func TestActivityList_IncludePeerInfo_Unset_NoJoinNoExtraFields(t *testing.T) {
+ // Back-compat — when ?include=peer_info is NOT passed, the SELECT
+ // uses unqualified column refs (no `activity_logs.` prefix) AND no
+ // JOIN. Existing tests pass this implicitly; this test pins it
+ // explicitly so a future refactor that accidentally turns the JOIN
+ // always-on gets caught.
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewActivityHandler(broadcaster)
+
+ // Regex pinned: "FROM activity_logs WHERE workspace_id" — no JOIN
+ // keyword between FROM and WHERE; no `activity_logs.` qualifier on
+ // workspace_id.
+ mock.ExpectQuery(`SELECT id, workspace_id,.+ FROM activity_logs WHERE workspace_id = .+`).
+ WithArgs("ws-1", 100).
+ WillReturnRows(sqlmock.NewRows([]string{
+ "id", "workspace_id", "activity_type", "source_id", "target_id",
+ "method", "summary", "request_body", "response_body",
+ "tool_trace", "duration_ms", "status", "error_detail", "created_at",
+ }).
+ AddRow("act-1", "ws-1", "a2a_receive", "11111111-2222-3333-4444-555555555555", "ws-1",
+ "message/send", "Hello",
+ nil, nil, nil, nil, "ok", nil, time.Now()))
+
+ gin.SetMode(gin.TestMode)
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
+ c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity", nil)
+ handler.List(c)
+
+ if w.Code != http.StatusOK {
+ t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
+ }
+ var resp []map[string]interface{}
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatalf("parse: %v", err)
+ }
+ if len(resp) != 1 {
+ t.Fatalf("want 1 row, got %d", len(resp))
+ }
+ // Confirm no peer_info enrichment leaks into the default envelope.
+ for _, k := range []string{"peer_name", "peer_role", "agent_card_url", "attachments"} {
+ if _, present := resp[0][k]; present {
+ t.Errorf("%s must NOT appear without ?include=peer_info; got %v", k, resp[0][k])
+ }
+ }
+ if err := mock.ExpectationsWereMet(); err != nil {
+ t.Fatalf("unmet expectations: %v", err)
+ }
+}
+
+func TestActivityList_IncludePeerInfo_UnknownFlagIgnored(t *testing.T) {
+ // ?include=bogus must NOT issue the JOIN — only the recognized
+ // `peer_info` flag triggers enrichment. The unknown flag is silently
+ // ignored (additive, opt-in convention).
+ mock := setupTestDB(t)
+ broadcaster := newTestBroadcaster()
+ handler := NewActivityHandler(broadcaster)
+
+ mock.ExpectQuery(`SELECT id, workspace_id,.+ FROM activity_logs WHERE workspace_id = .+`).
+ WithArgs("ws-1", 100).
+ WillReturnRows(sqlmock.NewRows([]string{
+ "id", "workspace_id", "activity_type", "source_id", "target_id",
+ "method", "summary", "request_body", "response_body",
+ "tool_trace", "duration_ms", "status", "error_detail", "created_at",
+ }))
+
+ gin.SetMode(gin.TestMode)
+ w := httptest.NewRecorder()
+ c, _ := gin.CreateTestContext(w)
+ c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
+ c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=bogus", nil)
+ handler.List(c)
+
+ if w.Code != http.StatusOK {
+ t.Fatalf("expected 200, got %d", w.Code)
+ }
+ if err := mock.ExpectationsWereMet(); err != nil {
+ t.Fatalf("unmet expectations: %v", err)
+ }
+}
+
+// Sanity test using the existing test broadcaster setup — verifies the
+// extractAttachments helper round-trips through json.Marshal cleanly
+// (no map ordering issues, no type-coercion surprises).
+func TestExtractAttachmentsFromRequestBody_RoundTripsThroughJSON(t *testing.T) {
+ body := []byte(`{"params":{"message":{"parts":[{"kind":"file","file":{"uri":"workspace:r.bin","mime_type":"application/octet-stream","name":"r.bin"}}]}}}`)
+ atts := extractAttachmentsFromRequestBody(body)
+ b, err := json.Marshal(atts)
+ if err != nil {
+ t.Fatalf("marshal: %v", err)
+ }
+ var decoded []map[string]interface{}
+ if err := json.Unmarshal(b, &decoded); err != nil {
+ t.Fatalf("unmarshal: %v", err)
+ }
+ if len(decoded) != 1 || decoded[0]["uri"] != "workspace:r.bin" {
+ t.Fatalf("round-trip mismatch: %v", decoded)
+ }
+ _ = fmt.Sprintf // keep fmt import live if test trimming removes usage
+}