feat(activity): add peer_id filter to /workspaces/:id/activity
Surfaces the conversation history with one specific peer for the wheel-side chat_history MCP tool. The filter joins (source_id = $X OR target_id = $X) so both inbound (peer was sender) and outbound (peer was recipient) turns appear in the same view, ordered by created_at, and composes with existing type/source/ since_secs/since_id/limit filters. Validates peer_id as a UUID at the trust boundary so a malformed caller can't smuggle SQL fragments via the parameter — the args are bound but the explicit rejection gives the wheel a cleaner 400 signal than an empty list, and defends against any future code path that might interpolate the value into a URL or another query. Tests: 3 new branches (positive filter, composition with type+source, UUID-shape rejection across 5 malformed inputs). Mutation-verified: reverting activity.go fails all peer_id tests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
e5a3b5282b
commit
c85fac4663
@ -15,6 +15,7 @@ import (
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type ActivityHandler struct {
|
||||
@ -55,10 +56,25 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
activityType := c.Query("type")
|
||||
source := c.Query("source") // "canvas" = source_id IS NULL, "agent" = source_id IS NOT NULL
|
||||
peerID := c.Query("peer_id") // optional UUID — restrict to rows where this peer is sender OR target
|
||||
limitStr := c.DefaultQuery("limit", "100")
|
||||
sinceSecsStr := c.Query("since_secs")
|
||||
sinceID := c.Query("since_id")
|
||||
|
||||
// Validate peer_id as a UUID at the trust boundary so a malformed
|
||||
// caller (the agent or a downstream MCP tool) can't smuggle SQL
|
||||
// fragments into the WHERE clause via the parameter, even though
|
||||
// args are bound. UUID-shape rejection is also the cleanest 400
|
||||
// signal for the wheel-side chat_history MCP tool — clearer than a
|
||||
// generic "no rows" empty list when the agent passed an obviously
|
||||
// wrong id.
|
||||
if peerID != "" {
|
||||
if _, err := uuid.Parse(peerID); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "peer_id must be a UUID"})
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
limit := 100
|
||||
if n, err := strconv.Atoi(limitStr); err == nil && n > 0 {
|
||||
limit = n
|
||||
@ -135,6 +151,22 @@ func (h *ActivityHandler) List(c *gin.Context) {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "source must be 'canvas' or 'agent'"})
|
||||
return
|
||||
}
|
||||
if peerID != "" {
|
||||
// Restrict to rows where this peer is either the sender (source_id)
|
||||
// or the recipient (target_id) of an A2A turn. This is the
|
||||
// "conversation history with peer X" view the wheel-side
|
||||
// chat_history MCP tool surfaces — agent receives a peer_agent
|
||||
// push, wants to see the prior 20 turns with that workspace
|
||||
// without paging through every other peer's traffic.
|
||||
//
|
||||
// Bound as a single arg, matched twice — keeps argIdx accurate
|
||||
// and avoids duplicate parameter binding (some drivers reject the
|
||||
// same arg slot reused, ours is fine but the explicit form is
|
||||
// clearer to read and matches the rest of the builder.)
|
||||
query += fmt.Sprintf(" AND (source_id = $%d OR target_id = $%d)", argIdx, argIdx)
|
||||
args = append(args, peerID)
|
||||
argIdx++
|
||||
}
|
||||
if sinceSecs > 0 {
|
||||
// Use a parameterized interval so the value is bound, not
|
||||
// interpolated into the SQL string. `make_interval(secs => $N)`
|
||||
|
||||
@ -167,6 +167,119 @@ func TestActivityList_SourceWithType(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Activity List peer_id filter ----------
|
||||
//
|
||||
// peer_id surfaces the conversation history with one specific peer
|
||||
// for the wheel-side chat_history MCP tool. The filter joins
|
||||
// (source_id = $X OR target_id = $X) so both inbound (where this
|
||||
// peer was the sender) and outbound (where this peer was the
|
||||
// recipient) turns appear in the same view, ordered by created_at.
|
||||
|
||||
const testPeerUUID = "11111111-2222-3333-4444-555555555555"
|
||||
|
||||
func TestActivityList_PeerIDFilter(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
// peer_id binds twice in the query (source_id OR target_id) but is
|
||||
// added to args once — sqlmock matches positional args, so the
|
||||
// binding shape is what matters.
|
||||
mock.ExpectQuery(
|
||||
`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND \(source_id = .+ OR target_id = .+\)`,
|
||||
).
|
||||
WithArgs("ws-1", testPeerUUID, 100).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "activity_type", "source_id", "target_id",
|
||||
"method", "summary", "request_body", "response_body",
|
||||
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
|
||||
}))
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
c.Request = httptest.NewRequest(
|
||||
"GET", "/workspaces/ws-1/activity?peer_id="+testPeerUUID, nil,
|
||||
)
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActivityList_PeerIDComposesWithType(t *testing.T) {
|
||||
// peer_id + type + source must compose into a single AND-chain so
|
||||
// the wheel can fetch e.g. "all peer_agent inbound from peer X" in
|
||||
// one round-trip. Pin both args + arg order so a future refactor
|
||||
// of the builder can't silently rearrange placeholders.
|
||||
mock := setupTestDB(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
mock.ExpectQuery(
|
||||
`SELECT .+ FROM activity_logs WHERE workspace_id = .+ AND activity_type = .+ AND source_id IS NOT NULL AND \(source_id = .+ OR target_id = .+\)`,
|
||||
).
|
||||
WithArgs("ws-1", "a2a_receive", testPeerUUID, 100).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "activity_type", "source_id", "target_id",
|
||||
"method", "summary", "request_body", "response_body",
|
||||
"tool_trace", "duration_ms", "status", "error_detail", "created_at",
|
||||
}))
|
||||
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
c.Request = httptest.NewRequest(
|
||||
"GET",
|
||||
"/workspaces/ws-1/activity?type=a2a_receive&source=agent&peer_id="+testPeerUUID,
|
||||
nil,
|
||||
)
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Fatalf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActivityList_PeerIDRejectsNonUUID(t *testing.T) {
|
||||
// Trust-boundary check: a malformed peer_id must 400 before any
|
||||
// query is built. Defends against caller bugs (typoed UUID,
|
||||
// leading whitespace) and against any future code path that might
|
||||
// otherwise interpolate the value into the URL or another query.
|
||||
gin.SetMode(gin.TestMode)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
for _, bad := range []string{
|
||||
"not-a-uuid",
|
||||
"%27%20OR%201%3D1%20--", // URL-encoded ' OR 1=1 --
|
||||
"11111111-2222-3333-4444", // truncated
|
||||
"11111111-2222-3333-4444-555555555555-extra", // overlong
|
||||
"11111111-2222-3333-4444-55555555555G", // non-hex
|
||||
} {
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-1"}}
|
||||
c.Request = httptest.NewRequest(
|
||||
"GET", "/workspaces/ws-1/activity?peer_id="+bad, nil,
|
||||
)
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("peer_id=%q: expected 400, got %d (%s)", bad, w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Activity type allowlist (#125: memory_write added) ----------
|
||||
|
||||
func TestActivityReport_AcceptsMemoryWriteType(t *testing.T) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user