Merge pull request #2472 from Molecule-AI/feat/activity-peer-id-filter

feat(activity): add peer_id filter to /workspaces/:id/activity
This commit is contained in:
Hongming Wang 2026-05-02 00:52:32 +00:00 committed by GitHub
commit 645d687b0a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 145 additions and 0 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
)
type ActivityHandler struct {
@ -55,10 +56,25 @@ func (h *ActivityHandler) List(c *gin.Context) {
workspaceID := c.Param("id")
activityType := c.Query("type")
source := c.Query("source") // "canvas" = source_id IS NULL, "agent" = source_id IS NOT NULL
peerID := c.Query("peer_id") // optional UUID — restrict to rows where this peer is sender OR target
limitStr := c.DefaultQuery("limit", "100")
sinceSecsStr := c.Query("since_secs")
sinceID := c.Query("since_id")
// Validate peer_id as a UUID at the trust boundary so a malformed
// caller (the agent or a downstream MCP tool) can't smuggle SQL
// fragments into the WHERE clause via the parameter, even though
// args are bound. UUID-shape rejection is also the cleanest 400
// signal for the wheel-side chat_history MCP tool — clearer than a
// generic "no rows" empty list when the agent passed an obviously
// wrong id.
if peerID != "" {
if _, err := uuid.Parse(peerID); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "peer_id must be a UUID"})
return
}
}
limit := 100
if n, err := strconv.Atoi(limitStr); err == nil && n > 0 {
limit = n
@ -135,6 +151,22 @@ func (h *ActivityHandler) List(c *gin.Context) {
c.JSON(http.StatusBadRequest, gin.H{"error": "source must be 'canvas' or 'agent'"})
return
}
if peerID != "" {
// Restrict to rows where this peer is either the sender (source_id)
// or the recipient (target_id) of an A2A turn. This is the
// "conversation history with peer X" view the wheel-side
// chat_history MCP tool surfaces — agent receives a peer_agent
// push, wants to see the prior 20 turns with that workspace
// without paging through every other peer's traffic.
//
// Bound as a single arg, matched twice — keeps argIdx accurate
// and avoids duplicate parameter binding (some drivers reject the
// same arg slot reused, ours is fine but the explicit form is
// clearer to read and matches the rest of the builder.)
query += fmt.Sprintf(" AND (source_id = $%d OR target_id = $%d)", argIdx, argIdx)
args = append(args, peerID)
argIdx++
}
if sinceSecs > 0 {
// Use a parameterized interval so the value is bound, not
// interpolated into the SQL string. `make_interval(secs => $N)`

View File

@ -167,6 +167,119 @@ func TestActivityList_SourceWithType(t *testing.T) {
}
}
// ---------- Activity List peer_id filter ----------
//
// peer_id surfaces the conversation history with one specific peer
// for the wheel-side chat_history MCP tool. The filter joins
// (source_id = $X OR target_id = $X) so both inbound (where this
// peer was the sender) and outbound (where this peer was the
// recipient) turns appear in the same view, ordered by created_at.
const testPeerUUID = "11111111-2222-3333-4444-555555555555"
func TestActivityList_PeerIDFilter(t *testing.T) {
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
// peer_id binds twice in the query (source_id OR target_id) but is
// added to args once — sqlmock matches positional args, so the
// binding shape is what matters.
mock.ExpectQuery(
`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND \(source_id = .+ OR target_id = .+\)`,
).
WithArgs("ws-1", testPeerUUID, 100).
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "activity_type", "source_id", "target_id",
"method", "summary", "request_body", "response_body",
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
}))
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest(
"GET", "/workspaces/ws-1/activity?peer_id="+testPeerUUID, nil,
)
handler.List(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestActivityList_PeerIDComposesWithType(t *testing.T) {
// peer_id + type + source must compose into a single AND-chain so
// the wheel can fetch e.g. "all peer_agent inbound from peer X" in
// one round-trip. Pin both args + arg order so a future refactor
// of the builder can't silently rearrange placeholders.
mock := setupTestDB(t)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
mock.ExpectQuery(
`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND activity_type = .+ AND source_id IS NOT NULL AND \(source_id = .+ OR target_id = .+\)`,
).
WithArgs("ws-1", "a2a_receive", testPeerUUID, 100).
WillReturnRows(sqlmock.NewRows([]string{
"id", "workspace_id", "activity_type", "source_id", "target_id",
"method", "summary", "request_body", "response_body",
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
}))
gin.SetMode(gin.TestMode)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest(
"GET",
"/workspaces/ws-1/activity?type=a2a_receive&source=agent&peer_id="+testPeerUUID,
nil,
)
handler.List(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}
func TestActivityList_PeerIDRejectsNonUUID(t *testing.T) {
// Trust-boundary check: a malformed peer_id must 400 before any
// query is built. Defends against caller bugs (typoed UUID,
// leading whitespace) and against any future code path that might
// otherwise interpolate the value into the URL or another query.
gin.SetMode(gin.TestMode)
broadcaster := newTestBroadcaster()
handler := NewActivityHandler(broadcaster)
for _, bad := range []string{
"not-a-uuid",
"%27%20OR%201%3D1%20--", // URL-encoded ' OR 1=1 --
"11111111-2222-3333-4444", // truncated
"11111111-2222-3333-4444-555555555555-extra", // overlong
"11111111-2222-3333-4444-55555555555G", // non-hex
} {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
c.Request = httptest.NewRequest(
"GET", "/workspaces/ws-1/activity?peer_id="+bad, nil,
)
handler.List(c)
if w.Code != http.StatusBadRequest {
t.Errorf("peer_id=%q: expected 400, got %d (%s)", bad, w.Code, w.Body.String())
}
}
}
// ---------- Activity type allowlist (#125: memory_write added) ----------
func TestActivityReport_AcceptsMemoryWriteType(t *testing.T) {