feat(activity): ?include=peer_info enrichment � peer_name/role/agent_card_url + attachments #1654

Merged
plugin-dev merged 1 commits from fix/activity-feed-peer-info-enrichment into main 2026-05-21 21:41:19 +00:00
2 changed files with 679 additions and 17 deletions
+232 -17
View File
@@ -67,7 +67,128 @@ func NewActivityHandler(b *events.Broadcaster) *ActivityHandler {
return &ActivityHandler{broadcaster: b}
}
// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id=
// extractAttachmentsFromRequestBody walks a JSON-RPC a2a inbound body to
// surface attachments (file/image/audio parts) as a flat `attachments[]`
// projection so callers don't have to drill into
// `request_body.params.message.parts[]` themselves.
//
// Shape of an a2a inbound request_body that carries attachments:
//
// {"jsonrpc":"2.0","method":"message/send","params":{
// "message":{"parts":[
// {"kind":"text", "text":"hi"},
// {"kind":"file", "file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}},
// {"kind":"image","file":{"uri":"workspace:bar.png","mime_type":"image/png","name":"bar.png"}},
// ]}}}
//
// Returns nil (omit-from-JSON) when the body has no attachments — the
// `?include=peer_info` envelope projects this as an array iff non-empty.
//
// Defensive on every step: any missing key / wrong-shape value returns
// nil instead of panicking. The activity_logs row could carry literally
// any JSON in request_body (legacy formats, future formats); we only
// commit to the documented a2a-sdk v1 message-part shape and silently
// skip anything else.
func extractAttachmentsFromRequestBody(raw []byte) []map[string]interface{} {
if len(raw) == 0 {
return nil
}
var body map[string]interface{}
if err := json.Unmarshal(raw, &body); err != nil {
return nil
}
params, ok := body["params"].(map[string]interface{})
if !ok {
return nil
}
message, ok := params["message"].(map[string]interface{})
if !ok {
return nil
}
parts, ok := message["parts"].([]interface{})
if !ok {
return nil
}
out := make([]map[string]interface{}, 0)
for _, p := range parts {
part, ok := p.(map[string]interface{})
if !ok {
continue
}
// a2a-sdk v1 uses "kind"; older v0 callers sent "type". Accept
// both for the discriminator — same defensive read pattern as
// the runtime-side extract_text helper.
kind, _ := part["kind"].(string)
if kind == "" {
kind, _ = part["type"].(string)
}
if kind != "file" && kind != "image" && kind != "audio" {
continue
}
// The file sub-object holds uri/mime_type/name. The a2a-sdk v1
// shape nests under "file"; some legacy payloads inlined the
// fields onto the part itself. Support both.
var fileObj map[string]interface{}
if f, ok := part["file"].(map[string]interface{}); ok {
fileObj = f
} else {
fileObj = part
}
uri, _ := fileObj["uri"].(string)
mimeType, _ := fileObj["mime_type"].(string)
name, _ := fileObj["name"].(string)
// At minimum we need either a uri or a name to be useful.
// Empty-part entries are skipped (they're a malformed inbound
// — surface nothing rather than emit a no-info placeholder).
if uri == "" && name == "" {
continue
}
att := map[string]interface{}{"kind": kind}
if uri != "" {
att["uri"] = uri
}
if mimeType != "" {
att["mime_type"] = mimeType
}
if name != "" {
att["name"] = name
}
out = append(out, att)
}
if len(out) == 0 {
return nil
}
return out
}
// includeFlagSet returns true iff `flag` appears in the comma-separated
// `?include=` query value. Whitespace around entries is tolerated.
// Empty `include` returns false (existing back-compat shape).
//
// The comma-separable form lets future fields ("attachments_only",
// "tool_trace_expanded", etc.) slot in without further URL-param creep.
func includeFlagSet(includeQuery, flag string) bool {
if includeQuery == "" || flag == "" {
return false
}
for _, raw := range strings.Split(includeQuery, ",") {
if strings.TrimSpace(raw) == flag {
return true
}
}
return false
}
// List handles GET /workspaces/:id/activity?type=&source=&limit=&since_secs=&since_id=&include=
//
// The `include` query param is comma-separable; today the only flag is
// `peer_info`, which enriches a2a_receive rows with `peer_name`,
// `peer_role`, `agent_card_url`, and an `attachments[]` projection (see
// extractAttachmentsFromRequestBody). It's additive + opt-in — existing
// callers that don't pass `?include=peer_info` see the unchanged shape.
// Surface for the layered enrichment that lets Claude Code channel
// pushes carry full sender identity instead of bare UUIDs (sibling
// repos: molecule-ai-workspace-runtime + molecule-mcp-claude-channel).
//
// since_secs filters to activity_logs.created_at >= NOW() - INTERVAL '$N seconds'.
// Optional, additive — callers that don't pass it get today's behavior (the
@@ -102,6 +223,8 @@ func (h *ActivityHandler) List(c *gin.Context) {
sinceSecsStr := c.Query("since_secs")
sinceID := c.Query("since_id")
beforeTSStr := c.Query("before_ts") // optional RFC3339 — return rows strictly older than this timestamp
include := c.Query("include") // comma-separated; today's only flag is "peer_info"
includePeerInfo := includeFlagSet(include, "peer_info")
// Validate peer_id as a UUID at the trust boundary so a malformed
// caller (the agent or a downstream MCP tool) can't smuggle SQL
@@ -192,22 +315,60 @@ func (h *ActivityHandler) List(c *gin.Context) {
usingCursor = true
}
// Build query with optional filters
query := `SELECT id, workspace_id, activity_type, source_id, target_id, method,
summary, request_body, response_body, tool_trace, duration_ms, status, error_detail, created_at
FROM activity_logs WHERE workspace_id = $1`
// Build query with optional filters. When ?include=peer_info is set,
// LEFT JOIN workspaces ON activity_logs.source_id = w.id so we can
// surface w.name + w.role on the row. LEFT (not INNER) is required
// for two reasons:
// 1. Canvas rows have source_id IS NULL — those must still appear
// in the result set (with NULL peer_name/peer_role).
// 2. A peer workspace may have been deleted since the row was
// written (no FK constraint on activity_logs.source_id) —
// LEFT JOIN preserves the activity row with NULL peer fields
// rather than silently dropping the row.
//
// agent_card_url is NOT pulled from the workspaces table; it's
// computed server-side from externalPlatformURL + source_id at
// projection time (mirrors molecule-ai-workspace-runtime
// a2a_client._agent_card_url_for which constructs
// {PLATFORM_URL}/registry/discover/{peer_id}).
//
// Column qualification (`activity_logs.<col>`) is added ONLY when
// the JOIN is present — disambiguates `id` / `created_at` which
// exist in both tables. When the JOIN is absent, unqualified
// column references preserve the exact wire-shape existing callers
// + existing test fixtures expect (back-compat).
actCol := ""
if includePeerInfo {
actCol = "activity_logs."
}
selectClause := `SELECT ` + actCol + `id, ` + actCol + `workspace_id, ` + actCol + `activity_type, ` +
actCol + `source_id, ` + actCol + `target_id, ` + actCol + `method, ` +
actCol + `summary, ` + actCol + `request_body, ` + actCol + `response_body, ` +
actCol + `tool_trace, ` + actCol + `duration_ms, ` + actCol + `status, ` +
actCol + `error_detail, ` + actCol + `created_at`
fromClause := ` FROM activity_logs`
if includePeerInfo {
selectClause += `, w.name AS peer_name, w.role AS peer_role`
fromClause += ` LEFT JOIN workspaces w ON w.id = activity_logs.source_id`
}
query := selectClause + fromClause + ` WHERE ` + actCol + `workspace_id = $1`
args := []interface{}{workspaceID}
argIdx := 2
// WHERE/ORDER column refs use the same `actCol` qualifier prefix
// computed above — empty string when no JOIN (back-compat with
// existing wire shape + sqlmock-regex test fixtures), or
// `activity_logs.` when LEFT JOIN'd (disambiguates `id` /
// `created_at` between the two tables).
if activityType != "" {
query += fmt.Sprintf(" AND activity_type = $%d", argIdx)
query += fmt.Sprintf(" AND "+actCol+"activity_type = $%d", argIdx)
args = append(args, activityType)
argIdx++
}
if source == "canvas" {
query += " AND source_id IS NULL"
query += " AND " + actCol + "source_id IS NULL"
} else if source == "agent" {
query += " AND source_id IS NOT NULL"
query += " AND " + actCol + "source_id IS NOT NULL"
} else if source != "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "source must be 'canvas' or 'agent'"})
return
@@ -224,7 +385,7 @@ func (h *ActivityHandler) List(c *gin.Context) {
// and avoids duplicate parameter binding (some drivers reject the
// same arg slot reused, ours is fine but the explicit form is
// clearer to read and matches the rest of the builder.)
query += fmt.Sprintf(" AND (source_id = $%d OR target_id = $%d)", argIdx, argIdx)
query += fmt.Sprintf(" AND ("+actCol+"source_id = $%d OR "+actCol+"target_id = $%d)", argIdx, argIdx)
args = append(args, peerID)
argIdx++
}
@@ -232,7 +393,7 @@ func (h *ActivityHandler) List(c *gin.Context) {
// Strictly older — never replay a row with the exact same
// timestamp, mirrors the `created_at > cursorTime` shape
// `since_id` uses for forward paging.
query += fmt.Sprintf(" AND created_at < $%d", argIdx)
query += fmt.Sprintf(" AND "+actCol+"created_at < $%d", argIdx)
args = append(args, beforeTS)
argIdx++
}
@@ -241,13 +402,13 @@ func (h *ActivityHandler) List(c *gin.Context) {
// interpolated into the SQL string. `make_interval(secs => $N)`
// avoids the lib/pq quirk where INTERVAL '$N seconds' won't
// substitute a placeholder inside the literal.
query += fmt.Sprintf(" AND created_at >= NOW() - make_interval(secs => $%d)", argIdx)
query += fmt.Sprintf(" AND "+actCol+"created_at >= NOW() - make_interval(secs => $%d)", argIdx)
args = append(args, sinceSecs)
argIdx++
}
if usingCursor {
// Strictly after — never replay the cursor row itself.
query += fmt.Sprintf(" AND created_at > $%d", argIdx)
query += fmt.Sprintf(" AND "+actCol+"created_at > $%d", argIdx)
args = append(args, cursorTime)
argIdx++
}
@@ -257,9 +418,9 @@ func (h *ActivityHandler) List(c *gin.Context) {
// since_id) keeps DESC — that's the canvas/UI shape and changing it
// would surprise existing callers.
if usingCursor {
query += fmt.Sprintf(" ORDER BY created_at ASC LIMIT $%d", argIdx)
query += fmt.Sprintf(" ORDER BY "+actCol+"created_at ASC LIMIT $%d", argIdx)
} else {
query += fmt.Sprintf(" ORDER BY created_at DESC LIMIT $%d", argIdx)
query += fmt.Sprintf(" ORDER BY "+actCol+"created_at DESC LIMIT $%d", argIdx)
}
args = append(args, limit)
@@ -272,6 +433,14 @@ func (h *ActivityHandler) List(c *gin.Context) {
}
defer rows.Close()
// agent_card_url base computed once per request so we don't pay the
// header-read cost per row. Only meaningful when includePeerInfo is
// set; the empty string here is harmless when the flag is off.
var platformBase string
if includePeerInfo {
platformBase = externalPlatformURL(c)
}
activities := make([]map[string]interface{}, 0)
for rows.Next() {
var id, wsID, actType, status string
@@ -279,10 +448,23 @@ func (h *ActivityHandler) List(c *gin.Context) {
var reqBody, respBody, toolTrace []byte
var durationMs *int
var createdAt time.Time
// LEFT JOIN'd peer columns — pointer-string so a NULL row
// (canvas message OR deleted peer workspace) decodes as nil
// rather than empty-string. Only scanned when includePeerInfo
// is set (matched against the SELECT clause above).
var peerName, peerRole *string
if err := rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method,
&summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt); err != nil {
log.Printf("Activity scan error: %v", err)
var scanErr error
if includePeerInfo {
scanErr = rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method,
&summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt,
&peerName, &peerRole)
} else {
scanErr = rows.Scan(&id, &wsID, &actType, &sourceID, &targetID, &method,
&summary, &reqBody, &respBody, &toolTrace, &durationMs, &status, &errorDetail, &createdAt)
}
if scanErr != nil {
log.Printf("Activity scan error: %v", scanErr)
continue
}
@@ -308,6 +490,39 @@ func (h *ActivityHandler) List(c *gin.Context) {
if toolTrace != nil {
entry["tool_trace"] = json.RawMessage(toolTrace)
}
// peer_info enrichment (per ?include=peer_info). Only emit the
// new fields when the flag is set — back-compat for callers
// that don't request it.
if includePeerInfo {
// peer_name / peer_role: emit only when present (canvas
// rows have source_id IS NULL → peer_name is NULL by JOIN;
// also a peer workspace may have been deleted since the
// row was written → same NULL outcome). Omit-when-absent
// matches the Layer 3 adaptor's "spread when present"
// pattern; canvas_user rows legitimately have no peer_*.
if peerName != nil && *peerName != "" {
entry["peer_name"] = *peerName
}
if peerRole != nil && *peerRole != "" {
entry["peer_role"] = *peerRole
}
// agent_card_url: constructed server-side from
// externalPlatformURL + source_id. Mirrors the runtime-
// side helper a2a_client._agent_card_url_for which builds
// {PLATFORM_URL}/registry/discover/{peer_id}. Only set
// when source_id is present + non-empty.
if sourceID != nil && *sourceID != "" && platformBase != "" {
entry["agent_card_url"] = platformBase + "/registry/discover/" + *sourceID
}
// attachments: flatten file/image/audio parts from the
// request_body. nil when none — only project when
// non-empty so the omit-when-absent rule holds.
if atts := extractAttachmentsFromRequestBody(reqBody); len(atts) > 0 {
entry["attachments"] = atts
}
}
activities = append(activities, entry)
}
if err := rows.Err(); err != nil {
@@ -0,0 +1,447 @@
package handlers
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/gin-gonic/gin"
)
// Tests for the `?include=peer_info` activity-feed enrichment.
//
// The enrichment is additive + opt-in. When the flag is absent, the
// existing tests (TestActivityList_SourceCanvas, etc.) prove the wire
// shape is unchanged. These tests prove:
// - When the flag IS set, the LEFT JOIN is issued and the SELECT
// adds w.name + w.role.
// - peer_name / peer_role surface from the joined row.
// - agent_card_url is composed server-side from
// externalPlatformURL + source_id and appears for non-canvas rows
// (source_id present).
// - attachments[] is projected from request_body.params.message.parts
// for file/image/audio parts.
// - Canvas rows (source_id NULL) do NOT get peer_name / peer_role /
// agent_card_url, but DO still appear in the result set (LEFT JOIN
// preserves them with NULL peer fields).
// - The `include` query param is comma-separable and only recognizes
// known flags.
// ---------- includeFlagSet helper unit tests ----------
func TestIncludeFlagSet(t *testing.T) {
cases := []struct {
query string
flag string
want bool
}{
{"", "peer_info", false},
{"peer_info", "peer_info", true},
{"peer_info,attachments", "peer_info", true},
{"attachments,peer_info", "peer_info", true},
{"attachments , peer_info ", "peer_info", true},
{"peer_infos", "peer_info", false},
{"peerinfo", "peer_info", false},
{"peer_info", "", false},
{",,", "peer_info", false},
}
for _, tc := range cases {
got := includeFlagSet(tc.query, tc.flag)
if got != tc.want {
t.Errorf("includeFlagSet(%q, %q) = %v, want %v", tc.query, tc.flag, got, tc.want)
}
}
}
// ---------- extractAttachmentsFromRequestBody unit tests ----------
func TestExtractAttachmentsFromRequestBody_Empty(t *testing.T) {
if got := extractAttachmentsFromRequestBody(nil); got != nil {
t.Errorf("nil body: want nil, got %v", got)
}
if got := extractAttachmentsFromRequestBody([]byte("")); got != nil {
t.Errorf("empty body: want nil, got %v", got)
}
if got := extractAttachmentsFromRequestBody([]byte("not json")); got != nil {
t.Errorf("non-json body: want nil, got %v", got)
}
}
func TestExtractAttachmentsFromRequestBody_NoAttachments(t *testing.T) {
// Text-only message: no file/image/audio parts → nil
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`)
if got := extractAttachmentsFromRequestBody(body); got != nil {
t.Errorf("text-only: want nil, got %v", got)
}
}
func TestExtractAttachmentsFromRequestBody_FileKindV1(t *testing.T) {
// a2a-sdk v1 shape: kind=file, file:{uri,mime_type,name}
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
{"kind":"text","text":"see attached"},
{"kind":"file","file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}}
]}}}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 1 {
t.Fatalf("want 1 attachment, got %d", len(atts))
}
if atts[0]["kind"] != "file" {
t.Errorf("kind: want file, got %v", atts[0]["kind"])
}
if atts[0]["uri"] != "workspace:foo.pdf" {
t.Errorf("uri mismatch: %v", atts[0]["uri"])
}
if atts[0]["mime_type"] != "application/pdf" {
t.Errorf("mime_type mismatch: %v", atts[0]["mime_type"])
}
if atts[0]["name"] != "foo.pdf" {
t.Errorf("name mismatch: %v", atts[0]["name"])
}
}
func TestExtractAttachmentsFromRequestBody_ImageAndAudio(t *testing.T) {
// Mixed image + audio parts; both surface
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
{"kind":"image","file":{"uri":"workspace:a.png","mime_type":"image/png","name":"a.png"}},
{"kind":"audio","file":{"uri":"workspace:b.mp3","mime_type":"audio/mpeg","name":"b.mp3"}}
]}}}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 2 {
t.Fatalf("want 2 attachments, got %d", len(atts))
}
if atts[0]["kind"] != "image" || atts[1]["kind"] != "audio" {
t.Errorf("kind order: got %v / %v", atts[0]["kind"], atts[1]["kind"])
}
}
func TestExtractAttachmentsFromRequestBody_LegacyV0TypeDiscriminator(t *testing.T) {
// Legacy v0 shape: type=file (not kind), inlined fields (no nested .file)
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
{"type":"file","uri":"workspace:legacy.txt","mime_type":"text/plain","name":"legacy.txt"}
]}}}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 1 {
t.Fatalf("want 1 attachment, got %d", len(atts))
}
if atts[0]["kind"] != "file" || atts[0]["uri"] != "workspace:legacy.txt" || atts[0]["name"] != "legacy.txt" {
t.Errorf("v0 part not surfaced: %v", atts[0])
}
}
func TestExtractAttachmentsFromRequestBody_SkipsEmptyParts(t *testing.T) {
// A "file" part with no uri AND no name is malformed — skip rather
// than emit a no-info entry.
body := []byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
{"kind":"file","file":{}},
{"kind":"file","file":{"name":"only-name.bin"}}
]}}}`)
atts := extractAttachmentsFromRequestBody(body)
if len(atts) != 1 {
t.Fatalf("want 1 attachment (the named one), got %d", len(atts))
}
if atts[0]["name"] != "only-name.bin" {
t.Errorf("expected only-name.bin, got %v", atts[0])
}
}
func TestExtractAttachmentsFromRequestBody_MalformedShape(t *testing.T) {
// Various malformed shapes return nil (defensive)
for _, b := range []string{
`{}`,
`{"params":{}}`,
`{"params":{"message":{}}}`,
`{"params":{"message":{"parts":"not-a-list"}}}`,
`{"params":{"message":{"parts":[null,42,"string"]}}}`,
} {
if got := extractAttachmentsFromRequestBody([]byte(b)); got != nil {
t.Errorf("body %q: want nil, got %v", b, got)
}
}
}
// ---------- Activity List ?include=peer_info handler tests ----------
func TestActivityList_IncludePeerInfo_IssuesLeftJoin(t *testing.T) {
// When ?include=peer_info is set, the query must:
// 1. SELECT include w.name + w.role aliased as peer_name/peer_role
// 2. FROM contains LEFT JOIN workspaces w ON w.id = activity_logs.source_id
// 3. WHERE uses qualified activity_logs.workspace_id (disambiguates
// from workspaces.id post-JOIN)
//
// Pin all three so a future refactor can't silently drop the JOIN or
// the alias and have the test still pass.
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
peerID := "11111111-2222-3333-4444-555555555555"
mock.ExpectQuery(
`SELECT .+w\.name AS peer_name, w\.role AS peer_role FROM activity_logs LEFT JOIN workspaces w ON w\.id = activity_logs\.source_id WHERE activity_logs\.workspace_id = .+`,
).
WithArgs("ws-1", 100).
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "activity_type", "source_id", "target_id",
"method", "summary", "request_body", "response_body",
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
"peer_name", "peer_role",
}).
AddRow("act-1", "ws-1", "a2a_receive", peerID, "ws-1",
"message/send", "Agent message: hello",
[]byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[{"kind":"text","text":"hello"}]}}}`),
nil, nil, nil, "ok", nil, time.Now(),
"Production Manager", "product manager"))
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil)
c.Request.Host = "platform.test"
c.Request.Header.Set("X-Forwarded-Proto", "https")
handler.List(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse: %v", err)
}
if len(resp) != 1 {
t.Fatalf("want 1 row, got %d", len(resp))
}
r := resp[0]
if r["peer_name"] != "Production Manager" {
t.Errorf("peer_name: got %v", r["peer_name"])
}
if r["peer_role"] != "product manager" {
t.Errorf("peer_role: got %v", r["peer_role"])
}
wantURL := "https://platform.test/registry/discover/" + peerID
if r["agent_card_url"] != wantURL {
t.Errorf("agent_card_url: got %v, want %v", r["agent_card_url"], wantURL)
}
// Text-only message has no attachments → omit from envelope
if _, present := r["attachments"]; present {
t.Errorf("attachments should be omitted on text-only row; got %v", r["attachments"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestActivityList_IncludePeerInfo_CanvasRowHasNoPeerFields(t *testing.T) {
// LEFT JOIN preserves canvas rows (source_id NULL) but their
// peer_name/peer_role come back as NULL — must omit from the
// envelope (not emit empty strings or null literals).
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
mock.ExpectQuery(
`LEFT JOIN workspaces w ON w\.id = activity_logs\.source_id`,
).
WithArgs("ws-1", 100).
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "activity_type", "source_id", "target_id",
"method", "summary", "request_body", "response_body",
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
"peer_name", "peer_role",
}).
// source_id NULL = canvas message; peer columns also NULL.
AddRow("act-canvas", "ws-1", "a2a_receive", nil, "ws-1",
"notify", "User said hi",
[]byte(`{"params":{"message":{"parts":[{"kind":"text","text":"hi"}]}}}`),
nil, nil, nil, "ok", nil, time.Now(),
nil, nil))
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil)
handler.List(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse: %v", err)
}
if len(resp) != 1 {
t.Fatalf("want 1 row, got %d", len(resp))
}
r := resp[0]
for _, k := range []string{"peer_name", "peer_role", "agent_card_url"} {
if _, present := r[k]; present {
t.Errorf("%s should be absent on canvas row; got %v", k, r[k])
}
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestActivityList_IncludePeerInfo_AttachmentsSurfaceFromRequestBody(t *testing.T) {
// A peer_agent message with an inline file attachment must have
// attachments[] populated on the envelope.
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
peerID := "11111111-2222-3333-4444-555555555555"
mock.ExpectQuery(`LEFT JOIN workspaces`).
WithArgs("ws-1", 100).
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "activity_type", "source_id", "target_id",
"method", "summary", "request_body", "response_body",
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
"peer_name", "peer_role",
}).
AddRow("act-with-file", "ws-1", "a2a_receive", peerID, "ws-1",
"message/send", "Agent message: see attached",
[]byte(`{"jsonrpc":"2.0","method":"message/send","params":{"message":{"parts":[
{"kind":"text","text":"see attached"},
{"kind":"file","file":{"uri":"workspace:foo.pdf","mime_type":"application/pdf","name":"foo.pdf"}}
]}}}`),
nil, nil, nil, "ok", nil, time.Now(),
"Code Reviewer", "code reviewer"))
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=peer_info", nil)
handler.List(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse: %v", err)
}
r := resp[0]
atts, ok := r["attachments"].([]interface{})
if !ok {
t.Fatalf("attachments missing or wrong type: %T %v", r["attachments"], r["attachments"])
}
if len(atts) != 1 {
t.Fatalf("want 1 attachment, got %d: %v", len(atts), atts)
}
att := atts[0].(map[string]interface{})
if att["kind"] != "file" || att["uri"] != "workspace:foo.pdf" || att["name"] != "foo.pdf" {
t.Errorf("attachment shape: %v", att)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestActivityList_IncludePeerInfo_Unset_NoJoinNoExtraFields(t *testing.T) {
// Back-compat — when ?include=peer_info is NOT passed, the SELECT
// uses unqualified column refs (no `activity_logs.` prefix) AND no
// JOIN. Existing tests pass this implicitly; this test pins it
// explicitly so a future refactor that accidentally turns the JOIN
// always-on gets caught.
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
// Regex pinned: "FROM activity_logs WHERE workspace_id" — no JOIN
// keyword between FROM and WHERE; no `activity_logs.` qualifier on
// workspace_id.
mock.ExpectQuery(`SELECT id, workspace_id,.+ FROM activity_logs WHERE workspace_id = .+`).
WithArgs("ws-1", 100).
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "activity_type", "source_id", "target_id",
"method", "summary", "request_body", "response_body",
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
}).
AddRow("act-1", "ws-1", "a2a_receive", "11111111-2222-3333-4444-555555555555", "ws-1",
"message/send", "Hello",
nil, nil, nil, nil, "ok", nil, time.Now()))
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity", nil)
handler.List(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("parse: %v", err)
}
if len(resp) != 1 {
t.Fatalf("want 1 row, got %d", len(resp))
}
// Confirm no peer_info enrichment leaks into the default envelope.
for _, k := range []string{"peer_name", "peer_role", "agent_card_url", "attachments"} {
if _, present := resp[0][k]; present {
t.Errorf("%s must NOT appear without ?include=peer_info; got %v", k, resp[0][k])
}
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestActivityList_IncludePeerInfo_UnknownFlagIgnored(t *testing.T) {
// ?include=bogus must NOT issue the JOIN — only the recognized
// `peer_info` flag triggers enrichment. The unknown flag is silently
// ignored (additive, opt-in convention).
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
mock.ExpectQuery(`SELECT id, workspace_id,.+ FROM activity_logs WHERE workspace_id = .+`).
WithArgs("ws-1", 100).
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "activity_type", "source_id", "target_id",
"method", "summary", "request_body", "response_body",
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
}))
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-1/activity?include=bogus", nil)
handler.List(c)
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d", w.Code)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
// Sanity test using the existing test broadcaster setup — verifies the
// extractAttachments helper round-trips through json.Marshal cleanly
// (no map ordering issues, no type-coercion surprises).
func TestExtractAttachmentsFromRequestBody_RoundTripsThroughJSON(t *testing.T) {
body := []byte(`{"params":{"message":{"parts":[{"kind":"file","file":{"uri":"workspace:r.bin","mime_type":"application/octet-stream","name":"r.bin"}}]}}}`)
atts := extractAttachmentsFromRequestBody(body)
b, err := json.Marshal(atts)
if err != nil {
t.Fatalf("marshal: %v", err)
}
var decoded []map[string]interface{}
if err := json.Unmarshal(b, &decoded); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if len(decoded) != 1 || decoded[0]["uri"] != "workspace:r.bin" {
t.Fatalf("round-trip mismatch: %v", decoded)
}
_ = fmt.Sprintf // keep fmt import live if test trimming removes usage
}