Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1c42488be7 |
@@ -63,31 +63,6 @@ func TestSessionSearchReturnsActivityAndMemory(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSessionSearch_DBError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
handler := NewActivityHandler(broadcaster)
|
||||
|
||||
mock.ExpectQuery("WITH session_items AS").
|
||||
WillReturnError(context.DeadlineExceeded)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-123/session-search?q=test", bytes.NewBufferString(""))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-123"}}
|
||||
|
||||
handler.SessionSearch(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500 on DB error, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet sqlmock expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------- Activity List source filter ----------
|
||||
|
||||
func TestActivityList_SourceCanvas(t *testing.T) {
|
||||
|
||||
@@ -602,33 +602,6 @@ func TestDelegationRecord_RejectsInvalidUUID(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelegationRecord_DBInsertFails(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
broadcaster := newTestBroadcaster()
|
||||
wh := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
|
||||
h := NewDelegationHandler(wh, broadcaster)
|
||||
|
||||
mock.ExpectExec("INSERT INTO activity_logs").
|
||||
WillReturnError(fmt.Errorf("connection refused"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "550e8400-e29b-41d4-a716-446655440000"}}
|
||||
body := `{"target_id":"550e8400-e29b-41d4-a716-446655440001","task":"hello","delegation_id":"del-xyz"}`
|
||||
c.Request = httptest.NewRequest("POST", "/delegations/record", bytes.NewBufferString(body))
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
h.Record(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500 on DB insert failure, got %d", w.Code)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDelegationUpdateStatus_CompletedInsertsResultRow(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
|
||||
@@ -0,0 +1,135 @@
|
||||
package handlers
|
||||
|
||||
// platform_agent.go — installs the org-level platform agent as the org root.
|
||||
// (RFC docs/design/rfc-platform-agent.md)
|
||||
//
|
||||
// The platform agent IS the org root: an org is the subtree under the single
|
||||
// parent_id IS NULL row (org_scope.go), so making the concierge the user's
|
||||
// universal A2A peer means making it that root. Installing it therefore:
|
||||
//
|
||||
// 1. upserts the platform-agent row (kind='platform', parent_id NULL);
|
||||
// 2. re-parents the org's existing root(s) under it;
|
||||
// 3. moves the org-anchor references — org_api_tokens.org_id and
|
||||
// org_plugin_allowlist.org_id, both of which key off the root workspace id
|
||||
// (see migrations 035/036 + 026) — from each old root to the platform agent.
|
||||
//
|
||||
// All of that happens in ONE transaction so a tenant's auth tokens and plugin
|
||||
// allowlist never point at a stale anchor. The operation is idempotent: a second
|
||||
// call finds the platform agent already the sole root and does nothing.
|
||||
//
|
||||
// Routing (CanCommunicate/sameOrg in registry/access.go + org_scope.go) needs NO
|
||||
// change — once the platform agent is the root, the existing ancestor/descendant
|
||||
// rules already give it universal in-org reach and keep tenant isolation intact.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/models"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type installPlatformAgentPayload struct {
|
||||
// ID is the platform agent's workspace id (a deterministic uuidv5 the
|
||||
// control plane derives per org). Required.
|
||||
ID string `json:"id" binding:"required"`
|
||||
// Name is the display name; defaults to "Org Concierge" when omitted.
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
// InstallPlatformAgent handles POST /admin/org/platform-agent (AdminAuth).
|
||||
//
|
||||
// Idempotently installs the platform agent as the org root for THIS tenant. The
|
||||
// control plane calls it at org-provision time (new orgs) and during the
|
||||
// existing-org backfill rollout. Safe to call repeatedly.
|
||||
func InstallPlatformAgent(c *gin.Context) {
|
||||
var p installPlatformAgentPayload
|
||||
if err := c.ShouldBindJSON(&p); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"})
|
||||
return
|
||||
}
|
||||
name := p.Name
|
||||
if name == "" {
|
||||
name = "Org Concierge"
|
||||
}
|
||||
if err := installPlatformAgent(c.Request.Context(), db.DB, p.ID, name); err != nil {
|
||||
log.Printf("InstallPlatformAgent: %v (id=%s)", err, p.ID)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "install failed"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"status": "installed",
|
||||
"platform_agent_id": p.ID,
|
||||
"kind": models.KindPlatform,
|
||||
})
|
||||
}
|
||||
|
||||
// installPlatformAgent performs the idempotent, transactional install described
|
||||
// in the file header. Separated from the gin handler so integration tests can
|
||||
// exercise it directly against a real Postgres (the org-anchor migration cannot
|
||||
// be proven with sqlmock).
|
||||
func installPlatformAgent(ctx context.Context, database *sql.DB, platformID, name string) error {
|
||||
tx, err := database.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("begin: %w", err)
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }() // no-op after Commit
|
||||
|
||||
// 1. Ensure the platform-agent row exists as a kind='platform' root.
|
||||
// ON CONFLICT keeps it a platform root if it was pre-seeded; the row is
|
||||
// tier 0 and never billed/provisioned as an ordinary workspace EC2.
|
||||
if _, err := tx.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, kind, tier, status, runtime, parent_id)
|
||||
VALUES ($1, $2, 'platform', 0, 'online', 'claude-code', NULL)
|
||||
ON CONFLICT (id) DO UPDATE SET kind = 'platform', parent_id = NULL
|
||||
`, platformID, name); err != nil {
|
||||
return fmt.Errorf("upsert platform agent: %w", err)
|
||||
}
|
||||
|
||||
// 2. Capture the org's other current roots (everything at parent_id IS NULL
|
||||
// except the platform agent itself). In a one-org tenant DB this is the
|
||||
// single team root; the query tolerates 0 (already installed) or N.
|
||||
rows, err := tx.QueryContext(ctx,
|
||||
`SELECT id FROM workspaces WHERE parent_id IS NULL AND id <> $1`, platformID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("select old roots: %w", err)
|
||||
}
|
||||
var oldRoots []string
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
rows.Close()
|
||||
return fmt.Errorf("scan old root: %w", err)
|
||||
}
|
||||
oldRoots = append(oldRoots, id)
|
||||
}
|
||||
rows.Close()
|
||||
if err := rows.Err(); err != nil {
|
||||
return fmt.Errorf("iterate old roots: %w", err)
|
||||
}
|
||||
|
||||
// 3 + 4. Re-parent each old root under the platform agent and move its
|
||||
// org-anchor references in the same transaction. A non-root old root
|
||||
// is kind='workspace', so it does not trip workspaces_platform_root_check.
|
||||
for _, root := range oldRoots {
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`UPDATE workspaces SET parent_id = $1, updated_at = now() WHERE id = $2`,
|
||||
platformID, root); err != nil {
|
||||
return fmt.Errorf("re-parent %s: %w", root, err)
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`UPDATE org_api_tokens SET org_id = $1 WHERE org_id = $2`, platformID, root); err != nil {
|
||||
return fmt.Errorf("migrate org_api_tokens for %s: %w", root, err)
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx,
|
||||
`UPDATE org_plugin_allowlist SET org_id = $1 WHERE org_id = $2`, platformID, root); err != nil {
|
||||
return fmt.Errorf("migrate org_plugin_allowlist for %s: %w", root, err)
|
||||
}
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
@@ -0,0 +1,188 @@
|
||||
//go:build integration
|
||||
// +build integration
|
||||
|
||||
// platform_agent_integration_test.go — REAL Postgres gate for installPlatformAgent.
|
||||
//
|
||||
// Run with:
|
||||
//
|
||||
// INTEGRATION_DB_URL="postgres://postgres:test@localhost:55432/molecule?sslmode=disable" \
|
||||
// go test -tags=integration ./internal/handlers/ -run Integration_PlatformAgentInstall -v
|
||||
//
|
||||
// CI: handlers-postgres-integration workflow (handlers + migrations path filter).
|
||||
//
|
||||
// Why this is NOT a sqlmock test
|
||||
// ------------------------------
|
||||
// The install re-parents the org's existing root under the platform agent AND
|
||||
// moves the org-anchor references (org_api_tokens.org_id, org_plugin_allowlist.
|
||||
// org_id) from old root to platform agent, atomically. The whole point is the
|
||||
// post-transaction row state: orgRootID() must resolve every node to the platform
|
||||
// agent, sameOrg() must still hold, and the auth/allowlist anchors must point at
|
||||
// the new root. Only a real Postgres can prove that; sqlmock cannot.
|
||||
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func integrationDB_PlatformAgentInstall(t *testing.T) *sql.DB {
|
||||
t.Helper()
|
||||
url := requireIntegrationDBURL(t)
|
||||
conn, err := sql.Open("postgres", url)
|
||||
if err != nil {
|
||||
t.Fatalf("open: %v", err)
|
||||
}
|
||||
if err := conn.Ping(); err != nil {
|
||||
t.Fatalf("ping: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { conn.Close() })
|
||||
return conn
|
||||
}
|
||||
|
||||
// TestIntegration_PlatformAgentInstall_ReparentsRootAndMovesAnchors builds a
|
||||
// real org in Postgres:
|
||||
//
|
||||
// root (parent_id NULL, kind=workspace)
|
||||
// └── child
|
||||
// + an org_api_token anchored to root
|
||||
// + an org_plugin_allowlist entry anchored to root
|
||||
//
|
||||
// then installs the platform agent and asserts:
|
||||
// - the platform agent is the new sole root (kind=platform, parent_id NULL);
|
||||
// - the old root is re-parented under it; the child is untouched;
|
||||
// - both org-anchor references now point at the platform agent;
|
||||
// - a second install is a no-op (idempotent).
|
||||
func TestIntegration_PlatformAgentInstall_ReparentsRootAndMovesAnchors(t *testing.T) {
|
||||
conn := integrationDB_PlatformAgentInstall(t)
|
||||
ctx := context.Background()
|
||||
|
||||
tag := uuid.New().String()[:8]
|
||||
prefix := fmt.Sprintf("itest-pinstall-%s", tag)
|
||||
rootID := uuid.New().String()
|
||||
childID := uuid.New().String()
|
||||
platformID := uuid.New().String()
|
||||
|
||||
cleanup := func() {
|
||||
_, _ = conn.ExecContext(ctx, `DELETE FROM org_plugin_allowlist WHERE plugin_name = $1`, prefix+"-plugin")
|
||||
_, _ = conn.ExecContext(ctx, `DELETE FROM org_api_tokens WHERE prefix = $1`, tag)
|
||||
// child + old root (prefixed names) first, then the platform agent by id
|
||||
// (root.parent_id references it, so it must go last).
|
||||
_, _ = conn.ExecContext(ctx, `DELETE FROM workspaces WHERE name LIKE $1`, prefix+"%")
|
||||
_, _ = conn.ExecContext(ctx, `DELETE FROM workspaces WHERE id = $1`, platformID)
|
||||
}
|
||||
t.Cleanup(cleanup)
|
||||
cleanup()
|
||||
|
||||
// Seed org tree.
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, tier, runtime, status, parent_id)
|
||||
VALUES ($1, $2, 2, 'claude-code', 'online', NULL)`, rootID, prefix+"-root"); err != nil {
|
||||
t.Fatalf("seed root: %v", err)
|
||||
}
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO workspaces (id, name, tier, runtime, status, parent_id)
|
||||
VALUES ($1, $2, 2, 'claude-code', 'online', $3)`, childID, prefix+"-child", rootID); err != nil {
|
||||
t.Fatalf("seed child: %v", err)
|
||||
}
|
||||
// Org-anchor rows keyed to the OLD root.
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO org_api_tokens (token_hash, prefix, name, org_id)
|
||||
VALUES ($1, $2, $3, $4)`,
|
||||
[]byte("hash-"+tag), tag, prefix+"-tok", rootID); err != nil {
|
||||
t.Fatalf("seed org_api_token: %v", err)
|
||||
}
|
||||
if _, err := conn.ExecContext(ctx, `
|
||||
INSERT INTO org_plugin_allowlist (org_id, plugin_name, enabled_by)
|
||||
VALUES ($1, $2, $3)`, rootID, prefix+"-plugin", childID); err != nil {
|
||||
t.Fatalf("seed allowlist: %v", err)
|
||||
}
|
||||
|
||||
// Install.
|
||||
if err := installPlatformAgent(ctx, conn, platformID, "Org Concierge"); err != nil {
|
||||
t.Fatalf("install: %v", err)
|
||||
}
|
||||
|
||||
assertState := func(stage string) {
|
||||
// platform agent is a kind=platform root.
|
||||
var kind string
|
||||
var parent sql.NullString
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT kind, parent_id FROM workspaces WHERE id = $1`, platformID).Scan(&kind, &parent); err != nil {
|
||||
t.Fatalf("[%s] read platform agent: %v", stage, err)
|
||||
}
|
||||
if kind != "platform" || parent.Valid {
|
||||
t.Fatalf("[%s] platform agent kind=%q parent=%v, want platform/NULL", stage, kind, parent)
|
||||
}
|
||||
// old root re-parented under the platform agent.
|
||||
var rootParent sql.NullString
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT parent_id FROM workspaces WHERE id = $1`, rootID).Scan(&rootParent); err != nil {
|
||||
t.Fatalf("[%s] read old root: %v", stage, err)
|
||||
}
|
||||
if !rootParent.Valid || rootParent.String != platformID {
|
||||
t.Fatalf("[%s] old root parent=%v, want %s", stage, rootParent, platformID)
|
||||
}
|
||||
// child untouched.
|
||||
var childParent sql.NullString
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT parent_id FROM workspaces WHERE id = $1`, childID).Scan(&childParent); err != nil {
|
||||
t.Fatalf("[%s] read child: %v", stage, err)
|
||||
}
|
||||
if !childParent.Valid || childParent.String != rootID {
|
||||
t.Fatalf("[%s] child parent=%v, want %s (unchanged)", stage, childParent, rootID)
|
||||
}
|
||||
// org-anchor references moved to the platform agent.
|
||||
var tokOrg, alOrg string
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT org_id FROM org_api_tokens WHERE prefix = $1`, tag).Scan(&tokOrg); err != nil {
|
||||
t.Fatalf("[%s] read token org_id: %v", stage, err)
|
||||
}
|
||||
if tokOrg != platformID {
|
||||
t.Fatalf("[%s] org_api_tokens.org_id=%s, want %s", stage, tokOrg, platformID)
|
||||
}
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT org_id FROM org_plugin_allowlist WHERE plugin_name = $1`, prefix+"-plugin").Scan(&alOrg); err != nil {
|
||||
t.Fatalf("[%s] read allowlist org_id: %v", stage, err)
|
||||
}
|
||||
if alOrg != platformID {
|
||||
t.Fatalf("[%s] org_plugin_allowlist.org_id=%s, want %s", stage, alOrg, platformID)
|
||||
}
|
||||
// orgRootID + sameOrg now resolve everything to the platform agent.
|
||||
got, err := orgRootID(ctx, conn, childID)
|
||||
if err != nil {
|
||||
t.Fatalf("[%s] orgRootID(child): %v", stage, err)
|
||||
}
|
||||
if got != platformID {
|
||||
t.Fatalf("[%s] orgRootID(child)=%s, want %s", stage, got, platformID)
|
||||
}
|
||||
same, err := sameOrg(ctx, conn, childID, platformID)
|
||||
if err != nil || !same {
|
||||
t.Fatalf("[%s] sameOrg(child, platform)=%v err=%v, want true", stage, same, err)
|
||||
}
|
||||
}
|
||||
|
||||
assertState("first install")
|
||||
|
||||
// Idempotent: second install must not error or change state.
|
||||
if err := installPlatformAgent(ctx, conn, platformID, "Org Concierge"); err != nil {
|
||||
t.Fatalf("second install (idempotent): %v", err)
|
||||
}
|
||||
assertState("second install")
|
||||
|
||||
// Neither seeded team node is a root any more — the platform agent is.
|
||||
var nRoots int
|
||||
if err := conn.QueryRowContext(ctx,
|
||||
`SELECT count(*) FROM workspaces WHERE parent_id IS NULL AND id IN ($1, $2)`,
|
||||
rootID, childID).Scan(&nRoots); err != nil {
|
||||
t.Fatalf("count roots: %v", err)
|
||||
}
|
||||
if nRoots != 0 {
|
||||
t.Fatalf("team roots after install = %d, want 0 (old root re-parented under platform agent)", nRoots)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// TestInstallPlatformAgent_BadJSON rejects a payload missing the required id
|
||||
// before touching the DB (binding:"required" on ID).
|
||||
func TestInstallPlatformAgent_BadJSON(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/admin/org/platform-agent",
|
||||
bytes.NewBufferString(`{"name":"Org Concierge"}`)) // no id
|
||||
c.Request.Header.Set("Content-Type", "application/json")
|
||||
|
||||
InstallPlatformAgent(c)
|
||||
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Errorf("missing id: expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
@@ -56,6 +56,11 @@ func TestINSERTworkspacesAllowlist(t *testing.T) {
|
||||
// workspace; UUID is server-generated. Caller intent IS to
|
||||
// create, so no idempotency check is needed.
|
||||
"workspace.go:Create": "single-workspace POST, server-generated UUID",
|
||||
// platform_agent.installPlatformAgent: single platform-agent row,
|
||||
// caller-supplied deterministic UUID; INSERT is idempotent via
|
||||
// ON CONFLICT (id) DO UPDATE and runs inside the install transaction.
|
||||
// Pinned by TestIntegration_PlatformAgentInstall_ReparentsRootAndMovesAnchors.
|
||||
"platform_agent.go:installPlatformAgent": "ON CONFLICT (id) DO UPDATE, single row in tx",
|
||||
}
|
||||
|
||||
actual := map[string]string{}
|
||||
|
||||
@@ -426,6 +426,12 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
adminTokH := handlers.NewAdminWorkspaceTokenHandler()
|
||||
r.POST("/admin/workspaces/:id/tokens", middleware.AdminAuth(db.DB), adminTokH.Create)
|
||||
|
||||
// Platform agent install — idempotently makes the org-level concierge
|
||||
// the org root (re-parents the existing root + moves org-anchor refs).
|
||||
// Called by the control plane at org provision + existing-org backfill.
|
||||
// (RFC docs/design/rfc-platform-agent.md)
|
||||
r.POST("/admin/org/platform-agent", middleware.AdminAuth(db.DB), handlers.InstallPlatformAgent)
|
||||
|
||||
// Memory
|
||||
memh := handlers.NewMemoryHandler()
|
||||
wsAuth.GET("/memory", memh.List)
|
||||
|
||||
Reference in New Issue
Block a user