From 46c20731e6d1ef9ee0ae9b2625db4d75c3396308 Mon Sep 17 00:00:00 2001 From: rabbitblood Date: Sun, 19 Apr 2026 20:26:35 -0700 Subject: [PATCH] feat: event-driven cron triggers + auto-push hook for agent productivity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three changes to boost agent throughput: 1. Event-driven cron triggers (webhooks.go): GitHub issues/opened events fire all "pick-up-work" schedules immediately. PR review/submitted events fire "PR review" and "security review" schedules. Uses next_run_at=now() so the scheduler picks them up on next tick. 2. Auto-push hook (executor_helpers.py): After every task completion, agents automatically push unpushed commits and open a PR targeting staging. Guards: only on non-protected branches with unpushed work. Uses /usr/local/bin/git and /usr/local/bin/gh wrappers with baked-in GH_TOKEN. Never crashes the agent — all errors logged and continued. 3. Integration (claude_sdk_executor.py): auto_push_hook() called in the _execute_locked finally block after commit_memory. Closes productivity gap where agents wrote code but never pushed, and where work crons only fired on timers instead of reacting to events. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../internal/handlers/webhooks.go | 140 ++++++++++++++++ .../internal/handlers/webhooks_test.go | 145 ++++++++++++++++ workspace/claude_sdk_executor.py | 3 + workspace/executor_helpers.py | 155 ++++++++++++++++++ 4 files changed, 443 insertions(+) diff --git a/workspace-server/internal/handlers/webhooks.go b/workspace-server/internal/handlers/webhooks.go index 259c62f0..7abfceb0 100644 --- a/workspace-server/internal/handlers/webhooks.go +++ b/workspace-server/internal/handlers/webhooks.go @@ -7,10 +7,12 @@ import ( "encoding/json" "fmt" "io" + "log" "net/http" "os" "strings" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/Molecule-AI/molecule-monorepo/platform/internal/events" "github.com/gin-gonic/gin" ) @@ -56,6 +58,16 @@ func (h *WebhookHandler) GitHub(c *gin.Context) { } eventType := c.GetHeader("X-GitHub-Event") + + // Event-driven cron triggers: certain GitHub events fire matching + // schedules immediately instead of forwarding to a specific workspace. + if triggered, triggerErr := h.handleCronTriggerEvent(c, eventType, rawBody); triggered { + if triggerErr != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": triggerErr.Error()}) + } + return + } + deliveryID := c.GetHeader("X-GitHub-Delivery") payloadWorkspaceID, a2aPayload, buildErr := buildGitHubA2APayload(eventType, deliveryID, rawBody) if buildErr != nil { @@ -295,3 +307,131 @@ func newGitHubMessagePayload(text string, metadata map[string]interface{}) map[s }, } } + +// --------------------------------------------------------------------------- +// Event-driven cron triggers +// +// Some GitHub events don't target a specific workspace — instead they should +// wake up all engineer work crons immediately so the team reacts to new issues +// or PR reviews without waiting for the next 30-minute timer tick. +// +// Supported events: +// - issues (action=opened) → fires schedules with "pick-up-work" in name +// - pull_request_review (action=submitted) → fires schedules with "PR review" +// or "security review" in name +// +// Mechanism: UPDATE next_run_at = NOW() on matching enabled schedules. The +// scheduler's 30-second poll loop picks them up on the next tick. +// --------------------------------------------------------------------------- + +// githubIssuesEvent is the minimal subset of the GitHub "issues" webhook payload. +type githubIssuesEvent struct { + Action string `json:"action"` + Repository githubRepository `json:"repository"` + Sender githubSender `json:"sender"` + Issue struct { + Number int `json:"number"` + Title string `json:"title"` + HTMLURL string `json:"html_url"` + } `json:"issue"` +} + +// githubPullRequestReviewEvent is the minimal subset of the GitHub +// "pull_request_review" webhook payload. +type githubPullRequestReviewEvent struct { + Action string `json:"action"` + Repository githubRepository `json:"repository"` + Sender githubSender `json:"sender"` + Review struct { + State string `json:"state"` // approved, changes_requested, commented + HTMLURL string `json:"html_url"` + } `json:"review"` + PullRequest struct { + Number int `json:"number"` + Title string `json:"title"` + HTMLURL string `json:"html_url"` + } `json:"pull_request"` +} + +// handleCronTriggerEvent checks if the GitHub event is one that should trigger +// schedules immediately. Returns (true, nil) if it handled the event and wrote +// the HTTP response, (true, err) if it handled but errored, or (false, nil) if +// the event is not a cron-trigger type and should fall through to A2A forwarding. +func (h *WebhookHandler) handleCronTriggerEvent(c *gin.Context, eventType string, rawBody []byte) (bool, error) { + ctx := c.Request.Context() + + switch eventType { + case "issues": + var payload githubIssuesEvent + if err := json.Unmarshal(rawBody, &payload); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid issues payload"}) + return true, nil + } + if payload.Action != "opened" { + c.JSON(http.StatusAccepted, gin.H{"status": "ignored", "reason": "only issues action=opened triggers crons"}) + return true, nil + } + + // Fire all enabled schedules whose name contains "pick-up-work" (case-insensitive). + result, err := db.DB.ExecContext(ctx, ` + UPDATE workspace_schedules + SET next_run_at = now(), updated_at = now() + WHERE enabled = true + AND next_run_at IS NOT NULL + AND LOWER(name) LIKE '%pick-up-work%' + `) + if err != nil { + log.Printf("Webhook: cron trigger (issues/opened) DB error: %v", err) + return true, fmt.Errorf("failed to trigger schedules: %w", err) + } + affected, _ := result.RowsAffected() + log.Printf("Webhook: issues/opened in %s #%d by %s — triggered %d pick-up-work schedule(s)", + payload.Repository.FullName, payload.Issue.Number, payload.Sender.Login, affected) + + c.JSON(http.StatusOK, gin.H{ + "status": "triggered", + "event": "issues", + "action": "opened", + "schedules_affected": affected, + }) + return true, nil + + case "pull_request_review": + var payload githubPullRequestReviewEvent + if err := json.Unmarshal(rawBody, &payload); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid pull_request_review payload"}) + return true, nil + } + if payload.Action != "submitted" { + c.JSON(http.StatusAccepted, gin.H{"status": "ignored", "reason": "only pull_request_review action=submitted triggers crons"}) + return true, nil + } + + // Fire all enabled schedules whose name contains "PR review" or "security review" (case-insensitive). + result, err := db.DB.ExecContext(ctx, ` + UPDATE workspace_schedules + SET next_run_at = now(), updated_at = now() + WHERE enabled = true + AND next_run_at IS NOT NULL + AND (LOWER(name) LIKE '%pr review%' OR LOWER(name) LIKE '%security review%') + `) + if err != nil { + log.Printf("Webhook: cron trigger (pull_request_review/submitted) DB error: %v", err) + return true, fmt.Errorf("failed to trigger schedules: %w", err) + } + affected, _ := result.RowsAffected() + log.Printf("Webhook: pull_request_review/submitted in %s PR #%d by %s (state=%s) — triggered %d review schedule(s)", + payload.Repository.FullName, payload.PullRequest.Number, payload.Sender.Login, payload.Review.State, affected) + + c.JSON(http.StatusOK, gin.H{ + "status": "triggered", + "event": "pull_request_review", + "action": "submitted", + "schedules_affected": affected, + }) + return true, nil + + default: + return false, nil + } +} diff --git a/workspace-server/internal/handlers/webhooks_test.go b/workspace-server/internal/handlers/webhooks_test.go index 74264c06..659fcd68 100644 --- a/workspace-server/internal/handlers/webhooks_test.go +++ b/workspace-server/internal/handlers/webhooks_test.go @@ -8,6 +8,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -206,3 +207,147 @@ func TestGitHubWebhook_ValidPRReviewComment_Forwards(t *testing.T) { t.Fatalf("unmet sqlmock expectations: %v", err) } } + +// --------------------------------------------------------------------------- +// Event-driven cron trigger tests +// --------------------------------------------------------------------------- + +func TestGitHubWebhook_IssuesOpened_TriggersCrons(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWebhookHandler(broadcaster) + + secret := "test-secret" + t.Setenv("GITHUB_WEBHOOK_SECRET", secret) + + body := []byte(`{ + "action": "opened", + "repository": {"full_name": "Molecule-AI/molecule-core"}, + "sender": {"login": "alice"}, + "issue": {"number": 42, "title": "New feature request", "html_url": "https://github.com/Molecule-AI/molecule-core/issues/42"} + }`) + + // Expect the UPDATE that sets next_run_at = now() on pick-up-work schedules. + mock.ExpectExec("UPDATE workspace_schedules"). + WillReturnResult(sqlmock.NewResult(0, 3)) + + w, c := newWebhookTestContext(t, "", body) + c.Request.Header.Set("X-GitHub-Event", "issues") + c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body)) + + handler.GitHub(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + // Verify response includes trigger metadata. + respBody := w.Body.String() + if !strings.Contains(respBody, `"triggered"`) { + t.Fatalf("expected 'triggered' in response, got: %s", respBody) + } + if !strings.Contains(respBody, `"schedules_affected"`) { + t.Fatalf("expected 'schedules_affected' in response, got: %s", respBody) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet sqlmock expectations: %v", err) + } +} + +func TestGitHubWebhook_IssuesClosed_Ignored(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWebhookHandler(broadcaster) + + secret := "test-secret" + t.Setenv("GITHUB_WEBHOOK_SECRET", secret) + + body := []byte(`{ + "action": "closed", + "repository": {"full_name": "Molecule-AI/molecule-core"}, + "sender": {"login": "alice"}, + "issue": {"number": 42, "title": "Old issue", "html_url": "https://github.com/Molecule-AI/molecule-core/issues/42"} + }`) + + w, c := newWebhookTestContext(t, "", body) + c.Request.Header.Set("X-GitHub-Event", "issues") + c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body)) + + handler.GitHub(c) + + if w.Code != http.StatusAccepted { + t.Fatalf("expected status 202, got %d: %s", w.Code, w.Body.String()) + } +} + +func TestGitHubWebhook_PRReviewSubmitted_TriggersCrons(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWebhookHandler(broadcaster) + + secret := "test-secret" + t.Setenv("GITHUB_WEBHOOK_SECRET", secret) + + body := []byte(`{ + "action": "submitted", + "repository": {"full_name": "Molecule-AI/molecule-core"}, + "sender": {"login": "bob"}, + "review": {"state": "changes_requested", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7#pullrequestreview-1"}, + "pull_request": {"number": 7, "title": "Fix scheduler bug", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7"} + }`) + + // Expect the UPDATE that sets next_run_at = now() on review schedules. + mock.ExpectExec("UPDATE workspace_schedules"). + WillReturnResult(sqlmock.NewResult(0, 2)) + + w, c := newWebhookTestContext(t, "", body) + c.Request.Header.Set("X-GitHub-Event", "pull_request_review") + c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body)) + + handler.GitHub(c) + + if w.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String()) + } + + respBody := w.Body.String() + if !strings.Contains(respBody, `"triggered"`) { + t.Fatalf("expected 'triggered' in response, got: %s", respBody) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet sqlmock expectations: %v", err) + } +} + +func TestGitHubWebhook_PRReviewDismissed_Ignored(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + broadcaster := newTestBroadcaster() + handler := NewWebhookHandler(broadcaster) + + secret := "test-secret" + t.Setenv("GITHUB_WEBHOOK_SECRET", secret) + + body := []byte(`{ + "action": "dismissed", + "repository": {"full_name": "Molecule-AI/molecule-core"}, + "sender": {"login": "bob"}, + "review": {"state": "dismissed", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7#pullrequestreview-1"}, + "pull_request": {"number": 7, "title": "Fix scheduler bug", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7"} + }`) + + w, c := newWebhookTestContext(t, "", body) + c.Request.Header.Set("X-GitHub-Event", "pull_request_review") + c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body)) + + handler.GitHub(c) + + if w.Code != http.StatusAccepted { + t.Fatalf("expected status 202, got %d: %s", w.Code, w.Body.String()) + } +} diff --git a/workspace/claude_sdk_executor.py b/workspace/claude_sdk_executor.py index 76421a46..8f8ce7e8 100644 --- a/workspace/claude_sdk_executor.py +++ b/workspace/claude_sdk_executor.py @@ -45,6 +45,7 @@ from executor_helpers import ( CONFIG_MOUNT, MEMORY_CONTENT_MAX_CHARS, WORKSPACE_MOUNT, + auto_push_hook, brief_summary, commit_memory, extract_message_text, @@ -473,6 +474,8 @@ class ClaudeSDKExecutor(AgentExecutor): await commit_memory( f"Conversation: {original_input[:MEMORY_CONTENT_MAX_CHARS]}" ) + # Auto-push unpushed commits and open PR (non-blocking, best-effort). + await auto_push_hook() return response_text or _NO_RESPONSE_MSG diff --git a/workspace/executor_helpers.py b/workspace/executor_helpers.py index 848dd6a2..5bc50c90 100644 --- a/workspace/executor_helpers.py +++ b/workspace/executor_helpers.py @@ -14,10 +14,12 @@ Provides: from __future__ import annotations +import asyncio import json import logging import os import re +import subprocess from pathlib import Path from typing import TYPE_CHECKING, Any @@ -390,3 +392,156 @@ def sanitize_agent_error( else: tag = "unknown" return f"Agent error ({tag}) — see workspace logs for details." + + +# ======================================================================== +# Auto-push hook — push unpushed commits and open PR after task completion +# ======================================================================== + +# Git/gh wrappers at /usr/local/bin have GH_TOKEN baked in. +_GIT = "/usr/local/bin/git" +_GH = "/usr/local/bin/gh" +_PROTECTED_BRANCHES = frozenset({"staging", "main", "master"}) + + +def _run_git(args: list[str], cwd: str, timeout: int = 30) -> subprocess.CompletedProcess: + """Run a git/gh command with bounded timeout. Never raises on failure.""" + return subprocess.run( + args, + cwd=cwd, + capture_output=True, + text=True, + timeout=timeout, + ) + + +def _auto_push_and_pr_sync(cwd: str) -> None: + """Synchronous implementation of the auto-push hook. + + 1. Check if we're in a git repo with unpushed commits on a feature branch. + 2. Push the branch. + 3. Open a PR against staging if one doesn't already exist. + + Designed to be called from a background thread — never raises, logs all + errors. Uses the git/gh wrappers at /usr/local/bin/ which have GH_TOKEN + baked in. + """ + try: + # --- Guard: is this a git repo? --- + probe = _run_git([_GIT, "rev-parse", "--is-inside-work-tree"], cwd) + if probe.returncode != 0: + return + + # --- Guard: get current branch --- + branch_result = _run_git( + [_GIT, "rev-parse", "--abbrev-ref", "HEAD"], cwd + ) + if branch_result.returncode != 0: + return + branch = branch_result.stdout.strip() + if not branch or branch in _PROTECTED_BRANCHES or branch == "HEAD": + return + + # --- Guard: any unpushed commits? --- + log_result = _run_git( + [_GIT, "log", "origin/staging..HEAD", "--oneline"], cwd + ) + if log_result.returncode != 0 or not log_result.stdout.strip(): + # No unpushed commits (or origin/staging doesn't exist). + return + + unpushed_lines = log_result.stdout.strip().splitlines() + logger.info( + "auto-push: %d unpushed commit(s) on branch '%s', pushing...", + len(unpushed_lines), + branch, + ) + + # --- Push --- + push_result = _run_git( + [_GIT, "push", "origin", branch], cwd, timeout=60 + ) + if push_result.returncode != 0: + logger.warning( + "auto-push: git push failed (exit %d): %s", + push_result.returncode, + (push_result.stderr or push_result.stdout)[:500], + ) + return + + logger.info("auto-push: pushed branch '%s' successfully", branch) + + # --- Check if PR already exists --- + pr_list = _run_git( + [_GH, "pr", "list", "--head", branch, "--json", "number"], cwd + ) + if pr_list.returncode != 0: + logger.warning( + "auto-push: gh pr list failed (exit %d): %s", + pr_list.returncode, + (pr_list.stderr or pr_list.stdout)[:500], + ) + return + + existing_prs = json.loads(pr_list.stdout.strip() or "[]") + if existing_prs: + logger.info( + "auto-push: PR already exists for branch '%s' (#%s), skipping create", + branch, + existing_prs[0].get("number", "?"), + ) + return + + # --- Get first commit message for PR title --- + first_commit = _run_git( + [_GIT, "log", "origin/staging..HEAD", "--reverse", + "--format=%s", "-1"], + cwd, + ) + pr_title = first_commit.stdout.strip() if first_commit.returncode == 0 else branch + # Truncate to 256 chars (GitHub limit) + if len(pr_title) > 256: + pr_title = pr_title[:253] + "..." + + # --- Create PR --- + pr_create = _run_git( + [ + _GH, "pr", "create", + "--base", "staging", + "--title", pr_title, + "--body", "Auto-created by workspace agent", + ], + cwd, + timeout=60, + ) + if pr_create.returncode != 0: + logger.warning( + "auto-push: gh pr create failed (exit %d): %s", + pr_create.returncode, + (pr_create.stderr or pr_create.stdout)[:500], + ) + else: + pr_url = pr_create.stdout.strip() + logger.info("auto-push: created PR %s", pr_url) + + except subprocess.TimeoutExpired: + logger.warning("auto-push: command timed out, skipping") + except Exception: + logger.exception("auto-push: unexpected error (non-fatal)") + + +async def auto_push_hook(cwd: str | None = None) -> None: + """Post-execution hook: push unpushed commits and open a PR. + + Runs the git/gh subprocess work in a background thread via + asyncio.to_thread so it never blocks the agent's event loop. + Catches all exceptions — the agent must never crash due to this hook. + """ + if cwd is None: + cwd = WORKSPACE_MOUNT + if not os.path.isdir(cwd): + return + try: + await asyncio.to_thread(_auto_push_and_pr_sync, cwd) + except Exception: + logger.exception("auto_push_hook: failed (non-fatal)")