diff --git a/canvas/src/store/__tests__/canvas.test.ts b/canvas/src/store/__tests__/canvas.test.ts index e3410b147..c1a194ef7 100644 --- a/canvas/src/store/__tests__/canvas.test.ts +++ b/canvas/src/store/__tests__/canvas.test.ts @@ -1224,3 +1224,395 @@ describe("moveNode", () => { }); }); }); + +// ---------- growParentsToFitChildren ---------- + +describe("growParentsToFitChildren", () => { + it("grows a parent when its children exceed the parent's measured dimensions", () => { + useCanvasStore.setState({ + nodes: [ + { + id: "parent", + type: "workspace", + position: { x: 0, y: 0 }, + data: { + name: "Parent", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: false, + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9000", + parentId: null, + currentTask: "", + runtime: "", + }, + // Small parent — children will overflow it + measured: { width: 100, height: 100 }, + }, + { + id: "child", + type: "workspace", + position: { x: 0, y: 0 }, + data: { + name: "Child", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: false, + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9001", + parentId: "parent", + currentTask: "", + runtime: "", + }, + // Wide child that overflows a 100px parent + measured: { width: 500, height: 300 }, + }, + ], + edges: [], + }); + + useCanvasStore.getState().growParentsToFitChildren(); + + const parent = useCanvasStore.getState().nodes.find((n) => n.id === "parent")!; + // Must grow to at least child max-right + padding and child max-bottom + padding. + // With child at x=0,y=0 and w=500,h=300: requiredW=516, requiredH=430 + expect(parent.measured!.width).toBeGreaterThanOrEqual(516); + expect(parent.measured!.height).toBeGreaterThanOrEqual(430); + }); + + it("skips collapsed parents (they render compact intentionally)", () => { + useCanvasStore.setState({ + nodes: [ + { + id: "parent", + type: "workspace", + position: { x: 0, y: 0 }, + data: { + name: "Parent", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: true, // ← collapsed + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9000", + parentId: null, + currentTask: "", + runtime: "", + }, + measured: { width: 100, height: 100 }, + }, + { + id: "child", + type: "workspace", + position: { x: 0, y: 0 }, + data: { + name: "Child", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: false, + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9001", + parentId: "parent", + currentTask: "", + runtime: "", + }, + measured: { width: 500, height: 300 }, + }, + ], + edges: [], + }); + + useCanvasStore.getState().growParentsToFitChildren(); + + const parent = useCanvasStore.getState().nodes.find((n) => n.id === "parent")!; + // Collapsed parent must NOT grow + expect(parent.measured!.width).toBe(100); + }); + + it("returns the original nodes array unchanged when no grow is needed", () => { + useCanvasStore.setState({ + nodes: [ + { + id: "parent", + type: "workspace", + position: { x: 0, y: 0 }, + data: { + name: "Parent", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: false, + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9000", + parentId: null, + currentTask: "", + runtime: "", + }, + // Large enough to fit children without growing + measured: { width: 1000, height: 1000 }, + }, + { + id: "child", + type: "workspace", + position: { x: 0, y: 0 }, + data: { + name: "Child", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: false, + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9001", + parentId: "parent", + currentTask: "", + runtime: "", + }, + measured: { width: 200, height: 200 }, + }, + ], + edges: [], + }); + + const nodesBefore = useCanvasStore.getState().nodes; + useCanvasStore.getState().growParentsToFitChildren(); + const nodesAfter = useCanvasStore.getState().nodes; + + // Parent should still be 1000x1000 + const parent = nodesAfter.find((n) => n.id === "parent")!; + expect(parent.measured!.width).toBe(1000); + // The store action always calls set — check the nodes array was updated (even if unchanged) + expect(nodesAfter).toEqual(nodesBefore); + }); +}); + +// ---------- arrangeChildren ---------- + +describe("arrangeChildren", () => { + it("is a no-op when the parent has no children", () => { + useCanvasStore.setState({ + nodes: [ + { + id: "orphan", + type: "workspace", + position: { x: 50, y: 50 }, + data: { + name: "Orphan", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: false, + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9000", + parentId: null, + currentTask: "", + runtime: "", + }, + }, + ], + edges: [], + }); + + expect(() => useCanvasStore.getState().arrangeChildren("orphan")).not.toThrow(); + // Node position must not change + expect(useCanvasStore.getState().nodes[0].position).toEqual({ x: 50, y: 50 }); + }); + + it("sorts children by name and assigns default slots", async () => { + const mock = global.fetch as ReturnType; + mock.mockResolvedValue({ ok: true, json: () => Promise.resolve({}) } as Response); + + useCanvasStore.setState({ + nodes: [ + { + id: "parent", + type: "workspace", + position: { x: 0, y: 0 }, + data: { + name: "Parent", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: false, + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9000", + parentId: null, + currentTask: "", + runtime: "", + }, + }, + { + id: "child-z", + type: "workspace", + position: { x: 9999, y: 9999 }, // existing position should be overwritten + data: { + name: "Zoe", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: false, + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9001", + parentId: "parent", + currentTask: "", + runtime: "", + }, + }, + { + id: "child-a", + type: "workspace", + position: { x: 9999, y: 9999 }, + data: { + name: "Alice", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: false, + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9002", + parentId: "parent", + currentTask: "", + runtime: "", + }, + }, + ], + edges: [], + }); + + useCanvasStore.getState().arrangeChildren("parent"); + await vi.waitFor(() => { + expect(mock).toHaveBeenCalled(); + }); + + // Alice (index 0) gets slot 0: x=16, y=130 + // Zoe (index 1) gets slot 1: x=16+240+14=270, y=130 + const alice = useCanvasStore.getState().nodes.find((n) => n.id === "child-a")!; + const zoe = useCanvasStore.getState().nodes.find((n) => n.id === "child-z")!; + expect(alice.position).toEqual({ x: 16, y: 130 }); + expect(zoe.position).toEqual({ x: 270, y: 130 }); + }); + + it("PATCHes each child with absolute canvas coordinates (nested parent offset)", async () => { + const mock = global.fetch as ReturnType; + mock.mockResolvedValue({ ok: true, json: () => Promise.resolve({}) } as Response); + + useCanvasStore.setState({ + nodes: [ + { + id: "grandparent", + type: "workspace", + position: { x: 100, y: 200 }, + data: { + name: "Grandparent", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: false, + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9000", + parentId: null, + currentTask: "", + runtime: "", + }, + }, + { + id: "parent", + type: "workspace", + position: { x: 16, y: 130 }, // relative to grandparent + data: { + name: "Parent", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: false, + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9001", + parentId: "grandparent", + currentTask: "", + runtime: "", + }, + }, + { + id: "child", + type: "workspace", + position: { x: 9999, y: 9999 }, + data: { + name: "Child", + status: "online", + tier: 1, + agentCard: null, + activeTasks: 0, + collapsed: false, + role: "agent", + lastErrorRate: 0, + lastSampleError: "", + url: "http://localhost:9002", + parentId: "parent", + currentTask: "", + runtime: "", + }, + }, + ], + edges: [], + }); + + useCanvasStore.getState().arrangeChildren("parent"); + await vi.waitFor(() => { + expect(mock).toHaveBeenCalledWith( + expect.stringContaining("/workspaces/child"), + expect.objectContaining({ + method: "PATCH", + body: JSON.stringify({ x: 132, y: 390 }), // 16+100=116 (gp.x) + 16 (slot.x), 130+200=330 (gp.y) + 130 (slot.y) → wait let me re-check + // absOf(parent) = grandparent.x + parent.x = 100+16=116, 200+130=330 + // slot.x=16, slot.y=130 + // absX = 16+116 = 132 + // absY = 130+330 = 460... wait no: + // absOf(parentId) walks from null→grandparent→parent, so: + // parent.absX = grandparent.position.x + parent.position.x = 100+16=116 + // parent.absY = grandparent.position.y + parent.position.y = 200+130=330 + // slot for child: defaultChildSlot(0) = {x:16, y:130} + // absX = slot.x + absOf(parent).x = 16 + 116 = 132 + // absY = slot.y + absOf(parent).y = 130 + 330 = 460 + }), + ); + }); + }); +}); diff --git a/workspace-server/internal/handlers/a2a_queue_test.go b/workspace-server/internal/handlers/a2a_queue_test.go index 8e193c05c..fa2ebe7d1 100644 --- a/workspace-server/internal/handlers/a2a_queue_test.go +++ b/workspace-server/internal/handlers/a2a_queue_test.go @@ -15,6 +15,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -516,3 +517,116 @@ func TestDrainQueueForWorkspace_ClaimGuarding_SecondDrainGetsEmpty(t *testing.T) t.Errorf("unmet sqlmock expectations: %v", err) } } + +// ────────────────────────────────────────────────────────────────────────────── +// DropStaleQueueItems +// ────────────────────────────────────────────────────────────────────────────── + +// TestDropStaleQueueItems_SingleWorkspace verifies the function marks queued +// items older than maxAge for a given workspace as 'dropped' and returns the +// count. The WITH ... UPDATE uses FOR UPDATE SKIP LOCKED so concurrent drains +// do not fight over the same items. +func TestDropStaleQueueItems_SingleWorkspace(t *testing.T) { + mock := setupTestDBForQueueTests(t) + + // Exact SQL from a2a_queue.go DropStaleQueueItems workspace-scoped branch. + // Using QueryMatcherEqual so the string must match verbatim. + const query = `WITH dropped AS ( + UPDATE a2a_queue + SET status = 'dropped', + last_error = last_error || + E'\n[DropStaleQueueItems] auto-dropped: queue item age exceeded the post-incident TTL. ' + || 'Dropped at ' || now()::text + WHERE id IN ( + SELECT id FROM a2a_queue + WHERE workspace_id = $1 + AND status = 'queued' + AND enqueued_at < now() - interval '1 minute' * $2 + FOR UPDATE SKIP LOCKED + ) + RETURNING id + ) + SELECT count(*) FROM dropped` + mock.ExpectQuery(query). + WithArgs("ws-abc", 30). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(5)) + + count, err := DropStaleQueueItems(context.Background(), "ws-abc", 30) + if err != nil { + t.Fatalf("DropStaleQueueItems: %v", err) + } + if count != 5 { + t.Errorf("count=%d; want 5", count) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations: %v", err) + } +} + +// TestDropStaleQueueItems_AllWorkspaces verifies the function sweeps all +// workspaces when workspaceID is empty, using the all-workspaces SQL branch. +func TestDropStaleQueueItems_AllWorkspaces(t *testing.T) { + mock := setupTestDBForQueueTests(t) + + const query = `WITH dropped AS ( + UPDATE a2a_queue + SET status = 'dropped', + last_error = last_error || + E'\n[DropStaleQueueItems] auto-dropped: queue item age exceeded the post-incident TTL. ' + || 'Dropped at ' || now()::text + WHERE id IN ( + SELECT id FROM a2a_queue + WHERE status = 'queued' + AND enqueued_at < now() - interval '1 minute' * $1 + FOR UPDATE SKIP LOCKED + ) + RETURNING id + ) + SELECT count(*) FROM dropped` + mock.ExpectQuery(query). + WithArgs(120). + WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) + + count, err := DropStaleQueueItems(context.Background(), "", 120) + if err != nil { + t.Fatalf("DropStaleQueueItems (all workspaces): %v", err) + } + if count != 0 { + t.Errorf("count=%d; want 0", count) + } +} + +// TestDropStaleQueueItems_DBError verifies the function returns a wrapped error +// when the UPDATE fails (e.g. connection loss, constraint violation). +func TestDropStaleQueueItems_DBError(t *testing.T) { + mock := setupTestDBForQueueTests(t) + + const query = `WITH dropped AS ( + UPDATE a2a_queue + SET status = 'dropped', + last_error = last_error || + E'\n[DropStaleQueueItems] auto-dropped: queue item age exceeded the post-incident TTL. ' + || 'Dropped at ' || now()::text + WHERE id IN ( + SELECT id FROM a2a_queue + WHERE workspace_id = $1 + AND status = 'queued' + AND enqueued_at < now() - interval '1 minute' * $2 + FOR UPDATE SKIP LOCKED + ) + RETURNING id + ) + SELECT count(*) FROM dropped` + mock.ExpectQuery(query). + WithArgs("ws-err", 60). + WillReturnError(sql.ErrConnDone) + + _, err := DropStaleQueueItems(context.Background(), "ws-err", 60) + if err == nil { + t.Fatal("expected error, got nil") + } + // Error message must include the function name per the wrapped fmt.Errorf. + if !strings.Contains(err.Error(), "DropStaleQueueItems") { + t.Errorf("error = %v; want wrapped error mentioning DropStaleQueueItems", err) + } +} diff --git a/workspace-server/internal/handlers/admin_queue.go b/workspace-server/internal/handlers/admin_queue.go index 422f9eeb8..971518f58 100644 --- a/workspace-server/internal/handlers/admin_queue.go +++ b/workspace-server/internal/handlers/admin_queue.go @@ -8,6 +8,15 @@ import ( "github.com/gin-gonic/gin" ) +// dropStaleItems is the package-level function slot used by DropStale. +// Tests stub this via variable reassignment to exercise the handler without +// a real DB. Default-installed to the real production impl +// (DropStaleQueueItems) so prod hits the sweeper; the handler's success +// path is no longer a silent no-op (pre-fix CR-A 10781). +// +// Signature MUST match DropStaleQueueItems: (ctx, workspaceID, maxAgeMinutes) -> (int, error). +var dropStaleItems = DropStaleQueueItems + // AdminQueueHandler serves POST /admin/a2a-queue/drop-stale — an ops tool for // post-incident queue cleanup. Marks queued items older than the given TTL as // 'dropped', preventing PM agents from spending cycles on stale post-incident @@ -33,7 +42,7 @@ func (h *AdminQueueHandler) DropStale(c *gin.Context) { } workspaceID := c.Query("workspace_id") - count, err := DropStaleQueueItems(c.Request.Context(), workspaceID, maxAge) + count, err := dropStaleItems(c.Request.Context(), workspaceID, maxAge) if err != nil { log.Printf("AdminQueueHandler.DropStale: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to drop stale items"}) diff --git a/workspace-server/internal/handlers/admin_queue_test.go b/workspace-server/internal/handlers/admin_queue_test.go index 63f11d2b6..1d7a1c16a 100644 --- a/workspace-server/internal/handlers/admin_queue_test.go +++ b/workspace-server/internal/handlers/admin_queue_test.go @@ -1,133 +1,141 @@ package handlers import ( + "context" + "database/sql" "encoding/json" "net/http" "net/http/httptest" "testing" - "github.com/DATA-DOG/go-sqlmock" "github.com/gin-gonic/gin" ) -func TestDropStaleQueueItems_extractMaxAge(t *testing.T) { - gin.SetMode(gin.TestMode) - - tests := []struct { - name string - query string - wantStatus int - wantDropped *int // nil = don't check - }{ - { - name: "default 60 minutes", - query: "", - wantStatus: http.StatusOK, - wantDropped: nil, // will be non-nil on success - }, - { - name: "explicit 120 minutes", - query: "?max_age_minutes=120", - wantStatus: http.StatusOK, - wantDropped: nil, - }, - { - name: "workspace scoped", - query: "?max_age_minutes=30&workspace_id=abc-123", - wantStatus: http.StatusOK, - wantDropped: nil, - }, - { - name: "invalid max_age_minutes", - query: "?max_age_minutes=bad", - wantStatus: http.StatusBadRequest, - wantDropped: nil, - }, - { - name: "zero max_age_minutes", - query: "?max_age_minutes=0", - wantStatus: http.StatusBadRequest, - wantDropped: nil, - }, - { - name: "negative max_age_minutes", - query: "?max_age_minutes=-5", - wantStatus: http.StatusBadRequest, - wantDropped: nil, - }, +// TestNewAdminQueueHandler_Constructor verifies the constructor returns a +// non-nil handler with the expected zero-state fields. +func TestNewAdminQueueHandler_Constructor(t *testing.T) { + h := NewAdminQueueHandler() + if h == nil { + t.Fatal("NewAdminQueueHandler returned nil") } + // Zero-value struct fields must not panic on use. + _ = h.DropStale +} - for _, tc := range tests { +// TestDropStale_InvalidMaxAge verifies the handler rejects non-positive +// max_age_minutes values. +func TestDropStale_InvalidMaxAge(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + h := NewAdminQueueHandler() + + for _, tc := range []struct { + name string + maxAge string + wantCode int + }{ + {"zero", "0", http.StatusBadRequest}, + {"negative", "-5", http.StatusBadRequest}, + {"non-integer", "abc", http.StatusBadRequest}, + } { t.Run(tc.name, func(t *testing.T) { - mock := setupTestDB(t) - h := &AdminQueueHandler{} - - switch tc.name { - case "default 60 minutes": - // global scope, 1 query arg - mock.ExpectQuery("UPDATE a2a_queue"). - WithArgs(60). - WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) - case "explicit 120 minutes": - mock.ExpectQuery("UPDATE a2a_queue"). - WithArgs(120). - WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) - case "workspace scoped": - mock.ExpectQuery("UPDATE a2a_queue"). - WithArgs("abc-123", 30). - WillReturnRows(sqlmock.NewRows([]string{"count"}).AddRow(0)) - } - - router := gin.New() - router.POST("/admin/a2a-queue/drop-stale", h.DropStale) - - req := httptest.NewRequest(http.MethodPost, "/admin/a2a-queue/drop-stale"+tc.query, nil) w := httptest.NewRecorder() - router.ServeHTTP(w, req) - - if w.Code != tc.wantStatus { - t.Errorf("got status %d, want %d", w.Code, tc.wantStatus) - } - - if tc.wantDropped != nil { - var resp map[string]interface{} - if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { - t.Fatalf("failed to decode response: %v", err) - } - if got, ok := resp["dropped"].(float64); !ok { - t.Fatalf("dropped field missing or wrong type: %v", resp) - } else if int(got) != *tc.wantDropped { - t.Errorf("got dropped=%d, want %d", int(got), *tc.wantDropped) - } - } - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet sqlmock expectations: %v", err) + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/admin/a2a-queue/drop-stale?max_age_minutes="+tc.maxAge, nil) + h.DropStale(c) + if w.Code != tc.wantCode { + t.Errorf("max_age_minutes=%s: got %d, want %d", tc.maxAge, w.Code, tc.wantCode) } }) } } -// TestDropStaleQueueItems_sqlCorrectness verifies the SQL query shape for -// both scoped (workspace_id provided) and global (workspace_id empty) cases. -// Uses a mock DB that returns a known row count. -func TestDropStaleQueueItems_sqlShape(t *testing.T) { - // Verify the SQL in DropStaleQueueItems uses the correct columns and WHERE clause. - // The function must: - // 1. Only touch rows with status = 'queued' - // 2. Only touch rows where enqueued_at < now() - interval - // 3. Set status = 'dropped' (not delete or update to other values) - // 4. Append to last_error (preserve any prior error message) - // 5. Use FOR UPDATE SKIP LOCKED to avoid blocking concurrent drains +// TestDropStale_Success verifies the handler calls dropStaleItems with the +// correct parsed TTL and workspace_id, and returns the dropped count. +func TestDropStale_Success(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + h := NewAdminQueueHandler() - // Shape check only — the actual SQL is: - // UPDATE a2a_queue SET status='dropped', last_error=last_error||... WHERE id IN ( - // SELECT id FROM a2a_queue WHERE workspace_id=$1 AND status='queued' - // AND enqueued_at < now() - interval '1 minute' * $2 - // FOR UPDATE SKIP LOCKED - // ) - // - // This is correct: status='queued' filter, age filter, status='dropped' update, - // error preserved via last_error||, FOR UPDATE SKIP LOCKED concurrency-safe. - t.Log("SQL shape: UPDATE ... SET status='dropped', last_error=last_error||... WHERE id IN (SELECT ... FOR UPDATE SKIP LOCKED) — verified correct") + var capturedWorkspaceID string + var capturedMaxAge int + prev := dropStaleItems + dropStaleItems = func(ctx context.Context, workspaceID string, maxAgeMinutes int) (int, error) { + capturedWorkspaceID = workspaceID + capturedMaxAge = maxAgeMinutes + return 3, nil + } + defer func() { dropStaleItems = prev }() + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/admin/a2a-queue/drop-stale?max_age_minutes=30&workspace_id=ws-abc", nil) + h.DropStale(c) + + if w.Code != http.StatusOK { + t.Fatalf("got %d, want 200: %s", w.Code, w.Body.String()) + } + if capturedWorkspaceID != "ws-abc" { + t.Errorf("workspace_id = %q; want ws-abc", capturedWorkspaceID) + } + if capturedMaxAge != 30 { + t.Errorf("max_age = %d; want 30", capturedMaxAge) + } + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("not JSON: %v", err) + } + if resp["dropped"] != float64(3) { + t.Errorf("dropped = %v; want 3", resp["dropped"]) + } +} + +// TestDropStale_DBError propagates 500 to the client. +func TestDropStale_DBError(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + h := NewAdminQueueHandler() + + prev := dropStaleItems + dropStaleItems = func(ctx context.Context, wsID string, maxAge int) (int, error) { + return 0, sql.ErrConnDone + } + defer func() { dropStaleItems = prev }() + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/admin/a2a-queue/drop-stale?max_age_minutes=60", nil) + h.DropStale(c) + + if w.Code != http.StatusInternalServerError { + t.Errorf("got %d, want 500", w.Code) + } +} + +// TestDropStale_AllWorkspaces verifies an absent workspace_id param results +// in an empty string passed to dropStaleItems (signals "all workspaces"). +func TestDropStale_AllWorkspaces(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + h := NewAdminQueueHandler() + + var capturedWorkspaceID string + prev := dropStaleItems + dropStaleItems = func(ctx context.Context, workspaceID string, maxAgeMinutes int) (int, error) { + capturedWorkspaceID = workspaceID + return 0, nil + } + defer func() { dropStaleItems = prev }() + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Request = httptest.NewRequest("POST", "/admin/a2a-queue/drop-stale?max_age_minutes=120", nil) + h.DropStale(c) + + if capturedWorkspaceID != "" { + t.Errorf("workspace_id = %q; want empty (all workspaces)", capturedWorkspaceID) + } + if w.Code != http.StatusOK { + t.Errorf("got %d, want 200", w.Code) + } } diff --git a/workspace-server/internal/handlers/mcp_tools.go b/workspace-server/internal/handlers/mcp_tools.go index 24e991bb9..1c6726eaa 100644 --- a/workspace-server/internal/handlers/mcp_tools.go +++ b/workspace-server/internal/handlers/mcp_tools.go @@ -35,8 +35,8 @@ func insertMCPDelegationRow(ctx context.Context, db *sql.DB, workspaceID, target }) _, err := db.ExecContext(ctx, ` INSERT INTO activity_logs (workspace_id, activity_type, method, source_id, target_id, summary, request_body, status) - VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, 'pending') - `, workspaceID, workspaceID, targetID, "Delegating to "+targetID, string(taskJSON)) + VALUES ($1, 'delegation', 'delegate', $2, $3, $4, $5::jsonb, $6) + `, workspaceID, workspaceID, targetID, "Delegating to "+targetID, string(taskJSON), "pending") return err } diff --git a/workspace-server/internal/handlers/mcp_tools_test.go b/workspace-server/internal/handlers/mcp_tools_test.go index 02af754a1..915d4c5d9 100644 --- a/workspace-server/internal/handlers/mcp_tools_test.go +++ b/workspace-server/internal/handlers/mcp_tools_test.go @@ -1,10 +1,148 @@ package handlers import ( + "context" + "database/sql" "encoding/json" "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" ) +// ───────────────────────────────────────────────────────────────────────────── +// insertMCPDelegationRow tests +// ───────────────────────────────────────────────────────────────────────────── + +// TestInsertMCPDelegationRow_Success verifies the function inserts a +// correctly-structured activity_log row with delegation type and JSON request_body. +func TestInsertMCPDelegationRow_Success(t *testing.T) { + mock := setupTestDB(t) + + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs( + "ws-caller", // $1: workspace_id + "ws-caller", // $2: source_id + "ws-target", // $3: target_id + "Delegating to ws-target", // $4: summary + sqlmock.AnyArg(), // $5: request_body (JSON) + "pending", // $6: status + ). + WillReturnResult(sqlmock.NewResult(1, 1)) + + err := insertMCPDelegationRow(context.Background(), db.DB, + "ws-caller", "ws-target", "delg-abc123", "Check the prod logs") + if err != nil { + t.Fatalf("insertMCPDelegationRow: %v", err) + } + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations: %v", err) + } +} + +// TestInsertMCPDelegationRow_DBError verifies the function propagates DB errors. +func TestInsertMCPDelegationRow_DBError(t *testing.T) { + mock := setupTestDB(t) + + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs( + "ws-caller", + "ws-caller", + "ws-target", + "Delegating to ws-target", + sqlmock.AnyArg(), + "pending", + ). + WillReturnError(sql.ErrConnDone) + + err := insertMCPDelegationRow(context.Background(), db.DB, + "ws-caller", "ws-target", "delg-xyz", "Do the thing") + if err == nil { + t.Fatal("expected DB error, got nil") + } + if err != sql.ErrConnDone { + t.Errorf("error = %v; want sql.ErrConnDone", err) + } +} + +// TestInsertMCPDelegationRow_EmptyTask verifies the function is safe to call +// even with an empty task string (still inserts a valid row). +func TestInsertMCPDelegationRow_EmptyTask(t *testing.T) { + mock := setupTestDB(t) + + mock.ExpectExec(`INSERT INTO activity_logs`). + WithArgs( + "ws-caller", + "ws-caller", + "ws-target", + "Delegating to ws-target", + sqlmock.AnyArg(), + "pending", + ). + WillReturnResult(sqlmock.NewResult(1, 1)) + + err := insertMCPDelegationRow(context.Background(), db.DB, + "ws-caller", "ws-target", "delg-empty", "") + if err != nil { + t.Fatalf("insertMCPDelegationRow with empty task: %v", err) + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// updateMCPDelegationStatus tests +// ───────────────────────────────────────────────────────────────────────────── + +// TestUpdateMCPDelegationStatus_Success verifies the function updates the +// activity_log row identified by workspace_id + delegation_id, setting status. +func TestUpdateMCPDelegationStatus_Success(t *testing.T) { + mock := setupTestDB(t) + + mock.ExpectExec(`UPDATE activity_logs`). + WithArgs("dispatched", "", "ws-caller", "delg-abc"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + // Non-fatal: no panic even if DB is nil (won't reach here since db.DB is set). + updateMCPDelegationStatus(context.Background(), db.DB, + "ws-caller", "delg-abc", "dispatched", "") + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations: %v", err) + } +} + +// TestUpdateMCPDelegationStatus_WithErrorDetail verifies the function passes +// through the error_detail value so canvas can surface the failure reason. +func TestUpdateMCPDelegationStatus_WithErrorDetail(t *testing.T) { + mock := setupTestDB(t) + + mock.ExpectExec(`UPDATE activity_logs`). + WithArgs("failed", "connection refused", "ws-caller", "delg-xyz"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + updateMCPDelegationStatus(context.Background(), db.DB, + "ws-caller", "delg-xyz", "failed", "connection refused") + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("sqlmock expectations: %v", err) + } +} + +// TestUpdateMCPDelegationStatus_DBError_swallowed verifies the function logs +// the error but does NOT return it — callers (toolDelegateTask) must not be +// blocked by a failed status-write. This is the non-fatal error contract. +func TestUpdateMCPDelegationStatus_DBError_Swallowed(t *testing.T) { + mock := setupTestDB(t) + + mock.ExpectExec(`UPDATE activity_logs`). + WithArgs("dispatched", "", "ws-caller", "delg-err"). + WillReturnError(sql.ErrConnDone) + + // Must not panic or return. + updateMCPDelegationStatus(context.Background(), db.DB, + "ws-caller", "delg-err", "dispatched", "") + // Test passes if no panic — the function swallows the error and returns. +} + // ───────────────────────────────────────────────────────────────────────────── // extractA2AText tests // ───────────────────────────────────────────────────────────────────────────── diff --git a/workspace-server/internal/handlers/terminal_diagnose_test.go b/workspace-server/internal/handlers/terminal_diagnose_test.go index e08885c21..acd37da86 100644 --- a/workspace-server/internal/handlers/terminal_diagnose_test.go +++ b/workspace-server/internal/handlers/terminal_diagnose_test.go @@ -254,6 +254,75 @@ func TestDiagnoseRemote_StopsAtSSHProbe(t *testing.T) { } } +// TestSyncBuf_WriteAndString verifies syncBuf is a correct +// bytes.Buffer wrapper: Write appends, String returns accumulated content, +// and neither operation returns an error. +func TestSyncBuf_WriteAndString(t *testing.T) { + var buf syncBuf + n, err := buf.Write([]byte("hello")) + if n != 5 || err != nil { + t.Fatalf("Write returned n=%d err=%v; want 5 nil", n, err) + } + got := buf.String() + if got != "hello" { + t.Errorf("String() = %q; want hello", got) + } +} + +// TestSyncBuf_WriteMultiple verifies successive Write calls accumulate +// and String reflects the full concatenated content. +func TestSyncBuf_WriteMultiple(t *testing.T) { + var buf syncBuf + buf.Write([]byte("foo")) + buf.Write([]byte("bar")) + got := buf.String() + if got != "foobar" { + t.Errorf("String() = %q; want foobar", got) + } +} + +// TestSyncBuf_WriteConcurrent exercises that concurrent Write calls +// do not panic and the final content is consistent (deterministic order +// not required — only that no data race is introduced by the mutex). +func TestSyncBuf_WriteConcurrent(t *testing.T) { + var buf syncBuf + ch := make(chan bool, 2) + go func() { + for i := 0; i < 100; i++ { + buf.Write([]byte("a")) + } + ch <- true + }() + go func() { + for i := 0; i < 100; i++ { + buf.Write([]byte("b")) + } + ch <- true + }() + <-ch + <-ch + got := buf.String() + if len(got) != 200 { + t.Errorf("String() length = %d; want 200", len(got)) + } +} + +// TestSyncBuf_Empty verifies zero-value syncBuf is safe to call before +// any Write: String returns "" and Write succeeds. +func TestSyncBuf_Empty(t *testing.T) { + var buf syncBuf + if s := buf.String(); s != "" { + t.Errorf("zero-value String() = %q; want empty string", s) + } + n, err := buf.Write([]byte("x")) + if n != 1 || err != nil { + t.Errorf("Write on zero-value buf: n=%d err=%v; want 1 nil", n, err) + } + if buf.String() != "x" { + t.Errorf("String() after write on zero-value buf: got %q; want x", buf.String()) + } +} + // TestUnwrapGoError pins the unwrapGoError helper that extracts subprocess // stderr from the Go-wrapped error string produced by sendSSHPublicKey. // Regression gate for mc#687: the E2E smoke now reads detail (not error),