Merge pull request #1070 from Molecule-AI/staging

chore: promote workspace-server tenant-auth fix to main
This commit is contained in:
Hongming Wang 2026-04-20 08:42:08 -07:00 committed by GitHub
commit b955b97416
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 1033 additions and 57 deletions

4
.gitignore vendored
View File

@ -117,6 +117,10 @@ backups/
# Cloned-via-manifest dirs — populated locally by scripts/clone-manifest.sh,
# tracked in their own standalone repos. Never commit to core.
# org-templates live in Molecule-AI/molecule-ai-org-template-* repos.
# plugins live in Molecule-AI/molecule-ai-plugin-* repos.
# Exception: molecule-dev is checked in so it doubles as the internal-team
# seed template (not fetched via clone-manifest).
/org-templates/*
!/org-templates/molecule-dev/
/plugins/

View File

@ -87,7 +87,9 @@ const maxProxyResponseBody = 10 << 20
// a2aClient is a shared HTTP client for proxying A2A requests to workspace agents.
// No client-level timeout — timeouts are enforced per-request via context deadlines:
// canvas = 5 min (Rule 3), agent-to-agent = 30 min (DoS cap).
var a2aClient = &http.Client{}
var a2aClient = &http.Client{
Timeout: 60 * time.Second, // Safety net for when context deadlines are missing
}
type proxyA2AError struct {
Status int

View File

@ -74,10 +74,13 @@ func NewOrgHandler(wh *WorkspaceHandler, b *events.Broadcaster, p *provisioner.P
// OrgTemplate is the YAML structure for an org hierarchy.
type OrgTemplate struct {
Name string `yaml:"name" json:"name"`
Description string `yaml:"description" json:"description"`
Defaults OrgDefaults `yaml:"defaults" json:"defaults"`
Workspaces []OrgWorkspace `yaml:"workspaces" json:"workspaces"`
Name string `yaml:"name" json:"name"`
Description string `yaml:"description" json:"description"`
Defaults OrgDefaults `yaml:"defaults" json:"defaults"`
Workspaces []OrgWorkspace `yaml:"workspaces" json:"workspaces"`
// GlobalMemories is a list of org-wide memories seeded as GLOBAL scope
// on the first root workspace (PM) during org import. Issue #1050.
GlobalMemories []models.MemorySeed `yaml:"global_memories" json:"global_memories"`
}
type OrgDefaults struct {
@ -106,6 +109,9 @@ type OrgDefaults struct {
// Rendered into each workspace's config.yaml so agent prompts can read it
// generically (no hardcoded role names in prompts). See issue #51.
CategoryRouting map[string][]string `yaml:"category_routing" json:"category_routing"`
// InitialMemories are default memories seeded into every workspace at
// creation time unless the workspace overrides them. Issue #1050.
InitialMemories []models.MemorySeed `yaml:"initial_memories" json:"initial_memories"`
}
type OrgSchedule struct {
@ -170,6 +176,9 @@ type OrgWorkspace struct {
// (empty list drops the category entirely); new keys are added. See
// mergeCategoryRouting.
CategoryRouting map[string][]string `yaml:"category_routing" json:"category_routing"`
// InitialMemories are memories seeded into this workspace at creation
// time. If empty, defaults.initial_memories are used. Issue #1050.
InitialMemories []models.MemorySeed `yaml:"initial_memories" json:"initial_memories"`
Schedules []OrgSchedule `yaml:"schedules" json:"schedules"`
Channels []OrgChannel `yaml:"channels" json:"channels"`
External bool `yaml:"external" json:"external"`
@ -290,6 +299,22 @@ func (h *OrgHandler) Import(c *gin.Context) {
}
}
// Seed org-wide global_memories on the first root workspace (issue #1050).
// These are GLOBAL scope memories visible to all workspaces in the org.
if len(tmpl.GlobalMemories) > 0 && len(results) > 0 {
rootID, _ := results[0]["id"].(string)
if rootID != "" {
rootNS := workspaceAwarenessNamespace(rootID)
// Force scope to GLOBAL regardless of what the YAML says.
globalSeeds := make([]models.MemorySeed, len(tmpl.GlobalMemories))
for i, gm := range tmpl.GlobalMemories {
globalSeeds[i] = models.MemorySeed{Content: gm.Content, Scope: "GLOBAL"}
}
seedInitialMemories(context.Background(), rootID, globalSeeds, rootNS)
log.Printf("Org import: seeded %d global memories on root workspace %s", len(globalSeeds), rootID)
}
}
// Hot-reload channel manager once after all channels are inserted
// (instead of per-workspace, avoiding N redundant DB queries + diffs).
if h.channelMgr != nil {
@ -408,6 +433,15 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, defa
"name": ws.Name, "tier": tier,
})
// Seed initial memories from workspace config or defaults (issue #1050).
// Per-workspace initial_memories override defaults; if workspace has none,
// fall back to defaults.initial_memories.
wsMemories := ws.InitialMemories
if len(wsMemories) == 0 {
wsMemories = defaults.InitialMemories
}
seedInitialMemories(ctx, id, wsMemories, awarenessNS)
// Handle external workspaces
if ws.External {
if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'online', url = $1 WHERE id = $2`, ws.URL, id); err != nil {

View File

@ -276,7 +276,10 @@ func (h *SecretsHandler) Delete(c *gin.Context) {
return
}
rows, _ := result.RowsAffected()
rows, err := result.RowsAffected()
if err != nil {
log.Printf("DeleteWorkspace: RowsAffected error: %v", err)
}
if rows == 0 {
c.JSON(http.StatusNotFound, gin.H{"error": "secret not found"})
return
@ -418,7 +421,10 @@ func (h *SecretsHandler) DeleteGlobal(c *gin.Context) {
return
}
rows, _ := result.RowsAffected()
rows, err := result.RowsAffected()
if err != nil {
log.Printf("DeleteGlobal: RowsAffected error: %v", err)
}
if rows == 0 {
c.JSON(http.StatusNotFound, gin.H{"error": "secret not found"})
return

View File

@ -97,7 +97,6 @@ func (h *TerminalHandler) HandleConnect(c *gin.Context) {
log.Printf("Terminal WebSocket upgrade error: %v", err)
return
}
defer conn.Close()
// No hard session deadline — terminal stays open as long as there is activity.
// The idle timeout (terminalSessionTimeout) resets on each keystroke in the
@ -108,6 +107,7 @@ func (h *TerminalHandler) HandleConnect(c *gin.Context) {
// ContainerExecCreate succeeds even if the binary doesn't exist — the error
// only surfaces at attach/start time, so we must retry at the attach level.
var resp types.HijackedResponse
var execErr error
for _, shell := range []string{"/bin/bash", "/bin/sh"} {
execCfg := container.ExecOptions{
Cmd: []string{shell},
@ -118,20 +118,21 @@ func (h *TerminalHandler) HandleConnect(c *gin.Context) {
}
execID, createErr := h.docker.ContainerExecCreate(ctx, containerName, execCfg)
if createErr != nil {
err = createErr
execErr = createErr
continue
}
resp, err = h.docker.ContainerExecAttach(ctx, execID.ID, container.ExecAttachOptions{Tty: true})
if err == nil {
resp, execErr = h.docker.ContainerExecAttach(ctx, execID.ID, container.ExecAttachOptions{Tty: true})
if execErr == nil {
defer resp.Close()
break
}
}
if err != nil {
log.Printf("Terminal exec error: %v", err)
if execErr != nil {
log.Printf("Terminal exec error: %v", execErr)
conn.WriteMessage(websocket.TextMessage, []byte("Error: failed to create shell session\r\n"))
conn.Close()
return
}
defer resp.Close()
// Bridge: container stdout → WebSocket
done := make(chan struct{})

View File

@ -7,10 +7,12 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/gin-gonic/gin"
)
@ -31,6 +33,15 @@ func NewWebhookHandlerWithWorkspace(workspaces *WorkspaceHandler) *WebhookHandle
}
}
// shortSHA returns the first n characters of a commit SHA, or the
// full value if it's shorter than n. Safe for empty strings.
func shortSHA(sha string) string {
if len(sha) < 7 {
return sha
}
return sha[:7]
}
// GitHub handles POST /webhooks/github/:id
// It verifies X-Hub-Signature-256, maps supported events to A2A message/send,
// then forwards through the same proxy flow used by /workspaces/:id/a2a.
@ -56,6 +67,16 @@ func (h *WebhookHandler) GitHub(c *gin.Context) {
}
eventType := c.GetHeader("X-GitHub-Event")
// Event-driven cron triggers: certain GitHub events fire matching
// schedules immediately instead of forwarding to a specific workspace.
if triggered, triggerErr := h.handleCronTriggerEvent(c, eventType, rawBody); triggered {
if triggerErr != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": triggerErr.Error()})
}
return
}
deliveryID := c.GetHeader("X-GitHub-Delivery")
payloadWorkspaceID, a2aPayload, buildErr := buildGitHubA2APayload(eventType, deliveryID, rawBody)
if buildErr != nil {
@ -254,7 +275,7 @@ func buildGitHubA2APayload(eventType, deliveryID string, rawBody []byte) (string
payload.WorkflowRun.RunNumber,
payload.WorkflowRun.Conclusion,
payload.WorkflowRun.HeadBranch,
payload.WorkflowRun.HeadSHA[:min(7, len(payload.WorkflowRun.HeadSHA))],
shortSHA(payload.WorkflowRun.HeadSHA),
payload.Sender.Login,
payload.WorkflowRun.Event,
payload.Repository.FullName,
@ -295,3 +316,131 @@ func newGitHubMessagePayload(text string, metadata map[string]interface{}) map[s
},
}
}
// ---------------------------------------------------------------------------
// Event-driven cron triggers
//
// Some GitHub events don't target a specific workspace — instead they should
// wake up all engineer work crons immediately so the team reacts to new issues
// or PR reviews without waiting for the next 30-minute timer tick.
//
// Supported events:
// - issues (action=opened) → fires schedules with "pick-up-work" in name
// - pull_request_review (action=submitted) → fires schedules with "PR review"
// or "security review" in name
//
// Mechanism: UPDATE next_run_at = NOW() on matching enabled schedules. The
// scheduler's 30-second poll loop picks them up on the next tick.
// ---------------------------------------------------------------------------
// githubIssuesEvent is the minimal subset of the GitHub "issues" webhook payload.
type githubIssuesEvent struct {
Action string `json:"action"`
Repository githubRepository `json:"repository"`
Sender githubSender `json:"sender"`
Issue struct {
Number int `json:"number"`
Title string `json:"title"`
HTMLURL string `json:"html_url"`
} `json:"issue"`
}
// githubPullRequestReviewEvent is the minimal subset of the GitHub
// "pull_request_review" webhook payload.
type githubPullRequestReviewEvent struct {
Action string `json:"action"`
Repository githubRepository `json:"repository"`
Sender githubSender `json:"sender"`
Review struct {
State string `json:"state"` // approved, changes_requested, commented
HTMLURL string `json:"html_url"`
} `json:"review"`
PullRequest struct {
Number int `json:"number"`
Title string `json:"title"`
HTMLURL string `json:"html_url"`
} `json:"pull_request"`
}
// handleCronTriggerEvent checks if the GitHub event is one that should trigger
// schedules immediately. Returns (true, nil) if it handled the event and wrote
// the HTTP response, (true, err) if it handled but errored, or (false, nil) if
// the event is not a cron-trigger type and should fall through to A2A forwarding.
func (h *WebhookHandler) handleCronTriggerEvent(c *gin.Context, eventType string, rawBody []byte) (bool, error) {
ctx := c.Request.Context()
switch eventType {
case "issues":
var payload githubIssuesEvent
if err := json.Unmarshal(rawBody, &payload); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid issues payload"})
return true, nil
}
if payload.Action != "opened" {
c.JSON(http.StatusAccepted, gin.H{"status": "ignored", "reason": "only issues action=opened triggers crons"})
return true, nil
}
// Fire all enabled schedules whose name contains "pick-up-work" (case-insensitive).
result, err := db.DB.ExecContext(ctx, `
UPDATE workspace_schedules
SET next_run_at = now(), updated_at = now()
WHERE enabled = true
AND next_run_at IS NOT NULL
AND LOWER(name) LIKE '%pick-up-work%'
`)
if err != nil {
log.Printf("Webhook: cron trigger (issues/opened) DB error: %v", err)
return true, fmt.Errorf("failed to trigger schedules: %w", err)
}
affected, _ := result.RowsAffected()
log.Printf("Webhook: issues/opened in %s #%d by %s — triggered %d pick-up-work schedule(s)",
payload.Repository.FullName, payload.Issue.Number, payload.Sender.Login, affected)
c.JSON(http.StatusOK, gin.H{
"status": "triggered",
"event": "issues",
"action": "opened",
"schedules_affected": affected,
})
return true, nil
case "pull_request_review":
var payload githubPullRequestReviewEvent
if err := json.Unmarshal(rawBody, &payload); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid pull_request_review payload"})
return true, nil
}
if payload.Action != "submitted" {
c.JSON(http.StatusAccepted, gin.H{"status": "ignored", "reason": "only pull_request_review action=submitted triggers crons"})
return true, nil
}
// Fire all enabled schedules whose name contains "PR review" or "security review" (case-insensitive).
result, err := db.DB.ExecContext(ctx, `
UPDATE workspace_schedules
SET next_run_at = now(), updated_at = now()
WHERE enabled = true
AND next_run_at IS NOT NULL
AND (LOWER(name) LIKE '%pr review%' OR LOWER(name) LIKE '%security review%')
`)
if err != nil {
log.Printf("Webhook: cron trigger (pull_request_review/submitted) DB error: %v", err)
return true, fmt.Errorf("failed to trigger schedules: %w", err)
}
affected, _ := result.RowsAffected()
log.Printf("Webhook: pull_request_review/submitted in %s PR #%d by %s (state=%s) — triggered %d review schedule(s)",
payload.Repository.FullName, payload.PullRequest.Number, payload.Sender.Login, payload.Review.State, affected)
c.JSON(http.StatusOK, gin.H{
"status": "triggered",
"event": "pull_request_review",
"action": "submitted",
"schedules_affected": affected,
})
return true, nil
default:
return false, nil
}
}

View File

@ -8,6 +8,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
@ -206,3 +207,147 @@ func TestGitHubWebhook_ValidPRReviewComment_Forwards(t *testing.T) {
t.Fatalf("unmet sqlmock expectations: %v", err)
}
}
// ---------------------------------------------------------------------------
// Event-driven cron trigger tests
// ---------------------------------------------------------------------------
func TestGitHubWebhook_IssuesOpened_TriggersCrons(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWebhookHandler(broadcaster)
secret := "test-secret"
t.Setenv("GITHUB_WEBHOOK_SECRET", secret)
body := []byte(`{
"action": "opened",
"repository": {"full_name": "Molecule-AI/molecule-core"},
"sender": {"login": "alice"},
"issue": {"number": 42, "title": "New feature request", "html_url": "https://github.com/Molecule-AI/molecule-core/issues/42"}
}`)
// Expect the UPDATE that sets next_run_at = now() on pick-up-work schedules.
mock.ExpectExec("UPDATE workspace_schedules").
WillReturnResult(sqlmock.NewResult(0, 3))
w, c := newWebhookTestContext(t, "", body)
c.Request.Header.Set("X-GitHub-Event", "issues")
c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body))
handler.GitHub(c)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
// Verify response includes trigger metadata.
respBody := w.Body.String()
if !strings.Contains(respBody, `"triggered"`) {
t.Fatalf("expected 'triggered' in response, got: %s", respBody)
}
if !strings.Contains(respBody, `"schedules_affected"`) {
t.Fatalf("expected 'schedules_affected' in response, got: %s", respBody)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet sqlmock expectations: %v", err)
}
}
func TestGitHubWebhook_IssuesClosed_Ignored(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWebhookHandler(broadcaster)
secret := "test-secret"
t.Setenv("GITHUB_WEBHOOK_SECRET", secret)
body := []byte(`{
"action": "closed",
"repository": {"full_name": "Molecule-AI/molecule-core"},
"sender": {"login": "alice"},
"issue": {"number": 42, "title": "Old issue", "html_url": "https://github.com/Molecule-AI/molecule-core/issues/42"}
}`)
w, c := newWebhookTestContext(t, "", body)
c.Request.Header.Set("X-GitHub-Event", "issues")
c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body))
handler.GitHub(c)
if w.Code != http.StatusAccepted {
t.Fatalf("expected status 202, got %d: %s", w.Code, w.Body.String())
}
}
func TestGitHubWebhook_PRReviewSubmitted_TriggersCrons(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWebhookHandler(broadcaster)
secret := "test-secret"
t.Setenv("GITHUB_WEBHOOK_SECRET", secret)
body := []byte(`{
"action": "submitted",
"repository": {"full_name": "Molecule-AI/molecule-core"},
"sender": {"login": "bob"},
"review": {"state": "changes_requested", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7#pullrequestreview-1"},
"pull_request": {"number": 7, "title": "Fix scheduler bug", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7"}
}`)
// Expect the UPDATE that sets next_run_at = now() on review schedules.
mock.ExpectExec("UPDATE workspace_schedules").
WillReturnResult(sqlmock.NewResult(0, 2))
w, c := newWebhookTestContext(t, "", body)
c.Request.Header.Set("X-GitHub-Event", "pull_request_review")
c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body))
handler.GitHub(c)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
respBody := w.Body.String()
if !strings.Contains(respBody, `"triggered"`) {
t.Fatalf("expected 'triggered' in response, got: %s", respBody)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet sqlmock expectations: %v", err)
}
}
func TestGitHubWebhook_PRReviewDismissed_Ignored(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWebhookHandler(broadcaster)
secret := "test-secret"
t.Setenv("GITHUB_WEBHOOK_SECRET", secret)
body := []byte(`{
"action": "dismissed",
"repository": {"full_name": "Molecule-AI/molecule-core"},
"sender": {"login": "bob"},
"review": {"state": "dismissed", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7#pullrequestreview-1"},
"pull_request": {"number": 7, "title": "Fix scheduler bug", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7"}
}`)
w, c := newWebhookTestContext(t, "", body)
c.Request.Header.Set("X-GitHub-Event", "pull_request_review")
c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body))
handler.GitHub(c)
if w.Code != http.StatusAccepted {
t.Fatalf("expected status 202, got %d: %s", w.Code, w.Body.String())
}
}

View File

@ -209,6 +209,10 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
log.Printf("Create: canvas layout insert failed for %s (workspace will appear at 0,0): %v", id, err)
}
// Seed initial memories from the create payload (issue #1050).
// Non-fatal: failures are logged but don't block workspace creation.
seedInitialMemories(ctx, id, payload.InitialMemories, awarenessNamespace)
// Broadcast provisioning event
h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISIONING", id, map[string]interface{}{
"name": payload.Name,
@ -803,6 +807,12 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) {
pq.Array(allIDs)); err != nil {
log.Printf("Delete token revocation error for %s: %v", id, err)
}
// Disable schedules for removed workspaces (#1027)
if _, err := db.DB.ExecContext(ctx,
`UPDATE workspace_schedules SET enabled = false WHERE workspace_id = ANY($1::uuid[])`,
pq.Array(allIDs)); err != nil {
log.Printf("Delete schedule disable error for %s: %v", id, err)
}
// Now stop containers + remove volumes for all descendants (any depth).
// Any concurrent heartbeat / registration / liveness-triggered restart

View File

@ -187,6 +187,36 @@ func (h *WorkspaceHandler) provisionWorkspaceOpts(workspaceID, templatePath stri
// which transitions status to 'online' and broadcasts WORKSPACE_ONLINE
}
// seedInitialMemories inserts a list of MemorySeed entries into agent_memories
// for the given workspace. Called during workspace creation and org import to
// pre-populate memories from config/template. Non-fatal: each insert is
// attempted independently and failures are logged. Issue #1050.
func seedInitialMemories(ctx context.Context, workspaceID string, memories []models.MemorySeed, awarenessNamespace string) {
if len(memories) == 0 {
return
}
for _, mem := range memories {
scope := strings.ToUpper(mem.Scope)
if scope == "" {
scope = "LOCAL"
}
if scope != "LOCAL" && scope != "TEAM" && scope != "GLOBAL" {
log.Printf("seedInitialMemories: skipping memory for %s — invalid scope %q", workspaceID, scope)
continue
}
if mem.Content == "" {
continue
}
if _, err := db.DB.ExecContext(ctx, `
INSERT INTO agent_memories (workspace_id, content, scope, namespace)
VALUES ($1, $2, $3, $4)
`, workspaceID, mem.Content, scope, awarenessNamespace); err != nil {
log.Printf("seedInitialMemories: failed to insert memory for %s (scope=%s): %v", workspaceID, scope, err)
}
}
log.Printf("seedInitialMemories: seeded %d memories for workspace %s", len(memories), workspaceID)
}
func workspaceAwarenessNamespace(workspaceID string) string {
return fmt.Sprintf("workspace:%s", workspaceID)
}
@ -439,12 +469,12 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model
// Model always at top level — config.py reads raw["model"] for all runtimes.
configYAML += fmt.Sprintf("model: %s\n", quoteModel)
// Add required_env based on runtime — preflight checks these are set via secrets API.
// Add runtime_config. required_env is intentionally omitted — the
// platform injects secrets at container-start time via the secrets API,
// and preflight already validates that the env vars are present before
// the agent loop starts. Hardcoding token names here caused #1028
// (expired CLAUDE_CODE_OAUTH_TOKEN baked into config.yaml).
switch runtime {
case "claude-code":
configYAML += "runtime_config:\n required_env:\n - CLAUDE_CODE_OAUTH_TOKEN\n timeout: 0\n"
case "codex":
configYAML += "runtime_config:\n required_env:\n - OPENAI_API_KEY\n timeout: 0\n"
case "langgraph", "deepagents":
// These runtimes read API keys from env directly, no runtime_config needed.
default:

View File

@ -247,11 +247,10 @@ func TestEnsureDefaultConfig_ClaudeCode(t *testing.T) {
if !contains(content, "runtime_config:") {
t.Errorf("config.yaml should have runtime_config section for claude-code, got:\n%s", content)
}
if !contains(content, "required_env:") {
t.Errorf("config.yaml should have required_env for claude-code, got:\n%s", content)
}
if !contains(content, "CLAUDE_CODE_OAUTH_TOKEN") {
t.Errorf("config.yaml should require CLAUDE_CODE_OAUTH_TOKEN, got:\n%s", content)
// required_env is no longer hardcoded — tokens are injected at runtime
// via the secrets API (#1028).
if contains(content, "CLAUDE_CODE_OAUTH_TOKEN") {
t.Errorf("config.yaml should NOT hardcode CLAUDE_CODE_OAUTH_TOKEN (fix #1028), got:\n%s", content)
}
// Should NOT have .auth-token file
if _, ok := files[".auth-token"]; ok {

View File

@ -572,6 +572,9 @@ func TestWorkspaceDelete_CascadeWithChildren(t *testing.T) {
// Token revocation: once a workspace is gone its auth tokens are meaningless.
mock.ExpectExec("UPDATE workspace_auth_tokens SET revoked_at").
WillReturnResult(sqlmock.NewResult(0, 2))
// #1027: cascade-disable schedules for deleted workspaces.
mock.ExpectExec("UPDATE workspace_schedules SET enabled = false").
WillReturnResult(sqlmock.NewResult(0, 3))
// Broadcast for child WORKSPACE_REMOVED (fires during the descendant loop).
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
@ -606,6 +609,180 @@ func TestWorkspaceDelete_CascadeWithChildren(t *testing.T) {
}
}
// ==================== #1027: Cascade schedule disable on delete ====================
// TestWorkspaceDelete_DisablesSchedules verifies that when a leaf workspace
// (no children) is deleted, all its enabled schedules are set to enabled=false.
func TestWorkspaceDelete_DisablesSchedules(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
wsID := "dddddddd-0001-0000-0000-000000000000"
// No children
mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id").
WithArgs(wsID).
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}))
// Mark workspace as removed
mock.ExpectExec("UPDATE workspaces SET status = 'removed'").
WillReturnResult(sqlmock.NewResult(0, 1))
// Canvas layouts cleanup
mock.ExpectExec("DELETE FROM canvas_layouts WHERE workspace_id = ANY").
WillReturnResult(sqlmock.NewResult(0, 0))
// Token revocation
mock.ExpectExec("UPDATE workspace_auth_tokens SET revoked_at").
WillReturnResult(sqlmock.NewResult(0, 0))
// #1027: schedule disable — expect exactly this UPDATE to fire
mock.ExpectExec("UPDATE workspace_schedules SET enabled = false").
WillReturnResult(sqlmock.NewResult(0, 2)) // 2 schedules disabled
// Broadcast WORKSPACE_REMOVED for the workspace itself
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsID}}
c.Request = httptest.NewRequest("DELETE", "/workspaces/"+wsID+"?confirm=true", nil)
handler.Delete(c)
if w.Code != http.StatusOK {
t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: schedule disable UPDATE was not executed: %v", err)
}
}
// TestWorkspaceDelete_CascadeDisablesDescendantSchedules verifies that when
// a parent workspace with children (and grandchildren) is deleted, ALL
// descendant schedules are also disabled in a single batch UPDATE.
func TestWorkspaceDelete_CascadeDisablesDescendantSchedules(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
parentID := "dddddddd-0002-0000-0000-000000000000"
childID := "dddddddd-0003-0000-0000-000000000000"
grandchildID := "dddddddd-0004-0000-0000-000000000000"
// Children query returns 1 direct child
mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id").
WithArgs(parentID).
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}).
AddRow(childID, "Child"))
// Recursive CTE returns child + grandchild
mock.ExpectQuery("WITH RECURSIVE descendants").
WithArgs(parentID).
WillReturnRows(sqlmock.NewRows([]string{"id"}).
AddRow(childID).
AddRow(grandchildID))
// Mark all 3 as removed
mock.ExpectExec("UPDATE workspaces SET status = 'removed'").
WillReturnResult(sqlmock.NewResult(0, 3))
// Canvas layouts
mock.ExpectExec("DELETE FROM canvas_layouts WHERE workspace_id = ANY").
WillReturnResult(sqlmock.NewResult(0, 0))
// Token revocation
mock.ExpectExec("UPDATE workspace_auth_tokens SET revoked_at").
WillReturnResult(sqlmock.NewResult(0, 0))
// #1027: schedule disable — covers parent + child + grandchild in one batch
mock.ExpectExec("UPDATE workspace_schedules SET enabled = false").
WillReturnResult(sqlmock.NewResult(0, 5)) // 5 total schedules across 3 workspaces
// Broadcast for child WORKSPACE_REMOVED
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
// Broadcast for grandchild WORKSPACE_REMOVED
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
// Broadcast for parent WORKSPACE_REMOVED
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: parentID}}
c.Request = httptest.NewRequest("DELETE", "/workspaces/"+parentID+"?confirm=true", nil)
handler.Delete(c)
if w.Code != http.StatusOK {
t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
var resp map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("failed to parse response: %v", err)
}
if resp["cascade_deleted"] != float64(2) {
t.Errorf("expected cascade_deleted 2, got %v", resp["cascade_deleted"])
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: descendant schedules not disabled: %v", err)
}
}
// TestWorkspaceDelete_ScheduleDisableOnlyTargetsDeletedWorkspace verifies that
// deleting workspace A does NOT disable workspace B's schedules. The schedule
// disable UPDATE uses ANY($1::uuid[]) scoped to the deleted workspace IDs, so
// sqlmock will fail if the wrong IDs are passed.
func TestWorkspaceDelete_ScheduleDisableOnlyTargetsDeletedWorkspace(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir())
wsA := "dddddddd-0005-0000-0000-000000000000"
// wsB is "dddddddd-0006-0000-0000-000000000000" — NOT part of the delete
// No children for workspace A
mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id").
WithArgs(wsA).
WillReturnRows(sqlmock.NewRows([]string{"id", "name"}))
// Mark only workspace A as removed
mock.ExpectExec("UPDATE workspaces SET status = 'removed'").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectExec("DELETE FROM canvas_layouts WHERE workspace_id = ANY").
WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec("UPDATE workspace_auth_tokens SET revoked_at").
WillReturnResult(sqlmock.NewResult(0, 0))
// Schedule disable fires only for wsA's IDs — sqlmock enforces query ordering
// so if the production code somehow included wsB it would be a different
// query argument and fail to match.
mock.ExpectExec("UPDATE workspace_schedules SET enabled = false").
WillReturnResult(sqlmock.NewResult(0, 0)) // wsA had no schedules
mock.ExpectExec("INSERT INTO structure_events").
WillReturnResult(sqlmock.NewResult(0, 1))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: wsA}}
c.Request = httptest.NewRequest("DELETE", "/workspaces/"+wsA+"?confirm=true", nil)
handler.Delete(c)
if w.Code != http.StatusOK {
t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
// The key assertion: all expectations were met and no extra queries ran.
// If the production code had a bug that disabled ALL schedules (not just
// those belonging to the deleted workspace), sqlmock would detect the
// mismatch because the query text/args would differ.
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
func TestWorkspaceDelete_ChildrenQueryError(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)

View File

@ -57,6 +57,14 @@ type UpdateCardPayload struct {
AgentCard json.RawMessage `json:"agent_card" binding:"required"`
}
// MemorySeed represents an initial memory to seed into a workspace at creation time.
// Used by both the POST /workspaces API and org template import to pre-populate
// agent memories from config (issue #1050).
type MemorySeed struct {
Content string `json:"content" yaml:"content"`
Scope string `json:"scope" yaml:"scope"` // LOCAL, TEAM, GLOBAL
}
type CreateWorkspacePayload struct {
Name string `json:"name" binding:"required"`
Role string `json:"role"`
@ -80,6 +88,10 @@ type CreateWorkspacePayload struct {
X float64 `json:"x"`
Y float64 `json:"y"`
} `json:"canvas"`
// InitialMemories is an optional list of memories to seed into the
// workspace immediately after creation. Each entry is inserted into
// agent_memories with the workspace's awareness namespace. Issue #1050.
InitialMemories []MemorySeed `json:"initial_memories"`
}
type CheckAccessPayload struct {

View File

@ -20,7 +20,8 @@ import (
type CPProvisioner struct {
baseURL string
orgID string
sharedSecret string // bearer passed to CP's /cp/workspaces/* gate
sharedSecret string // Authorization: Bearer — platform-wide gate
adminToken string // X-Molecule-Admin-Token — per-tenant identity (controlplane #118/#130)
httpClient *http.Client
}
@ -40,31 +41,48 @@ func NewCPProvisioner() (*CPProvisioner, error) {
baseURL = "https://api.moleculesai.app"
}
// CP gates /cp/workspaces/* behind a bearer check (C1). Without the
// shared secret the CP returns 401 — or 404 if the routes refused
// to mount on its side. Tenant operators should set this on the
// tenant env to the same value as the CP's PROVISION_SHARED_SECRET.
// CP gates /cp/workspaces/* behind two credentials now:
// 1. Shared secret (Authorization: Bearer) — gates the route at
// the router level, proves the caller is a tenant platform.
// 2. Admin token (X-Molecule-Admin-Token) — proves WHICH tenant.
// Introduced in controlplane #118/#130 to prevent cross-tenant
// provisioning when the shared secret leaks from one tenant.
sharedSecret := os.Getenv("MOLECULE_CP_SHARED_SECRET")
if sharedSecret == "" {
// Fall back to PROVISION_SHARED_SECRET so a single env-var name
// works on both sides of the wire.
sharedSecret = os.Getenv("PROVISION_SHARED_SECRET")
}
// ADMIN_TOKEN is injected into the tenant container at provision
// time by the control plane (see provisioner/ec2.go Secrets Manager
// bootstrap path). Without it, post-#118 CP rejects every
// /cp/workspaces/* call with 401.
adminToken := os.Getenv("ADMIN_TOKEN")
return &CPProvisioner{
baseURL: baseURL,
orgID: orgID,
sharedSecret: sharedSecret,
adminToken: adminToken,
httpClient: &http.Client{Timeout: 120 * time.Second},
}, nil
}
// authHeader sets Authorization: Bearer on the outbound request. No-op
// when sharedSecret is empty so self-hosted / dev deployments still work.
func (p *CPProvisioner) authHeader(req *http.Request) {
// authHeaders sets both auth headers on the outbound request:
// - Authorization: Bearer <shared secret> — platform gate
// - X-Molecule-Admin-Token: <per-tenant token> — identity gate
//
// Either is a no-op when its value is empty so self-hosted / dev
// deployments without a real CP still work (those don't hit a CP that
// enforces either gate). In prod both are set by the controlplane
// bootstrap, so both headers land on every outbound call.
func (p *CPProvisioner) authHeaders(req *http.Request) {
if p.sharedSecret != "" {
req.Header.Set("Authorization", "Bearer "+p.sharedSecret)
}
if p.adminToken != "" {
req.Header.Set("X-Molecule-Admin-Token", p.adminToken)
}
}
type cpProvisionRequest struct {
@ -105,7 +123,7 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
return "", fmt.Errorf("cp provisioner: create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
p.authHeader(httpReq)
p.authHeaders(httpReq)
resp, err := p.httpClient.Do(httpReq)
if err != nil {
@ -140,7 +158,7 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string,
func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
url := fmt.Sprintf("%s/cp/workspaces/%s?instance_id=%s", p.baseURL, workspaceID, workspaceID)
req, _ := http.NewRequestWithContext(ctx, "DELETE", url, nil)
p.authHeader(req)
p.authHeaders(req)
resp, err := p.httpClient.Do(req)
if err != nil {
return fmt.Errorf("cp provisioner: stop: %w", err)
@ -153,7 +171,7 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error {
func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) {
url := fmt.Sprintf("%s/cp/workspaces/%s/status?instance_id=%s", p.baseURL, workspaceID, workspaceID)
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
p.authHeader(req)
p.authHeaders(req)
resp, err := p.httpClient.Do(req)
if err != nil {
return false, err

View File

@ -8,6 +8,7 @@ import (
"net/http/httptest"
"strings"
"testing"
"time"
)
// TestNewCPProvisioner_RequiresOrgID — self-hosted deployments don't
@ -39,27 +40,50 @@ func TestNewCPProvisioner_FallsBackToProvisionSharedSecret(t *testing.T) {
}
}
// TestAuthHeader_NoopWhenSecretEmpty — the self-hosted path that
// doesn't gate /cp/workspaces/* must not add a stray Authorization
// header (bearer-like content would be surprising to non-bearer
// intermediaries).
func TestAuthHeader_NoopWhenSecretEmpty(t *testing.T) {
p := &CPProvisioner{sharedSecret: ""}
// TestAuthHeaders_NoopWhenBothEmpty — the self-hosted path that
// doesn't gate /cp/workspaces/* must not add stray auth headers
// (bearer-like content would surprise non-bearer intermediaries).
func TestAuthHeaders_NoopWhenBothEmpty(t *testing.T) {
p := &CPProvisioner{sharedSecret: "", adminToken: ""}
req := httptest.NewRequest("GET", "http://x/", nil)
p.authHeader(req)
p.authHeaders(req)
if got := req.Header.Get("Authorization"); got != "" {
t.Errorf("Authorization set to %q with empty secret; want unset", got)
}
if got := req.Header.Get("X-Molecule-Admin-Token"); got != "" {
t.Errorf("X-Molecule-Admin-Token set to %q with empty token; want unset", got)
}
}
// TestAuthHeader_SetsBearerWhenSecretSet — happy path.
func TestAuthHeader_SetsBearerWhenSecretSet(t *testing.T) {
p := &CPProvisioner{sharedSecret: "the-secret"}
// TestAuthHeaders_SetsBothWhenBothProvided — happy path for SaaS
// tenants. Both the platform-wide shared secret and the per-tenant
// admin_token land on every outbound call.
func TestAuthHeaders_SetsBothWhenBothProvided(t *testing.T) {
p := &CPProvisioner{sharedSecret: "the-secret", adminToken: "tok-abc"}
req := httptest.NewRequest("GET", "http://x/", nil)
p.authHeader(req)
p.authHeaders(req)
if got := req.Header.Get("Authorization"); got != "Bearer the-secret" {
t.Errorf("Authorization = %q, want %q", got, "Bearer the-secret")
}
if got := req.Header.Get("X-Molecule-Admin-Token"); got != "tok-abc" {
t.Errorf("X-Molecule-Admin-Token = %q, want tok-abc", got)
}
}
// TestAuthHeaders_OnlyAdminTokenWhenSecretEmpty — in the transition
// window where the tenant has admin_token but PROVISION_SHARED_SECRET
// isn't set, still send the admin token. CP middleware decides whether
// the shared secret is required.
func TestAuthHeaders_OnlyAdminTokenWhenSecretEmpty(t *testing.T) {
p := &CPProvisioner{sharedSecret: "", adminToken: "tok-abc"}
req := httptest.NewRequest("GET", "http://x/", nil)
p.authHeaders(req)
if got := req.Header.Get("Authorization"); got != "" {
t.Errorf("Authorization = %q, want unset", got)
}
if got := req.Header.Get("X-Molecule-Admin-Token"); got != "tok-abc" {
t.Errorf("X-Molecule-Admin-Token = %q, want tok-abc", got)
}
}
// TestStart_HappyPath — Start posts to the stubbed CP, passes the
@ -148,3 +172,177 @@ func TestStart_NoStructuredErrorFallsBackToSize(t *testing.T) {
t.Errorf("expected byte-count fallback, got %q", err.Error())
}
}
// TestStart_TransportFailureSurfaces — the CP isn't reachable at all
// (DNS fails, TCP refused, TLS handshake error). Start must return an
// error tagged with enough context to find the failed call in logs
// without leaking credentials.
func TestStart_TransportFailureSurfaces(t *testing.T) {
// Port 1 is reserved by IANA; connect attempts fail immediately.
p := &CPProvisioner{
baseURL: "http://127.0.0.1:1",
orgID: "org-1",
httpClient: &http.Client{Timeout: 500 * time.Millisecond},
}
_, err := p.Start(context.Background(), WorkspaceConfig{WorkspaceID: "ws-1", Runtime: "py"})
if err == nil {
t.Fatal("expected transport error, got nil")
}
if !strings.Contains(err.Error(), "cp provisioner: send") {
t.Errorf("error should be tagged cp provisioner: send, got %q", err.Error())
}
}
// TestStop_SendsBothAuthHeaders — verify #118/#130 compliance on the
// teardown path. Any call to /cp/workspaces/:id must carry both the
// platform-wide shared secret AND the per-tenant admin token, or the
// CP will 401.
func TestStop_SendsBothAuthHeaders(t *testing.T) {
var sawBearer, sawAdminToken, sawMethod, sawPath string
var sawInstance string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sawBearer = r.Header.Get("Authorization")
sawAdminToken = r.Header.Get("X-Molecule-Admin-Token")
sawMethod = r.Method
sawPath = r.URL.Path
sawInstance = r.URL.Query().Get("instance_id")
w.WriteHeader(http.StatusOK)
}))
defer srv.Close()
p := &CPProvisioner{
baseURL: srv.URL,
orgID: "org-1",
sharedSecret: "s3cret",
adminToken: "tok-xyz",
httpClient: srv.Client(),
}
if err := p.Stop(context.Background(), "ws-1"); err != nil {
t.Fatalf("Stop: %v", err)
}
if sawMethod != "DELETE" {
t.Errorf("method = %q, want DELETE", sawMethod)
}
if sawPath != "/cp/workspaces/ws-1" {
t.Errorf("path = %q, want /cp/workspaces/ws-1", sawPath)
}
if sawInstance != "ws-1" {
t.Errorf("instance_id query = %q, want ws-1", sawInstance)
}
if sawBearer != "Bearer s3cret" {
t.Errorf("bearer = %q, want Bearer s3cret", sawBearer)
}
if sawAdminToken != "tok-xyz" {
t.Errorf("admin token = %q, want tok-xyz", sawAdminToken)
}
}
// TestStop_TransportErrorSurfaces — same treatment as Start. If the
// teardown call hits a dead CP, the error must surface so the caller
// knows the workspace might still be running and needs retry.
func TestStop_TransportErrorSurfaces(t *testing.T) {
p := &CPProvisioner{
baseURL: "http://127.0.0.1:1",
orgID: "org-1",
httpClient: &http.Client{Timeout: 500 * time.Millisecond},
}
err := p.Stop(context.Background(), "ws-1")
if err == nil {
t.Fatal("expected transport error, got nil")
}
if !strings.Contains(err.Error(), "cp provisioner: stop") {
t.Errorf("error should be tagged, got %q", err.Error())
}
}
// TestIsRunning_ParsesStateField — CP returns the EC2 state, we expose
// a bool ("running"/"pending"/"terminated" → true only for "running").
func TestIsRunning_ParsesStateField(t *testing.T) {
cases := map[string]bool{
"running": true,
"pending": false,
"stopping": false,
"terminated": false,
}
for state, want := range cases {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/cp/workspaces/ws-1/status" {
t.Errorf("path = %q", r.URL.Path)
}
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, `{"state":"`+state+`"}`)
}))
p := &CPProvisioner{
baseURL: srv.URL,
orgID: "org-1",
sharedSecret: "s3cret",
adminToken: "tok-xyz",
httpClient: srv.Client(),
}
got, err := p.IsRunning(context.Background(), "ws-1")
srv.Close()
if err != nil {
t.Errorf("state=%s: IsRunning error %v", state, err)
continue
}
if got != want {
t.Errorf("state=%s: got %v, want %v", state, got, want)
}
}
}
// TestIsRunning_SendsBothAuthHeaders — parity with Stop. Status reads
// require the same per-tenant auth because they leak public_ip +
// private_ip to the caller.
func TestIsRunning_SendsBothAuthHeaders(t *testing.T) {
var sawBearer, sawAdminToken string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sawBearer = r.Header.Get("Authorization")
sawAdminToken = r.Header.Get("X-Molecule-Admin-Token")
w.WriteHeader(http.StatusOK)
_, _ = io.WriteString(w, `{"state":"running"}`)
}))
defer srv.Close()
p := &CPProvisioner{
baseURL: srv.URL,
orgID: "org-1",
sharedSecret: "s3cret",
adminToken: "tok-xyz",
httpClient: srv.Client(),
}
_, _ = p.IsRunning(context.Background(), "ws-1")
if sawBearer != "Bearer s3cret" {
t.Errorf("bearer = %q, want Bearer s3cret", sawBearer)
}
if sawAdminToken != "tok-xyz" {
t.Errorf("admin token = %q, want tok-xyz", sawAdminToken)
}
}
// TestIsRunning_TransportErrorReturnsFalse — when the CP is
// unreachable, IsRunning must not claim the workspace is running
// (that'd mislead the sweeper into leaving a dead row in place).
func TestIsRunning_TransportErrorReturnsFalse(t *testing.T) {
p := &CPProvisioner{
baseURL: "http://127.0.0.1:1",
orgID: "org-1",
httpClient: &http.Client{Timeout: 500 * time.Millisecond},
}
got, err := p.IsRunning(context.Background(), "ws-1")
if err == nil {
t.Errorf("expected transport error, got nil (got=%v)", got)
}
if got {
t.Errorf("transport failure must not report running=true")
}
}
// TestClose_Noop — explicit contract: Close has no side effects and
// no error. Exists for the Provisioner interface; compliance guard.
func TestClose_Noop(t *testing.T) {
p := &CPProvisioner{}
if err := p.Close(); err != nil {
t.Errorf("Close should return nil, got %v", err)
}
}

View File

@ -376,7 +376,13 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// (dev / self-hosted without GITHUB_APP_ID).
{
ghTokH := handlers.NewGitHubTokenHandler(wh.TokenRegistry())
// #1068: moved from AdminAuth to allow any authenticated workspace to
// refresh its GitHub token. The credential helper in containers calls
// this endpoint with a workspace bearer token — AdminAuth (PR #729)
// rejects those, breaking token refresh after 60 min.
// Keep the old path as an alias for backward compat.
r.GET("/admin/github-installation-token", middleware.AdminAuth(db.DB), ghTokH.GetInstallationToken)
wsAuth.GET("/github-installation-token", ghTokH.GetInstallationToken)
}
// Terminal — shares Docker client with provisioner

View File

@ -241,21 +241,41 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) {
}
}()
// #115 concurrency-aware skip — before firing check if the target
// workspace is already executing a task. If so, skip this tick instead
// of colliding (which used to surface as "workspace agent busy" errors
// and register as a hard fail). advance next_run_at so the next cron
// slot gets a fresh chance; log a skipped cron_run row so history shows
// the gap instead of a silent miss. COALESCE guards against NULL.
// #969 concurrency-aware queue — when the target workspace is busy,
// defer the fire instead of skipping. Polls every 10s for up to 2 min
// waiting for the workspace to become idle. If still busy after 2 min,
// falls back to the original skip behavior.
//
// This replaces the #115 "skip when busy" pattern which caused crons
// to permanently miss when workspaces were perpetually busy from the
// Orchestrator pulse delegation chain (~30% message drop rate on Dev Lead).
var activeTasks int
if err := db.DB.QueryRowContext(ctx,
`SELECT COALESCE(active_tasks, 0) FROM workspaces WHERE id = $1`,
sched.WorkspaceID,
).Scan(&activeTasks); err == nil && activeTasks > 0 {
log.Printf("Scheduler: skipping '%s' on busy workspace %s (active_tasks=%d)",
log.Printf("Scheduler: '%s' workspace %s busy (active_tasks=%d), deferring up to 2 min",
sched.Name, short(sched.WorkspaceID, 12), activeTasks)
s.recordSkipped(ctx, sched, activeTasks)
return
// Poll every 10s for up to 2 minutes
waited := false
for i := 0; i < 12; i++ {
time.Sleep(10 * time.Second)
if err := db.DB.QueryRowContext(ctx,
`SELECT COALESCE(active_tasks, 0) FROM workspaces WHERE id = $1`,
sched.WorkspaceID,
).Scan(&activeTasks); err != nil || activeTasks == 0 {
waited = true
break
}
}
if !waited && activeTasks > 0 {
log.Printf("Scheduler: skipping '%s' on busy workspace %s after 2 min wait (active_tasks=%d)",
sched.Name, short(sched.WorkspaceID, 12), activeTasks)
s.recordSkipped(ctx, sched, activeTasks)
return
}
log.Printf("Scheduler: '%s' workspace %s now idle after deferral, firing",
sched.Name, short(sched.WorkspaceID, 12))
}
fireCtx, cancel := context.WithTimeout(ctx, fireTimeout)

View File

@ -45,6 +45,7 @@ from executor_helpers import (
CONFIG_MOUNT,
MEMORY_CONTENT_MAX_CHARS,
WORKSPACE_MOUNT,
auto_push_hook,
brief_summary,
commit_memory,
extract_message_text,
@ -473,6 +474,8 @@ class ClaudeSDKExecutor(AgentExecutor):
await commit_memory(
f"Conversation: {original_input[:MEMORY_CONTENT_MAX_CHARS]}"
)
# Auto-push unpushed commits and open PR (non-blocking, best-effort).
await auto_push_hook()
return response_text or _NO_RESPONSE_MSG

View File

@ -14,10 +14,12 @@ Provides:
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING, Any
@ -390,3 +392,156 @@ def sanitize_agent_error(
else:
tag = "unknown"
return f"Agent error ({tag}) — see workspace logs for details."
# ========================================================================
# Auto-push hook — push unpushed commits and open PR after task completion
# ========================================================================
# Git/gh wrappers at /usr/local/bin have GH_TOKEN baked in.
_GIT = "/usr/local/bin/git"
_GH = "/usr/local/bin/gh"
_PROTECTED_BRANCHES = frozenset({"staging", "main", "master"})
def _run_git(args: list[str], cwd: str, timeout: int = 30) -> subprocess.CompletedProcess:
"""Run a git/gh command with bounded timeout. Never raises on failure."""
return subprocess.run(
args,
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout,
)
def _auto_push_and_pr_sync(cwd: str) -> None:
"""Synchronous implementation of the auto-push hook.
1. Check if we're in a git repo with unpushed commits on a feature branch.
2. Push the branch.
3. Open a PR against staging if one doesn't already exist.
Designed to be called from a background thread never raises, logs all
errors. Uses the git/gh wrappers at /usr/local/bin/ which have GH_TOKEN
baked in.
"""
try:
# --- Guard: is this a git repo? ---
probe = _run_git([_GIT, "rev-parse", "--is-inside-work-tree"], cwd)
if probe.returncode != 0:
return
# --- Guard: get current branch ---
branch_result = _run_git(
[_GIT, "rev-parse", "--abbrev-ref", "HEAD"], cwd
)
if branch_result.returncode != 0:
return
branch = branch_result.stdout.strip()
if not branch or branch in _PROTECTED_BRANCHES or branch == "HEAD":
return
# --- Guard: any unpushed commits? ---
log_result = _run_git(
[_GIT, "log", "origin/staging..HEAD", "--oneline"], cwd
)
if log_result.returncode != 0 or not log_result.stdout.strip():
# No unpushed commits (or origin/staging doesn't exist).
return
unpushed_lines = log_result.stdout.strip().splitlines()
logger.info(
"auto-push: %d unpushed commit(s) on branch '%s', pushing...",
len(unpushed_lines),
branch,
)
# --- Push ---
push_result = _run_git(
[_GIT, "push", "origin", branch], cwd, timeout=60
)
if push_result.returncode != 0:
logger.warning(
"auto-push: git push failed (exit %d): %s",
push_result.returncode,
(push_result.stderr or push_result.stdout)[:500],
)
return
logger.info("auto-push: pushed branch '%s' successfully", branch)
# --- Check if PR already exists ---
pr_list = _run_git(
[_GH, "pr", "list", "--head", branch, "--json", "number"], cwd
)
if pr_list.returncode != 0:
logger.warning(
"auto-push: gh pr list failed (exit %d): %s",
pr_list.returncode,
(pr_list.stderr or pr_list.stdout)[:500],
)
return
existing_prs = json.loads(pr_list.stdout.strip() or "[]")
if existing_prs:
logger.info(
"auto-push: PR already exists for branch '%s' (#%s), skipping create",
branch,
existing_prs[0].get("number", "?"),
)
return
# --- Get first commit message for PR title ---
first_commit = _run_git(
[_GIT, "log", "origin/staging..HEAD", "--reverse",
"--format=%s", "-1"],
cwd,
)
pr_title = first_commit.stdout.strip() if first_commit.returncode == 0 else branch
# Truncate to 256 chars (GitHub limit)
if len(pr_title) > 256:
pr_title = pr_title[:253] + "..."
# --- Create PR ---
pr_create = _run_git(
[
_GH, "pr", "create",
"--base", "staging",
"--title", pr_title,
"--body", "Auto-created by workspace agent",
],
cwd,
timeout=60,
)
if pr_create.returncode != 0:
logger.warning(
"auto-push: gh pr create failed (exit %d): %s",
pr_create.returncode,
(pr_create.stderr or pr_create.stdout)[:500],
)
else:
pr_url = pr_create.stdout.strip()
logger.info("auto-push: created PR %s", pr_url)
except subprocess.TimeoutExpired:
logger.warning("auto-push: command timed out, skipping")
except Exception:
logger.exception("auto-push: unexpected error (non-fatal)")
async def auto_push_hook(cwd: str | None = None) -> None:
"""Post-execution hook: push unpushed commits and open a PR.
Runs the git/gh subprocess work in a background thread via
asyncio.to_thread so it never blocks the agent's event loop.
Catches all exceptions the agent must never crash due to this hook.
"""
if cwd is None:
cwd = WORKSPACE_MOUNT
if not os.path.isdir(cwd):
return
try:
await asyncio.to_thread(_auto_push_and_pr_sync, cwd)
except Exception:
logger.exception("auto_push_hook: failed (non-fatal)")

View File

@ -53,7 +53,14 @@ set -euo pipefail
PLATFORM_URL="${PLATFORM_URL:-http://platform:8080}"
CONFIGS_DIR="${CONFIGS_DIR:-/configs}"
TOKEN_FILE="${CONFIGS_DIR}/.auth_token"
ENDPOINT="${PLATFORM_URL}/admin/github-installation-token"
# #1068: use workspace-scoped path (WorkspaceAuth) instead of admin path
# (AdminAuth rejects workspace bearer tokens since PR #729).
WORKSPACE_ID="${WORKSPACE_ID:-}"
if [ -n "$WORKSPACE_ID" ]; then
ENDPOINT="${PLATFORM_URL}/workspaces/${WORKSPACE_ID}/github-installation-token"
else
ENDPOINT="${PLATFORM_URL}/admin/github-installation-token"
fi
# _fetch_token — internal helper; also callable directly from cron.
# Outputs the raw token string on success; exits non-zero on failure.