molecule-core/workspace-server/internal/plugins/drift_sweeper.go
Molecule AI Core Platform Lead 9e3d420363
All checks were successful
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 6s
sop-tier-check / tier-check (pull_request) Successful in 4s
[core-lead-agent] fix(core#228): cascade fixes for PluginResolver — make main compile
PR #256 introduced PluginResolver to break the SourceResolver redeclaration
deadlock, but missed three downstream call-sites that left main uncompilable:

1. plugins/drift_sweeper.go: PluginResolver.Resolve was declared returning
   PluginResolver (recursive). *Registry.Resolve returns the production
   SourceResolver from source.go, so *Registry didn't satisfy PluginResolver.
   Fix: Resolve returns SourceResolver. Add compile-time assertion that
   *Registry satisfies PluginResolver so any future signature drift fails
   the build instead of router wiring.

2. plugins/drift_sweeper_test.go: stubResolver was still declared with the
   old SourceResolver shape AND asserted against SourceResolver — the
   assertion failed because stubResolver lacks Scheme()/Fetch(). Fix: stub
   is a PluginResolver; assertion targets PluginResolver. Drop the unused
   "database/sql" import that fails go vet.

3. router/router.go:
   - The 70f84823 reorder moved the plgh init block above its dockerCli
     dependency (line 538 used; line 594 declared). Moved the dockerCli
     declaration up so it's available where used; replaced the orphaned
     declaration in the terminal block with a comment.
   - Setup's pluginResolver param was typed plugins.SourceResolver — wrong
     for *plugins.Registry (Registry is not a per-scheme resolver). Retyped
     to plugins.PluginResolver, which *Registry actually satisfies.
   - Removed the broken `plgh.WithSourceResolver(pluginResolver)` call —
     WithSourceResolver expects a per-scheme SourceResolver, not a
     PluginResolver/registry. plgh has its own internal default registry
     (github+local) from NewPluginsHandler, so dropping the call is
     functionally a no-op vs the broken state. Kept the param so the
     drift sweeper (main.go) can share scheme enumeration when needed.

4. go.sum: add the content hash entry for go.moleculesai.app/plugin/
   gh-identity/pluginloader (only the /go.mod hash was present, breaking
   `go build ./cmd/server`).

Verified locally:
  go build ./...           ✓
  go vet ./...             ✓ (only pre-existing org_external append warning)
  go test ./internal/plugins/...  ✓
  go test ./internal/router/...   ✓

6 pre-existing handler test failures (TestExecuteDelegation_*,
TestHandleDiagnose_*) are orthogonal — they did not run before because the
package didn't compile. Out of scope for this fix; tracking separately.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 09:46:35 +00:00

331 lines
13 KiB
Go

package plugins
// drift_sweeper.go — periodic drift detection for the plugin version-subscription
// model (core#113 / #123).
//
// How it works
// ─────────────
// Every DriftSweepInterval the sweeper:
// 1. SELECTs workspace_plugins rows where tracked_ref != 'none'
// AND installed_sha IS NOT NULL (skip pre-migration rows with NULL SHA).
// 2. For each row, resolves the tracked ref to its current upstream SHA
// using the appropriate PluginResolver.
// 3. If the resolved SHA differs from installed_sha → drift detected.
// 4. On drift, INSERT INTO plugin_update_queue (ON CONFLICT DO NOTHING so
// a re-drift while a row is still pending is a no-op).
//
// Thread-safety
// ─────────────
// The sweeper holds no mutable state between ticks. Each tick runs a fresh
// goroutine spawned by the ticker; the parent goroutine is cancelled when
// the passed context is cancelled. This matches the pattern used by
// pendinguploads/sweeper.go and registry/orphan_sweeper.go.
//
// Gitea compatibility
// ───────────────────
// Gitea's REST API is a GitHub-API-compatible surface, so the GithubResolver
// with BaseURL pointing at a Gitea instance works for Gitea-hosted plugin
// sources too. The source_raw in workspace_plugins stores the full spec
// (e.g. "github://owner/repo#tag:v1.0.0") which the resolver parses.
// For "local://" sources the resolver has no SHA concept, so those rows
// are skipped (local plugins have no upstream to drift against).
//
// Resource cost
// ─────────────
// Each tick runs O(N) resolves where N is the count of tracked plugins.
// Each resolve does a --depth=1 git fetch, bounded by the network round-trip
// to GitHub/Gitea. With 1000 tracked plugins and 1h interval, worst case is
// ~1,000 network calls per hour. The per-row timeout (ResolveRefDeadline)
// prevents a slow/hanging fetch from blocking the entire sweep cycle.
import (
"context"
"database/sql"
"fmt"
"log"
"strings"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// DriftSweepInterval is the cadence between drift-sweep cycles.
// 1 hour is a reasonable balance: fast enough to surface new tag releases
// within a reasonable window, sparse enough to not hammer GitHub's API with
// 1000s of concurrent requests across a large deployment.
const DriftSweepInterval = 1 * time.Hour
// ResolveRefDeadline bounds the git fetch for a single plugin. A
// --depth=1 clone of any reasonable plugin repo should complete well
// within 30s on a healthy connection; 60s is the conservative ceiling
// that handles Gitea instances on high-latency links.
const ResolveRefDeadline = 60 * time.Second
// PluginResolver is the registry-level abstraction the sweeper consumes:
// pick a per-scheme SourceResolver for a parsed Source, and enumerate the
// registered schemes so we can strip the prefix from a stored source_raw.
//
// Resolve returns the production SourceResolver from source.go (NOT another
// PluginResolver) — that's the actual shape of *Registry.Resolve, and the
// sweeper only needs the per-scheme resolver's identity, not its Fetch.
//
// Named PluginResolver (not SourceResolver) to avoid redeclaring the
// per-scheme SourceResolver interface defined in source.go (core#228 fix).
// Satisfied by *Registry from source.go via Resolve + Schemes.
type PluginResolver interface {
Resolve(source Source) (SourceResolver, error)
Schemes() []string
}
// Compile-time assertion: *Registry satisfies PluginResolver. Catches any
// future drift in Registry.Resolve / Schemes signatures at build time.
var _ PluginResolver = (*Registry)(nil)
// StartPluginDriftSweeper runs the drift-detection loop until ctx is cancelled.
// Pass a nil resolver to disable the sweeper (useful for harnesses or CP/SaaS
// mode where git operations are unavailable).
//
// Registers itself via atexits in cmd/server/main.go so the process
// shuts down cleanly on SIGTERM.
func StartPluginDriftSweeper(ctx context.Context, resolver PluginResolver) {
if resolver == nil {
log.Println("Plugin drift sweeper: resolver is nil — sweeper disabled")
return
}
log.Printf("Plugin drift sweeper started — interval %s", DriftSweepInterval)
ticker := time.NewTicker(DriftSweepInterval)
defer ticker.Stop()
// Run once on startup so we detect drift immediately rather than waiting
// for the first tick.
sweepDriftOnce(ctx, resolver)
for {
select {
case <-ctx.Done():
log.Println("Plugin drift sweeper: shutdown")
return
case <-ticker.C:
// ctx.Err() guard: the ticker may fire just as ctx is cancelled
// (MPMC channel race). Skip the sweep so we don't start a
// ResolveRef cycle after shutdown that would pollute the next
// test's baseline.
if ctx.Err() != nil {
continue
}
sweepDriftOnce(ctx, resolver)
}
}
}
// sweepDriftOnce runs one full drift-detection cycle.
// Errors are non-fatal — each row is handled independently so a single
// slow row doesn't block the rest of the sweep.
func sweepDriftOnce(parent context.Context, resolver PluginResolver) {
ctx, cancel := context.WithTimeout(parent, 10*time.Minute)
defer cancel()
rows, err := db.DB.QueryContext(ctx, `
SELECT wp.id, wp.workspace_id, wp.plugin_name, wp.source_raw,
wp.tracked_ref, wp.installed_sha
FROM workspace_plugins wp
WHERE wp.tracked_ref != 'none'
AND wp.installed_sha IS NOT NULL
`)
if err != nil {
log.Printf("Plugin drift sweeper: SELECT failed: %v", err)
return
}
defer rows.Close()
for rows.Next() {
var row struct {
id string
workspaceID string
pluginName string
sourceRaw string
trackedRef string
installedSHA string
}
if scanErr := rows.Scan(&row.id, &row.workspaceID, &row.pluginName,
&row.sourceRaw, &row.trackedRef, &row.installedSHA); scanErr != nil {
log.Printf("Plugin drift sweeper: row scan failed: %v", scanErr)
continue
}
latestSHA, resolveErr := resolveLatestSHA(ctx, resolver, row.sourceRaw, row.trackedRef)
if resolveErr != nil {
// Log and skip — don't queue drift if we couldn't resolve.
// Transient network errors self-heal on the next cycle.
log.Printf("Plugin drift sweeper: resolve %s@%s failed: %v — skipping",
row.pluginName, row.trackedRef, resolveErr)
continue
}
if latestSHA == row.installedSHA {
continue // no drift
}
log.Printf("Plugin drift sweeper: drift detected for %s (workspace=%s): "+
"installed=%s upstream=%s", row.pluginName, row.workspaceID,
row.installedSHA[:8], latestSHA[:8])
if queueErr := queueDriftEntry(ctx, row.workspaceID, row.pluginName,
row.trackedRef, row.installedSHA, latestSHA); queueErr != nil {
log.Printf("Plugin drift sweeper: queue drift for %s failed: %v",
row.pluginName, queueErr)
}
}
if iterErr := rows.Err(); iterErr != nil {
log.Printf("Plugin drift sweeper: rows iteration failed: %v", iterErr)
}
}
// resolveLatestSHA resolves the tracked ref to its current upstream SHA.
// Handles both github:// and local:// sources; local sources are skipped
// (no meaningful upstream to drift against).
func resolveLatestSHA(ctx context.Context, resolver PluginResolver, sourceRaw, trackedRef string) (string, error) {
// Strip the scheme prefix to get the raw spec.
// sourceRaw is stored as the full string, e.g. "github://owner/repo#tag:v1.0.0"
spec := sourceRaw
for _, scheme := range resolver.Schemes() {
if strings.HasPrefix(spec, scheme+"://") {
spec = strings.TrimPrefix(spec, scheme+"://")
break
}
}
// Parse the ref from the tracked_ref field (e.g. "tag:v1.0.0").
// Prepend it as a # suffix so the resolver can fetch the right ref.
var refSuffix string
switch {
case strings.HasPrefix(trackedRef, "tag:"):
refSuffix = "#" + trackedRef
case strings.HasPrefix(trackedRef, "sha:"):
refSuffix = "#" + trackedRef
default:
// Bare ref (shouldn't happen per validateTrackedRef, but be safe).
refSuffix = "#" + trackedRef
}
// If spec already has a # fragment, replace it with the tracked ref.
// (In practice source_raw always has one, but handle both cases.)
if strings.Contains(spec, "#") {
spec = strings.SplitN(spec, "#", 2)[0] + refSuffix
} else {
spec = spec + refSuffix
}
// Use the github resolver directly — it handles the fetch + rev-parse.
gh := NewGithubResolver()
resolvedSHA, err := gh.ResolveRef(ctx, spec)
if err != nil {
return "", fmt.Errorf("resolve %s: %w", spec, err)
}
return resolvedSHA, nil
}
// queueDriftEntry inserts a pending drift entry into plugin_update_queue.
// ON CONFLICT (workspace_id, plugin_name) WHERE status = 'pending' DO NOTHING
// makes this idempotent — re-drift while a row is already pending is a no-op.
// Uses the partial unique index plugin_update_queue_pending_unique as the
// inference target; the WHERE clause ensures we only dedup pending rows.
func queueDriftEntry(ctx context.Context, workspaceID, pluginName, trackedRef, currentSHA, latestSHA string) error {
_, err := db.DB.ExecContext(ctx, `
INSERT INTO plugin_update_queue
(workspace_id, plugin_name, tracked_ref, current_sha, latest_sha)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (workspace_id, plugin_name) DO NOTHING
`, workspaceID, pluginName, trackedRef, currentSHA, latestSHA)
return err
}
// ─────────────────────────────────────────────────────────────────────────────
// Test helpers
// ─────────────────────────────────────────────────────────────────────────────
// SweepDriftOnceForTest exposes sweepDriftOnce for package-level testing.
func SweepDriftOnceForTest(parent context.Context, resolver PluginResolver) {
sweepDriftOnce(parent, resolver)
}
// QueueDriftEntryForTest exposes queueDriftEntry for package-level testing.
func QueueDriftEntryForTest(ctx context.Context, workspaceID, pluginName, trackedRef, currentSHA, latestSHA string) error {
return queueDriftEntry(ctx, workspaceID, pluginName, trackedRef, currentSHA, latestSHA)
}
// PluginUpdateQueueRow is the Go struct mirroring a plugin_update_queue row.
// Exported for tests and for the admin handler to consume.
type PluginUpdateQueueRow struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
PluginName string `json:"plugin_name"`
TrackedRef string `json:"tracked_ref"`
CurrentSHA string `json:"current_sha"`
LatestSHA string `json:"latest_sha"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
// ListPendingUpdates returns all pending drift entries, newest first.
func ListPendingUpdates(ctx context.Context) ([]PluginUpdateQueueRow, error) {
rows, err := db.DB.QueryContext(ctx, `
SELECT id, workspace_id, plugin_name, tracked_ref,
current_sha, latest_sha, status, created_at
FROM plugin_update_queue
WHERE status = 'pending'
ORDER BY created_at DESC
`)
if err != nil {
return nil, fmt.Errorf("list pending updates: %w", err)
}
defer rows.Close()
var result []PluginUpdateQueueRow
for rows.Next() {
var r PluginUpdateQueueRow
if scanErr := rows.Scan(&r.ID, &r.WorkspaceID, &r.PluginName,
&r.TrackedRef, &r.CurrentSHA, &r.LatestSHA, &r.Status, &r.CreatedAt); scanErr != nil {
return nil, fmt.Errorf("scan row: %w", scanErr)
}
result = append(result, r)
}
return result, rows.Err()
}
// ApplyDriftUpdate marks a queue entry as applied (or already-applied idempotently)
// and returns the workspace_id and plugin_name so the caller can trigger a restart.
func ApplyDriftUpdate(ctx context.Context, queueID string) (workspaceID, pluginName string, err error) {
var row struct {
WorkspaceID string
PluginName string
Status sql.NullString
}
err = db.DB.QueryRowContext(ctx, `
SELECT workspace_id, plugin_name, status
FROM plugin_update_queue
WHERE id = $1
`, queueID).Scan(&row.WorkspaceID, &row.PluginName, &row.Status)
if err == sql.ErrNoRows {
return "", "", fmt.Errorf("queue entry %s not found", queueID)
}
if err != nil {
return "", "", fmt.Errorf("query queue entry: %w", err)
}
if row.Status.Valid && row.Status.String == "applied" {
// Idempotent — already applied.
return row.WorkspaceID, row.PluginName, nil
}
_, execErr := db.DB.ExecContext(ctx, `
UPDATE plugin_update_queue
SET status = 'applied'
WHERE id = $1
AND status = 'pending'
`, queueID)
if execErr != nil {
return "", "", fmt.Errorf("update status: %w", execErr)
}
return row.WorkspaceID, row.PluginName, nil
}