molecule-core/workspace-server/internal/handlers/plugins_tracking.go
Molecule AI Core-BE ada1008012
Some checks failed
sop-tier-check / tier-check (pull_request) Failing after 5s
Secret scan / Scan diff for credential-shaped strings (pull_request) Successful in 5s
feat(plugins): plugin drift detector + queue + admin apply endpoint (#123)
## Summary

Adds the version-subscription drift detection and operator-apply workflow for
per-workspace plugin tracking (core#113).

## Components

**Migration** (`20260510000000_plugin_drift_queue`):
- Adds `installed_sha` column to `workspace_plugins` — records the commit SHA
  installed so the drift sweeper can compare against upstream.
- Creates `plugin_update_queue` table with status: pending | applied | dismissed.
- Adds partial unique index to prevent duplicate pending rows per
  (workspace_id, plugin_name).

**GithubResolver** (`github.go`):
- `LastFetchSHA` field + `LastSHA()` getter — populated by `Fetch` after a
  successful shallow clone (captured before `.git` is stripped). Used by the
  install pipeline to seed `installed_sha`.
- `ResolveRef(ctx, spec)` method — resolves a plugin spec to its full commit
  SHA using `git fetch --depth=1 + git rev-parse`. Used by the drift sweeper
  to get the current upstream SHA for a tracked ref (tag:vX.Y.Z, tag:latest,
  sha:…, or bare branch).

**Drift sweeper** (`plugins/drift_sweeper.go`):
- Periodic sweep every 1h: SELECTs rows where `tracked_ref != 'none' AND
  installed_sha IS NOT NULL`, resolves upstream SHA, queues drift if different.
- `ListPendingUpdates()` — reads pending queue rows for the admin endpoint.
- `ApplyDriftUpdate()` — marks entry applied (idempotent).
- ctx.Err() guard on ticker arm to avoid post-shutdown work.

**Install pipeline** (`plugins_install_pipeline.go`, `plugins_tracking.go`,
`plugins_install.go`):
- `stageResult.InstalledSHA` field — carries the SHA from Fetch to the DB.
- `recordWorkspacePluginInstall` now accepts and stores `installed_sha`.
- `deleteWorkspacePluginRow` — removes tracking row on uninstall so a stale
  SHA doesn't prevent the next install from creating a fresh row.
- Both Docker and EIC uninstall paths call `deleteWorkspacePluginRow`.

**Admin endpoints** (`handlers/admin_plugin_drift.go`):
- `GET /admin/plugin-updates-pending` — list all pending drift entries.
- `POST /admin/plugin-updates/:id/apply` — re-installs plugin from source_raw
  (re-fetching the same tracked ref), records the new SHA, marks entry applied,
  triggers workspace restart. Idempotent (already-applied returns 200).

**Router wiring** (`router.go`, `cmd/server/main.go`):
- Plugin registry created in main.go and shared between PluginsHandler and drift
  sweeper.
- `router.Setup` accepts optional `pluginResolver` param.
- `PluginsHandler.Sources()` export for the sweeper wiring pattern.

## Tests

- `plugins/github_test.go` — `ResolveRef` coverage (invalid spec, git error,
  not-found mapping, no-panic for all ref shapes).
- `plugins/drift_sweeper_test.go` — `ResolveRef` happy path, stub resolver
  interface compliance.
- `handlers/admin_plugin_drift_test.go` — ListPending (empty, non-empty, DB
  error), Apply (not found, already applied, already dismissed, workspace_plugins
  missing).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 00:39:50 +00:00

94 lines
3.3 KiB
Go

package handlers
// plugins_tracking.go — workspace_plugins DB tracking for the
// version-subscription model (core#113).
//
// Schema lives in migration 20260508160000_workspace_plugins_tracking.up.sql.
// This file is the Go-side write surface used at install time to record
// each plugin's install record. Drift detection / queue / apply are
// follow-up scope (filed as a separate issue once this lands).
import (
"context"
"errors"
"fmt"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
)
// trackedRefValues is the closed set of bare-string values the
// workspace_plugins.tracked_ref column accepts. Prefixed values
// ("tag:..." / "sha:...") are validated structurally below.
var trackedRefValues = map[string]bool{
"none": true,
}
// validateTrackedRef returns the canonical form of a track string, or
// an error if the input is malformed. Empty input → "none" (default).
//
// Accepted shapes:
//
// "" — defaults to "none"
// "none" — no tracking
// "tag:vX.Y.Z" — track a specific tag
// "tag:latest" — track latest tag, drift on every new tag
// "sha:<full-sha>" — pinned to commit SHA
func validateTrackedRef(s string) (string, error) {
s = strings.TrimSpace(s)
if s == "" {
return "none", nil
}
if trackedRefValues[s] {
return s, nil
}
if strings.HasPrefix(s, "tag:") && len(s) > 4 {
return s, nil
}
if strings.HasPrefix(s, "sha:") && len(s) > 4 {
return s, nil
}
return "", fmt.Errorf("invalid track value %q: expected 'none' | 'tag:vX.Y.Z' | 'tag:latest' | 'sha:<full>'", s)
}
// recordWorkspacePluginInstall upserts the workspace_plugins row for a
// plugin install. ON CONFLICT (workspace_id, plugin_name) DO UPDATE so
// reinstalling the same plugin name (with a possibly-different source or
// track value) updates the existing row rather than failing.
//
// installedSHA records the commit SHA that was installed; used by the drift
// sweeper to detect when the upstream ref has moved. May be empty (e.g. for
// local:// sources or pre-migration installs) — the sweeper skips NULL SHAs.
func recordWorkspacePluginInstall(
ctx context.Context, workspaceID, pluginName, sourceRaw, track, installedSHA string,
) error {
if workspaceID == "" || pluginName == "" || sourceRaw == "" {
return errors.New("recordWorkspacePluginInstall: missing required field")
}
canonicalTrack, err := validateTrackedRef(track)
if err != nil {
return err
}
_, err = db.DB.ExecContext(ctx, `
INSERT INTO workspace_plugins (workspace_id, plugin_name, source_raw, tracked_ref, installed_sha)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (workspace_id, plugin_name)
DO UPDATE SET
source_raw = EXCLUDED.source_raw,
tracked_ref = EXCLUDED.tracked_ref,
installed_sha = EXCLUDED.installed_sha,
updated_at = NOW()
`, workspaceID, pluginName, sourceRaw, canonicalTrack, installedSHA)
return err
}
// deleteWorkspacePluginRow removes the workspace_plugins row for a workspace/plugin
// pair. Called by the uninstall path so the row doesn't persist with a stale
// installed_sha after the plugin has been removed from the container.
func deleteWorkspacePluginRow(ctx context.Context, workspaceID, pluginName string) error {
_, err := db.DB.ExecContext(ctx, `
DELETE FROM workspace_plugins WHERE workspace_id = $1 AND plugin_name = $2
`, workspaceID, pluginName)
return err
}