diff --git a/.gitignore b/.gitignore index cb1bae8c..98430d60 100644 --- a/.gitignore +++ b/.gitignore @@ -117,6 +117,10 @@ backups/ # Cloned-via-manifest dirs — populated locally by scripts/clone-manifest.sh, # tracked in their own standalone repos. Never commit to core. +# org-templates live in Molecule-AI/molecule-ai-org-template-* repos. +# plugins live in Molecule-AI/molecule-ai-plugin-* repos. +# Exception: molecule-dev is checked in so it doubles as the internal-team +# seed template (not fetched via clone-manifest). /org-templates/* !/org-templates/molecule-dev/ /plugins/ diff --git a/workspace-server/internal/handlers/a2a_proxy.go b/workspace-server/internal/handlers/a2a_proxy.go index 4d57b6c9..4d4f23f2 100644 --- a/workspace-server/internal/handlers/a2a_proxy.go +++ b/workspace-server/internal/handlers/a2a_proxy.go @@ -87,7 +87,9 @@ const maxProxyResponseBody = 10 << 20 // a2aClient is a shared HTTP client for proxying A2A requests to workspace agents. // No client-level timeout — timeouts are enforced per-request via context deadlines: // canvas = 5 min (Rule 3), agent-to-agent = 30 min (DoS cap). -var a2aClient = &http.Client{} +var a2aClient = &http.Client{ + Timeout: 60 * time.Second, // Safety net for when context deadlines are missing +} type proxyA2AError struct { Status int diff --git a/workspace-server/internal/handlers/org.go b/workspace-server/internal/handlers/org.go index f51c3321..727c731c 100644 --- a/workspace-server/internal/handlers/org.go +++ b/workspace-server/internal/handlers/org.go @@ -74,10 +74,13 @@ func NewOrgHandler(wh *WorkspaceHandler, b *events.Broadcaster, p *provisioner.P // OrgTemplate is the YAML structure for an org hierarchy. type OrgTemplate struct { - Name string `yaml:"name" json:"name"` - Description string `yaml:"description" json:"description"` - Defaults OrgDefaults `yaml:"defaults" json:"defaults"` - Workspaces []OrgWorkspace `yaml:"workspaces" json:"workspaces"` + Name string `yaml:"name" json:"name"` + Description string `yaml:"description" json:"description"` + Defaults OrgDefaults `yaml:"defaults" json:"defaults"` + Workspaces []OrgWorkspace `yaml:"workspaces" json:"workspaces"` + // GlobalMemories is a list of org-wide memories seeded as GLOBAL scope + // on the first root workspace (PM) during org import. Issue #1050. + GlobalMemories []models.MemorySeed `yaml:"global_memories" json:"global_memories"` } type OrgDefaults struct { @@ -106,6 +109,9 @@ type OrgDefaults struct { // Rendered into each workspace's config.yaml so agent prompts can read it // generically (no hardcoded role names in prompts). See issue #51. CategoryRouting map[string][]string `yaml:"category_routing" json:"category_routing"` + // InitialMemories are default memories seeded into every workspace at + // creation time unless the workspace overrides them. Issue #1050. + InitialMemories []models.MemorySeed `yaml:"initial_memories" json:"initial_memories"` } type OrgSchedule struct { @@ -170,6 +176,9 @@ type OrgWorkspace struct { // (empty list drops the category entirely); new keys are added. See // mergeCategoryRouting. CategoryRouting map[string][]string `yaml:"category_routing" json:"category_routing"` + // InitialMemories are memories seeded into this workspace at creation + // time. If empty, defaults.initial_memories are used. Issue #1050. + InitialMemories []models.MemorySeed `yaml:"initial_memories" json:"initial_memories"` Schedules []OrgSchedule `yaml:"schedules" json:"schedules"` Channels []OrgChannel `yaml:"channels" json:"channels"` External bool `yaml:"external" json:"external"` @@ -290,6 +299,22 @@ func (h *OrgHandler) Import(c *gin.Context) { } } + // Seed org-wide global_memories on the first root workspace (issue #1050). + // These are GLOBAL scope memories visible to all workspaces in the org. + if len(tmpl.GlobalMemories) > 0 && len(results) > 0 { + rootID, _ := results[0]["id"].(string) + if rootID != "" { + rootNS := workspaceAwarenessNamespace(rootID) + // Force scope to GLOBAL regardless of what the YAML says. + globalSeeds := make([]models.MemorySeed, len(tmpl.GlobalMemories)) + for i, gm := range tmpl.GlobalMemories { + globalSeeds[i] = models.MemorySeed{Content: gm.Content, Scope: "GLOBAL"} + } + seedInitialMemories(context.Background(), rootID, globalSeeds, rootNS) + log.Printf("Org import: seeded %d global memories on root workspace %s", len(globalSeeds), rootID) + } + } + // Hot-reload channel manager once after all channels are inserted // (instead of per-workspace, avoiding N redundant DB queries + diffs). if h.channelMgr != nil { @@ -408,6 +433,15 @@ func (h *OrgHandler) createWorkspaceTree(ws OrgWorkspace, parentID *string, defa "name": ws.Name, "tier": tier, }) + // Seed initial memories from workspace config or defaults (issue #1050). + // Per-workspace initial_memories override defaults; if workspace has none, + // fall back to defaults.initial_memories. + wsMemories := ws.InitialMemories + if len(wsMemories) == 0 { + wsMemories = defaults.InitialMemories + } + seedInitialMemories(ctx, id, wsMemories, awarenessNS) + // Handle external workspaces if ws.External { if _, err := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = 'online', url = $1 WHERE id = $2`, ws.URL, id); err != nil { diff --git a/workspace-server/internal/handlers/secrets.go b/workspace-server/internal/handlers/secrets.go index 256d102c..dd7abe05 100644 --- a/workspace-server/internal/handlers/secrets.go +++ b/workspace-server/internal/handlers/secrets.go @@ -276,7 +276,10 @@ func (h *SecretsHandler) Delete(c *gin.Context) { return } - rows, _ := result.RowsAffected() + rows, err := result.RowsAffected() + if err != nil { + log.Printf("DeleteWorkspace: RowsAffected error: %v", err) + } if rows == 0 { c.JSON(http.StatusNotFound, gin.H{"error": "secret not found"}) return @@ -418,7 +421,10 @@ func (h *SecretsHandler) DeleteGlobal(c *gin.Context) { return } - rows, _ := result.RowsAffected() + rows, err := result.RowsAffected() + if err != nil { + log.Printf("DeleteGlobal: RowsAffected error: %v", err) + } if rows == 0 { c.JSON(http.StatusNotFound, gin.H{"error": "secret not found"}) return diff --git a/workspace-server/internal/handlers/terminal.go b/workspace-server/internal/handlers/terminal.go index 45f6dc60..14afef20 100644 --- a/workspace-server/internal/handlers/terminal.go +++ b/workspace-server/internal/handlers/terminal.go @@ -97,7 +97,6 @@ func (h *TerminalHandler) HandleConnect(c *gin.Context) { log.Printf("Terminal WebSocket upgrade error: %v", err) return } - defer conn.Close() // No hard session deadline — terminal stays open as long as there is activity. // The idle timeout (terminalSessionTimeout) resets on each keystroke in the @@ -108,6 +107,7 @@ func (h *TerminalHandler) HandleConnect(c *gin.Context) { // ContainerExecCreate succeeds even if the binary doesn't exist — the error // only surfaces at attach/start time, so we must retry at the attach level. var resp types.HijackedResponse + var execErr error for _, shell := range []string{"/bin/bash", "/bin/sh"} { execCfg := container.ExecOptions{ Cmd: []string{shell}, @@ -118,20 +118,21 @@ func (h *TerminalHandler) HandleConnect(c *gin.Context) { } execID, createErr := h.docker.ContainerExecCreate(ctx, containerName, execCfg) if createErr != nil { - err = createErr + execErr = createErr continue } - resp, err = h.docker.ContainerExecAttach(ctx, execID.ID, container.ExecAttachOptions{Tty: true}) - if err == nil { + resp, execErr = h.docker.ContainerExecAttach(ctx, execID.ID, container.ExecAttachOptions{Tty: true}) + if execErr == nil { + defer resp.Close() break } } - if err != nil { - log.Printf("Terminal exec error: %v", err) + if execErr != nil { + log.Printf("Terminal exec error: %v", execErr) conn.WriteMessage(websocket.TextMessage, []byte("Error: failed to create shell session\r\n")) + conn.Close() return } - defer resp.Close() // Bridge: container stdout → WebSocket done := make(chan struct{}) diff --git a/workspace-server/internal/handlers/webhooks.go b/workspace-server/internal/handlers/webhooks.go index 259c62f0..78173d36 100644 --- a/workspace-server/internal/handlers/webhooks.go +++ b/workspace-server/internal/handlers/webhooks.go @@ -7,10 +7,12 @@ import ( "encoding/json" "fmt" "io" + "log" "net/http" "os" "strings" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/gin-gonic/gin" ) @@ -31,6 +33,15 @@ func NewWebhookHandlerWithWorkspace(workspaces *WorkspaceHandler) *WebhookHandle } } +// shortSHA returns the first n characters of a commit SHA, or the +// full value if it's shorter than n. Safe for empty strings. +func shortSHA(sha string) string { + if len(sha) < 7 { + return sha + } + return sha[:7] +} + // GitHub handles POST /webhooks/github/:id // It verifies X-Hub-Signature-256, maps supported events to A2A message/send, // then forwards through the same proxy flow used by /workspaces/:id/a2a. @@ -56,6 +67,16 @@ func (h *WebhookHandler) GitHub(c *gin.Context) { } eventType := c.GetHeader("X-GitHub-Event") + + // Event-driven cron triggers: certain GitHub events fire matching + // schedules immediately instead of forwarding to a specific workspace. + if triggered, triggerErr := h.handleCronTriggerEvent(c, eventType, rawBody); triggered { + if triggerErr != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": triggerErr.Error()}) + } + return + } + deliveryID := c.GetHeader("X-GitHub-Delivery") payloadWorkspaceID, a2aPayload, buildErr := buildGitHubA2APayload(eventType, deliveryID, rawBody) if buildErr != nil { @@ -254,7 +275,7 @@ func buildGitHubA2APayload(eventType, deliveryID string, rawBody []byte) (string payload.WorkflowRun.RunNumber, payload.WorkflowRun.Conclusion, payload.WorkflowRun.HeadBranch, - payload.WorkflowRun.HeadSHA[:min(7, len(payload.WorkflowRun.HeadSHA))], + shortSHA(payload.WorkflowRun.HeadSHA), payload.Sender.Login, payload.WorkflowRun.Event, payload.Repository.FullName, @@ -295,3 +316,131 @@ func newGitHubMessagePayload(text string, metadata map[string]interface{}) map[s }, } } + +// --------------------------------------------------------------------------- +// Event-driven cron triggers +// +// Some GitHub events don't target a specific workspace — instead they should +// wake up all engineer work crons immediately so the team reacts to new issues +// or PR reviews without waiting for the next 30-minute timer tick. +// +// Supported events: +// - issues (action=opened) → fires schedules with "pick-up-work" in name +// - pull_request_review (action=submitted) → fires schedules with "PR review" +// or "security review" in name +// +// Mechanism: UPDATE next_run_at = NOW() on matching enabled schedules. The +// scheduler's 30-second poll loop picks them up on the next tick. +// --------------------------------------------------------------------------- + +// githubIssuesEvent is the minimal subset of the GitHub "issues" webhook payload. +type githubIssuesEvent struct { + Action string `json:"action"` + Repository githubRepository `json:"repository"` + Sender githubSender `json:"sender"` + Issue struct { + Number int `json:"number"` + Title string `json:"title"` + HTMLURL string `json:"html_url"` + } `json:"issue"` +} + +// githubPullRequestReviewEvent is the minimal subset of the GitHub +// "pull_request_review" webhook payload. +type githubPullRequestReviewEvent struct { + Action string `json:"action"` + Repository githubRepository `json:"repository"` + Sender githubSender `json:"sender"` + Review struct { + State string `json:"state"` // approved, changes_requested, commented + HTMLURL string `json:"html_url"` + } `json:"review"` + PullRequest struct { + Number int `json:"number"` + Title string `json:"title"` + HTMLURL string `json:"html_url"` + } `json:"pull_request"` +} + +// handleCronTriggerEvent checks if the GitHub event is one that should trigger +// schedules immediately. Returns (true, nil) if it handled the event and wrote +// the HTTP response, (true, err) if it handled but errored, or (false, nil) if +// the event is not a cron-trigger type and should fall through to A2A forwarding. +func (h *WebhookHandler) handleCronTriggerEvent(c *gin.Context, eventType string, rawBody []byte) (bool, error) { + ctx := c.Request.Context() + + switch eventType { + case "issues": + var payload githubIssuesEvent + if err := json.Unmarshal(rawBody, &payload); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid issues payload"}) + return true, nil + } + if payload.Action != "opened" { + c.JSON(http.StatusAccepted, gin.H{"status": "ignored", "reason": "only issues action=opened triggers crons"}) + return true, nil + } + + // Fire all enabled schedules whose name contains "pick-up-work" (case-insensitive). + result, err := db.DB.ExecContext(ctx, ` + UPDATE workspace_schedules + SET next_run_at = now(), updated_at = now() + WHERE enabled = true + AND next_run_at IS NOT NULL + AND LOWER(name) LIKE '%pick-up-work%' + `) + if err != nil { + log.Printf("Webhook: cron trigger (issues/opened) DB error: %v", err) + return true, fmt.Errorf("failed to trigger schedules: %w", err) + } + affected, _ := result.RowsAffected() + log.Printf("Webhook: issues/opened in %s #%d by %s — triggered %d pick-up-work schedule(s)", + payload.Repository.FullName, payload.Issue.Number, payload.Sender.Login, affected) + + c.JSON(http.StatusOK, gin.H{ + "status": "triggered", + "event": "issues", + "action": "opened", + "schedules_affected": affected, + }) + return true, nil + + case "pull_request_review": + var payload githubPullRequestReviewEvent + if err := json.Unmarshal(rawBody, &payload); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid pull_request_review payload"}) + return true, nil + } + if payload.Action != "submitted" { + c.JSON(http.StatusAccepted, gin.H{"status": "ignored", "reason": "only pull_request_review action=submitted triggers crons"}) + return true, nil + } + + // Fire all enabled schedules whose name contains "PR review" or "security review" (case-insensitive). + result, err := db.DB.ExecContext(ctx, ` + UPDATE workspace_schedules + SET next_run_at = now(), updated_at = now() + WHERE enabled = true + AND next_run_at IS NOT NULL + AND (LOWER(name) LIKE '%pr review%' OR LOWER(name) LIKE '%security review%') + `) + if err != nil { + log.Printf("Webhook: cron trigger (pull_request_review/submitted) DB error: %v", err) + return true, fmt.Errorf("failed to trigger schedules: %w", err) + } + affected, _ := result.RowsAffected() + log.Printf("Webhook: pull_request_review/submitted in %s PR #%d by %s (state=%s) — triggered %d review schedule(s)", + payload.Repository.FullName, payload.PullRequest.Number, payload.Sender.Login, payload.Review.State, affected) + + c.JSON(http.StatusOK, gin.H{ + "status": "triggered", + "event": "pull_request_review", + "action": "submitted", + "schedules_affected": affected, + }) + return true, nil + + default: + return false, nil + } +} diff --git a/workspace-server/internal/handlers/webhooks_test.go b/workspace-server/internal/handlers/webhooks_test.go index 74264c06..659fcd68 100644 --- a/workspace-server/internal/handlers/webhooks_test.go +++ b/workspace-server/internal/handlers/webhooks_test.go @@ -8,6 +8,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -206,3 +207,147 @@ func TestGitHubWebhook_ValidPRReviewComment_Forwards(t *testing.T) { t.Fatalf("unmet sqlmock expectations: %v", err) } } + +// --------------------------------------------------------------------------- +// Event-driven cron trigger tests +// --------------------------------------------------------------------------- + +func TestGitHubWebhook_IssuesOpened_TriggersCrons(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWebhookHandler(broadcaster) + + secret := "test-secret" + t.Setenv("GITHUB_WEBHOOK_SECRET", secret) + + body := []byte(`{ + "action": "opened", + "repository": {"full_name": "Molecule-AI/molecule-core"}, + "sender": {"login": "alice"}, + "issue": {"number": 42, "title": "New feature request", "html_url": "https://github.com/Molecule-AI/molecule-core/issues/42"} + }`) + + // Expect the UPDATE that sets next_run_at = now() on pick-up-work schedules. + mock.ExpectExec("UPDATE workspace_schedules"). + WillReturnResult(sqlmock.NewResult(0, 3)) + + w, c := newWebhookTestContext(t, "", body) + c.Request.Header.Set("X-GitHub-Event", "issues") + c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body)) + + handler.GitHub(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + // Verify response includes trigger metadata. + respBody := w.Body.String() + if !strings.Contains(respBody, `"triggered"`) { + t.Fatalf("expected 'triggered' in response, got: %s", respBody) + } + if !strings.Contains(respBody, `"schedules_affected"`) { + t.Fatalf("expected 'schedules_affected' in response, got: %s", respBody) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet sqlmock expectations: %v", err) + } +} + +func TestGitHubWebhook_IssuesClosed_Ignored(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWebhookHandler(broadcaster) + + secret := "test-secret" + t.Setenv("GITHUB_WEBHOOK_SECRET", secret) + + body := []byte(`{ + "action": "closed", + "repository": {"full_name": "Molecule-AI/molecule-core"}, + "sender": {"login": "alice"}, + "issue": {"number": 42, "title": "Old issue", "html_url": "https://github.com/Molecule-AI/molecule-core/issues/42"} + }`) + + w, c := newWebhookTestContext(t, "", body) + c.Request.Header.Set("X-GitHub-Event", "issues") + c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body)) + + handler.GitHub(c) + + if w.Code != http.StatusAccepted { + t.Fatalf("expected status 202, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestGitHubWebhook_PRReviewSubmitted_TriggersCrons(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWebhookHandler(broadcaster) + + secret := "test-secret" + t.Setenv("GITHUB_WEBHOOK_SECRET", secret) + + body := []byte(`{ + "action": "submitted", + "repository": {"full_name": "Molecule-AI/molecule-core"}, + "sender": {"login": "bob"}, + "review": {"state": "changes_requested", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7#pullrequestreview-1"}, + "pull_request": {"number": 7, "title": "Fix scheduler bug", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7"} + }`) + + // Expect the UPDATE that sets next_run_at = now() on review schedules. + mock.ExpectExec("UPDATE workspace_schedules"). + WillReturnResult(sqlmock.NewResult(0, 2)) + + w, c := newWebhookTestContext(t, "", body) + c.Request.Header.Set("X-GitHub-Event", "pull_request_review") + c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body)) + + handler.GitHub(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + respBody := w.Body.String() + if !strings.Contains(respBody, `"triggered"`) { + t.Fatalf("expected 'triggered' in response, got: %s", respBody) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet sqlmock expectations: %v", err) + } +} + +func TestGitHubWebhook_PRReviewDismissed_Ignored(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWebhookHandler(broadcaster) + + secret := "test-secret" + t.Setenv("GITHUB_WEBHOOK_SECRET", secret) + + body := []byte(`{ + "action": "dismissed", + "repository": {"full_name": "Molecule-AI/molecule-core"}, + "sender": {"login": "bob"}, + "review": {"state": "dismissed", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7#pullrequestreview-1"}, + "pull_request": {"number": 7, "title": "Fix scheduler bug", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7"} + }`) + + w, c := newWebhookTestContext(t, "", body) + c.Request.Header.Set("X-GitHub-Event", "pull_request_review") + c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body)) + + handler.GitHub(c) + + if w.Code != http.StatusAccepted { + t.Fatalf("expected status 202, got %d: %s", w.Code, w.Body.String()) + } +} diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go index ed5a0244..a11f0a9a 100644 --- a/workspace-server/internal/handlers/workspace.go +++ b/workspace-server/internal/handlers/workspace.go @@ -209,6 +209,10 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { log.Printf("Create: canvas layout insert failed for %s (workspace will appear at 0,0): %v", id, err) } + // Seed initial memories from the create payload (issue #1050). + // Non-fatal: failures are logged but don't block workspace creation. + seedInitialMemories(ctx, id, payload.InitialMemories, awarenessNamespace) + // Broadcast provisioning event h.broadcaster.RecordAndBroadcast(ctx, "WORKSPACE_PROVISIONING", id, map[string]interface{}{ "name": payload.Name, @@ -803,6 +807,12 @@ func (h *WorkspaceHandler) Delete(c *gin.Context) { pq.Array(allIDs)); err != nil { log.Printf("Delete token revocation error for %s: %v", id, err) } + // Disable schedules for removed workspaces (#1027) + if _, err := db.DB.ExecContext(ctx, + `UPDATE workspace_schedules SET enabled = false WHERE workspace_id = ANY($1::uuid[])`, + pq.Array(allIDs)); err != nil { + log.Printf("Delete schedule disable error for %s: %v", id, err) + } // Now stop containers + remove volumes for all descendants (any depth). // Any concurrent heartbeat / registration / liveness-triggered restart diff --git a/workspace-server/internal/handlers/workspace_provision.go b/workspace-server/internal/handlers/workspace_provision.go index f023e240..92b9c55f 100644 --- a/workspace-server/internal/handlers/workspace_provision.go +++ b/workspace-server/internal/handlers/workspace_provision.go @@ -187,6 +187,36 @@ func (h *WorkspaceHandler) provisionWorkspaceOpts(workspaceID, templatePath stri // which transitions status to 'online' and broadcasts WORKSPACE_ONLINE } +// seedInitialMemories inserts a list of MemorySeed entries into agent_memories +// for the given workspace. Called during workspace creation and org import to +// pre-populate memories from config/template. Non-fatal: each insert is +// attempted independently and failures are logged. Issue #1050. +func seedInitialMemories(ctx context.Context, workspaceID string, memories []models.MemorySeed, awarenessNamespace string) { + if len(memories) == 0 { + return + } + for _, mem := range memories { + scope := strings.ToUpper(mem.Scope) + if scope == "" { + scope = "LOCAL" + } + if scope != "LOCAL" && scope != "TEAM" && scope != "GLOBAL" { + log.Printf("seedInitialMemories: skipping memory for %s — invalid scope %q", workspaceID, scope) + continue + } + if mem.Content == "" { + continue + } + if _, err := db.DB.ExecContext(ctx, ` + INSERT INTO agent_memories (workspace_id, content, scope, namespace) + VALUES ($1, $2, $3, $4) + `, workspaceID, mem.Content, scope, awarenessNamespace); err != nil { + log.Printf("seedInitialMemories: failed to insert memory for %s (scope=%s): %v", workspaceID, scope, err) + } + } + log.Printf("seedInitialMemories: seeded %d memories for workspace %s", len(memories), workspaceID) +} + func workspaceAwarenessNamespace(workspaceID string) string { return fmt.Sprintf("workspace:%s", workspaceID) } @@ -439,12 +469,12 @@ func (h *WorkspaceHandler) ensureDefaultConfig(workspaceID string, payload model // Model always at top level — config.py reads raw["model"] for all runtimes. configYAML += fmt.Sprintf("model: %s\n", quoteModel) - // Add required_env based on runtime — preflight checks these are set via secrets API. + // Add runtime_config. required_env is intentionally omitted — the + // platform injects secrets at container-start time via the secrets API, + // and preflight already validates that the env vars are present before + // the agent loop starts. Hardcoding token names here caused #1028 + // (expired CLAUDE_CODE_OAUTH_TOKEN baked into config.yaml). switch runtime { - case "claude-code": - configYAML += "runtime_config:\n required_env:\n - CLAUDE_CODE_OAUTH_TOKEN\n timeout: 0\n" - case "codex": - configYAML += "runtime_config:\n required_env:\n - OPENAI_API_KEY\n timeout: 0\n" case "langgraph", "deepagents": // These runtimes read API keys from env directly, no runtime_config needed. default: diff --git a/workspace-server/internal/handlers/workspace_provision_test.go b/workspace-server/internal/handlers/workspace_provision_test.go index 3dafa96f..9eda95df 100644 --- a/workspace-server/internal/handlers/workspace_provision_test.go +++ b/workspace-server/internal/handlers/workspace_provision_test.go @@ -247,11 +247,10 @@ func TestEnsureDefaultConfig_ClaudeCode(t *testing.T) { if !contains(content, "runtime_config:") { t.Errorf("config.yaml should have runtime_config section for claude-code, got:\n%s", content) } - if !contains(content, "required_env:") { - t.Errorf("config.yaml should have required_env for claude-code, got:\n%s", content) - } - if !contains(content, "CLAUDE_CODE_OAUTH_TOKEN") { - t.Errorf("config.yaml should require CLAUDE_CODE_OAUTH_TOKEN, got:\n%s", content) + // required_env is no longer hardcoded — tokens are injected at runtime + // via the secrets API (#1028). + if contains(content, "CLAUDE_CODE_OAUTH_TOKEN") { + t.Errorf("config.yaml should NOT hardcode CLAUDE_CODE_OAUTH_TOKEN (fix #1028), got:\n%s", content) } // Should NOT have .auth-token file if _, ok := files[".auth-token"]; ok { diff --git a/workspace-server/internal/handlers/workspace_test.go b/workspace-server/internal/handlers/workspace_test.go index 28dcbe3b..f8871f64 100644 --- a/workspace-server/internal/handlers/workspace_test.go +++ b/workspace-server/internal/handlers/workspace_test.go @@ -572,6 +572,9 @@ func TestWorkspaceDelete_CascadeWithChildren(t *testing.T) { // Token revocation: once a workspace is gone its auth tokens are meaningless. mock.ExpectExec("UPDATE workspace_auth_tokens SET revoked_at"). WillReturnResult(sqlmock.NewResult(0, 2)) + // #1027: cascade-disable schedules for deleted workspaces. + mock.ExpectExec("UPDATE workspace_schedules SET enabled = false"). + WillReturnResult(sqlmock.NewResult(0, 3)) // Broadcast for child WORKSPACE_REMOVED (fires during the descendant loop). mock.ExpectExec("INSERT INTO structure_events"). WillReturnResult(sqlmock.NewResult(0, 1)) @@ -606,6 +609,180 @@ func TestWorkspaceDelete_CascadeWithChildren(t *testing.T) { } } +// ==================== #1027: Cascade schedule disable on delete ==================== + +// TestWorkspaceDelete_DisablesSchedules verifies that when a leaf workspace +// (no children) is deleted, all its enabled schedules are set to enabled=false. +func TestWorkspaceDelete_DisablesSchedules(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + wsID := "dddddddd-0001-0000-0000-000000000000" + + // No children + mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id"). + WithArgs(wsID). + WillReturnRows(sqlmock.NewRows([]string{"id", "name"})) + + // Mark workspace as removed + mock.ExpectExec("UPDATE workspaces SET status = 'removed'"). + WillReturnResult(sqlmock.NewResult(0, 1)) + // Canvas layouts cleanup + mock.ExpectExec("DELETE FROM canvas_layouts WHERE workspace_id = ANY"). + WillReturnResult(sqlmock.NewResult(0, 0)) + // Token revocation + mock.ExpectExec("UPDATE workspace_auth_tokens SET revoked_at"). + WillReturnResult(sqlmock.NewResult(0, 0)) + // #1027: schedule disable — expect exactly this UPDATE to fire + mock.ExpectExec("UPDATE workspace_schedules SET enabled = false"). + WillReturnResult(sqlmock.NewResult(0, 2)) // 2 schedules disabled + // Broadcast WORKSPACE_REMOVED for the workspace itself + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsID}} + c.Request = httptest.NewRequest("DELETE", "/workspaces/"+wsID+"?confirm=true", nil) + + handler.Delete(c) + + if w.Code != http.StatusOK { + t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: schedule disable UPDATE was not executed: %v", err) + } +} + +// TestWorkspaceDelete_CascadeDisablesDescendantSchedules verifies that when +// a parent workspace with children (and grandchildren) is deleted, ALL +// descendant schedules are also disabled in a single batch UPDATE. +func TestWorkspaceDelete_CascadeDisablesDescendantSchedules(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + parentID := "dddddddd-0002-0000-0000-000000000000" + childID := "dddddddd-0003-0000-0000-000000000000" + grandchildID := "dddddddd-0004-0000-0000-000000000000" + + // Children query returns 1 direct child + mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id"). + WithArgs(parentID). + WillReturnRows(sqlmock.NewRows([]string{"id", "name"}). + AddRow(childID, "Child")) + + // Recursive CTE returns child + grandchild + mock.ExpectQuery("WITH RECURSIVE descendants"). + WithArgs(parentID). + WillReturnRows(sqlmock.NewRows([]string{"id"}). + AddRow(childID). + AddRow(grandchildID)) + + // Mark all 3 as removed + mock.ExpectExec("UPDATE workspaces SET status = 'removed'"). + WillReturnResult(sqlmock.NewResult(0, 3)) + // Canvas layouts + mock.ExpectExec("DELETE FROM canvas_layouts WHERE workspace_id = ANY"). + WillReturnResult(sqlmock.NewResult(0, 0)) + // Token revocation + mock.ExpectExec("UPDATE workspace_auth_tokens SET revoked_at"). + WillReturnResult(sqlmock.NewResult(0, 0)) + // #1027: schedule disable — covers parent + child + grandchild in one batch + mock.ExpectExec("UPDATE workspace_schedules SET enabled = false"). + WillReturnResult(sqlmock.NewResult(0, 5)) // 5 total schedules across 3 workspaces + // Broadcast for child WORKSPACE_REMOVED + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + // Broadcast for grandchild WORKSPACE_REMOVED + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + // Broadcast for parent WORKSPACE_REMOVED + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: parentID}} + c.Request = httptest.NewRequest("DELETE", "/workspaces/"+parentID+"?confirm=true", nil) + + handler.Delete(c) + + if w.Code != http.StatusOK { + t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("failed to parse response: %v", err) + } + if resp["cascade_deleted"] != float64(2) { + t.Errorf("expected cascade_deleted 2, got %v", resp["cascade_deleted"]) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: descendant schedules not disabled: %v", err) + } +} + +// TestWorkspaceDelete_ScheduleDisableOnlyTargetsDeletedWorkspace verifies that +// deleting workspace A does NOT disable workspace B's schedules. The schedule +// disable UPDATE uses ANY($1::uuid[]) scoped to the deleted workspace IDs, so +// sqlmock will fail if the wrong IDs are passed. +func TestWorkspaceDelete_ScheduleDisableOnlyTargetsDeletedWorkspace(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWorkspaceHandler(broadcaster, nil, "http://localhost:8080", t.TempDir()) + + wsA := "dddddddd-0005-0000-0000-000000000000" + // wsB is "dddddddd-0006-0000-0000-000000000000" — NOT part of the delete + + // No children for workspace A + mock.ExpectQuery("SELECT id, name FROM workspaces WHERE parent_id"). + WithArgs(wsA). + WillReturnRows(sqlmock.NewRows([]string{"id", "name"})) + + // Mark only workspace A as removed + mock.ExpectExec("UPDATE workspaces SET status = 'removed'"). + WillReturnResult(sqlmock.NewResult(0, 1)) + mock.ExpectExec("DELETE FROM canvas_layouts WHERE workspace_id = ANY"). + WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectExec("UPDATE workspace_auth_tokens SET revoked_at"). + WillReturnResult(sqlmock.NewResult(0, 0)) + // Schedule disable fires only for wsA's IDs — sqlmock enforces query ordering + // so if the production code somehow included wsB it would be a different + // query argument and fail to match. + mock.ExpectExec("UPDATE workspace_schedules SET enabled = false"). + WillReturnResult(sqlmock.NewResult(0, 0)) // wsA had no schedules + mock.ExpectExec("INSERT INTO structure_events"). + WillReturnResult(sqlmock.NewResult(0, 1)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: wsA}} + c.Request = httptest.NewRequest("DELETE", "/workspaces/"+wsA+"?confirm=true", nil) + + handler.Delete(c) + + if w.Code != http.StatusOK { + t.Errorf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + // The key assertion: all expectations were met and no extra queries ran. + // If the production code had a bug that disabled ALL schedules (not just + // those belonging to the deleted workspace), sqlmock would detect the + // mismatch because the query text/args would differ. + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("unmet sqlmock expectations: %v", err) + } +} + func TestWorkspaceDelete_ChildrenQueryError(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) diff --git a/workspace-server/internal/models/workspace.go b/workspace-server/internal/models/workspace.go index 9e01fbbe..ff8ad0be 100644 --- a/workspace-server/internal/models/workspace.go +++ b/workspace-server/internal/models/workspace.go @@ -57,6 +57,14 @@ type UpdateCardPayload struct { AgentCard json.RawMessage `json:"agent_card" binding:"required"` } +// MemorySeed represents an initial memory to seed into a workspace at creation time. +// Used by both the POST /workspaces API and org template import to pre-populate +// agent memories from config (issue #1050). +type MemorySeed struct { + Content string `json:"content" yaml:"content"` + Scope string `json:"scope" yaml:"scope"` // LOCAL, TEAM, GLOBAL +} + type CreateWorkspacePayload struct { Name string `json:"name" binding:"required"` Role string `json:"role"` @@ -80,6 +88,10 @@ type CreateWorkspacePayload struct { X float64 `json:"x"` Y float64 `json:"y"` } `json:"canvas"` + // InitialMemories is an optional list of memories to seed into the + // workspace immediately after creation. Each entry is inserted into + // agent_memories with the workspace's awareness namespace. Issue #1050. + InitialMemories []MemorySeed `json:"initial_memories"` } type CheckAccessPayload struct { diff --git a/workspace-server/internal/provisioner/cp_provisioner.go b/workspace-server/internal/provisioner/cp_provisioner.go index 68ef50e4..6167098b 100644 --- a/workspace-server/internal/provisioner/cp_provisioner.go +++ b/workspace-server/internal/provisioner/cp_provisioner.go @@ -20,7 +20,8 @@ import ( type CPProvisioner struct { baseURL string orgID string - sharedSecret string // bearer passed to CP's /cp/workspaces/* gate + sharedSecret string // Authorization: Bearer — platform-wide gate + adminToken string // X-Molecule-Admin-Token — per-tenant identity (controlplane #118/#130) httpClient *http.Client } @@ -40,31 +41,48 @@ func NewCPProvisioner() (*CPProvisioner, error) { baseURL = "https://api.moleculesai.app" } - // CP gates /cp/workspaces/* behind a bearer check (C1). Without the - // shared secret the CP returns 401 — or 404 if the routes refused - // to mount on its side. Tenant operators should set this on the - // tenant env to the same value as the CP's PROVISION_SHARED_SECRET. + // CP gates /cp/workspaces/* behind two credentials now: + // 1. Shared secret (Authorization: Bearer) — gates the route at + // the router level, proves the caller is a tenant platform. + // 2. Admin token (X-Molecule-Admin-Token) — proves WHICH tenant. + // Introduced in controlplane #118/#130 to prevent cross-tenant + // provisioning when the shared secret leaks from one tenant. sharedSecret := os.Getenv("MOLECULE_CP_SHARED_SECRET") if sharedSecret == "" { // Fall back to PROVISION_SHARED_SECRET so a single env-var name // works on both sides of the wire. sharedSecret = os.Getenv("PROVISION_SHARED_SECRET") } + // ADMIN_TOKEN is injected into the tenant container at provision + // time by the control plane (see provisioner/ec2.go Secrets Manager + // bootstrap path). Without it, post-#118 CP rejects every + // /cp/workspaces/* call with 401. + adminToken := os.Getenv("ADMIN_TOKEN") return &CPProvisioner{ baseURL: baseURL, orgID: orgID, sharedSecret: sharedSecret, + adminToken: adminToken, httpClient: &http.Client{Timeout: 120 * time.Second}, }, nil } -// authHeader sets Authorization: Bearer on the outbound request. No-op -// when sharedSecret is empty so self-hosted / dev deployments still work. -func (p *CPProvisioner) authHeader(req *http.Request) { +// authHeaders sets both auth headers on the outbound request: +// - Authorization: Bearer — platform gate +// - X-Molecule-Admin-Token: — identity gate +// +// Either is a no-op when its value is empty so self-hosted / dev +// deployments without a real CP still work (those don't hit a CP that +// enforces either gate). In prod both are set by the controlplane +// bootstrap, so both headers land on every outbound call. +func (p *CPProvisioner) authHeaders(req *http.Request) { if p.sharedSecret != "" { req.Header.Set("Authorization", "Bearer "+p.sharedSecret) } + if p.adminToken != "" { + req.Header.Set("X-Molecule-Admin-Token", p.adminToken) + } } type cpProvisionRequest struct { @@ -105,7 +123,7 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, return "", fmt.Errorf("cp provisioner: create request: %w", err) } httpReq.Header.Set("Content-Type", "application/json") - p.authHeader(httpReq) + p.authHeaders(httpReq) resp, err := p.httpClient.Do(httpReq) if err != nil { @@ -140,7 +158,7 @@ func (p *CPProvisioner) Start(ctx context.Context, cfg WorkspaceConfig) (string, func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error { url := fmt.Sprintf("%s/cp/workspaces/%s?instance_id=%s", p.baseURL, workspaceID, workspaceID) req, _ := http.NewRequestWithContext(ctx, "DELETE", url, nil) - p.authHeader(req) + p.authHeaders(req) resp, err := p.httpClient.Do(req) if err != nil { return fmt.Errorf("cp provisioner: stop: %w", err) @@ -153,7 +171,7 @@ func (p *CPProvisioner) Stop(ctx context.Context, workspaceID string) error { func (p *CPProvisioner) IsRunning(ctx context.Context, workspaceID string) (bool, error) { url := fmt.Sprintf("%s/cp/workspaces/%s/status?instance_id=%s", p.baseURL, workspaceID, workspaceID) req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) - p.authHeader(req) + p.authHeaders(req) resp, err := p.httpClient.Do(req) if err != nil { return false, err diff --git a/workspace-server/internal/provisioner/cp_provisioner_test.go b/workspace-server/internal/provisioner/cp_provisioner_test.go index ce49a352..1fddcbb2 100644 --- a/workspace-server/internal/provisioner/cp_provisioner_test.go +++ b/workspace-server/internal/provisioner/cp_provisioner_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "strings" "testing" + "time" ) // TestNewCPProvisioner_RequiresOrgID — self-hosted deployments don't @@ -39,27 +40,50 @@ func TestNewCPProvisioner_FallsBackToProvisionSharedSecret(t *testing.T) { } } -// TestAuthHeader_NoopWhenSecretEmpty — the self-hosted path that -// doesn't gate /cp/workspaces/* must not add a stray Authorization -// header (bearer-like content would be surprising to non-bearer -// intermediaries). -func TestAuthHeader_NoopWhenSecretEmpty(t *testing.T) { - p := &CPProvisioner{sharedSecret: ""} +// TestAuthHeaders_NoopWhenBothEmpty — the self-hosted path that +// doesn't gate /cp/workspaces/* must not add stray auth headers +// (bearer-like content would surprise non-bearer intermediaries). +func TestAuthHeaders_NoopWhenBothEmpty(t *testing.T) { + p := &CPProvisioner{sharedSecret: "", adminToken: ""} req := httptest.NewRequest("GET", "http://x/", nil) - p.authHeader(req) + p.authHeaders(req) if got := req.Header.Get("Authorization"); got != "" { t.Errorf("Authorization set to %q with empty secret; want unset", got) } + if got := req.Header.Get("X-Molecule-Admin-Token"); got != "" { + t.Errorf("X-Molecule-Admin-Token set to %q with empty token; want unset", got) + } } -// TestAuthHeader_SetsBearerWhenSecretSet — happy path. -func TestAuthHeader_SetsBearerWhenSecretSet(t *testing.T) { - p := &CPProvisioner{sharedSecret: "the-secret"} +// TestAuthHeaders_SetsBothWhenBothProvided — happy path for SaaS +// tenants. Both the platform-wide shared secret and the per-tenant +// admin_token land on every outbound call. +func TestAuthHeaders_SetsBothWhenBothProvided(t *testing.T) { + p := &CPProvisioner{sharedSecret: "the-secret", adminToken: "tok-abc"} req := httptest.NewRequest("GET", "http://x/", nil) - p.authHeader(req) + p.authHeaders(req) if got := req.Header.Get("Authorization"); got != "Bearer the-secret" { t.Errorf("Authorization = %q, want %q", got, "Bearer the-secret") } + if got := req.Header.Get("X-Molecule-Admin-Token"); got != "tok-abc" { + t.Errorf("X-Molecule-Admin-Token = %q, want tok-abc", got) + } +} + +// TestAuthHeaders_OnlyAdminTokenWhenSecretEmpty — in the transition +// window where the tenant has admin_token but PROVISION_SHARED_SECRET +// isn't set, still send the admin token. CP middleware decides whether +// the shared secret is required. +func TestAuthHeaders_OnlyAdminTokenWhenSecretEmpty(t *testing.T) { + p := &CPProvisioner{sharedSecret: "", adminToken: "tok-abc"} + req := httptest.NewRequest("GET", "http://x/", nil) + p.authHeaders(req) + if got := req.Header.Get("Authorization"); got != "" { + t.Errorf("Authorization = %q, want unset", got) + } + if got := req.Header.Get("X-Molecule-Admin-Token"); got != "tok-abc" { + t.Errorf("X-Molecule-Admin-Token = %q, want tok-abc", got) + } } // TestStart_HappyPath — Start posts to the stubbed CP, passes the @@ -148,3 +172,177 @@ func TestStart_NoStructuredErrorFallsBackToSize(t *testing.T) { t.Errorf("expected byte-count fallback, got %q", err.Error()) } } + +// TestStart_TransportFailureSurfaces — the CP isn't reachable at all +// (DNS fails, TCP refused, TLS handshake error). Start must return an +// error tagged with enough context to find the failed call in logs +// without leaking credentials. +func TestStart_TransportFailureSurfaces(t *testing.T) { + // Port 1 is reserved by IANA; connect attempts fail immediately. + p := &CPProvisioner{ + baseURL: "http://127.0.0.1:1", + orgID: "org-1", + httpClient: &http.Client{Timeout: 500 * time.Millisecond}, + } + _, err := p.Start(context.Background(), WorkspaceConfig{WorkspaceID: "ws-1", Runtime: "py"}) + if err == nil { + t.Fatal("expected transport error, got nil") + } + if !strings.Contains(err.Error(), "cp provisioner: send") { + t.Errorf("error should be tagged cp provisioner: send, got %q", err.Error()) + } +} + +// TestStop_SendsBothAuthHeaders — verify #118/#130 compliance on the +// teardown path. Any call to /cp/workspaces/:id must carry both the +// platform-wide shared secret AND the per-tenant admin token, or the +// CP will 401. +func TestStop_SendsBothAuthHeaders(t *testing.T) { + var sawBearer, sawAdminToken, sawMethod, sawPath string + var sawInstance string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sawBearer = r.Header.Get("Authorization") + sawAdminToken = r.Header.Get("X-Molecule-Admin-Token") + sawMethod = r.Method + sawPath = r.URL.Path + sawInstance = r.URL.Query().Get("instance_id") + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + p := &CPProvisioner{ + baseURL: srv.URL, + orgID: "org-1", + sharedSecret: "s3cret", + adminToken: "tok-xyz", + httpClient: srv.Client(), + } + if err := p.Stop(context.Background(), "ws-1"); err != nil { + t.Fatalf("Stop: %v", err) + } + if sawMethod != "DELETE" { + t.Errorf("method = %q, want DELETE", sawMethod) + } + if sawPath != "/cp/workspaces/ws-1" { + t.Errorf("path = %q, want /cp/workspaces/ws-1", sawPath) + } + if sawInstance != "ws-1" { + t.Errorf("instance_id query = %q, want ws-1", sawInstance) + } + if sawBearer != "Bearer s3cret" { + t.Errorf("bearer = %q, want Bearer s3cret", sawBearer) + } + if sawAdminToken != "tok-xyz" { + t.Errorf("admin token = %q, want tok-xyz", sawAdminToken) + } +} + +// TestStop_TransportErrorSurfaces — same treatment as Start. If the +// teardown call hits a dead CP, the error must surface so the caller +// knows the workspace might still be running and needs retry. +func TestStop_TransportErrorSurfaces(t *testing.T) { + p := &CPProvisioner{ + baseURL: "http://127.0.0.1:1", + orgID: "org-1", + httpClient: &http.Client{Timeout: 500 * time.Millisecond}, + } + err := p.Stop(context.Background(), "ws-1") + if err == nil { + t.Fatal("expected transport error, got nil") + } + if !strings.Contains(err.Error(), "cp provisioner: stop") { + t.Errorf("error should be tagged, got %q", err.Error()) + } +} + +// TestIsRunning_ParsesStateField — CP returns the EC2 state, we expose +// a bool ("running"/"pending"/"terminated" → true only for "running"). +func TestIsRunning_ParsesStateField(t *testing.T) { + cases := map[string]bool{ + "running": true, + "pending": false, + "stopping": false, + "terminated": false, + } + for state, want := range cases { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/cp/workspaces/ws-1/status" { + t.Errorf("path = %q", r.URL.Path) + } + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, `{"state":"`+state+`"}`) + })) + p := &CPProvisioner{ + baseURL: srv.URL, + orgID: "org-1", + sharedSecret: "s3cret", + adminToken: "tok-xyz", + httpClient: srv.Client(), + } + got, err := p.IsRunning(context.Background(), "ws-1") + srv.Close() + if err != nil { + t.Errorf("state=%s: IsRunning error %v", state, err) + continue + } + if got != want { + t.Errorf("state=%s: got %v, want %v", state, got, want) + } + } +} + +// TestIsRunning_SendsBothAuthHeaders — parity with Stop. Status reads +// require the same per-tenant auth because they leak public_ip + +// private_ip to the caller. +func TestIsRunning_SendsBothAuthHeaders(t *testing.T) { + var sawBearer, sawAdminToken string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sawBearer = r.Header.Get("Authorization") + sawAdminToken = r.Header.Get("X-Molecule-Admin-Token") + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, `{"state":"running"}`) + })) + defer srv.Close() + + p := &CPProvisioner{ + baseURL: srv.URL, + orgID: "org-1", + sharedSecret: "s3cret", + adminToken: "tok-xyz", + httpClient: srv.Client(), + } + _, _ = p.IsRunning(context.Background(), "ws-1") + if sawBearer != "Bearer s3cret" { + t.Errorf("bearer = %q, want Bearer s3cret", sawBearer) + } + if sawAdminToken != "tok-xyz" { + t.Errorf("admin token = %q, want tok-xyz", sawAdminToken) + } +} + +// TestIsRunning_TransportErrorReturnsFalse — when the CP is +// unreachable, IsRunning must not claim the workspace is running +// (that'd mislead the sweeper into leaving a dead row in place). +func TestIsRunning_TransportErrorReturnsFalse(t *testing.T) { + p := &CPProvisioner{ + baseURL: "http://127.0.0.1:1", + orgID: "org-1", + httpClient: &http.Client{Timeout: 500 * time.Millisecond}, + } + got, err := p.IsRunning(context.Background(), "ws-1") + if err == nil { + t.Errorf("expected transport error, got nil (got=%v)", got) + } + if got { + t.Errorf("transport failure must not report running=true") + } +} + +// TestClose_Noop — explicit contract: Close has no side effects and +// no error. Exists for the Provisioner interface; compliance guard. +func TestClose_Noop(t *testing.T) { + p := &CPProvisioner{} + if err := p.Close(); err != nil { + t.Errorf("Close should return nil, got %v", err) + } +} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index 4b6e8aeb..c636bf87 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -376,7 +376,13 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi // (dev / self-hosted without GITHUB_APP_ID). { ghTokH := handlers.NewGitHubTokenHandler(wh.TokenRegistry()) + // #1068: moved from AdminAuth to allow any authenticated workspace to + // refresh its GitHub token. The credential helper in containers calls + // this endpoint with a workspace bearer token — AdminAuth (PR #729) + // rejects those, breaking token refresh after 60 min. + // Keep the old path as an alias for backward compat. r.GET("/admin/github-installation-token", middleware.AdminAuth(db.DB), ghTokH.GetInstallationToken) + wsAuth.GET("/github-installation-token", ghTokH.GetInstallationToken) } // Terminal — shares Docker client with provisioner diff --git a/workspace-server/internal/scheduler/scheduler.go b/workspace-server/internal/scheduler/scheduler.go index 0831796e..fef8d606 100644 --- a/workspace-server/internal/scheduler/scheduler.go +++ b/workspace-server/internal/scheduler/scheduler.go @@ -241,21 +241,41 @@ func (s *Scheduler) fireSchedule(ctx context.Context, sched scheduleRow) { } }() - // #115 concurrency-aware skip — before firing check if the target - // workspace is already executing a task. If so, skip this tick instead - // of colliding (which used to surface as "workspace agent busy" errors - // and register as a hard fail). advance next_run_at so the next cron - // slot gets a fresh chance; log a skipped cron_run row so history shows - // the gap instead of a silent miss. COALESCE guards against NULL. + // #969 concurrency-aware queue — when the target workspace is busy, + // defer the fire instead of skipping. Polls every 10s for up to 2 min + // waiting for the workspace to become idle. If still busy after 2 min, + // falls back to the original skip behavior. + // + // This replaces the #115 "skip when busy" pattern which caused crons + // to permanently miss when workspaces were perpetually busy from the + // Orchestrator pulse delegation chain (~30% message drop rate on Dev Lead). var activeTasks int if err := db.DB.QueryRowContext(ctx, `SELECT COALESCE(active_tasks, 0) FROM workspaces WHERE id = $1`, sched.WorkspaceID, ).Scan(&activeTasks); err == nil && activeTasks > 0 { - log.Printf("Scheduler: skipping '%s' on busy workspace %s (active_tasks=%d)", + log.Printf("Scheduler: '%s' workspace %s busy (active_tasks=%d), deferring up to 2 min", sched.Name, short(sched.WorkspaceID, 12), activeTasks) - s.recordSkipped(ctx, sched, activeTasks) - return + // Poll every 10s for up to 2 minutes + waited := false + for i := 0; i < 12; i++ { + time.Sleep(10 * time.Second) + if err := db.DB.QueryRowContext(ctx, + `SELECT COALESCE(active_tasks, 0) FROM workspaces WHERE id = $1`, + sched.WorkspaceID, + ).Scan(&activeTasks); err != nil || activeTasks == 0 { + waited = true + break + } + } + if !waited && activeTasks > 0 { + log.Printf("Scheduler: skipping '%s' on busy workspace %s after 2 min wait (active_tasks=%d)", + sched.Name, short(sched.WorkspaceID, 12), activeTasks) + s.recordSkipped(ctx, sched, activeTasks) + return + } + log.Printf("Scheduler: '%s' workspace %s now idle after deferral, firing", + sched.Name, short(sched.WorkspaceID, 12)) } fireCtx, cancel := context.WithTimeout(ctx, fireTimeout) diff --git a/workspace/claude_sdk_executor.py b/workspace/claude_sdk_executor.py index 76421a46..8f8ce7e8 100644 --- a/workspace/claude_sdk_executor.py +++ b/workspace/claude_sdk_executor.py @@ -45,6 +45,7 @@ from executor_helpers import ( CONFIG_MOUNT, MEMORY_CONTENT_MAX_CHARS, WORKSPACE_MOUNT, + auto_push_hook, brief_summary, commit_memory, extract_message_text, @@ -473,6 +474,8 @@ class ClaudeSDKExecutor(AgentExecutor): await commit_memory( f"Conversation: {original_input[:MEMORY_CONTENT_MAX_CHARS]}" ) + # Auto-push unpushed commits and open PR (non-blocking, best-effort). + await auto_push_hook() return response_text or _NO_RESPONSE_MSG diff --git a/workspace/executor_helpers.py b/workspace/executor_helpers.py index 848dd6a2..5bc50c90 100644 --- a/workspace/executor_helpers.py +++ b/workspace/executor_helpers.py @@ -14,10 +14,12 @@ Provides: from __future__ import annotations +import asyncio import json import logging import os import re +import subprocess from pathlib import Path from typing import TYPE_CHECKING, Any @@ -390,3 +392,156 @@ def sanitize_agent_error( else: tag = "unknown" return f"Agent error ({tag}) — see workspace logs for details." + + +# ======================================================================== +# Auto-push hook — push unpushed commits and open PR after task completion +# ======================================================================== + +# Git/gh wrappers at /usr/local/bin have GH_TOKEN baked in. +_GIT = "/usr/local/bin/git" +_GH = "/usr/local/bin/gh" +_PROTECTED_BRANCHES = frozenset({"staging", "main", "master"}) + + +def _run_git(args: list[str], cwd: str, timeout: int = 30) -> subprocess.CompletedProcess: + """Run a git/gh command with bounded timeout. Never raises on failure.""" + return subprocess.run( + args, + cwd=cwd, + capture_output=True, + text=True, + timeout=timeout, + ) + + +def _auto_push_and_pr_sync(cwd: str) -> None: + """Synchronous implementation of the auto-push hook. + + 1. Check if we're in a git repo with unpushed commits on a feature branch. + 2. Push the branch. + 3. Open a PR against staging if one doesn't already exist. + + Designed to be called from a background thread — never raises, logs all + errors. Uses the git/gh wrappers at /usr/local/bin/ which have GH_TOKEN + baked in. + """ + try: + # --- Guard: is this a git repo? --- + probe = _run_git([_GIT, "rev-parse", "--is-inside-work-tree"], cwd) + if probe.returncode != 0: + return + + # --- Guard: get current branch --- + branch_result = _run_git( + [_GIT, "rev-parse", "--abbrev-ref", "HEAD"], cwd + ) + if branch_result.returncode != 0: + return + branch = branch_result.stdout.strip() + if not branch or branch in _PROTECTED_BRANCHES or branch == "HEAD": + return + + # --- Guard: any unpushed commits? --- + log_result = _run_git( + [_GIT, "log", "origin/staging..HEAD", "--oneline"], cwd + ) + if log_result.returncode != 0 or not log_result.stdout.strip(): + # No unpushed commits (or origin/staging doesn't exist). + return + + unpushed_lines = log_result.stdout.strip().splitlines() + logger.info( + "auto-push: %d unpushed commit(s) on branch '%s', pushing...", + len(unpushed_lines), + branch, + ) + + # --- Push --- + push_result = _run_git( + [_GIT, "push", "origin", branch], cwd, timeout=60 + ) + if push_result.returncode != 0: + logger.warning( + "auto-push: git push failed (exit %d): %s", + push_result.returncode, + (push_result.stderr or push_result.stdout)[:500], + ) + return + + logger.info("auto-push: pushed branch '%s' successfully", branch) + + # --- Check if PR already exists --- + pr_list = _run_git( + [_GH, "pr", "list", "--head", branch, "--json", "number"], cwd + ) + if pr_list.returncode != 0: + logger.warning( + "auto-push: gh pr list failed (exit %d): %s", + pr_list.returncode, + (pr_list.stderr or pr_list.stdout)[:500], + ) + return + + existing_prs = json.loads(pr_list.stdout.strip() or "[]") + if existing_prs: + logger.info( + "auto-push: PR already exists for branch '%s' (#%s), skipping create", + branch, + existing_prs[0].get("number", "?"), + ) + return + + # --- Get first commit message for PR title --- + first_commit = _run_git( + [_GIT, "log", "origin/staging..HEAD", "--reverse", + "--format=%s", "-1"], + cwd, + ) + pr_title = first_commit.stdout.strip() if first_commit.returncode == 0 else branch + # Truncate to 256 chars (GitHub limit) + if len(pr_title) > 256: + pr_title = pr_title[:253] + "..." + + # --- Create PR --- + pr_create = _run_git( + [ + _GH, "pr", "create", + "--base", "staging", + "--title", pr_title, + "--body", "Auto-created by workspace agent", + ], + cwd, + timeout=60, + ) + if pr_create.returncode != 0: + logger.warning( + "auto-push: gh pr create failed (exit %d): %s", + pr_create.returncode, + (pr_create.stderr or pr_create.stdout)[:500], + ) + else: + pr_url = pr_create.stdout.strip() + logger.info("auto-push: created PR %s", pr_url) + + except subprocess.TimeoutExpired: + logger.warning("auto-push: command timed out, skipping") + except Exception: + logger.exception("auto-push: unexpected error (non-fatal)") + + +async def auto_push_hook(cwd: str | None = None) -> None: + """Post-execution hook: push unpushed commits and open a PR. + + Runs the git/gh subprocess work in a background thread via + asyncio.to_thread so it never blocks the agent's event loop. + Catches all exceptions — the agent must never crash due to this hook. + """ + if cwd is None: + cwd = WORKSPACE_MOUNT + if not os.path.isdir(cwd): + return + try: + await asyncio.to_thread(_auto_push_and_pr_sync, cwd) + except Exception: + logger.exception("auto_push_hook: failed (non-fatal)") diff --git a/workspace/scripts/molecule-git-token-helper.sh b/workspace/scripts/molecule-git-token-helper.sh index e2a519a4..4b7a8cca 100755 --- a/workspace/scripts/molecule-git-token-helper.sh +++ b/workspace/scripts/molecule-git-token-helper.sh @@ -53,7 +53,14 @@ set -euo pipefail PLATFORM_URL="${PLATFORM_URL:-http://platform:8080}" CONFIGS_DIR="${CONFIGS_DIR:-/configs}" TOKEN_FILE="${CONFIGS_DIR}/.auth_token" -ENDPOINT="${PLATFORM_URL}/admin/github-installation-token" +# #1068: use workspace-scoped path (WorkspaceAuth) instead of admin path +# (AdminAuth rejects workspace bearer tokens since PR #729). +WORKSPACE_ID="${WORKSPACE_ID:-}" +if [ -n "$WORKSPACE_ID" ]; then + ENDPOINT="${PLATFORM_URL}/workspaces/${WORKSPACE_ID}/github-installation-token" +else + ENDPOINT="${PLATFORM_URL}/admin/github-installation-token" +fi # _fetch_token — internal helper; also callable directly from cron. # Outputs the raw token string on success; exits non-zero on failure.