From e76cea259387db14954457b0283dbe8dc60966d4 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 03:59:58 +0000 Subject: [PATCH 1/9] =?UTF-8?q?WIP:=20#2151=20CHUNK=201=20scaffold=20?= =?UTF-8?q?=E2=80=94=20Activity=20+=20Delegation=20+=20A2A=20integration?= =?UTF-8?q?=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Scaffold file with integrationDB helper, seed fixtures, and 4 starter real-Postgres tests: - TestIntegration_ActivityList_Basic - TestIntegration_DelegationList_Basic - TestIntegration_A2AQueue_EnqueueAndDepth - TestIntegration_A2AQueue_DequeueNext TODO markers for the full CRUD matrix awaiting spec delivery. Co-Authored-By: Claude Opus 4.7 --- ...ctivity_delegation_a2a_integration_test.go | 267 ++++++++++++++++++ 1 file changed, 267 insertions(+) create mode 100644 workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go diff --git a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go new file mode 100644 index 000000000..474b4d899 --- /dev/null +++ b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go @@ -0,0 +1,267 @@ +//go:build integration +// +build integration + +// activity_delegation_a2a_integration_test.go — REAL Postgres integration tests +// for Activity, Delegation, and A2A Queue handlers (#2151 CHUNK 1). +// +// Run with: +// +// docker run --rm -d --name pg-integration \ +// -e POSTGRES_PASSWORD=test -e POSTGRES_DB=molecule \ +// -p 55432:5432 postgres:15-alpine +// sleep 4 +// cd workspace-server +// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \ +// go test -tags=integration ./internal/handlers/ -run Integration_ +// +// CI (.gitea/workflows/handlers-postgres-integration.yml) runs this on every +// PR that touches workspace-server/internal/handlers/**. + +package handlers + +import ( + "context" + "database/sql" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db" + "github.com/gin-gonic/gin" + "github.com/google/uuid" + _ "github.com/lib/pq" +) + +// integrationDB_ActivityDelegationA2A opens a connection from +// $INTEGRATION_DB_URL (skipping the test if unset), wipes the tables +// used by these tests, hot-swaps the package-level db.DB, and registers +// a Cleanup that restores the previous db.DB + closes the connection. +// +// NOT SAFE FOR `t.Parallel()` — each test gets the tables to itself. +func integrationDB_ActivityDelegationA2A(t *testing.T) *sql.DB { + t.Helper() + url := os.Getenv("INTEGRATION_DB_URL") + if url == "" { + t.Skip("INTEGRATION_DB_URL not set; skipping (local devs: see file header)") + } + conn, err := sql.Open("postgres", url) + if err != nil { + t.Fatalf("open: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := conn.PingContext(ctx); err != nil { + t.Fatalf("ping: %v", err) + } + + ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel2() + if _, err := conn.ExecContext(ctx2, ` + DELETE FROM a2a_queue WHERE workspace_id IN (SELECT id FROM workspaces WHERE name LIKE 'test-2151-%'); + DELETE FROM activity_logs WHERE workspace_id IN (SELECT id FROM workspaces WHERE name LIKE 'test-2151-%'); + DELETE FROM delegations WHERE caller_id IN (SELECT id FROM workspaces WHERE name LIKE 'test-2151-%'); + DELETE FROM workspaces WHERE name LIKE 'test-2151-%'; + `); err != nil { + t.Fatalf("cleanup: %v", err) + } + + prev := db.DB + db.DB = conn + t.Cleanup(func() { + db.DB = prev + conn.Close() + }) + return conn +} + +// seedWorkspace inserts a workspace with the given name and returns its id. +func seedWorkspace(t *testing.T, conn *sql.DB, name string) string { + t.Helper() + var id string + if err := conn.QueryRowContext(context.Background(), ` + INSERT INTO workspaces (id, name, status) + VALUES (gen_random_uuid(), $1, 'online') + RETURNING id + `, name).Scan(&id); err != nil { + t.Fatalf("seedWorkspace %q: %v", name, err) + } + return id +} + +// seedActivityLog inserts an activity_logs row and returns its id. +func seedActivityLog(t *testing.T, conn *sql.DB, workspaceID, activityType, method, status string, sourceID, targetID *string) string { + t.Helper() + var id string + reqBody := map[string]interface{}{"test": true} + reqJSON, _ := json.Marshal(reqBody) + if err := conn.QueryRowContext(context.Background(), ` + INSERT INTO activity_logs (workspace_id, activity_type, method, status, source_id, target_id, request_body) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb) + RETURNING id + `, workspaceID, activityType, method, status, sourceID, targetID, string(reqJSON)).Scan(&id); err != nil { + t.Fatalf("seedActivityLog: %v", err) + } + return id +} + +// seedA2AQueueItem inserts an a2a_queue row and returns its id. +func seedA2AQueueItem(t *testing.T, conn *sql.DB, workspaceID, callerID string, priority int, body []byte, status string) string { + t.Helper() + var id string + if err := conn.QueryRowContext(context.Background(), ` + INSERT INTO a2a_queue (workspace_id, caller_id, priority, body, status) + VALUES ($1, $2, $3, $4::jsonb, $5) + RETURNING id + `, workspaceID, callerID, priority, string(body), status).Scan(&id); err != nil { + t.Fatalf("seedA2AQueueItem: %v", err) + } + return id +} + +// noOpEmitter is a test-only stub that satisfies events.EventEmitter. +type noOpEmitter struct{} + +func (noOpEmitter) RecordAndBroadcast(ctx context.Context, eventType string, workspaceID string, payload interface{}) error { return nil } +func (noOpEmitter) BroadcastOnly(workspaceID string, eventType string, payload interface{}) {} + +// newTestGinContext creates a gin.Context with an httptest recorder. +func newTestGinContext() (*gin.Context, *httptest.ResponseRecorder) { + gin.SetMode(gin.TestMode) + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + return c, w +} + +// ---------- Activity handler integration tests ---------- + +func TestIntegration_ActivityList_Basic(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-activity-list") + seedActivityLog(t, conn, wsID, "agent_log", "test_method", "ok", nil, nil) + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + h.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("List returned %d, want 200", w.Code) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(resp) != 1 { + t.Fatalf("expected 1 activity, got %d", len(resp)) + } +} + +// TODO(#2151): Activity List filter matrix (type, source, since_secs, since_id, peer_id, include=peer_info, before_ts) +// TODO(#2151): Activity Report + source_id spoof guard +// TODO(#2151): SessionSearch basic + empty query +// TODO(#2151): Notify with attachments validation + +// ---------- Delegation handler integration tests ---------- + +func TestIntegration_DelegationList_Basic(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-delegation-list") + + // Insert a delegation row via the legacy activity_logs path + taskJSON, _ := json.Marshal(map[string]interface{}{"task": "hello", "delegation_id": "del-test-1"}) + respJSON, _ := json.Marshal(map[string]interface{}{"delegation_id": "del-test-1"}) + if _, err := conn.ExecContext(context.Background(), ` + INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status) + VALUES ($1, 'delegation', 'delegate', $1, $1, 'Delegating to test', $2::jsonb, $3::jsonb, 'pending') + `, wsID, string(taskJSON), string(respJSON)); err != nil { + t.Fatalf("seed delegation: %v", err) + } + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + dh := NewDelegationHandler(wh, noOpEmitter{}) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + dh.ListDelegations(c) + + if w.Code != http.StatusOK { + t.Fatalf("ListDelegations returned %d, want 200", w.Code) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(resp) != 1 { + t.Fatalf("expected 1 delegation, got %d", len(resp)) + } +} + +// TODO(#2151): Delegate endpoint (idempotency, self-delegation guard, success path) +// TODO(#2151): Record endpoint +// TODO(#2151): UpdateStatus endpoint + +// ---------- A2A Queue integration tests ---------- + +func TestIntegration_A2AQueue_EnqueueAndDepth(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-a2a-queue") + callerID := seedWorkspace(t, conn, "test-2151-a2a-caller") + + body := []byte(`{"method":"message/send","params":{"message":{"text":"hi"}}}`) + id, depth, err := EnqueueA2A(context.Background(), wsID, callerID, PriorityTask, body, "message/send", "", nil) + if err != nil { + t.Fatalf("EnqueueA2A: %v", err) + } + if id == "" { + t.Fatal("expected non-empty queue id") + } + if depth != 1 { + t.Fatalf("expected depth 1, got %d", depth) + } + + // Verify row exists + var status string + if err := conn.QueryRowContext(context.Background(), `SELECT status FROM a2a_queue WHERE id = $1`, id).Scan(&status); err != nil { + t.Fatalf("select queue row: %v", err) + } + if status != "queued" { + t.Fatalf("expected status queued, got %s", status) + } + + // Verify depth + d := QueueDepth(context.Background(), wsID) + if d != 1 { + t.Fatalf("expected QueueDepth 1, got %d", d) + } +} + +func TestIntegration_A2AQueue_DequeueNext(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-a2a-dequeue") + body := []byte(`{"test":true}`) + seedA2AQueueItem(t, conn, wsID, "", PriorityTask, body, "queued") + + item, err := DequeueNext(context.Background(), wsID) + if err != nil { + t.Fatalf("DequeueNext: %v", err) + } + if item == nil { + t.Fatal("expected item, got nil") + } + + // Verify status flipped to dispatched + var status string + if err := conn.QueryRowContext(context.Background(), `SELECT status FROM a2a_queue WHERE id = $1`, item.ID).Scan(&status); err != nil { + t.Fatalf("select: %v", err) + } + if status != "dispatched" { + t.Fatalf("expected dispatched, got %s", status) + } +} + +// TODO(#2151): A2A Queue Status endpoint (auth rules, 404 vs 403, response body inclusion) +// TODO(#2151): A2A Queue idempotency conflict +// TODO(#2151): A2A Queue MarkQueueItemCompleted / Failed +// TODO(#2151): A2A Queue DropStaleQueueItems -- 2.52.0 From ffe26a192a48719523ab1550de377ff52fb46f79 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 04:06:38 +0000 Subject: [PATCH 2/9] feat(#2151): add Activity Report spoof-guard + A2A queue lifecycle integration tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds 5 real-infra integration tests to CHUNK 1 scaffold: - ActivityReport_SourceIDSpoofGuard (403 on cross-workspace source_id) - ActivityReport_ValidType (200 on valid agent_log report) - A2AQueue_IdempotencyConflict (same idempotency_key → same id) - A2AQueue_MarkCompletedAndFailed (status transitions + max attempts) - A2AQueue_DropStaleQueueItems (time-based eviction) Also adds missing `strings` import and prunes completed TODOs. Co-Authored-By: Claude Opus 4.7 --- ...ctivity_delegation_a2a_integration_test.go | 133 +++++++++++++++++- 1 file changed, 129 insertions(+), 4 deletions(-) diff --git a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go index 474b4d899..e4ea7b84e 100644 --- a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go +++ b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go @@ -26,6 +26,7 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "testing" "time" @@ -159,8 +160,46 @@ func TestIntegration_ActivityList_Basic(t *testing.T) { } } +func TestIntegration_ActivityReport_SourceIDSpoofGuard(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-activity-spoof") + otherWS := seedWorkspace(t, conn, "test-2151-activity-victim") + + h := NewActivityHandler(noOpEmitter{}) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/activity", strings.NewReader(`{ + "activity_type": "agent_log", + "source_id": "`+otherWS+`" + }`)) + c.Request.Header.Set("Content-Type", "application/json") + h.Report(c) + + if w.Code != http.StatusForbidden { + t.Fatalf("Report with spoofed source_id returned %d, want 403", w.Code) + } +} + +func TestIntegration_ActivityReport_ValidType(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-activity-valid") + + h := NewActivityHandler(noOpEmitter{}) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/activity", strings.NewReader(`{ + "activity_type": "agent_log", + "summary": "test" + }`)) + c.Request.Header.Set("Content-Type", "application/json") + h.Report(c) + + if w.Code != http.StatusOK { + t.Fatalf("Report valid activity returned %d, want 200", w.Code) + } +} + // TODO(#2151): Activity List filter matrix (type, source, since_secs, since_id, peer_id, include=peer_info, before_ts) -// TODO(#2151): Activity Report + source_id spoof guard // TODO(#2151): SessionSearch basic + empty query // TODO(#2151): Notify with attachments validation @@ -261,7 +300,93 @@ func TestIntegration_A2AQueue_DequeueNext(t *testing.T) { } } +func TestIntegration_A2AQueue_IdempotencyConflict(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-a2a-idem") + callerID := seedWorkspace(t, conn, "test-2151-a2a-idem-caller") + body := []byte(`{"method":"message/send","params":{"message":{"text":"hi"}}}`) + + id1, depth1, err := EnqueueA2A(context.Background(), wsID, callerID, PriorityTask, body, "message/send", "idem-key-1", nil) + if err != nil { + t.Fatalf("EnqueueA2A first: %v", err) + } + if depth1 != 1 { + t.Fatalf("expected depth 1, got %d", depth1) + } + + id2, depth2, err := EnqueueA2A(context.Background(), wsID, callerID, PriorityTask, body, "message/send", "idem-key-1", nil) + if err != nil { + t.Fatalf("EnqueueA2A second: %v", err) + } + if id1 != id2 { + t.Fatalf("idempotency mismatch: %s vs %s", id1, id2) + } + if depth2 != 1 { + t.Fatalf("expected depth still 1 after idempotent re-enqueue, got %d", depth2) + } +} + +func TestIntegration_A2AQueue_MarkCompletedAndFailed(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-a2a-lifecycle") + body := []byte(`{"test":true}`) + qid := seedA2AQueueItem(t, conn, wsID, "", PriorityTask, body, "dispatched") + + MarkQueueItemCompleted(context.Background(), qid) + var status string + if err := conn.QueryRowContext(context.Background(), `SELECT status FROM a2a_queue WHERE id = $1`, qid).Scan(&status); err != nil { + t.Fatalf("select after completed: %v", err) + } + if status != "completed" { + t.Fatalf("expected completed, got %s", status) + } + + // Seed another item to test failed path with max attempts + qid2 := seedA2AQueueItem(t, conn, wsID, "", PriorityTask, body, "dispatched") + for i := 0; i < 6; i++ { + MarkQueueItemFailed(context.Background(), qid2, "transient error") + } + var status2 string + var lastErr string + if err := conn.QueryRowContext(context.Background(), `SELECT status, last_error FROM a2a_queue WHERE id = $1`, qid2).Scan(&status2, &lastErr); err != nil { + t.Fatalf("select after failed: %v", err) + } + if status2 != "failed" { + t.Fatalf("expected failed after max attempts, got %s", status2) + } + if lastErr == "" { + t.Fatal("expected last_error set") + } +} + +func TestIntegration_A2AQueue_DropStaleQueueItems(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-a2a-stale") + body := []byte(`{"test":true}`) + + // Insert a stale queued item by backdating enqueued_at + if _, err := conn.ExecContext(context.Background(), ` + INSERT INTO a2a_queue (id, workspace_id, priority, body, status, enqueued_at) + VALUES (gen_random_uuid(), $1, $2, $3::jsonb, 'queued', now() - interval '10 minutes') + `, wsID, PriorityTask, string(body)); err != nil { + t.Fatalf("seed stale item: %v", err) + } + + dropped, err := DropStaleQueueItems(context.Background(), wsID, 5) + if err != nil { + t.Fatalf("DropStaleQueueItems: %v", err) + } + if dropped != 1 { + t.Fatalf("expected 1 dropped, got %d", dropped) + } + + var count int + if err := conn.QueryRowContext(context.Background(), `SELECT COUNT(*) FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued'`, wsID).Scan(&count); err != nil { + t.Fatalf("count: %v", err) + } + if count != 0 { + t.Fatalf("expected 0 queued items after drop, got %d", count) + } +} + // TODO(#2151): A2A Queue Status endpoint (auth rules, 404 vs 403, response body inclusion) -// TODO(#2151): A2A Queue idempotency conflict -// TODO(#2151): A2A Queue MarkQueueItemCompleted / Failed -// TODO(#2151): A2A Queue DropStaleQueueItems -- 2.52.0 From 025fa6b129c62f52d2a3b31fea4ffce517cfc453 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 04:28:46 +0000 Subject: [PATCH 3/9] refactor(handlers): narrow Activity + Delegation handler broadcasters to EventEmitter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changes NewActivityHandler and NewDelegationHandler to accept the narrow events.EventEmitter interface instead of *events.Broadcaster. This aligns with WorkspaceHandler (already interface-typed) and lets integration tests substitute noOpEmitter{} without standing up Redis. No production callers affected — *events.Broadcaster still satisfies the interface via the existing compile-time assertion. Co-Authored-By: Claude Opus 4.7 --- workspace-server/internal/handlers/activity.go | 4 ++-- workspace-server/internal/handlers/delegation.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/workspace-server/internal/handlers/activity.go b/workspace-server/internal/handlers/activity.go index 73de12c9d..0b20f932b 100644 --- a/workspace-server/internal/handlers/activity.go +++ b/workspace-server/internal/handlers/activity.go @@ -60,10 +60,10 @@ func sanitizeErrorDetailForBroadcast(s string) string { } type ActivityHandler struct { - broadcaster *events.Broadcaster + broadcaster events.EventEmitter } -func NewActivityHandler(b *events.Broadcaster) *ActivityHandler { +func NewActivityHandler(b events.EventEmitter) *ActivityHandler { return &ActivityHandler{broadcaster: b} } diff --git a/workspace-server/internal/handlers/delegation.go b/workspace-server/internal/handlers/delegation.go index c277d390c..72b0a89ba 100644 --- a/workspace-server/internal/handlers/delegation.go +++ b/workspace-server/internal/handlers/delegation.go @@ -102,10 +102,10 @@ func pushDelegationResultToInbox(ctx context.Context, sourceID, delegationID, st // and the A2A request runs in the background. type DelegationHandler struct { workspace *WorkspaceHandler - broadcaster *events.Broadcaster + broadcaster events.EventEmitter } -func NewDelegationHandler(wh *WorkspaceHandler, b *events.Broadcaster) *DelegationHandler { +func NewDelegationHandler(wh *WorkspaceHandler, b events.EventEmitter) *DelegationHandler { return &DelegationHandler{workspace: wh, broadcaster: b} } -- 2.52.0 From c408b17477982c1e1abff218718ffa799b0f9de5 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 04:33:28 +0000 Subject: [PATCH 4/9] feat(#2151): add Delegate/Record/UpdateStatus + Activity filter + SessionSearch + Notify tests Fills the remaining CHUNK 1 TODOs with 11 new real-infra integration tests: Activity: - TestIntegration_ActivityList_FilterByType - TestIntegration_SessionSearch_Basic - TestIntegration_SessionSearch_EmptyQuery - TestIntegration_Notify_Basic - TestIntegration_Notify_InvalidAttachment Delegation: - TestIntegration_Delegate_SelfDelegationGuard (400 on self-delegation) - TestIntegration_Delegate_Idempotency (200 with idempotent_hit) - TestIntegration_Delegate_SuccessPath (202 + row exists + async drain) - TestIntegration_Record_Basic (202 + dispatched row) - TestIntegration_UpdateStatus_Completed (200 + completed + delegate_result) - TestIntegration_UpdateStatus_Failed (200 + failed status) Also updates TODO comments to reflect remaining work. Co-Authored-By: Claude Opus 4.7 --- ...ctivity_delegation_a2a_integration_test.go | 371 +++++++++++++++++- 1 file changed, 365 insertions(+), 6 deletions(-) diff --git a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go index e4ea7b84e..2dabe5f58 100644 --- a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go +++ b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go @@ -199,9 +199,124 @@ func TestIntegration_ActivityReport_ValidType(t *testing.T) { } } -// TODO(#2151): Activity List filter matrix (type, source, since_secs, since_id, peer_id, include=peer_info, before_ts) -// TODO(#2151): SessionSearch basic + empty query -// TODO(#2151): Notify with attachments validation +func TestIntegration_ActivityList_FilterByType(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-activity-filter") + seedActivityLog(t, conn, wsID, "agent_log", "method1", "ok", nil, nil) + seedActivityLog(t, conn, wsID, "a2a_receive", "method2", "ok", nil, nil) + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/activity?type=agent_log", nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("List returned %d, want 200", w.Code) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(resp) != 1 { + t.Fatalf("expected 1 activity, got %d", len(resp)) + } + if resp[0]["activity_type"] != "agent_log" { + t.Fatalf("expected agent_log, got %v", resp[0]["activity_type"]) + } +} + +func TestIntegration_SessionSearch_Basic(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-session-search") + seedActivityLog(t, conn, wsID, "agent_log", "test_method", "ok", nil, nil) + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/session?q=test", nil) + h.SessionSearch(c) + + if w.Code != http.StatusOK { + t.Fatalf("SessionSearch returned %d, want 200", w.Code) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(resp) != 1 { + t.Fatalf("expected 1 result, got %d", len(resp)) + } +} + +func TestIntegration_SessionSearch_EmptyQuery(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-session-empty") + seedActivityLog(t, conn, wsID, "agent_log", "method", "ok", nil, nil) + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/session", nil) + h.SessionSearch(c) + + if w.Code != http.StatusOK { + t.Fatalf("SessionSearch returned %d, want 200", w.Code) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(resp) != 1 { + t.Fatalf("expected 1 result, got %d", len(resp)) + } +} + +func TestIntegration_Notify_Basic(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-notify-basic") + + h := NewActivityHandler(noOpEmitter{}) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/notify", strings.NewReader(`{ + "message": "hello user" + }`)) + c.Request.Header.Set("Content-Type", "application/json") + h.Notify(c) + + if w.Code != http.StatusOK { + t.Fatalf("Notify returned %d, want 200", w.Code) + } + var count int + if err := conn.QueryRowContext(context.Background(), `SELECT COUNT(*) FROM activity_logs WHERE workspace_id = $1 AND method = 'notify'`, wsID).Scan(&count); err != nil { + t.Fatalf("count: %v", err) + } + if count != 1 { + t.Fatalf("expected 1 notify row, got %d", count) + } +} + +func TestIntegration_Notify_InvalidAttachment(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-notify-attach") + + h := NewActivityHandler(noOpEmitter{}) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/notify", strings.NewReader(`{ + "message": "hi", + "attachments": [{"uri":"","name":""}] + }`)) + c.Request.Header.Set("Content-Type", "application/json") + h.Notify(c) + + if w.Code != http.StatusBadRequest { + t.Fatalf("Notify with empty attachment returned %d, want 400", w.Code) + } +} + +// TODO(#2151): Activity List full filter matrix (source, since_secs, since_id, peer_id, include=peer_info, before_ts) // ---------- Delegation handler integration tests ---------- @@ -237,9 +352,253 @@ func TestIntegration_DelegationList_Basic(t *testing.T) { } } -// TODO(#2151): Delegate endpoint (idempotency, self-delegation guard, success path) -// TODO(#2151): Record endpoint -// TODO(#2151): UpdateStatus endpoint +func TestIntegration_Delegate_SelfDelegationGuard(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-delegate-self") + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + dh := NewDelegationHandler(wh, noOpEmitter{}) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/delegate", strings.NewReader(`{ + "target_id": "`+wsID+`", + "task": "do something" + }`)) + c.Request.Header.Set("Content-Type", "application/json") + dh.Delegate(c) + + if w.Code != http.StatusBadRequest { + t.Fatalf("Delegate self-delegation returned %d, want 400", w.Code) + } +} + +func TestIntegration_Delegate_Idempotency(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-delegate-idem") + targetID := seedWorkspace(t, conn, "test-2151-delegate-idem-target") + + taskJSON, _ := json.Marshal(map[string]interface{}{"task": "hello", "delegation_id": "del-idem-1"}) + respJSON, _ := json.Marshal(map[string]interface{}{"delegation_id": "del-idem-1"}) + if _, err := conn.ExecContext(context.Background(), ` + INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status, idempotency_key) + VALUES ($1, 'delegation', 'delegate', $1, $2, 'Delegating to test', $3::jsonb, $4::jsonb, 'pending', 'idem-key-delegate') + `, wsID, targetID, string(taskJSON), string(respJSON)); err != nil { + t.Fatalf("seed idempotent delegation: %v", err) + } + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + dh := NewDelegationHandler(wh, noOpEmitter{}) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/delegate", strings.NewReader(`{ + "target_id": "`+targetID+`", + "task": "do something", + "idempotency_key": "idem-key-delegate" + }`)) + c.Request.Header.Set("Content-Type", "application/json") + dh.Delegate(c) + + if w.Code != http.StatusOK { + t.Fatalf("Delegate idempotency returned %d, want 200", w.Code) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if resp["idempotent_hit"] != true { + t.Fatalf("expected idempotent_hit=true, got %v", resp["idempotent_hit"]) + } + if resp["delegation_id"] != "del-idem-1" { + t.Fatalf("expected delegation_id=del-idem-1, got %v", resp["delegation_id"]) + } +} + +func TestIntegration_Delegate_SuccessPath(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-delegate-ok") + targetID := seedWorkspace(t, conn, "test-2151-delegate-ok-target") + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + dh := NewDelegationHandler(wh, noOpEmitter{}) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/delegate", strings.NewReader(`{ + "target_id": "`+targetID+`", + "task": "do something" + }`)) + c.Request.Header.Set("Content-Type", "application/json") + dh.Delegate(c) + + if w.Code != http.StatusAccepted { + t.Fatalf("Delegate returned %d, want 202", w.Code) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + delegationID, ok := resp["delegation_id"].(string) + if !ok || delegationID == "" { + t.Fatal("expected non-empty delegation_id") + } + + // Verify a row exists for this delegation (status may have been updated by the background goroutine) + var count int + if err := conn.QueryRowContext(context.Background(), ` + SELECT COUNT(*) FROM activity_logs + WHERE workspace_id = $1 AND method = 'delegate' AND request_body->>'delegation_id' = $2 + `, wsID, delegationID).Scan(&count); err != nil { + t.Fatalf("select delegation row: %v", err) + } + if count != 1 { + t.Fatalf("expected 1 delegation row, got %d", count) + } + + // Drain the background goroutine so it doesn't race with the next test's db.DB swap + wh.waitAsyncForTest() +} + +func TestIntegration_Record_Basic(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-record-basic") + targetID := seedWorkspace(t, conn, "test-2151-record-target") + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + dh := NewDelegationHandler(wh, noOpEmitter{}) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/delegations/record", strings.NewReader(`{ + "target_id": "`+targetID+`", + "task": "recorded task", + "delegation_id": "del-record-1" + }`)) + c.Request.Header.Set("Content-Type", "application/json") + dh.Record(c) + + if w.Code != http.StatusAccepted { + t.Fatalf("Record returned %d, want 202", w.Code) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if resp["status"] != "recorded" { + t.Fatalf("expected status recorded, got %v", resp["status"]) + } + + var status string + if err := conn.QueryRowContext(context.Background(), ` + SELECT status FROM activity_logs + WHERE workspace_id = $1 AND method = 'delegate' AND request_body->>'delegation_id' = 'del-record-1' + `, wsID).Scan(&status); err != nil { + t.Fatalf("select record row: %v", err) + } + if status != "dispatched" { + t.Fatalf("expected status dispatched, got %s", status) + } +} + +func TestIntegration_UpdateStatus_Completed(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-update-completed") + targetID := seedWorkspace(t, conn, "test-2151-update-completed-target") + + taskJSON, _ := json.Marshal(map[string]interface{}{"task": "hello", "delegation_id": "del-update-1"}) + respJSON, _ := json.Marshal(map[string]interface{}{"delegation_id": "del-update-1"}) + if _, err := conn.ExecContext(context.Background(), ` + INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status) + VALUES ($1, 'delegation', 'delegate', $1, $2, 'Delegating to test', $3::jsonb, $4::jsonb, 'dispatched') + `, wsID, targetID, string(taskJSON), string(respJSON)); err != nil { + t.Fatalf("seed delegation: %v", err) + } + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + dh := NewDelegationHandler(wh, noOpEmitter{}) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}, {Key: "delegation_id", Value: "del-update-1"}} + c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/delegations/del-update-1/update", strings.NewReader(`{ + "status": "completed", + "response_preview": "done" + }`)) + c.Request.Header.Set("Content-Type", "application/json") + dh.UpdateStatus(c) + + if w.Code != http.StatusOK { + t.Fatalf("UpdateStatus returned %d, want 200", w.Code) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if resp["status"] != "completed" { + t.Fatalf("expected status completed, got %v", resp["status"]) + } + + var status string + if err := conn.QueryRowContext(context.Background(), ` + SELECT status FROM activity_logs + WHERE workspace_id = $1 AND method = 'delegate' AND request_body->>'delegation_id' = 'del-update-1' + `, wsID).Scan(&status); err != nil { + t.Fatalf("select: %v", err) + } + if status != "completed" { + t.Fatalf("expected status completed, got %s", status) + } + + var count int + if err := conn.QueryRowContext(context.Background(), ` + SELECT COUNT(*) FROM activity_logs + WHERE workspace_id = $1 AND method = 'delegate_result' AND status = 'completed' + `, wsID).Scan(&count); err != nil { + t.Fatalf("count: %v", err) + } + if count != 1 { + t.Fatalf("expected 1 delegate_result row, got %d", count) + } +} + +func TestIntegration_UpdateStatus_Failed(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-update-failed") + targetID := seedWorkspace(t, conn, "test-2151-update-failed-target") + + taskJSON, _ := json.Marshal(map[string]interface{}{"task": "hello", "delegation_id": "del-update-fail-1"}) + respJSON, _ := json.Marshal(map[string]interface{}{"delegation_id": "del-update-fail-1"}) + if _, err := conn.ExecContext(context.Background(), ` + INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, response_body, status) + VALUES ($1, 'delegation', 'delegate', $1, $2, 'Delegating to test', $3::jsonb, $4::jsonb, 'dispatched') + `, wsID, targetID, string(taskJSON), string(respJSON)); err != nil { + t.Fatalf("seed delegation: %v", err) + } + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + dh := NewDelegationHandler(wh, noOpEmitter{}) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}, {Key: "delegation_id", Value: "del-update-fail-1"}} + c.Request = httptest.NewRequest(http.MethodPost, "/workspaces/"+wsID+"/delegations/del-update-fail-1/update", strings.NewReader(`{ + "status": "failed", + "error": "something went wrong" + }`)) + c.Request.Header.Set("Content-Type", "application/json") + dh.UpdateStatus(c) + + if w.Code != http.StatusOK { + t.Fatalf("UpdateStatus returned %d, want 200", w.Code) + } + + var status string + if err := conn.QueryRowContext(context.Background(), ` + SELECT status FROM activity_logs + WHERE workspace_id = $1 AND method = 'delegate' AND request_body->>'delegation_id' = 'del-update-fail-1' + `, wsID).Scan(&status); err != nil { + t.Fatalf("select: %v", err) + } + if status != "failed" { + t.Fatalf("expected status failed, got %s", status) + } +} + +// TODO(#2151): Activity List remaining filters (source, since_secs, since_id, peer_id, include=peer_info, before_ts) +// TODO(#2151): A2A Queue Status endpoint (auth rules, 404 vs 403, response body inclusion) // ---------- A2A Queue integration tests ---------- -- 2.52.0 From af05bf4f77b2e60d07b3ea6aa38826cb0c03a91a Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 05:24:23 +0000 Subject: [PATCH 5/9] =?UTF-8?q?feat(#2151):=20CHUNK=202=20=E2=80=94=20Acti?= =?UTF-8?q?vity=20List=20filter=20matrix=20+=20A2A=20Queue=20Status=20auth?= =?UTF-8?q?=20rules?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add 11 Activity List filter integration tests: source=canvas/agent, invalid source, since_secs, since_id cursor, since_id 410 Gone, peer_id, invalid peer_id, include=peer_info, before_ts, invalid before_ts - Fix missing net/url import for url.QueryEscape in before_ts test - Fix QueueDepth() call to inline SQL (dead-code removal on main) - Add 6 A2A Queue Status endpoint auth integration tests: caller matches caller_id, caller matches workspace_id, org token bypass, mismatched caller → 404, missing identity → 404, non-existent queue_id → 404, response excludes body/caller_id - Remove all TODO(#2151) markers Co-Authored-By: Claude Opus 4.7 --- ...ctivity_delegation_a2a_integration_test.go | 401 +++++++++++++++++- 1 file changed, 393 insertions(+), 8 deletions(-) diff --git a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go index 2dabe5f58..5465353dd 100644 --- a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go +++ b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go @@ -25,6 +25,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "net/url" "os" "strings" "testing" @@ -316,7 +317,263 @@ func TestIntegration_Notify_InvalidAttachment(t *testing.T) { } } -// TODO(#2151): Activity List full filter matrix (source, since_secs, since_id, peer_id, include=peer_info, before_ts) +func TestIntegration_ActivityList_FilterBySourceCanvas(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-source-canvas") + seedActivityLog(t, conn, wsID, "agent_log", "m1", "ok", nil, nil) // canvas (source_id IS NULL) + peerID := seedWorkspace(t, conn, "test-2151-peer-canvas") + seedActivityLog(t, conn, wsID, "a2a_receive", "m2", "ok", &peerID, nil) // agent + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/activity?source=canvas", nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("List returned %d, want 200", w.Code) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(resp) != 1 { + t.Fatalf("expected 1 canvas row, got %d", len(resp)) + } + if resp[0]["method"] != "m1" { + t.Fatalf("expected method m1, got %v", resp[0]["method"]) + } +} + +func TestIntegration_ActivityList_FilterBySourceAgent(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-source-agent") + seedActivityLog(t, conn, wsID, "agent_log", "m1", "ok", nil, nil) + peerID := seedWorkspace(t, conn, "test-2151-peer-agent") + seedActivityLog(t, conn, wsID, "a2a_receive", "m2", "ok", &peerID, nil) + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/activity?source=agent", nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("List returned %d, want 200", w.Code) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(resp) != 1 { + t.Fatalf("expected 1 agent row, got %d", len(resp)) + } + if resp[0]["method"] != "m2" { + t.Fatalf("expected method m2, got %v", resp[0]["method"]) + } +} + +func TestIntegration_ActivityList_InvalidSource(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-invalid-source") + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/activity?source=invalid", nil) + h.List(c) + + if w.Code != http.StatusBadRequest { + t.Fatalf("List returned %d, want 400", w.Code) + } +} + +func TestIntegration_ActivityList_SinceSecs(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-since-secs") + seedActivityLog(t, conn, wsID, "agent_log", "old", "ok", nil, nil) + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/activity?since_secs=1", nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("List returned %d, want 200", w.Code) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + // The seed row was inserted just now, so it should be within 1 second. + if len(resp) != 1 { + t.Fatalf("expected 1 row within 1s, got %d", len(resp)) + } +} + +func TestIntegration_ActivityList_SinceIDCursor(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-since-id") + id1 := seedActivityLog(t, conn, wsID, "agent_log", "first", "ok", nil, nil) + seedActivityLog(t, conn, wsID, "agent_log", "second", "ok", nil, nil) + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/activity?since_id="+id1, nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("List returned %d, want 200", w.Code) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(resp) != 1 { + t.Fatalf("expected 1 row after cursor, got %d", len(resp)) + } + if resp[0]["method"] != "second" { + t.Fatalf("expected method 'second', got %v", resp[0]["method"]) + } +} + +func TestIntegration_ActivityList_SinceIDCursorGone(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-since-id-gone") + otherWS := seedWorkspace(t, conn, "test-2151-other-cursor") + cursorID := seedActivityLog(t, conn, otherWS, "agent_log", "cursor", "ok", nil, nil) + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/activity?since_id="+cursorID, nil) + h.List(c) + + if w.Code != http.StatusGone { + t.Fatalf("List returned %d, want 410", w.Code) + } +} + +func TestIntegration_ActivityList_PeerID(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-peer-id") + peerID := seedWorkspace(t, conn, "test-2151-peer-target") + seedActivityLog(t, conn, wsID, "agent_log", "no-peer", "ok", nil, nil) + seedActivityLog(t, conn, wsID, "a2a_send", "peer-source", "ok", &peerID, nil) + seedActivityLog(t, conn, wsID, "a2a_receive", "peer-target", "ok", nil, &peerID) + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/activity?peer_id="+peerID, nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("List returned %d, want 200", w.Code) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(resp) != 2 { + t.Fatalf("expected 2 peer rows, got %d", len(resp)) + } +} + +func TestIntegration_ActivityList_InvalidPeerID(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-invalid-peer") + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/activity?peer_id=not-a-uuid", nil) + h.List(c) + + if w.Code != http.StatusBadRequest { + t.Fatalf("List returned %d, want 400", w.Code) + } +} + +func TestIntegration_ActivityList_IncludePeerInfo(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-include-peer") + peerID := seedWorkspace(t, conn, "test-2151-peer-enriched") + seedActivityLog(t, conn, wsID, "a2a_receive", "m1", "ok", &peerID, nil) + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/activity?include=peer_info", nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("List returned %d, want 200", w.Code) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(resp) != 1 { + t.Fatalf("expected 1 row, got %d", len(resp)) + } + if _, ok := resp[0]["peer_name"]; !ok { + t.Fatalf("expected peer_name in response") + } + if _, ok := resp[0]["peer_role"]; !ok { + t.Fatalf("expected peer_role in response") + } +} + +func TestIntegration_ActivityList_BeforeTS(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-before-ts") + seedActivityLog(t, conn, wsID, "agent_log", "old", "ok", nil, nil) + + // Wait a tiny bit so the next row has a different timestamp + time.Sleep(50 * time.Millisecond) + seedActivityLog(t, conn, wsID, "agent_log", "new", "ok", nil, nil) + + // Use the current time as before_ts — only the "old" row should appear + beforeTS := time.Now().UTC().Add(-10 * time.Millisecond).Format(time.RFC3339) + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/activity?before_ts="+url.QueryEscape(beforeTS), nil) + h.List(c) + + if w.Code != http.StatusOK { + t.Fatalf("List returned %d, want 200", w.Code) + } + var resp []map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if len(resp) != 1 { + t.Fatalf("expected 1 row before ts, got %d", len(resp)) + } + if resp[0]["method"] != "old" { + t.Fatalf("expected method 'old', got %v", resp[0]["method"]) + } +} + +func TestIntegration_ActivityList_InvalidBeforeTS(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-invalid-before") + + h := NewActivityHandler(nil) + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest(http.MethodGet, "/workspaces/"+wsID+"/activity?before_ts=not-a-timestamp", nil) + h.List(c) + + if w.Code != http.StatusBadRequest { + t.Fatalf("List returned %d, want 400", w.Code) + } +} // ---------- Delegation handler integration tests ---------- @@ -597,9 +854,6 @@ func TestIntegration_UpdateStatus_Failed(t *testing.T) { } } -// TODO(#2151): Activity List remaining filters (source, since_secs, since_id, peer_id, include=peer_info, before_ts) -// TODO(#2151): A2A Queue Status endpoint (auth rules, 404 vs 403, response body inclusion) - // ---------- A2A Queue integration tests ---------- func TestIntegration_A2AQueue_EnqueueAndDepth(t *testing.T) { @@ -628,10 +882,13 @@ func TestIntegration_A2AQueue_EnqueueAndDepth(t *testing.T) { t.Fatalf("expected status queued, got %s", status) } - // Verify depth - d := QueueDepth(context.Background(), wsID) + // Verify depth via direct query (QueueDepth helper removed in dead-code sweep) + var d int + if err := conn.QueryRowContext(context.Background(), `SELECT COUNT(*) FROM a2a_queue WHERE workspace_id = $1 AND status = 'queued'`, wsID).Scan(&d); err != nil { + t.Fatalf("depth count: %v", err) + } if d != 1 { - t.Fatalf("expected QueueDepth 1, got %d", d) + t.Fatalf("expected queue depth 1, got %d", d) } } @@ -748,4 +1005,132 @@ func TestIntegration_A2AQueue_DropStaleQueueItems(t *testing.T) { } } -// TODO(#2151): A2A Queue Status endpoint (auth rules, 404 vs 403, response body inclusion) +// ---------- A2A Queue Status endpoint integration tests ---------- + +func TestIntegration_A2AQueueStatus_CallerMatchesCallerID(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-status-caller-ws") + callerID := seedWorkspace(t, conn, "test-2151-status-caller") + body := []byte(`{"test":true}`) + qid := seedA2AQueueItem(t, conn, wsID, callerID, PriorityTask, body, "queued") + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}, {Key: "queue_id", Value: qid}} + c.Request.Header.Set("X-Workspace-ID", callerID) + wh.GetA2AQueueStatus(c) + + if w.Code != http.StatusOK { + t.Fatalf("GetA2AQueueStatus returned %d, want 200", w.Code) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if resp["queue_id"] != qid { + t.Fatalf("expected queue_id %s, got %v", qid, resp["queue_id"]) + } + // Verify sensitive fields excluded + if _, ok := resp["body"]; ok { + t.Fatal("response should not include body") + } + if _, ok := resp["caller_id"]; ok { + t.Fatal("response should not include caller_id") + } +} + +func TestIntegration_A2AQueueStatus_CallerMatchesWorkspaceID(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-status-ws") + callerID := seedWorkspace(t, conn, "test-2151-status-ws-caller") + body := []byte(`{"test":true}`) + qid := seedA2AQueueItem(t, conn, wsID, callerID, PriorityTask, body, "queued") + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}, {Key: "queue_id", Value: qid}} + c.Request.Header.Set("X-Workspace-ID", wsID) + wh.GetA2AQueueStatus(c) + + if w.Code != http.StatusOK { + t.Fatalf("GetA2AQueueStatus returned %d, want 200", w.Code) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if resp["queue_id"] != qid { + t.Fatalf("expected queue_id %s, got %v", qid, resp["queue_id"]) + } +} + +func TestIntegration_A2AQueueStatus_OrgTokenBypass(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-status-org") + callerID := seedWorkspace(t, conn, "test-2151-status-org-caller") + body := []byte(`{"test":true}`) + qid := seedA2AQueueItem(t, conn, wsID, callerID, PriorityTask, body, "queued") + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}, {Key: "queue_id", Value: qid}} + c.Set("org_token_id", "test-org-token") + wh.GetA2AQueueStatus(c) + + if w.Code != http.StatusOK { + t.Fatalf("GetA2AQueueStatus returned %d, want 200", w.Code) + } +} + +func TestIntegration_A2AQueueStatus_MismatchedCaller(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-status-mismatch-ws") + callerID := seedWorkspace(t, conn, "test-2151-status-mismatch-caller") + otherWS := seedWorkspace(t, conn, "test-2151-status-mismatch-other") + body := []byte(`{"test":true}`) + qid := seedA2AQueueItem(t, conn, wsID, callerID, PriorityTask, body, "queued") + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}, {Key: "queue_id", Value: qid}} + c.Request.Header.Set("X-Workspace-ID", otherWS) + wh.GetA2AQueueStatus(c) + + if w.Code != http.StatusNotFound { + t.Fatalf("GetA2AQueueStatus returned %d, want 404", w.Code) + } +} + +func TestIntegration_A2AQueueStatus_MissingIdentity(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-status-no-id") + callerID := seedWorkspace(t, conn, "test-2151-status-no-id-caller") + body := []byte(`{"test":true}`) + qid := seedA2AQueueItem(t, conn, wsID, callerID, PriorityTask, body, "queued") + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}, {Key: "queue_id", Value: qid}} + // No X-Workspace-ID, no org_token_id + wh.GetA2AQueueStatus(c) + + if w.Code != http.StatusNotFound { + t.Fatalf("GetA2AQueueStatus returned %d, want 404", w.Code) + } +} + +func TestIntegration_A2AQueueStatus_NonExistentQueueID(t *testing.T) { + conn := integrationDB_ActivityDelegationA2A(t) + wsID := seedWorkspace(t, conn, "test-2151-status-missing") + callerID := seedWorkspace(t, conn, "test-2151-status-missing-caller") + + wh := &WorkspaceHandler{broadcaster: noOpEmitter{}} + c, w := newTestGinContext() + c.Params = gin.Params{{Key: "id", Value: wsID}, {Key: "queue_id", Value: uuid.New().String()}} + c.Request.Header.Set("X-Workspace-ID", callerID) + wh.GetA2AQueueStatus(c) + + if w.Code != http.StatusNotFound { + t.Fatalf("GetA2AQueueStatus returned %d, want 404", w.Code) + } +} -- 2.52.0 From 6f2274d62b8407655a4c59e9675e0da81e17c9ce Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 08:17:01 +0000 Subject: [PATCH 6/9] test(integration): fail-closed on missing INTEGRATION_DB_URL (#2151) Replaces t.Skip with t.Fatal in the integration helper so that a missing INTEGRATION_DB_URL env var surfaces as a hard failure rather than a silent skip. The skip pattern is a fail-open dark-wedge: CI could misconfigure the env, every test skips, and the gate reports GREEN while exercising zero code. Co-Authored-By: Claude Opus 4.7 --- .../handlers/activity_delegation_a2a_integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go index 5465353dd..34cf17521 100644 --- a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go +++ b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go @@ -38,7 +38,7 @@ import ( ) // integrationDB_ActivityDelegationA2A opens a connection from -// $INTEGRATION_DB_URL (skipping the test if unset), wipes the tables +// $INTEGRATION_DB_URL (failing the test if unset), wipes the tables // used by these tests, hot-swaps the package-level db.DB, and registers // a Cleanup that restores the previous db.DB + closes the connection. // @@ -47,7 +47,7 @@ func integrationDB_ActivityDelegationA2A(t *testing.T) *sql.DB { t.Helper() url := os.Getenv("INTEGRATION_DB_URL") if url == "" { - t.Skip("INTEGRATION_DB_URL not set; skipping (local devs: see file header)") + t.Fatal("INTEGRATION_DB_URL not set; failing (local devs: see file header)") } conn, err := sql.Open("postgres", url) if err != nil { -- 2.52.0 From b8982ac51712ea0645735298de6cced10b49d56a Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 11:46:40 +0000 Subject: [PATCH 7/9] test(activity): attach dummy request in newTestGinContext to prevent nil panic TestIntegration_ActivityList_Basic panicked with a nil pointer dereference at activity.go:512 because gin.CreateTestContext returns a context with c.Request == nil, and List() calls c.Request.Context(). Add a dummy httptest.NewRequest to newTestGinContext() so every test that uses the helper has a non-nil request. Relates to #2151. Co-Authored-By: Claude Opus 4.7 --- .../activity_delegation_a2a_integration_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go index 34cf17521..0623bce2d 100644 --- a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go +++ b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go @@ -134,6 +134,7 @@ func newTestGinContext() (*gin.Context, *httptest.ResponseRecorder) { gin.SetMode(gin.TestMode) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest(http.MethodGet, "/", nil) return c, w } @@ -532,13 +533,12 @@ func TestIntegration_ActivityList_BeforeTS(t *testing.T) { wsID := seedWorkspace(t, conn, "test-2151-before-ts") seedActivityLog(t, conn, wsID, "agent_log", "old", "ok", nil, nil) - // Wait a tiny bit so the next row has a different timestamp + // Capture a timestamp between the two rows so only "old" is before it. + time.Sleep(50 * time.Millisecond) + beforeTS := time.Now().UTC().Format(time.RFC3339) time.Sleep(50 * time.Millisecond) seedActivityLog(t, conn, wsID, "agent_log", "new", "ok", nil, nil) - // Use the current time as before_ts — only the "old" row should appear - beforeTS := time.Now().UTC().Add(-10 * time.Millisecond).Format(time.RFC3339) - h := NewActivityHandler(nil) c, w := newTestGinContext() c.Params = gin.Params{{Key: "id", Value: wsID}} @@ -896,7 +896,7 @@ func TestIntegration_A2AQueue_DequeueNext(t *testing.T) { conn := integrationDB_ActivityDelegationA2A(t) wsID := seedWorkspace(t, conn, "test-2151-a2a-dequeue") body := []byte(`{"test":true}`) - seedA2AQueueItem(t, conn, wsID, "", PriorityTask, body, "queued") + seedA2AQueueItem(t, conn, wsID, "00000000-0000-0000-0000-000000000001", PriorityTask, body, "queued") item, err := DequeueNext(context.Background(), wsID) if err != nil { @@ -946,7 +946,7 @@ func TestIntegration_A2AQueue_MarkCompletedAndFailed(t *testing.T) { conn := integrationDB_ActivityDelegationA2A(t) wsID := seedWorkspace(t, conn, "test-2151-a2a-lifecycle") body := []byte(`{"test":true}`) - qid := seedA2AQueueItem(t, conn, wsID, "", PriorityTask, body, "dispatched") + qid := seedA2AQueueItem(t, conn, wsID, "00000000-0000-0000-0000-000000000001", PriorityTask, body, "dispatched") MarkQueueItemCompleted(context.Background(), qid) var status string -- 2.52.0 From b4b8f4f8c657d543907e36c4b903f4b972fccffa Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 12:12:22 +0000 Subject: [PATCH 8/9] test(integration): fix ActivityList_IncludePeerInfo + A2AQueue_MarkCompletedAndFailed - Set peer role on seeded workspace so peer_role is populated in ?include=peer_info response (handler omits empty peer fields). - Use valid UUID instead of empty string for caller_id in seedA2AQueueItem to satisfy UUID column constraint. Co-Authored-By: Claude Opus 4.7 --- .../handlers/activity_delegation_a2a_integration_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go index 0623bce2d..ba4a35973 100644 --- a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go +++ b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go @@ -502,6 +502,9 @@ func TestIntegration_ActivityList_IncludePeerInfo(t *testing.T) { conn := integrationDB_ActivityDelegationA2A(t) wsID := seedWorkspace(t, conn, "test-2151-include-peer") peerID := seedWorkspace(t, conn, "test-2151-peer-enriched") + if _, err := conn.ExecContext(context.Background(), `UPDATE workspaces SET role = $1 WHERE id = $2`, "test-peer", peerID); err != nil { + t.Fatalf("set peer role: %v", err) + } seedActivityLog(t, conn, wsID, "a2a_receive", "m1", "ok", &peerID, nil) h := NewActivityHandler(nil) @@ -958,7 +961,7 @@ func TestIntegration_A2AQueue_MarkCompletedAndFailed(t *testing.T) { } // Seed another item to test failed path with max attempts - qid2 := seedA2AQueueItem(t, conn, wsID, "", PriorityTask, body, "dispatched") + qid2 := seedA2AQueueItem(t, conn, wsID, "00000000-0000-0000-0000-000000000001", PriorityTask, body, "dispatched") for i := 0; i < 6; i++ { MarkQueueItemFailed(context.Background(), qid2, "transient error") } -- 2.52.0 From 1e6b61ad3eb4602d2ec1d3fa1c2e0f79fbd30b31 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 12:18:31 +0000 Subject: [PATCH 9/9] test(integration): fix BeforeTS timing + A2AQueue max-attempts seeding - Use RFC3339Nano + 200ms gaps in BeforeTS test to avoid second- truncation and Go/Postgres clock skew. - Pre-set attempts=5 on seeded A2A queue item so MarkQueueItemFailed transitions to 'failed' on first call (attempts are normally incremented by DequeueNext, which the test bypasses). Co-Authored-By: Claude Opus 4.7 --- .../activity_delegation_a2a_integration_test.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go index ba4a35973..48b2bd68a 100644 --- a/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go +++ b/workspace-server/internal/handlers/activity_delegation_a2a_integration_test.go @@ -537,9 +537,11 @@ func TestIntegration_ActivityList_BeforeTS(t *testing.T) { seedActivityLog(t, conn, wsID, "agent_log", "old", "ok", nil, nil) // Capture a timestamp between the two rows so only "old" is before it. - time.Sleep(50 * time.Millisecond) - beforeTS := time.Now().UTC().Format(time.RFC3339) - time.Sleep(50 * time.Millisecond) + // Use nanosecond precision and a generous gap to avoid second-truncation + // or clock-skew between Go time.Now() and Postgres now(). + time.Sleep(200 * time.Millisecond) + beforeTS := time.Now().UTC().Format(time.RFC3339Nano) + time.Sleep(200 * time.Millisecond) seedActivityLog(t, conn, wsID, "agent_log", "new", "ok", nil, nil) h := NewActivityHandler(nil) @@ -960,8 +962,14 @@ func TestIntegration_A2AQueue_MarkCompletedAndFailed(t *testing.T) { t.Fatalf("expected completed, got %s", status) } - // Seed another item to test failed path with max attempts + // Seed another item to test failed path with max attempts. + // Pre-set attempts=5 so the first MarkQueueItemFailed sees attempts>=5 + // and transitions straight to failed (MarkQueueItemFailed increments + // attempts on dispatch via DequeueNext, but we call it directly here). qid2 := seedA2AQueueItem(t, conn, wsID, "00000000-0000-0000-0000-000000000001", PriorityTask, body, "dispatched") + if _, err := conn.ExecContext(context.Background(), `UPDATE a2a_queue SET attempts = 5 WHERE id = $1`, qid2); err != nil { + t.Fatalf("set attempts: %v", err) + } for i := 0; i < 6; i++ { MarkQueueItemFailed(context.Background(), qid2, "transient error") } -- 2.52.0