feat: event-driven cron triggers + auto-push hook for agent productivity

Three changes to boost agent throughput:

1. Event-driven cron triggers (webhooks.go): GitHub issues/opened events
   fire all "pick-up-work" schedules immediately. PR review/submitted
   events fire "PR review" and "security review" schedules. Uses
   next_run_at=now() so the scheduler picks them up on next tick.

2. Auto-push hook (executor_helpers.py): After every task completion,
   agents automatically push unpushed commits and open a PR targeting
   staging. Guards: only on non-protected branches with unpushed work.
   Uses /usr/local/bin/git and /usr/local/bin/gh wrappers with baked-in
   GH_TOKEN. Never crashes the agent — all errors logged and continued.

3. Integration (claude_sdk_executor.py): auto_push_hook() called in the
   _execute_locked finally block after commit_memory.

Closes productivity gap where agents wrote code but never pushed,
and where work crons only fired on timers instead of reacting to events.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rabbitblood 2026-04-19 20:26:35 -07:00
parent ccaa0a6b8a
commit 46c20731e6
4 changed files with 443 additions and 0 deletions

View File

@ -7,10 +7,12 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/events"
"github.com/gin-gonic/gin"
)
@ -56,6 +58,16 @@ func (h *WebhookHandler) GitHub(c *gin.Context) {
}
eventType := c.GetHeader("X-GitHub-Event")
// Event-driven cron triggers: certain GitHub events fire matching
// schedules immediately instead of forwarding to a specific workspace.
if triggered, triggerErr := h.handleCronTriggerEvent(c, eventType, rawBody); triggered {
if triggerErr != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": triggerErr.Error()})
}
return
}
deliveryID := c.GetHeader("X-GitHub-Delivery")
payloadWorkspaceID, a2aPayload, buildErr := buildGitHubA2APayload(eventType, deliveryID, rawBody)
if buildErr != nil {
@ -295,3 +307,131 @@ func newGitHubMessagePayload(text string, metadata map[string]interface{}) map[s
},
}
}
// ---------------------------------------------------------------------------
// Event-driven cron triggers
//
// Some GitHub events don't target a specific workspace — instead they should
// wake up all engineer work crons immediately so the team reacts to new issues
// or PR reviews without waiting for the next 30-minute timer tick.
//
// Supported events:
// - issues (action=opened) → fires schedules with "pick-up-work" in name
// - pull_request_review (action=submitted) → fires schedules with "PR review"
// or "security review" in name
//
// Mechanism: UPDATE next_run_at = NOW() on matching enabled schedules. The
// scheduler's 30-second poll loop picks them up on the next tick.
// ---------------------------------------------------------------------------
// githubIssuesEvent is the minimal subset of the GitHub "issues" webhook payload.
type githubIssuesEvent struct {
Action string `json:"action"`
Repository githubRepository `json:"repository"`
Sender githubSender `json:"sender"`
Issue struct {
Number int `json:"number"`
Title string `json:"title"`
HTMLURL string `json:"html_url"`
} `json:"issue"`
}
// githubPullRequestReviewEvent is the minimal subset of the GitHub
// "pull_request_review" webhook payload.
type githubPullRequestReviewEvent struct {
Action string `json:"action"`
Repository githubRepository `json:"repository"`
Sender githubSender `json:"sender"`
Review struct {
State string `json:"state"` // approved, changes_requested, commented
HTMLURL string `json:"html_url"`
} `json:"review"`
PullRequest struct {
Number int `json:"number"`
Title string `json:"title"`
HTMLURL string `json:"html_url"`
} `json:"pull_request"`
}
// handleCronTriggerEvent checks if the GitHub event is one that should trigger
// schedules immediately. Returns (true, nil) if it handled the event and wrote
// the HTTP response, (true, err) if it handled but errored, or (false, nil) if
// the event is not a cron-trigger type and should fall through to A2A forwarding.
func (h *WebhookHandler) handleCronTriggerEvent(c *gin.Context, eventType string, rawBody []byte) (bool, error) {
ctx := c.Request.Context()
switch eventType {
case "issues":
var payload githubIssuesEvent
if err := json.Unmarshal(rawBody, &payload); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid issues payload"})
return true, nil
}
if payload.Action != "opened" {
c.JSON(http.StatusAccepted, gin.H{"status": "ignored", "reason": "only issues action=opened triggers crons"})
return true, nil
}
// Fire all enabled schedules whose name contains "pick-up-work" (case-insensitive).
result, err := db.DB.ExecContext(ctx, `
UPDATE workspace_schedules
SET next_run_at = now(), updated_at = now()
WHERE enabled = true
AND next_run_at IS NOT NULL
AND LOWER(name) LIKE '%pick-up-work%'
`)
if err != nil {
log.Printf("Webhook: cron trigger (issues/opened) DB error: %v", err)
return true, fmt.Errorf("failed to trigger schedules: %w", err)
}
affected, _ := result.RowsAffected()
log.Printf("Webhook: issues/opened in %s #%d by %s — triggered %d pick-up-work schedule(s)",
payload.Repository.FullName, payload.Issue.Number, payload.Sender.Login, affected)
c.JSON(http.StatusOK, gin.H{
"status": "triggered",
"event": "issues",
"action": "opened",
"schedules_affected": affected,
})
return true, nil
case "pull_request_review":
var payload githubPullRequestReviewEvent
if err := json.Unmarshal(rawBody, &payload); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid pull_request_review payload"})
return true, nil
}
if payload.Action != "submitted" {
c.JSON(http.StatusAccepted, gin.H{"status": "ignored", "reason": "only pull_request_review action=submitted triggers crons"})
return true, nil
}
// Fire all enabled schedules whose name contains "PR review" or "security review" (case-insensitive).
result, err := db.DB.ExecContext(ctx, `
UPDATE workspace_schedules
SET next_run_at = now(), updated_at = now()
WHERE enabled = true
AND next_run_at IS NOT NULL
AND (LOWER(name) LIKE '%pr review%' OR LOWER(name) LIKE '%security review%')
`)
if err != nil {
log.Printf("Webhook: cron trigger (pull_request_review/submitted) DB error: %v", err)
return true, fmt.Errorf("failed to trigger schedules: %w", err)
}
affected, _ := result.RowsAffected()
log.Printf("Webhook: pull_request_review/submitted in %s PR #%d by %s (state=%s) — triggered %d review schedule(s)",
payload.Repository.FullName, payload.PullRequest.Number, payload.Sender.Login, payload.Review.State, affected)
c.JSON(http.StatusOK, gin.H{
"status": "triggered",
"event": "pull_request_review",
"action": "submitted",
"schedules_affected": affected,
})
return true, nil
default:
return false, nil
}
}

View File

@ -8,6 +8,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
@ -206,3 +207,147 @@ func TestGitHubWebhook_ValidPRReviewComment_Forwards(t *testing.T) {
t.Fatalf("unmet sqlmock expectations: %v", err)
}
}
// ---------------------------------------------------------------------------
// Event-driven cron trigger tests
// ---------------------------------------------------------------------------
func TestGitHubWebhook_IssuesOpened_TriggersCrons(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWebhookHandler(broadcaster)
secret := "test-secret"
t.Setenv("GITHUB_WEBHOOK_SECRET", secret)
body := []byte(`{
"action": "opened",
"repository": {"full_name": "Molecule-AI/molecule-core"},
"sender": {"login": "alice"},
"issue": {"number": 42, "title": "New feature request", "html_url": "https://github.com/Molecule-AI/molecule-core/issues/42"}
}`)
// Expect the UPDATE that sets next_run_at = now() on pick-up-work schedules.
mock.ExpectExec("UPDATE workspace_schedules").
WillReturnResult(sqlmock.NewResult(0, 3))
w, c := newWebhookTestContext(t, "", body)
c.Request.Header.Set("X-GitHub-Event", "issues")
c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body))
handler.GitHub(c)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
// Verify response includes trigger metadata.
respBody := w.Body.String()
if !strings.Contains(respBody, `"triggered"`) {
t.Fatalf("expected 'triggered' in response, got: %s", respBody)
}
if !strings.Contains(respBody, `"schedules_affected"`) {
t.Fatalf("expected 'schedules_affected' in response, got: %s", respBody)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet sqlmock expectations: %v", err)
}
}
func TestGitHubWebhook_IssuesClosed_Ignored(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWebhookHandler(broadcaster)
secret := "test-secret"
t.Setenv("GITHUB_WEBHOOK_SECRET", secret)
body := []byte(`{
"action": "closed",
"repository": {"full_name": "Molecule-AI/molecule-core"},
"sender": {"login": "alice"},
"issue": {"number": 42, "title": "Old issue", "html_url": "https://github.com/Molecule-AI/molecule-core/issues/42"}
}`)
w, c := newWebhookTestContext(t, "", body)
c.Request.Header.Set("X-GitHub-Event", "issues")
c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body))
handler.GitHub(c)
if w.Code != http.StatusAccepted {
t.Fatalf("expected status 202, got %d: %s", w.Code, w.Body.String())
}
}
func TestGitHubWebhook_PRReviewSubmitted_TriggersCrons(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWebhookHandler(broadcaster)
secret := "test-secret"
t.Setenv("GITHUB_WEBHOOK_SECRET", secret)
body := []byte(`{
"action": "submitted",
"repository": {"full_name": "Molecule-AI/molecule-core"},
"sender": {"login": "bob"},
"review": {"state": "changes_requested", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7#pullrequestreview-1"},
"pull_request": {"number": 7, "title": "Fix scheduler bug", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7"}
}`)
// Expect the UPDATE that sets next_run_at = now() on review schedules.
mock.ExpectExec("UPDATE workspace_schedules").
WillReturnResult(sqlmock.NewResult(0, 2))
w, c := newWebhookTestContext(t, "", body)
c.Request.Header.Set("X-GitHub-Event", "pull_request_review")
c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body))
handler.GitHub(c)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d: %s", w.Code, w.Body.String())
}
respBody := w.Body.String()
if !strings.Contains(respBody, `"triggered"`) {
t.Fatalf("expected 'triggered' in response, got: %s", respBody)
}
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet sqlmock expectations: %v", err)
}
}
func TestGitHubWebhook_PRReviewDismissed_Ignored(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
broadcaster := newTestBroadcaster()
handler := NewWebhookHandler(broadcaster)
secret := "test-secret"
t.Setenv("GITHUB_WEBHOOK_SECRET", secret)
body := []byte(`{
"action": "dismissed",
"repository": {"full_name": "Molecule-AI/molecule-core"},
"sender": {"login": "bob"},
"review": {"state": "dismissed", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7#pullrequestreview-1"},
"pull_request": {"number": 7, "title": "Fix scheduler bug", "html_url": "https://github.com/Molecule-AI/molecule-core/pull/7"}
}`)
w, c := newWebhookTestContext(t, "", body)
c.Request.Header.Set("X-GitHub-Event", "pull_request_review")
c.Request.Header.Set("X-Hub-Signature-256", githubSignature(secret, body))
handler.GitHub(c)
if w.Code != http.StatusAccepted {
t.Fatalf("expected status 202, got %d: %s", w.Code, w.Body.String())
}
}

View File

@ -45,6 +45,7 @@ from executor_helpers import (
CONFIG_MOUNT,
MEMORY_CONTENT_MAX_CHARS,
WORKSPACE_MOUNT,
auto_push_hook,
brief_summary,
commit_memory,
extract_message_text,
@ -473,6 +474,8 @@ class ClaudeSDKExecutor(AgentExecutor):
await commit_memory(
f"Conversation: {original_input[:MEMORY_CONTENT_MAX_CHARS]}"
)
# Auto-push unpushed commits and open PR (non-blocking, best-effort).
await auto_push_hook()
return response_text or _NO_RESPONSE_MSG

View File

@ -14,10 +14,12 @@ Provides:
from __future__ import annotations
import asyncio
import json
import logging
import os
import re
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING, Any
@ -390,3 +392,156 @@ def sanitize_agent_error(
else:
tag = "unknown"
return f"Agent error ({tag}) — see workspace logs for details."
# ========================================================================
# Auto-push hook — push unpushed commits and open PR after task completion
# ========================================================================
# Git/gh wrappers at /usr/local/bin have GH_TOKEN baked in.
_GIT = "/usr/local/bin/git"
_GH = "/usr/local/bin/gh"
_PROTECTED_BRANCHES = frozenset({"staging", "main", "master"})
def _run_git(args: list[str], cwd: str, timeout: int = 30) -> subprocess.CompletedProcess:
"""Run a git/gh command with bounded timeout. Never raises on failure."""
return subprocess.run(
args,
cwd=cwd,
capture_output=True,
text=True,
timeout=timeout,
)
def _auto_push_and_pr_sync(cwd: str) -> None:
"""Synchronous implementation of the auto-push hook.
1. Check if we're in a git repo with unpushed commits on a feature branch.
2. Push the branch.
3. Open a PR against staging if one doesn't already exist.
Designed to be called from a background thread never raises, logs all
errors. Uses the git/gh wrappers at /usr/local/bin/ which have GH_TOKEN
baked in.
"""
try:
# --- Guard: is this a git repo? ---
probe = _run_git([_GIT, "rev-parse", "--is-inside-work-tree"], cwd)
if probe.returncode != 0:
return
# --- Guard: get current branch ---
branch_result = _run_git(
[_GIT, "rev-parse", "--abbrev-ref", "HEAD"], cwd
)
if branch_result.returncode != 0:
return
branch = branch_result.stdout.strip()
if not branch or branch in _PROTECTED_BRANCHES or branch == "HEAD":
return
# --- Guard: any unpushed commits? ---
log_result = _run_git(
[_GIT, "log", "origin/staging..HEAD", "--oneline"], cwd
)
if log_result.returncode != 0 or not log_result.stdout.strip():
# No unpushed commits (or origin/staging doesn't exist).
return
unpushed_lines = log_result.stdout.strip().splitlines()
logger.info(
"auto-push: %d unpushed commit(s) on branch '%s', pushing...",
len(unpushed_lines),
branch,
)
# --- Push ---
push_result = _run_git(
[_GIT, "push", "origin", branch], cwd, timeout=60
)
if push_result.returncode != 0:
logger.warning(
"auto-push: git push failed (exit %d): %s",
push_result.returncode,
(push_result.stderr or push_result.stdout)[:500],
)
return
logger.info("auto-push: pushed branch '%s' successfully", branch)
# --- Check if PR already exists ---
pr_list = _run_git(
[_GH, "pr", "list", "--head", branch, "--json", "number"], cwd
)
if pr_list.returncode != 0:
logger.warning(
"auto-push: gh pr list failed (exit %d): %s",
pr_list.returncode,
(pr_list.stderr or pr_list.stdout)[:500],
)
return
existing_prs = json.loads(pr_list.stdout.strip() or "[]")
if existing_prs:
logger.info(
"auto-push: PR already exists for branch '%s' (#%s), skipping create",
branch,
existing_prs[0].get("number", "?"),
)
return
# --- Get first commit message for PR title ---
first_commit = _run_git(
[_GIT, "log", "origin/staging..HEAD", "--reverse",
"--format=%s", "-1"],
cwd,
)
pr_title = first_commit.stdout.strip() if first_commit.returncode == 0 else branch
# Truncate to 256 chars (GitHub limit)
if len(pr_title) > 256:
pr_title = pr_title[:253] + "..."
# --- Create PR ---
pr_create = _run_git(
[
_GH, "pr", "create",
"--base", "staging",
"--title", pr_title,
"--body", "Auto-created by workspace agent",
],
cwd,
timeout=60,
)
if pr_create.returncode != 0:
logger.warning(
"auto-push: gh pr create failed (exit %d): %s",
pr_create.returncode,
(pr_create.stderr or pr_create.stdout)[:500],
)
else:
pr_url = pr_create.stdout.strip()
logger.info("auto-push: created PR %s", pr_url)
except subprocess.TimeoutExpired:
logger.warning("auto-push: command timed out, skipping")
except Exception:
logger.exception("auto-push: unexpected error (non-fatal)")
async def auto_push_hook(cwd: str | None = None) -> None:
"""Post-execution hook: push unpushed commits and open a PR.
Runs the git/gh subprocess work in a background thread via
asyncio.to_thread so it never blocks the agent's event loop.
Catches all exceptions the agent must never crash due to this hook.
"""
if cwd is None:
cwd = WORKSPACE_MOUNT
if not os.path.isdir(cwd):
return
try:
await asyncio.to_thread(_auto_push_and_pr_sync, cwd)
except Exception:
logger.exception("auto_push_hook: failed (non-fatal)")