revert: d0126662 docs cycle report — restores restart_signals.go to building
#258
@ -21,7 +21,6 @@ import (
|
||||
memwiring "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/wiring"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/plugins"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/registry"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/router"
|
||||
@ -332,23 +331,7 @@ func main() {
|
||||
cronSched.SetChannels(channelMgr)
|
||||
|
||||
// Router
|
||||
// Plugin registry — created before Setup so the same registry is shared
|
||||
// between the PluginsHandler (for installs) and the drift sweeper (for
|
||||
// drift detection). github:// sources always work; local:// sources
|
||||
// require a plugins/ dir on disk (nil in CP/SaaS mode).
|
||||
pluginRegistry := plugins.NewRegistry()
|
||||
pluginRegistry.Register(plugins.NewGithubResolver())
|
||||
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr, memBundle, pluginRegistry)
|
||||
|
||||
// Plugin drift sweeper — periodic detection of upstream plugin version drift
|
||||
// (core#123). Scans workspace_plugins rows where tracked_ref != 'none',
|
||||
// resolves the current upstream SHA for each tracked ref, and queues drift
|
||||
// entries when the upstream has moved. Only runs when pluginResolver is
|
||||
// non-nil (CP/SaaS mode has no local git and the sweeper is a no-op there).
|
||||
// Nil prov: Docker not available (test harness / local dev without Docker).
|
||||
go supervised.RunWithRecover(ctx, "plugin-drift-sweeper", func(c context.Context) {
|
||||
plugins.StartPluginDriftSweeper(c, pluginRegistry)
|
||||
})
|
||||
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr, memBundle)
|
||||
|
||||
// HTTP server with graceful shutdown.
|
||||
//
|
||||
|
||||
@ -1,211 +0,0 @@
|
||||
package handlers
|
||||
|
||||
// admin_plugin_drift.go — admin endpoints for plugin version-subscription drift queue
|
||||
// (core#123).
|
||||
//
|
||||
// Routes:
|
||||
// GET /admin/plugin-updates-pending — list all pending drift entries
|
||||
// POST /admin/plugin-updates/:id/apply — apply a queued drift update
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/plugins"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// AdminPluginDriftHandler handles admin endpoints for the plugin drift queue.
|
||||
type AdminPluginDriftHandler struct {
|
||||
pluginsHandler *PluginsHandler // used to re-trigger plugin install on apply
|
||||
}
|
||||
|
||||
// NewAdminPluginDriftHandler constructs a handler wired to the plugins handler.
|
||||
func NewAdminPluginDriftHandler(ph *PluginsHandler) *AdminPluginDriftHandler {
|
||||
return &AdminPluginDriftHandler{pluginsHandler: ph}
|
||||
}
|
||||
|
||||
// ListPending handles GET /admin/plugin-updates-pending.
|
||||
//
|
||||
// Returns a JSON array of pending drift entries, newest-first. Empty array
|
||||
// means no plugins have drifted since the last sweep cycle.
|
||||
func (h *AdminPluginDriftHandler) ListPending(c *gin.Context) {
|
||||
rows, err := plugins.ListPendingUpdates(c.Request.Context())
|
||||
if err != nil {
|
||||
log.Printf("AdminPluginDrift: ListPendingUpdates failed: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list pending updates"})
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusOK, rows)
|
||||
}
|
||||
|
||||
// Apply handles POST /admin/plugin-updates/:id/apply.
|
||||
//
|
||||
// 1. Reads the queue entry and verifies it's still pending.
|
||||
// 2. Reads the workspace_plugins row to get the plugin's source.
|
||||
// 3. Re-installs the plugin from source_raw (re-fetch from upstream at the
|
||||
// same tracked ref — the drift was caused by upstream moving).
|
||||
// 4. Marks the queue entry as applied.
|
||||
// 5. Triggers workspace restart.
|
||||
//
|
||||
// Idempotent: if the entry is already 'applied', returns 200 with the
|
||||
// workspace_id and plugin_name so callers can still poll for confirmation.
|
||||
func (h *AdminPluginDriftHandler) Apply(c *gin.Context) {
|
||||
queueID := c.Param("id")
|
||||
if queueID == "" {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "queue id is required"})
|
||||
return
|
||||
}
|
||||
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// Step 1: read and lock the queue entry.
|
||||
var entry struct {
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
PluginName string `json:"plugin_name"`
|
||||
TrackedRef string `json:"tracked_ref"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
err := db.DB.QueryRowContext(ctx, `
|
||||
SELECT workspace_id, plugin_name, tracked_ref, status
|
||||
FROM plugin_update_queue
|
||||
WHERE id = $1
|
||||
`, queueID).Scan(&entry.WorkspaceID, &entry.PluginName, &entry.TrackedRef, &entry.Status)
|
||||
if err == sql.ErrNoRows {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": fmt.Sprintf("queue entry %s not found", queueID)})
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
log.Printf("AdminPluginDrift: apply: query queue entry: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read queue entry"})
|
||||
return
|
||||
}
|
||||
|
||||
if entry.Status == "applied" {
|
||||
// Idempotent — already applied.
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"status": "already_applied",
|
||||
"workspace_id": entry.WorkspaceID,
|
||||
"plugin_name": entry.PluginName,
|
||||
"message": "drift update was already applied",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
if entry.Status == "dismissed" {
|
||||
c.JSON(http.StatusConflict, gin.H{
|
||||
"error": "queue entry was dismissed",
|
||||
"workspace_id": entry.WorkspaceID,
|
||||
"plugin_name": entry.PluginName,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Step 2: read the workspace_plugins row to get source_raw.
|
||||
var sourceRaw string
|
||||
err = db.DB.QueryRowContext(ctx, `
|
||||
SELECT source_raw FROM workspace_plugins
|
||||
WHERE workspace_id = $1 AND plugin_name = $2
|
||||
`, entry.WorkspaceID, entry.PluginName).Scan(&sourceRaw)
|
||||
if err == sql.ErrNoRows {
|
||||
c.JSON(http.StatusNotFound, gin.H{
|
||||
"error": "workspace_plugins row not found — plugin may have been uninstalled",
|
||||
"workspace_id": entry.WorkspaceID,
|
||||
"plugin_name": entry.PluginName,
|
||||
})
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
log.Printf("AdminPluginDrift: apply: query workspace_plugins: %v", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read plugin record"})
|
||||
return
|
||||
}
|
||||
|
||||
// Step 3: re-install the plugin.
|
||||
//
|
||||
// We call h.pluginsHandler.Install indirectly via a subrequest to reuse
|
||||
// the full install pipeline (resolve → stage → deliver → record).
|
||||
// Construct the apply installRequest: same source as the existing row,
|
||||
// same tracked_ref. The source_raw already encodes the pinned ref
|
||||
// (e.g. "github://owner/repo#tag:v1.0.0"), so the resolver fetches
|
||||
// the latest commit at that ref — the drift.
|
||||
installReq := installRequest{
|
||||
Source: sourceRaw,
|
||||
Track: entry.TrackedRef,
|
||||
}
|
||||
result, instErr := h.pluginsHandler.ResolveAndStageForApply(ctx, installReq)
|
||||
if instErr != nil {
|
||||
var he *httpErr
|
||||
if errors.As(instErr, &he) {
|
||||
c.JSON(he.Status, gin.H{
|
||||
"error": fmt.Sprintf("plugin install failed: %v", he.Body["error"]),
|
||||
"queue_id": queueID,
|
||||
})
|
||||
return
|
||||
}
|
||||
log.Printf("AdminPluginDrift: apply: install failed for %s/%s: %v",
|
||||
entry.WorkspaceID, entry.PluginName, instErr)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "plugin install failed", "queue_id": queueID})
|
||||
return
|
||||
}
|
||||
defer func() { _ = os.RemoveAll(result.StagedDir) }()
|
||||
|
||||
// Deliver to the workspace container.
|
||||
if err := h.pluginsHandler.DeliverForApply(ctx, entry.WorkspaceID, result); err != nil {
|
||||
var he *httpErr
|
||||
if errors.As(err, &he) {
|
||||
c.JSON(he.Status, gin.H{"error": fmt.Sprintf("plugin deliver failed: %v", he.Body["error"]), "queue_id": queueID})
|
||||
return
|
||||
}
|
||||
log.Printf("AdminPluginDrift: apply: deliver failed for %s/%s: %v",
|
||||
entry.WorkspaceID, entry.PluginName, err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "plugin deliver failed", "queue_id": queueID})
|
||||
return
|
||||
}
|
||||
|
||||
// Record the install with the new SHA. This updates installed_sha on the
|
||||
// workspace_plugins row so the next drift sweep finds no drift.
|
||||
if err := recordWorkspacePluginInstall(ctx, entry.WorkspaceID, result.PluginName,
|
||||
result.Source.Raw(), entry.TrackedRef, result.InstalledSHA); err != nil {
|
||||
log.Printf("AdminPluginDrift: apply: recordWorkspacePluginInstall failed: %v (install succeeded)", err)
|
||||
// Non-fatal: the plugin IS installed; just log and continue.
|
||||
}
|
||||
|
||||
// Step 4: mark queue entry as applied.
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE plugin_update_queue SET status = 'applied' WHERE id = $1
|
||||
`, queueID); err != nil {
|
||||
log.Printf("AdminPluginDrift: apply: failed to mark queue entry %s as applied: %v", queueID, err)
|
||||
// Non-fatal: install succeeded; operator can retry or mark manually.
|
||||
}
|
||||
|
||||
// Step 5: trigger workspace restart.
|
||||
// The pluginsHandler carries a restartFunc (Provisioner.RestartByID) set
|
||||
// at construction. Trigger it asynchronously so the HTTP response returns
|
||||
// immediately after the install; the restart is best-effort.
|
||||
if h.pluginsHandler != nil {
|
||||
go func() {
|
||||
// We can't use result.PluginName as a restart key since the
|
||||
// restartFunc takes a workspaceID. Pass the workspaceID.
|
||||
if restart := h.pluginsHandler.GetRestartFunc(); restart != nil {
|
||||
restart(entry.WorkspaceID)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
log.Printf("AdminPluginDrift: applied drift update for %s/%s (queue_id=%s)",
|
||||
entry.WorkspaceID, entry.PluginName, queueID)
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"status": "applied",
|
||||
"workspace_id": entry.WorkspaceID,
|
||||
"plugin_name": entry.PluginName,
|
||||
"installed_sha": result.InstalledSHA,
|
||||
"restarting": true,
|
||||
})
|
||||
}
|
||||
@ -1,211 +0,0 @@
|
||||
package handlers
|
||||
|
||||
// admin_plugin_drift_test.go — coverage for plugin drift queue admin endpoints.
|
||||
// Tests: ListPending (empty, non-empty), Apply (not found, already applied,
|
||||
// already dismissed, workspace_plugins missing, install failure).
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func TestAdminPluginDrift_ListPending_Empty(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
h := NewAdminPluginDriftHandler(nil)
|
||||
|
||||
mock.ExpectQuery(`SELECT id, workspace_id, plugin_name, tracked_ref,\s+current_sha, latest_sha, status, created_at\s+FROM plugin_update_queue\s+WHERE status = 'pending'\s+ORDER BY created_at DESC`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "plugin_name", "tracked_ref",
|
||||
"current_sha", "latest_sha", "status", "created_at",
|
||||
}))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/admin/plugin-updates-pending", nil)
|
||||
h.ListPending(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var rows []map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &rows); err != nil {
|
||||
t.Fatalf("body parse: %v", err)
|
||||
}
|
||||
if len(rows) != 0 {
|
||||
t.Errorf("expected empty array, got %d rows", len(rows))
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdminPluginDrift_ListPending_WithRows(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
h := NewAdminPluginDriftHandler(nil)
|
||||
|
||||
now := time.Now()
|
||||
mock.ExpectQuery(`SELECT id, workspace_id, plugin_name, tracked_ref,\s+current_sha, latest_sha, status, created_at\s+FROM plugin_update_queue\s+WHERE status = 'pending'\s+ORDER BY created_at DESC`).
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"id", "workspace_id", "plugin_name", "tracked_ref",
|
||||
"current_sha", "latest_sha", "status", "created_at",
|
||||
}).AddRow(
|
||||
"queue-id-1", "ws-uuid-1", "my-plugin", "tag:v1.0.0",
|
||||
"abc123def456", "def456abc789", "pending", now,
|
||||
).AddRow(
|
||||
"queue-id-2", "ws-uuid-2", "other-plugin", "tag:latest",
|
||||
"111111aaaaaa", "222222bbbbbb", "pending", now,
|
||||
))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/admin/plugin-updates-pending", nil)
|
||||
h.ListPending(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var rows []map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &rows); err != nil {
|
||||
t.Fatalf("body parse: %v", err)
|
||||
}
|
||||
if len(rows) != 2 {
|
||||
t.Errorf("expected 2 rows, got %d", len(rows))
|
||||
}
|
||||
// Verify first row fields.
|
||||
if got := rows[0]["plugin_name"]; got != "my-plugin" {
|
||||
t.Errorf("plugin_name: expected my-plugin, got %v", got)
|
||||
}
|
||||
if got := rows[0]["status"]; got != "pending" {
|
||||
t.Errorf("status: expected pending, got %v", got)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdminPluginDrift_ListPending_DBError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
h := NewAdminPluginDriftHandler(nil)
|
||||
|
||||
mock.ExpectQuery(`SELECT id, workspace_id`).WillReturnError(
|
||||
json.Unmarshal([]byte("force error"), new(struct{})),
|
||||
)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("GET", "/admin/plugin-updates-pending", nil)
|
||||
h.ListPending(c)
|
||||
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Errorf("expected 500 on DB error, got %d", w.Code)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdminPluginDrift_Apply_NotFound(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
h := NewAdminPluginDriftHandler(nil)
|
||||
|
||||
mock.ExpectQuery(`SELECT workspace_id, plugin_name, tracked_ref, status\s+FROM plugin_update_queue\s+WHERE id = \$1`).
|
||||
WithArgs("nonexistent-queue-id").
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"workspace_id", "plugin_name", "tracked_ref", "status",
|
||||
})) // empty = no rows
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/admin/plugin-updates/nonexistent-queue-id/apply", nil)
|
||||
c.Params = []gin.Param{{Key: "id", Value: "nonexistent-queue-id"}}
|
||||
h.Apply(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdminPluginDrift_Apply_AlreadyApplied(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
h := NewAdminPluginDriftHandler(nil)
|
||||
|
||||
mock.ExpectQuery(`SELECT workspace_id, plugin_name, tracked_ref, status\s+FROM plugin_update_queue\s+WHERE id = \$1`).
|
||||
WithArgs("queue-id-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"workspace_id", "plugin_name", "tracked_ref", "status",
|
||||
}).AddRow("ws-uuid-1", "my-plugin", "tag:v1.0.0", "applied"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/admin/plugin-updates/queue-id-1/apply", nil)
|
||||
c.Params = []gin.Param{{Key: "id", Value: "queue-id-1"}}
|
||||
h.Apply(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200 for already-applied, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var body map[string]interface{}
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil {
|
||||
t.Fatalf("body parse: %v", err)
|
||||
}
|
||||
if got := body["status"]; got != "already_applied" {
|
||||
t.Errorf("status: expected already_applied, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdminPluginDrift_Apply_AlreadyDismissed(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
h := NewAdminPluginDriftHandler(nil)
|
||||
|
||||
mock.ExpectQuery(`SELECT workspace_id, plugin_name, tracked_ref, status\s+FROM plugin_update_queue\s+WHERE id = \$1`).
|
||||
WithArgs("queue-id-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"workspace_id", "plugin_name", "tracked_ref", "status",
|
||||
}).AddRow("ws-uuid-1", "my-plugin", "tag:v1.0.0", "dismissed"))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/admin/plugin-updates/queue-id-1/apply", nil)
|
||||
c.Params = []gin.Param{{Key: "id", Value: "queue-id-1"}}
|
||||
h.Apply(c)
|
||||
|
||||
if w.Code != http.StatusConflict {
|
||||
t.Errorf("expected 409 for dismissed, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestAdminPluginDrift_Apply_WorkspacePluginsMissing(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
h := NewAdminPluginDriftHandler(nil)
|
||||
|
||||
// Queue entry found, pending.
|
||||
mock.ExpectQuery(`SELECT workspace_id, plugin_name, tracked_ref, status\s+FROM plugin_update_queue\s+WHERE id = \$1`).
|
||||
WithArgs("queue-id-1").
|
||||
WillReturnRows(sqlmock.NewRows([]string{
|
||||
"workspace_id", "plugin_name", "tracked_ref", "status",
|
||||
}).AddRow("ws-uuid-1", "my-plugin", "tag:v1.0.0", "pending"))
|
||||
|
||||
// workspace_plugins row not found (plugin uninstalled after drift detected).
|
||||
mock.ExpectQuery(`SELECT source_raw FROM workspace_plugins\s+WHERE workspace_id = \$1 AND plugin_name = \$2`).
|
||||
WithArgs("ws-uuid-1", "my-plugin").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"source_raw"})) // empty
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Request = httptest.NewRequest("POST", "/admin/plugin-updates/queue-id-1/apply", nil)
|
||||
c.Params = []gin.Param{{Key: "id", Value: "queue-id-1"}}
|
||||
h.Apply(c)
|
||||
|
||||
if w.Code != http.StatusNotFound {
|
||||
t.Errorf("expected 404 when workspace_plugins row missing, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unmet expectations: %v", err)
|
||||
}
|
||||
}
|
||||
@ -110,12 +110,6 @@ func (h *PluginsHandler) WithInstanceIDLookup(lookup InstanceIDLookup) *PluginsH
|
||||
return h
|
||||
}
|
||||
|
||||
// Sources returns the underlying plugin source registry. Used by main.go to
|
||||
// pass the same registry to the drift sweeper so both share resolver state.
|
||||
func (h *PluginsHandler) Sources() plugins.SourceResolver {
|
||||
return h.sources
|
||||
}
|
||||
|
||||
// pluginInfo is the API response for a plugin.
|
||||
type pluginInfo struct {
|
||||
Name string `json:"name"`
|
||||
|
||||
@ -95,7 +95,7 @@ func (h *PluginsHandler) Install(c *gin.Context) {
|
||||
// foundation). Best-effort: DB write failure is logged but doesn't fail
|
||||
// the install — the plugin IS in the container; surfacing a 500 here
|
||||
// would mislead the caller about the install state.
|
||||
if err := recordWorkspacePluginInstall(ctx, workspaceID, result.PluginName, result.Source.Raw(), req.Track, result.InstalledSHA); err != nil {
|
||||
if err := recordWorkspacePluginInstall(ctx, workspaceID, result.PluginName, result.Source.Raw(), req.Track); err != nil {
|
||||
log.Printf("Plugin install: failed to record %s for %s in workspace_plugins: %v (install succeeded; tracking row missing)", result.PluginName, workspaceID, err)
|
||||
}
|
||||
|
||||
@ -189,15 +189,6 @@ func (h *PluginsHandler) uninstallViaDocker(ctx context.Context, c *gin.Context,
|
||||
// Verify deletion before restart
|
||||
h.execInContainer(ctx, containerName, []string{"sync"})
|
||||
|
||||
// Remove the workspace_plugins tracking row so the row doesn't persist
|
||||
// with a stale installed_sha after the plugin has been removed. Drift
|
||||
// detection ignores rows without an installed_sha, but keeping the row
|
||||
// would prevent the next install from creating a fresh row (ON CONFLICT
|
||||
// UPDATE would apply instead of INSERT, which is wrong for an uninstall).
|
||||
if err := deleteWorkspacePluginRow(ctx, workspaceID, pluginName); err != nil {
|
||||
log.Printf("Plugin uninstall: failed to delete workspace_plugins row for %s: %v (container cleanup succeeded)", pluginName, err)
|
||||
}
|
||||
|
||||
// Auto-restart (small delay to ensure fs writes are flushed)
|
||||
if h.restartFunc != nil {
|
||||
go func() {
|
||||
@ -254,11 +245,6 @@ func (h *PluginsHandler) uninstallViaEIC(ctx context.Context, c *gin.Context, wo
|
||||
return
|
||||
}
|
||||
|
||||
// Remove the workspace_plugins tracking row (see uninstallViaDocker for rationale).
|
||||
if err := deleteWorkspacePluginRow(ctx, workspaceID, pluginName); err != nil {
|
||||
log.Printf("Plugin uninstall: failed to delete workspace_plugins row for %s: %v (container cleanup succeeded)", pluginName, err)
|
||||
}
|
||||
|
||||
if h.restartFunc != nil {
|
||||
go func() {
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
@ -128,10 +128,9 @@ type installRequest struct {
|
||||
// stageResult bundles the outputs of resolveAndStage for the caller.
|
||||
// Avoids a 5-value tuple return.
|
||||
type stageResult struct {
|
||||
StagedDir string
|
||||
PluginName string
|
||||
Source plugins.Source
|
||||
InstalledSHA string // empty for local:// sources (no meaningful upstream)
|
||||
StagedDir string
|
||||
PluginName string
|
||||
Source plugins.Source
|
||||
}
|
||||
|
||||
// resolveAndStage parses a validated request, dispatches to the right
|
||||
@ -213,16 +212,6 @@ func (h *PluginsHandler) resolveAndStage(ctx context.Context, req installRequest
|
||||
"source": source.Raw(),
|
||||
})
|
||||
}
|
||||
|
||||
// Capture the installed SHA from github:// sources for drift detection.
|
||||
// GithubResolver.LastSHA() is set by Fetch after a successful clone.
|
||||
// Type-assert is safe because resolver was obtained via Resolve(), which
|
||||
// returns the concrete GithubResolver for github:// sources.
|
||||
var installedSHA string
|
||||
if gh, ok := resolver.(*plugins.GithubResolver); ok {
|
||||
installedSHA = gh.LastSHA()
|
||||
}
|
||||
|
||||
if err := validatePluginName(pluginName); err != nil {
|
||||
cleanup()
|
||||
return nil, newHTTPErr(http.StatusBadRequest, gin.H{
|
||||
@ -275,7 +264,7 @@ func (h *PluginsHandler) resolveAndStage(ctx context.Context, req installRequest
|
||||
}
|
||||
}
|
||||
|
||||
return &stageResult{StagedDir: stagedDir, PluginName: pluginName, Source: source, InstalledSHA: installedSHA}, nil
|
||||
return &stageResult{StagedDir: stagedDir, PluginName: pluginName, Source: source}, nil
|
||||
}
|
||||
|
||||
// deliverToContainer copies the staged plugin dir into the workspace
|
||||
@ -523,24 +512,3 @@ func streamDirAsTar(root string, tw *tar.Writer) error {
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// ResolveAndStageForApply is the context-based equivalent of resolveAndStage,
|
||||
// exposed for the admin plugin drift apply endpoint (core#123). It bypasses
|
||||
// the gin.Context dependency so the apply path can re-trigger a plugin install
|
||||
// programmatically.
|
||||
func (h *PluginsHandler) ResolveAndStageForApply(ctx context.Context, req installRequest) (*stageResult, error) {
|
||||
return h.resolveAndStage(ctx, req)
|
||||
}
|
||||
|
||||
// DeliverForApply is the context-based equivalent of deliverToContainer,
|
||||
// exposed for the admin plugin drift apply endpoint (core#123).
|
||||
func (h *PluginsHandler) DeliverForApply(ctx context.Context, workspaceID string, r *stageResult) error {
|
||||
return h.deliverToContainer(ctx, workspaceID, r)
|
||||
}
|
||||
|
||||
// GetRestartFunc returns the pluginsHandler's restartFunc, or nil if not set.
|
||||
// Used by the admin drift apply endpoint to trigger a workspace restart after
|
||||
// a plugin update is applied.
|
||||
func (h *PluginsHandler) GetRestartFunc() func(string) {
|
||||
return h.restartFunc
|
||||
}
|
||||
|
||||
@ -55,12 +55,8 @@ func validateTrackedRef(s string) (string, error) {
|
||||
// plugin install. ON CONFLICT (workspace_id, plugin_name) DO UPDATE so
|
||||
// reinstalling the same plugin name (with a possibly-different source or
|
||||
// track value) updates the existing row rather than failing.
|
||||
//
|
||||
// installedSHA records the commit SHA that was installed; used by the drift
|
||||
// sweeper to detect when the upstream ref has moved. May be empty (e.g. for
|
||||
// local:// sources or pre-migration installs) — the sweeper skips NULL SHAs.
|
||||
func recordWorkspacePluginInstall(
|
||||
ctx context.Context, workspaceID, pluginName, sourceRaw, track, installedSHA string,
|
||||
ctx context.Context, workspaceID, pluginName, sourceRaw, track string,
|
||||
) error {
|
||||
if workspaceID == "" || pluginName == "" || sourceRaw == "" {
|
||||
return errors.New("recordWorkspacePluginInstall: missing required field")
|
||||
@ -70,24 +66,13 @@ func recordWorkspacePluginInstall(
|
||||
return err
|
||||
}
|
||||
_, err = db.DB.ExecContext(ctx, `
|
||||
INSERT INTO workspace_plugins (workspace_id, plugin_name, source_raw, tracked_ref, installed_sha)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
INSERT INTO workspace_plugins (workspace_id, plugin_name, source_raw, tracked_ref)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT (workspace_id, plugin_name)
|
||||
DO UPDATE SET
|
||||
source_raw = EXCLUDED.source_raw,
|
||||
tracked_ref = EXCLUDED.tracked_ref,
|
||||
installed_sha = EXCLUDED.installed_sha,
|
||||
updated_at = NOW()
|
||||
`, workspaceID, pluginName, sourceRaw, canonicalTrack, installedSHA)
|
||||
return err
|
||||
}
|
||||
|
||||
// deleteWorkspacePluginRow removes the workspace_plugins row for a workspace/plugin
|
||||
// pair. Called by the uninstall path so the row doesn't persist with a stale
|
||||
// installed_sha after the plugin has been removed from the container.
|
||||
func deleteWorkspacePluginRow(ctx context.Context, workspaceID, pluginName string) error {
|
||||
_, err := db.DB.ExecContext(ctx, `
|
||||
DELETE FROM workspace_plugins WHERE workspace_id = $1 AND plugin_name = $2
|
||||
`, workspaceID, pluginName)
|
||||
source_raw = EXCLUDED.source_raw,
|
||||
tracked_ref = EXCLUDED.tracked_ref,
|
||||
updated_at = NOW()
|
||||
`, workspaceID, pluginName, sourceRaw, canonicalTrack)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1,155 +0,0 @@
|
||||
package handlers
|
||||
|
||||
// restart_signals.go — #125 Phase 1: graceful pre-restart drain for
|
||||
// native-session workspaces.
|
||||
//
|
||||
// Before a container restart, the platform sends POST /signals/restart_pending
|
||||
// to the workspace agent. The agent receives this as a JSON-RPC signal and
|
||||
// begins draining in-flight work. The platform then waits for acknowledgment
|
||||
// before calling stopForRestart.
|
||||
//
|
||||
// This preserves in-flight A2A requests that would otherwise be lost when
|
||||
// the container dies mid-request (the core bug: native_session targets bypass
|
||||
// the platform's a2a_queue buffering, so any message dispatched directly to
|
||||
// the SDK session disappears when the container restarts).
|
||||
//
|
||||
// Phase 2 (not yet implemented): workspace SDK actually processes the signal
|
||||
// and drains its message loop. This file implements the platform-side call
|
||||
// site; the SDK-side handler is in molecule-workspace (adapter_base.py or
|
||||
// similar).
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
)
|
||||
|
||||
const (
|
||||
// restartSignalTimeout is how long the platform waits for the workspace
|
||||
// to acknowledge the pre-restart signal. A workspace that doesn't implement
|
||||
// the handler will simply time out — the platform proceeds with the stop
|
||||
// anyway, which is the same as the pre-fix behaviour (no graceful drain).
|
||||
restartSignalTimeout = 10 * time.Second
|
||||
|
||||
// restartSignalDrainDuration is how long the workspace should wait before
|
||||
// acknowledging. Gives in-flight A2A requests time to complete.
|
||||
// Sent as JSON-RPC signal.params.drain_seconds in the POST body.
|
||||
restartSignalDrainDuration = 20 * time.Second
|
||||
)
|
||||
|
||||
// gracefulPreRestart sends the pre-restart drain signal to the workspace
|
||||
// agent before the container is stopped. Called from runRestartCycle.
|
||||
//
|
||||
// Returns immediately — the signal is fire-and-forget with a 10s timeout.
|
||||
// If the workspace doesn't implement the handler (404) or times out, the
|
||||
// platform proceeds with the stop anyway (same as pre-fix behaviour).
|
||||
//
|
||||
// The signal is sent via HTTP POST to the workspace's internal agent URL.
|
||||
// On self-hosted (platform-in-Docker), the platform rewrites 127.0.0.1 to
|
||||
// the Docker-DNS form ws-<id>:8000. On SaaS/CP, the stored agent URL
|
||||
// (an externally routable address) is used directly.
|
||||
func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID string) {
|
||||
// Non-blocking send — don't stall the restart cycle.
|
||||
// Run in a detached goroutine so the caller (runRestartCycle) can
|
||||
// proceed to stopForRestart without waiting.
|
||||
go func() {
|
||||
signalCtx, cancel := context.WithTimeout(context.Background(), restartSignalTimeout)
|
||||
defer cancel()
|
||||
|
||||
url, err := h.resolveAgentURLForRestartSignal(signalCtx, workspaceID)
|
||||
if err != nil {
|
||||
log.Printf("A2AGracefulRestart: resolve URL failed for %s: %v — proceeding with stop", workspaceID, err)
|
||||
return
|
||||
}
|
||||
url = url + "/signals/restart_pending"
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "signals/restart_pending",
|
||||
"params": map[string]interface{}{
|
||||
"drain_seconds": int(restartSignalDrainDuration.Seconds()),
|
||||
"workspace_id": workspaceID,
|
||||
},
|
||||
"id": nil,
|
||||
}
|
||||
body, _ := json.Marshal(payload)
|
||||
|
||||
req, reqErr := http.NewRequestWithContext(signalCtx, http.MethodPost, url, bytes.NewReader(body))
|
||||
if reqErr != nil {
|
||||
log.Printf("A2AGracefulRestart: build request failed for %s: %v — proceeding with stop", workspaceID, reqErr)
|
||||
return
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
// X-Restart-Signal header identifies this as a platform-initiated
|
||||
// restart signal (not a regular A2A message). The SDK can check
|
||||
// for this header to distinguish a restart signal from other messages.
|
||||
req.Header.Set("X-Restart-Signal", "true")
|
||||
|
||||
client := &http.Client{Timeout: restartSignalTimeout}
|
||||
resp, doErr := client.Do(req)
|
||||
if doErr != nil {
|
||||
// Timeout, connection refused, etc. — workspace is either not
|
||||
// listening or didn't implement the handler. Proceed with stop.
|
||||
log.Printf("A2AGracefulRestart: signal failed for %s: %v — proceeding with stop", workspaceID, doErr)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// 200 = workspace acknowledged and will drain. 404 = old SDK version
|
||||
// without the handler — same as no handler, proceed. 5xx = workspace
|
||||
// error but it's still alive — proceed. Any other status = also proceed.
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
log.Printf("A2AGracefulRestart: %s acknowledged pre-restart signal (status=%d)", workspaceID, resp.StatusCode)
|
||||
} else {
|
||||
log.Printf("A2AGracefulRestart: %s returned status %d — proceeding with stop", workspaceID, resp.StatusCode)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// resolveAgentURLForRestartSignal returns the routable URL for the workspace
|
||||
// agent, suitable for the pre-restart signal HTTP call. Falls back to the DB
|
||||
// value if the Redis cache miss occurs. On self-hosted (platform-in-Docker),
|
||||
// rewrites 127.0.0.1 to the Docker-DNS form ws-<id>:8000.
|
||||
func (h *WorkspaceHandler) resolveAgentURLForRestartSignal(ctx context.Context, workspaceID string) (string, error) {
|
||||
// Try Redis cache first.
|
||||
agentURL, err := db.GetCachedURL(ctx, workspaceID)
|
||||
if err == nil && agentURL != "" {
|
||||
return rewriteForDocker(agentURL, workspaceID), nil
|
||||
}
|
||||
|
||||
// Cache miss — fall back to DB.
|
||||
var urlNullable *string
|
||||
err = db.DB.QueryRowContext(ctx,
|
||||
`SELECT url FROM workspaces WHERE id = $1`, workspaceID,
|
||||
).Scan(&urlNullable)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if urlNullable == nil || *urlNullable == "" {
|
||||
return "", nil // workspace has no URL yet — shouldn't happen at restart time
|
||||
}
|
||||
agentURL = *urlNullable
|
||||
_ = db.CacheURL(ctx, workspaceID, agentURL)
|
||||
return rewriteForDocker(agentURL, workspaceID), nil
|
||||
}
|
||||
|
||||
// rewriteForDocker rewrites a 127.0.0.1 agent URL to the Docker-DNS form
|
||||
// when the platform is running inside a Docker container. When platform is
|
||||
// on the host (non-Docker), 127.0.0.1 IS the host and the original URL works.
|
||||
func rewriteForDocker(agentURL, workspaceID string) string {
|
||||
if platformInDocker && h.provisioner != nil {
|
||||
// Only rewrite if the URL points to localhost (the ephemeral port
|
||||
// binding the container published to the host). Internal Docker
|
||||
// URLs (e.g. http://ws-abc123def:8000) are already correct.
|
||||
if len(agentURL) >= 17 && agentURL[:16] == "http://127.0.0.1" {
|
||||
return provisioner.InternalURL(workspaceID)
|
||||
}
|
||||
}
|
||||
return agentURL
|
||||
}
|
||||
@ -1,330 +0,0 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// stubLocalProv is a minimal LocalProvisionerAPI stub used to make
|
||||
// h.provisioner non-nil for the Docker-URL-rewrite tests.
|
||||
// All methods panic — rewriteForDocker only checks h.provisioner != nil.
|
||||
type stubLocalProv struct{}
|
||||
|
||||
func (s *stubLocalProv) Start(_ context.Context, _ provisioner.WorkspaceConfig) (string, error) {
|
||||
panic("stubLocalProv.Start not implemented in test")
|
||||
}
|
||||
func (s *stubLocalProv) Stop(_ context.Context, _ string) error {
|
||||
panic("stubLocalProv.Stop not implemented in test")
|
||||
}
|
||||
func (s *stubLocalProv) IsRunning(_ context.Context, _ string) (bool, error) {
|
||||
panic("stubLocalProv.IsRunning not implemented in test")
|
||||
}
|
||||
func (s *stubLocalProv) ExecRead(_ context.Context, _, _ string) ([]byte, error) {
|
||||
panic("stubLocalProv.ExecRead not implemented in test")
|
||||
}
|
||||
func (s *stubLocalProv) RemoveVolume(_ context.Context, _ string) error {
|
||||
panic("stubLocalProv.RemoveVolume not implemented in test")
|
||||
}
|
||||
func (s *stubLocalProv) VolumeHasFile(_ context.Context, _, _ string) (bool, error) {
|
||||
panic("stubLocalProv.VolumeHasFile not implemented in test")
|
||||
}
|
||||
func (s *stubLocalProv) WriteAuthTokenToVolume(_ context.Context, _, _ string) error {
|
||||
panic("stubLocalProv.WriteAuthTokenToVolume not implemented in test")
|
||||
}
|
||||
|
||||
// Compile-time assertion: stubLocalProv satisfies LocalProvisionerAPI.
|
||||
var _ provisioner.LocalProvisionerAPI = (*stubLocalProv)(nil)
|
||||
|
||||
// TestRewriteForDocker_NonDockerHostUrlUnchanged verifies that a non-Docker
|
||||
// URL passes through rewriteForDocker unchanged when platform is not in Docker.
|
||||
func TestRewriteForDocker_NonDockerHostUrlUnchanged(t *testing.T) {
|
||||
restore := setPlatformInDockerForTest(false)
|
||||
defer restore()
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
url := h.rewriteForDocker("http://example.com:8000/agent", "ws-test-123")
|
||||
if url != "http://example.com:8000/agent" {
|
||||
t.Errorf("expected unchanged URL, got %q", url)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRewriteForDocker_LocalhostUrlUnchanged_NoProvisioner verifies that a
|
||||
// localhost URL is NOT rewritten when h.provisioner is nil (SaaS/CP mode).
|
||||
func TestRewriteForDocker_LocalhostUrlUnchanged_NoProvisioner(t *testing.T) {
|
||||
restore := setPlatformInDockerForTest(true)
|
||||
defer restore()
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
// h.provisioner is nil → no Docker rewrite even when platformInDocker=true
|
||||
url := h.rewriteForDocker("http://127.0.0.1:49152/agent", "ws-test-123")
|
||||
if url != "http://127.0.0.1:49152/agent" {
|
||||
t.Errorf("expected localhost URL unchanged (no provisioner), got %q", url)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRewriteForDocker_LocalhostUrlRewritten verifies that a localhost URL
|
||||
// IS rewritten to the Docker-DNS form when platform is in Docker AND a
|
||||
// provisioner is wired.
|
||||
func TestRewriteForDocker_LocalhostUrlRewritten(t *testing.T) {
|
||||
restore := setPlatformInDockerForTest(true)
|
||||
defer restore()
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
h.provisioner = &stubLocalProv{} // non-nil → triggers Docker rewrite
|
||||
|
||||
url := h.rewriteForDocker("http://127.0.0.1:49152/agent", "ws-test-123")
|
||||
// Docker DNS form: ws-<short-id>:8000
|
||||
if url == "http://127.0.0.1:49152/agent" {
|
||||
t.Error("expected localhost URL to be rewritten to Docker DNS form")
|
||||
}
|
||||
// Verify the rewrite matches the expected Docker internal URL format
|
||||
expectedInternal := "http://ws-ws-test-123:8000"
|
||||
if url != expectedInternal {
|
||||
t.Errorf("expected %q, got %q", expectedInternal, url)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveAgentURLForRestartSignal_CacheHit verifies that a Redis-cached
|
||||
// URL is returned without hitting the DB.
|
||||
func TestResolveAgentURLForRestartSignal_CacheHit(t *testing.T) {
|
||||
mockDB, mock := setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct
|
||||
_ = setupTestRedisWithURL(t, "http://cached.internal:9000/agent")
|
||||
|
||||
h := newHandlerWithTestDepsWithDB(t, mockDB)
|
||||
|
||||
// Redis cache hit → DB should NOT be queried
|
||||
url, err := h.resolveAgentURLForRestartSignal(context.Background(), "ws-cache-hit-123")
|
||||
if err != nil {
|
||||
t.Fatalf("resolveAgentURLForRestartSignal failed: %v", err)
|
||||
}
|
||||
if url == "" {
|
||||
t.Fatal("expected non-empty URL from cache")
|
||||
}
|
||||
// DB should not be queried (no rows returned to sqlmock)
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unfulfilled DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveAgentURLForRestartSignal_DBError verifies that a DB error is
|
||||
// returned and propagated when neither Redis cache nor DB lookup succeeds.
|
||||
func TestResolveAgentURLForRestartSignal_DBError(t *testing.T) {
|
||||
mockDB, mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct
|
||||
_ = setupTestRedis(t) // empty → cache miss
|
||||
|
||||
h := newHandlerWithTestDepsWithDB(t, mockDB)
|
||||
|
||||
mock.ExpectQuery(`SELECT url FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-db-err-789").
|
||||
WillReturnError(context.DeadlineExceeded)
|
||||
|
||||
_, err := h.resolveAgentURLForRestartSignal(context.Background(), "ws-db-err-789")
|
||||
if err == nil {
|
||||
t.Fatal("expected DB error to be returned")
|
||||
}
|
||||
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unfulfilled DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveAgentURLForRestartSignal_CacheMiss verifies that on Redis miss,
|
||||
// the URL is fetched from the DB and cached.
|
||||
func TestResolveAgentURLForRestartSignal_CacheMiss(t *testing.T) {
|
||||
mockDB, mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct
|
||||
mr := setupTestRedis(t) // empty → cache miss
|
||||
|
||||
h := newHandlerWithTestDepsWithDB(t, mockDB)
|
||||
|
||||
mock.ExpectQuery(`SELECT url FROM workspaces WHERE id =`).
|
||||
WithArgs("ws-cache-miss-456").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"url"}).
|
||||
AddRow("http://db.internal:8000/agent"))
|
||||
|
||||
url, err := h.resolveAgentURLForRestartSignal(context.Background(), "ws-cache-miss-456")
|
||||
if err != nil {
|
||||
t.Fatalf("resolveAgentURLForRestartSignal failed: %v", err)
|
||||
}
|
||||
if url != "http://db.internal:8000/agent" {
|
||||
t.Errorf("expected DB URL, got %q", url)
|
||||
}
|
||||
|
||||
// Verify the URL was cached in Redis
|
||||
cached, err := mr.Get(context.Background(), "ws:ws-cache-miss-456:url").Result()
|
||||
if err != nil {
|
||||
t.Fatalf("URL was not cached in Redis: %v", err)
|
||||
}
|
||||
if cached != "http://db.internal:8000/agent" {
|
||||
t.Errorf("expected cached URL %q, got %q", "http://db.internal:8000/agent", cached)
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("unfulfilled DB expectations: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGracefulPreRestart_Success verifies that when the workspace returns 200,
|
||||
// the signal is logged as acknowledged without error.
|
||||
func TestGracefulPreRestart_Success(t *testing.T) {
|
||||
_ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct
|
||||
|
||||
mr := setupTestRedisWithURL(t, "http://localhost:18000/agent")
|
||||
|
||||
// httptest server simulating the workspace container's /signals/restart_pending
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
t.Errorf("expected POST, got %s", r.Method)
|
||||
}
|
||||
if r.Header.Get("Content-Type") != "application/json" {
|
||||
t.Errorf("expected Content-Type: application/json, got %s", r.Header.Get("Content-Type"))
|
||||
}
|
||||
if r.Header.Get("X-Restart-Signal") != "true" {
|
||||
t.Error("expected X-Restart-Signal: true header")
|
||||
}
|
||||
|
||||
var req map[string]interface{}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
t.Errorf("failed to decode request body: %v", err)
|
||||
}
|
||||
if req["method"] != "signals/restart_pending" {
|
||||
t.Errorf("expected method signals/restart_pending, got %v", req["method"])
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"jsonrpc": "2.0",
|
||||
"result": map[string]interface{}{"acknowledged": true},
|
||||
})
|
||||
}))
|
||||
defer srv.Close()
|
||||
mr.Set("ws:ws-ack-789:url", srv.URL, 5*time.Minute)
|
||||
|
||||
// Patch the handler's resolveAgentURLForRestartSignal to return the test server URL
|
||||
// (avoids needing a real provisioner for this test)
|
||||
h := newHandlerWithTestDeps(t)
|
||||
origResolve := h.resolveAgentURLForRestartSignal
|
||||
h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) {
|
||||
return srv.URL + "/agent", nil
|
||||
}
|
||||
defer func() { h.resolveAgentURLForRestartSignal = origResolve }()
|
||||
|
||||
// gracefulPreRestart runs in a goroutine with its own timeout.
|
||||
// We give it time to complete before the test ends.
|
||||
h.gracefulPreRestart(context.Background(), "ws-ack-789")
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
// TestGracefulPreRestart_NotImplemented verifies that when the workspace returns
|
||||
// 404 (old SDK version), the platform proceeds gracefully (log + no error).
|
||||
func TestGracefulPreRestart_NotImplemented(t *testing.T) {
|
||||
_ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct
|
||||
|
||||
mr := setupTestRedisWithURL(t, "http://localhost:18001/agent")
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}))
|
||||
defer srv.Close()
|
||||
mr.Set("ws:ws-noimpl-999:url", srv.URL, 5*time.Minute)
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
origResolve := h.resolveAgentURLForRestartSignal
|
||||
h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) {
|
||||
return srv.URL + "/agent", nil
|
||||
}
|
||||
defer func() { h.resolveAgentURLForRestartSignal = origResolve }()
|
||||
|
||||
h.gracefulPreRestart(context.Background(), "ws-noimpl-999")
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
// No panic or error expected — graceful degradation
|
||||
}
|
||||
|
||||
// TestGracefulPreRestart_ConnectionRefused verifies that when the workspace
|
||||
// is unreachable, the platform proceeds gracefully without error.
|
||||
func TestGracefulPreRestart_ConnectionRefused(t *testing.T) {
|
||||
_ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct
|
||||
|
||||
mr := setupTestRedisWithURL(t, "http://localhost:19999/agent") // nothing listening on 19999
|
||||
mr.Set("ws:ws-unreachable-000:url", "http://localhost:19999/agent", 5*time.Minute)
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
origResolve := h.resolveAgentURLForRestartSignal
|
||||
h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) {
|
||||
return "http://localhost:19999/agent", nil
|
||||
}
|
||||
defer func() { h.resolveAgentURLForRestartSignal = origResolve }()
|
||||
|
||||
h.gracefulPreRestart(context.Background(), "ws-unreachable-000")
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
// No panic or error expected — proceeds with stop as documented
|
||||
}
|
||||
|
||||
// TestGracefulPreRestart_URLResolutionError verifies that when URL resolution
|
||||
// fails, the platform proceeds gracefully without blocking the restart.
|
||||
func TestGracefulPreRestart_URLResolutionError(t *testing.T) {
|
||||
_ = setupTestDB(t)
|
||||
_ = setupTestRedis(t) // empty → URL resolution will fail in resolveAgentURLForRestartSignal
|
||||
|
||||
h := newHandlerWithTestDeps(t)
|
||||
|
||||
// Override resolveAgentURLForRestartSignal to return an error
|
||||
origResolve := h.resolveAgentURLForRestartSignal
|
||||
h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) {
|
||||
return "", context.DeadlineExceeded
|
||||
}
|
||||
defer func() { h.resolveAgentURLForRestartSignal = origResolve }()
|
||||
|
||||
h.gracefulPreRestart(context.Background(), "ws-url-err-111")
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
// No panic or error expected — proceeds with stop as documented
|
||||
}
|
||||
|
||||
// ─── helpers ─────────────────────────────────────────────────────────────────
|
||||
|
||||
// newHandlerWithTestDeps creates a WorkspaceHandler with test stubs.
|
||||
// provisioner is nil so rewriteForDocker returns URL unchanged.
|
||||
func newHandlerWithTestDeps(t *testing.T) *WorkspaceHandler {
|
||||
return NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
}
|
||||
|
||||
// newHandlerWithTestDepsWithDB creates a WorkspaceHandler with a specific mock DB.
|
||||
// Use this when you need to control the DB mock expectations.
|
||||
func newHandlerWithTestDepsWithDB(t *testing.T, mockDB *sql.DB) *WorkspaceHandler {
|
||||
// We need to temporarily replace db.DB with our mock
|
||||
origDB := db.DB
|
||||
db.DB = mockDB
|
||||
t.Cleanup(func() { db.DB = origDB })
|
||||
|
||||
return NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
|
||||
}
|
||||
|
||||
// setupTestRedisWithURL is like setupTestRedis but pre-populates a workspace URL.
|
||||
func setupTestRedisWithURL(t *testing.T, url string) *miniredis.Miniredis {
|
||||
mr, err := miniredis.Run()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to start miniredis: %v", err)
|
||||
}
|
||||
db.RDB = redis.NewClient(&redis.Options{Addr: mr.Addr()})
|
||||
// Pre-populate a URL for the test workspace IDs used in these tests
|
||||
for _, wsID := range []string{"ws-cache-hit-123", "ws-cache-miss-456", "ws-ack-789", "ws-noimpl-999", "ws-unreachable-000"} {
|
||||
if err := db.CacheURL(context.Background(), wsID, url); err != nil {
|
||||
t.Fatalf("failed to cache URL for %s: %v", wsID, err)
|
||||
}
|
||||
}
|
||||
t.Cleanup(func() { mr.Close() })
|
||||
return mr
|
||||
}
|
||||
|
||||
// rewriteForDocker is exported from restart_signals.go so it can be tested here.
|
||||
func (h *WorkspaceHandler) rewriteForDocker(agentURL, workspaceID string) string {
|
||||
return rewriteForDocker(agentURL, workspaceID)
|
||||
}
|
||||
@ -564,18 +564,6 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) {
|
||||
|
||||
log.Printf("Auto-restart: restarting %s (%s) runtime=%q (was: %s)", wsName, workspaceID, dbRuntime, status)
|
||||
|
||||
// #125 Phase 1: send pre-restart drain signal to the workspace agent.
|
||||
// For native_session targets, A2A messages go directly to the SDK session
|
||||
// and bypass the platform's a2a_queue buffering. If the container dies
|
||||
// mid-request, those messages are lost. The pre-restart signal gives the
|
||||
// SDK a chance to drain in-flight work before the container stops.
|
||||
//
|
||||
// Fire-and-forget: gracefulPreRestart runs in a detached goroutine with its
|
||||
// own 10s timeout. If the workspace doesn't implement the handler (404) or
|
||||
// times out, we proceed with the stop anyway — identical to the pre-fix
|
||||
// behaviour.
|
||||
h.gracefulPreRestart(ctx, workspaceID)
|
||||
|
||||
h.stopForRestart(ctx, workspaceID)
|
||||
|
||||
db.DB.ExecContext(ctx,
|
||||
|
||||
@ -1,317 +0,0 @@
|
||||
package plugins
|
||||
|
||||
// drift_sweeper.go — periodic drift detection for the plugin version-subscription
|
||||
// model (core#113 / #123).
|
||||
//
|
||||
// How it works
|
||||
// ─────────────
|
||||
// Every DriftSweepInterval the sweeper:
|
||||
// 1. SELECTs workspace_plugins rows where tracked_ref != 'none'
|
||||
// AND installed_sha IS NOT NULL (skip pre-migration rows with NULL SHA).
|
||||
// 2. For each row, resolves the tracked ref to its current upstream SHA
|
||||
// using the appropriate SourceResolver.
|
||||
// 3. If the resolved SHA differs from installed_sha → drift detected.
|
||||
// 4. On drift, INSERT INTO plugin_update_queue (ON CONFLICT DO NOTHING so
|
||||
// a re-drift while a row is still pending is a no-op).
|
||||
//
|
||||
// Thread-safety
|
||||
// ─────────────
|
||||
// The sweeper holds no mutable state between ticks. Each tick runs a fresh
|
||||
// goroutine spawned by the ticker; the parent goroutine is cancelled when
|
||||
// the passed context is cancelled. This matches the pattern used by
|
||||
// pendinguploads/sweeper.go and registry/orphan_sweeper.go.
|
||||
//
|
||||
// Gitea compatibility
|
||||
// ───────────────────
|
||||
// Gitea's REST API is a GitHub-API-compatible surface, so the GithubResolver
|
||||
// with BaseURL pointing at a Gitea instance works for Gitea-hosted plugin
|
||||
// sources too. The source_raw in workspace_plugins stores the full spec
|
||||
// (e.g. "github://owner/repo#tag:v1.0.0") which the resolver parses.
|
||||
// For "local://" sources the resolver has no SHA concept, so those rows
|
||||
// are skipped (local plugins have no upstream to drift against).
|
||||
//
|
||||
// Resource cost
|
||||
// ─────────────
|
||||
// Each tick runs O(N) resolves where N is the count of tracked plugins.
|
||||
// Each resolve does a --depth=1 git fetch, bounded by the network round-trip
|
||||
// to GitHub/Gitea. With 1000 tracked plugins and 1h interval, worst case is
|
||||
// ~1,000 network calls per hour. The per-row timeout (ResolveRefDeadline)
|
||||
// prevents a slow/hanging fetch from blocking the entire sweep cycle.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
)
|
||||
|
||||
// DriftSweepInterval is the cadence between drift-sweep cycles.
|
||||
// 1 hour is a reasonable balance: fast enough to surface new tag releases
|
||||
// within a reasonable window, sparse enough to not hammer GitHub's API with
|
||||
// 1000s of concurrent requests across a large deployment.
|
||||
const DriftSweepInterval = 1 * time.Hour
|
||||
|
||||
// ResolveRefDeadline bounds the git fetch for a single plugin. A
|
||||
// --depth=1 clone of any reasonable plugin repo should complete well
|
||||
// within 30s on a healthy connection; 60s is the conservative ceiling
|
||||
// that handles Gitea instances on high-latency links.
|
||||
const ResolveRefDeadline = 60 * time.Second
|
||||
|
||||
// SourceResolver resolves plugin sources to installable directories.
|
||||
// Satisfied by *Registry (which wraps GithubResolver + LocalResolver).
|
||||
type SourceResolver interface {
|
||||
Resolve(source Source) (SourceResolver, error)
|
||||
Schemes() []string
|
||||
}
|
||||
|
||||
// StartPluginDriftSweeper runs the drift-detection loop until ctx is cancelled.
|
||||
// Pass a nil resolver to disable the sweeper (useful for harnesses or CP/SaaS
|
||||
// mode where git operations are unavailable).
|
||||
//
|
||||
// Registers itself via atexits in cmd/server/main.go so the process
|
||||
// shuts down cleanly on SIGTERM.
|
||||
func StartPluginDriftSweeper(ctx context.Context, resolver SourceResolver) {
|
||||
if resolver == nil {
|
||||
log.Println("Plugin drift sweeper: resolver is nil — sweeper disabled")
|
||||
return
|
||||
}
|
||||
log.Printf("Plugin drift sweeper started — interval %s", DriftSweepInterval)
|
||||
ticker := time.NewTicker(DriftSweepInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Run once on startup so we detect drift immediately rather than waiting
|
||||
// for the first tick.
|
||||
sweepDriftOnce(ctx, resolver)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("Plugin drift sweeper: shutdown")
|
||||
return
|
||||
case <-ticker.C:
|
||||
// ctx.Err() guard: the ticker may fire just as ctx is cancelled
|
||||
// (MPMC channel race). Skip the sweep so we don't start a
|
||||
// ResolveRef cycle after shutdown that would pollute the next
|
||||
// test's baseline.
|
||||
if ctx.Err() != nil {
|
||||
continue
|
||||
}
|
||||
sweepDriftOnce(ctx, resolver)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sweepDriftOnce runs one full drift-detection cycle.
|
||||
// Errors are non-fatal — each row is handled independently so a single
|
||||
// slow row doesn't block the rest of the sweep.
|
||||
func sweepDriftOnce(parent context.Context, resolver SourceResolver) {
|
||||
ctx, cancel := context.WithTimeout(parent, 10*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT wp.id, wp.workspace_id, wp.plugin_name, wp.source_raw,
|
||||
wp.tracked_ref, wp.installed_sha
|
||||
FROM workspace_plugins wp
|
||||
WHERE wp.tracked_ref != 'none'
|
||||
AND wp.installed_sha IS NOT NULL
|
||||
`)
|
||||
if err != nil {
|
||||
log.Printf("Plugin drift sweeper: SELECT failed: %v", err)
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var row struct {
|
||||
id string
|
||||
workspaceID string
|
||||
pluginName string
|
||||
sourceRaw string
|
||||
trackedRef string
|
||||
installedSHA string
|
||||
}
|
||||
if scanErr := rows.Scan(&row.id, &row.workspaceID, &row.pluginName,
|
||||
&row.sourceRaw, &row.trackedRef, &row.installedSHA); scanErr != nil {
|
||||
log.Printf("Plugin drift sweeper: row scan failed: %v", scanErr)
|
||||
continue
|
||||
}
|
||||
|
||||
latestSHA, resolveErr := resolveLatestSHA(ctx, resolver, row.sourceRaw, row.trackedRef)
|
||||
if resolveErr != nil {
|
||||
// Log and skip — don't queue drift if we couldn't resolve.
|
||||
// Transient network errors self-heal on the next cycle.
|
||||
log.Printf("Plugin drift sweeper: resolve %s@%s failed: %v — skipping",
|
||||
row.pluginName, row.trackedRef, resolveErr)
|
||||
continue
|
||||
}
|
||||
|
||||
if latestSHA == row.installedSHA {
|
||||
continue // no drift
|
||||
}
|
||||
|
||||
log.Printf("Plugin drift sweeper: drift detected for %s (workspace=%s): "+
|
||||
"installed=%s upstream=%s", row.pluginName, row.workspaceID,
|
||||
row.installedSHA[:8], latestSHA[:8])
|
||||
|
||||
if queueErr := queueDriftEntry(ctx, row.workspaceID, row.pluginName,
|
||||
row.trackedRef, row.installedSHA, latestSHA); queueErr != nil {
|
||||
log.Printf("Plugin drift sweeper: queue drift for %s failed: %v",
|
||||
row.pluginName, queueErr)
|
||||
}
|
||||
}
|
||||
if iterErr := rows.Err(); iterErr != nil {
|
||||
log.Printf("Plugin drift sweeper: rows iteration failed: %v", iterErr)
|
||||
}
|
||||
}
|
||||
|
||||
// resolveLatestSHA resolves the tracked ref to its current upstream SHA.
|
||||
// Handles both github:// and local:// sources; local sources are skipped
|
||||
// (no meaningful upstream to drift against).
|
||||
func resolveLatestSHA(ctx context.Context, resolver SourceResolver, sourceRaw, trackedRef string) (string, error) {
|
||||
// Strip the scheme prefix to get the raw spec.
|
||||
// sourceRaw is stored as the full string, e.g. "github://owner/repo#tag:v1.0.0"
|
||||
spec := sourceRaw
|
||||
for _, scheme := range resolver.Schemes() {
|
||||
if strings.HasPrefix(spec, scheme+"://") {
|
||||
spec = strings.TrimPrefix(spec, scheme+"://")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Parse the ref from the tracked_ref field (e.g. "tag:v1.0.0").
|
||||
// Prepend it as a # suffix so the resolver can fetch the right ref.
|
||||
var refSuffix string
|
||||
switch {
|
||||
case strings.HasPrefix(trackedRef, "tag:"):
|
||||
refSuffix = "#" + trackedRef
|
||||
case strings.HasPrefix(trackedRef, "sha:"):
|
||||
refSuffix = "#" + trackedRef
|
||||
default:
|
||||
// Bare ref (shouldn't happen per validateTrackedRef, but be safe).
|
||||
refSuffix = "#" + trackedRef
|
||||
}
|
||||
|
||||
// If spec already has a # fragment, replace it with the tracked ref.
|
||||
// (In practice source_raw always has one, but handle both cases.)
|
||||
if strings.Contains(spec, "#") {
|
||||
spec = strings.SplitN(spec, "#", 2)[0] + refSuffix
|
||||
} else {
|
||||
spec = spec + refSuffix
|
||||
}
|
||||
|
||||
// Use the github resolver directly — it handles the fetch + rev-parse.
|
||||
gh := NewGithubResolver()
|
||||
resolvedSHA, err := gh.ResolveRef(ctx, spec)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("resolve %s: %w", spec, err)
|
||||
}
|
||||
return resolvedSHA, nil
|
||||
}
|
||||
|
||||
// queueDriftEntry inserts a pending drift entry into plugin_update_queue.
|
||||
// ON CONFLICT (workspace_id, plugin_name) WHERE status = 'pending' DO NOTHING
|
||||
// makes this idempotent — re-drift while a row is already pending is a no-op.
|
||||
// Uses the partial unique index plugin_update_queue_pending_unique as the
|
||||
// inference target; the WHERE clause ensures we only dedup pending rows.
|
||||
func queueDriftEntry(ctx context.Context, workspaceID, pluginName, trackedRef, currentSHA, latestSHA string) error {
|
||||
_, err := db.DB.ExecContext(ctx, `
|
||||
INSERT INTO plugin_update_queue
|
||||
(workspace_id, plugin_name, tracked_ref, current_sha, latest_sha)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (workspace_id, plugin_name) DO NOTHING
|
||||
`, workspaceID, pluginName, trackedRef, currentSHA, latestSHA)
|
||||
return err
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
// Test helpers
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// SweepDriftOnceForTest exposes sweepDriftOnce for package-level testing.
|
||||
func SweepDriftOnceForTest(parent context.Context, resolver SourceResolver) {
|
||||
sweepDriftOnce(parent, resolver)
|
||||
}
|
||||
|
||||
// QueueDriftEntryForTest exposes queueDriftEntry for package-level testing.
|
||||
func QueueDriftEntryForTest(ctx context.Context, workspaceID, pluginName, trackedRef, currentSHA, latestSHA string) error {
|
||||
return queueDriftEntry(ctx, workspaceID, pluginName, trackedRef, currentSHA, latestSHA)
|
||||
}
|
||||
|
||||
// PluginUpdateQueueRow is the Go struct mirroring a plugin_update_queue row.
|
||||
// Exported for tests and for the admin handler to consume.
|
||||
type PluginUpdateQueueRow struct {
|
||||
ID string `json:"id"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
PluginName string `json:"plugin_name"`
|
||||
TrackedRef string `json:"tracked_ref"`
|
||||
CurrentSHA string `json:"current_sha"`
|
||||
LatestSHA string `json:"latest_sha"`
|
||||
Status string `json:"status"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
// ListPendingUpdates returns all pending drift entries, newest first.
|
||||
func ListPendingUpdates(ctx context.Context) ([]PluginUpdateQueueRow, error) {
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT id, workspace_id, plugin_name, tracked_ref,
|
||||
current_sha, latest_sha, status, created_at
|
||||
FROM plugin_update_queue
|
||||
WHERE status = 'pending'
|
||||
ORDER BY created_at DESC
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list pending updates: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var result []PluginUpdateQueueRow
|
||||
for rows.Next() {
|
||||
var r PluginUpdateQueueRow
|
||||
if scanErr := rows.Scan(&r.ID, &r.WorkspaceID, &r.PluginName,
|
||||
&r.TrackedRef, &r.CurrentSHA, &r.LatestSHA, &r.Status, &r.CreatedAt); scanErr != nil {
|
||||
return nil, fmt.Errorf("scan row: %w", scanErr)
|
||||
}
|
||||
result = append(result, r)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
||||
// ApplyDriftUpdate marks a queue entry as applied (or already-applied idempotently)
|
||||
// and returns the workspace_id and plugin_name so the caller can trigger a restart.
|
||||
func ApplyDriftUpdate(ctx context.Context, queueID string) (workspaceID, pluginName string, err error) {
|
||||
var row struct {
|
||||
WorkspaceID string
|
||||
PluginName string
|
||||
Status sql.NullString
|
||||
}
|
||||
err = db.DB.QueryRowContext(ctx, `
|
||||
SELECT workspace_id, plugin_name, status
|
||||
FROM plugin_update_queue
|
||||
WHERE id = $1
|
||||
`, queueID).Scan(&row.WorkspaceID, &row.PluginName, &row.Status)
|
||||
if err == sql.ErrNoRows {
|
||||
return "", "", fmt.Errorf("queue entry %s not found", queueID)
|
||||
}
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("query queue entry: %w", err)
|
||||
}
|
||||
|
||||
if row.Status.Valid && row.Status.String == "applied" {
|
||||
// Idempotent — already applied.
|
||||
return row.WorkspaceID, row.PluginName, nil
|
||||
}
|
||||
|
||||
_, execErr := db.DB.ExecContext(ctx, `
|
||||
UPDATE plugin_update_queue
|
||||
SET status = 'applied'
|
||||
WHERE id = $1
|
||||
AND status = 'pending'
|
||||
`, queueID)
|
||||
if execErr != nil {
|
||||
return "", "", fmt.Errorf("update status: %w", execErr)
|
||||
}
|
||||
return row.WorkspaceID, row.PluginName, nil
|
||||
}
|
||||
@ -1,163 +0,0 @@
|
||||
package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// stubResolver is a SourceResolver that always returns a stub github resolver.
|
||||
type stubResolver struct {
|
||||
schemes []string
|
||||
}
|
||||
|
||||
func (s *stubResolver) Resolve(source Source) (SourceResolver, error) {
|
||||
return NewGithubResolver(), nil
|
||||
}
|
||||
|
||||
func (s *stubResolver) Schemes() []string { return s.schemes }
|
||||
|
||||
func TestResolveRef_RejectsBareSpec(t *testing.T) {
|
||||
r := NewGithubResolver()
|
||||
_, err := r.ResolveRef(context.Background(), "org/repo")
|
||||
if err == nil {
|
||||
t.Error("bare spec (no ref) should be rejected")
|
||||
}
|
||||
}
|
||||
|
||||
func TestResolveRef_RejectsInvalidSpec(t *testing.T) {
|
||||
r := NewGithubResolver()
|
||||
for _, spec := range []string{"", "single-segment", "a/b/c"} {
|
||||
t.Run(spec, func(t *testing.T) {
|
||||
_, err := r.ResolveRef(context.Background(), spec)
|
||||
if err == nil {
|
||||
t.Errorf("spec %q should be rejected", spec)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestResolveRef_PropagatesGitError(t *testing.T) {
|
||||
r := &GithubResolver{
|
||||
GitRunner: func(ctx context.Context, dir string, args ...string) error {
|
||||
return errors.New("simulated network failure")
|
||||
},
|
||||
}
|
||||
_, err := r.ResolveRef(context.Background(), "org/repo#v1.0.0")
|
||||
if err == nil {
|
||||
t.Error("expected error from git runner")
|
||||
}
|
||||
}
|
||||
|
||||
func TestResolveRef_MapsNotFoundToErrPluginNotFound(t *testing.T) {
|
||||
r := &GithubResolver{
|
||||
GitRunner: func(ctx context.Context, dir string, args ...string) error {
|
||||
return errors.New("remote: Repository not found")
|
||||
},
|
||||
}
|
||||
_, err := r.ResolveRef(context.Background(), "org/repo#v1.0.0")
|
||||
if !errors.Is(err, ErrPluginNotFound) {
|
||||
t.Errorf("expected ErrPluginNotFound, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// stubGitForResolveRef creates a stub that handles fetch + rev-parse for ResolveRef.
|
||||
func stubGitForResolveRef(t *testing.T, sha string) func(ctx context.Context, dir string, args ...string) error {
|
||||
return func(ctx context.Context, dir string, args ...string) error {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
if len(args) < 1 {
|
||||
return errors.New("no args")
|
||||
}
|
||||
switch args[0] {
|
||||
case "fetch":
|
||||
// mkdir for clone target
|
||||
_ = dir
|
||||
return nil
|
||||
case "rev-parse":
|
||||
// rev-parse success — write SHA to a file so rev-parse can "read" it
|
||||
return nil
|
||||
case "describe":
|
||||
// git describe for latest tag
|
||||
return nil
|
||||
}
|
||||
return errors.New("unexpected git command: " + args[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestResolveRef_SucceedsForTagRef(t *testing.T) {
|
||||
// This test verifies the happy path: fetch + rev-parse succeed.
|
||||
// We stub all git commands to succeed, then verify LastFetchSHA is populated.
|
||||
calls := make(map[string]bool)
|
||||
r := &GithubResolver{
|
||||
GitRunner: func(ctx context.Context, dir string, args ...string) error {
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
calls[args[0]] = true
|
||||
return nil
|
||||
},
|
||||
}
|
||||
_, err := r.ResolveRef(context.Background(), "org/repo#tag:v1.0.0")
|
||||
// Without a real git binary, we can't fully test success — but we can
|
||||
// verify the argument routing doesn't panic and returns expected errors.
|
||||
if err != nil && !errors.Is(err, ErrPluginNotFound) {
|
||||
// Expect ErrPluginNotFound when git is not available (no real git binary)
|
||||
// The important thing is it doesn't panic.
|
||||
}
|
||||
if !calls["fetch"] && !calls["rev-parse"] {
|
||||
// At least one git command should have been called
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolveRef_DoesNotPanic verifies that ResolveRef handles all ref shapes
|
||||
// without panicking on nil dereference or similar.
|
||||
func TestResolveRef_DoesNotPanic(t *testing.T) {
|
||||
r := NewGithubResolver()
|
||||
refs := []string{
|
||||
"org/repo#tag:v1.0.0",
|
||||
"org/repo#tag:latest",
|
||||
"org/repo#sha:abc123def456",
|
||||
"org/repo#main",
|
||||
}
|
||||
for _, ref := range refs {
|
||||
t.Run(ref, func(t *testing.T) {
|
||||
// Without real git, just verify no panic
|
||||
_, _ = r.ResolveRef(context.Background(), ref)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestQueueDriftEntry_Integration is a basic sanity that the SQL doesn't
|
||||
// explode. Real integration requires a test DB; here we verify the function
|
||||
// signature and error paths.
|
||||
func TestQueueDriftEntry_HandlesNilDB(t *testing.T) {
|
||||
// queueDriftEntry is internal; test via SweepDriftOnce which uses it.
|
||||
// When db.DB is nil, the SELECT in sweepDriftOnce will fail with a
|
||||
// nil pointer panic — but that's correct behaviour (DB must be wired).
|
||||
// The sweeper logs and skips on error, so nil DB gracefully degrades.
|
||||
}
|
||||
|
||||
// TestPluginUpdateQueueRow_Struct covers the struct field names.
|
||||
func TestPluginUpdateQueueRow_Struct(t *testing.T) {
|
||||
row := PluginUpdateQueueRow{
|
||||
ID: "test-id",
|
||||
WorkspaceID: "test-workspace",
|
||||
PluginName: "test-plugin",
|
||||
TrackedRef: "tag:v1.0.0",
|
||||
CurrentSHA: "abc123",
|
||||
LatestSHA: "def456",
|
||||
Status: "pending",
|
||||
}
|
||||
if row.Status != "pending" {
|
||||
t.Errorf("expected status pending, got %s", row.Status)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSourceResolverInterface_StubResolver verifies that a stub resolver
|
||||
// satisfies the SourceResolver interface.
|
||||
func TestSourceResolverInterface_StubResolver(t *testing.T) {
|
||||
var _ SourceResolver = (*stubResolver)(nil)
|
||||
}
|
||||
@ -30,11 +30,6 @@ type GithubResolver struct {
|
||||
// BaseURL defaults to https://github.com. Tests point it at a local
|
||||
// file:// bare repo.
|
||||
BaseURL string
|
||||
|
||||
// LastFetchSHA is set by Fetch after a successful clone. It holds the
|
||||
// commit SHA that was checked out. callers can retrieve it via LastSHA().
|
||||
// Only valid after a successful Fetch call; reset on each Fetch.
|
||||
LastFetchSHA string
|
||||
}
|
||||
|
||||
// NewGithubResolver constructs a resolver with sensible defaults.
|
||||
@ -45,10 +40,6 @@ func NewGithubResolver() *GithubResolver {
|
||||
}
|
||||
}
|
||||
|
||||
// LastSHA returns the SHA of the last successful Fetch call, or "" if
|
||||
// Fetch has not been called or the last call failed.
|
||||
func (r *GithubResolver) LastSHA() string { return r.LastFetchSHA }
|
||||
|
||||
// Scheme returns "github".
|
||||
func (r *GithubResolver) Scheme() string { return "github" }
|
||||
|
||||
@ -123,15 +114,6 @@ func (r *GithubResolver) Fetch(ctx context.Context, spec string, dst string) (st
|
||||
return "", fmt.Errorf("github resolver: clone %s failed: %w", url, err)
|
||||
}
|
||||
|
||||
// Capture the SHA before we strip .git. This is the commit that will
|
||||
// be installed, used by the drift detector to seed installed_sha so
|
||||
// subsequent cycles can detect drift.
|
||||
// runGit captures output; errors are non-fatal — an unknown SHA just
|
||||
// means drift detection can't work for this row, which is acceptable.
|
||||
if shaOut, shaErr := runGitOneLine(ctx, cloneTarget, "rev-parse", "--verify", "HEAD"); shaErr == nil {
|
||||
r.LastFetchSHA = strings.TrimSpace(shaOut)
|
||||
}
|
||||
|
||||
// Strip .git so the plugin dir doesn't become a nested repo in the
|
||||
// workspace container's filesystem.
|
||||
if err := os.RemoveAll(filepath.Join(cloneTarget, ".git")); err != nil {
|
||||
@ -146,24 +128,6 @@ func (r *GithubResolver) Fetch(ctx context.Context, spec string, dst string) (st
|
||||
return repo, nil
|
||||
}
|
||||
|
||||
// runGitOneLine runs git with args in dir and returns stdout trimmed.
|
||||
// Returns "" on error (caller decides whether to treat it as fatal).
|
||||
func runGitOneLine(ctx context.Context, dir string, args ...string) (string, error) {
|
||||
cmd := exec.CommandContext(ctx, "git", args...)
|
||||
cmd.Dir = dir
|
||||
childEnv := os.Environ()
|
||||
if os.Getenv("HOME") == "" && dir != "" {
|
||||
childEnv = append(childEnv, "HOME="+dir)
|
||||
}
|
||||
childEnv = append(childEnv, "LANG=C", "LC_ALL=C")
|
||||
cmd.Env = childEnv
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("git %v: %w (output: %s)", args, err, string(out))
|
||||
}
|
||||
return string(out), nil
|
||||
}
|
||||
|
||||
// defaultGitRunner shells out to the system `git`. `dir` is the working
|
||||
// directory for the command (nil/empty means current process cwd).
|
||||
func defaultGitRunner(ctx context.Context, dir string, args ...string) error {
|
||||
@ -190,116 +154,3 @@ func defaultGitRunner(ctx context.Context, dir string, args ...string) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResolveRef resolves a plugin spec to a full commit SHA.
|
||||
//
|
||||
// Used by the drift sweeper to compare the SHA installed in a workspace
|
||||
// against the current upstream SHA for the tracked ref.
|
||||
//
|
||||
// Spec shapes:
|
||||
// - "owner/repo#tag:v1.0.0" → fetch the tag, return its commit SHA
|
||||
// - "owner/repo#tag:latest" → fetch tags, find the latest tag, return its SHA
|
||||
// - "owner/repo#sha:abc123" → already a full SHA; validate and return as-is
|
||||
// - "owner/repo#main" → fetch the branch, return its tip SHA
|
||||
//
|
||||
// Returns ErrPluginNotFound if the ref does not exist upstream.
|
||||
func (r *GithubResolver) ResolveRef(ctx context.Context, spec string) (string, error) {
|
||||
spec = strings.TrimSpace(spec)
|
||||
m := repoRE.FindStringSubmatch(spec)
|
||||
if m == nil {
|
||||
return "", fmt.Errorf("github resolver: spec %q must be <owner>/<repo>[#<ref>]", spec)
|
||||
}
|
||||
owner, repo, ref := m[1], m[2], m[3]
|
||||
if ref == "" {
|
||||
return "", fmt.Errorf("github resolver: ResolveRef requires a ref (got bare %q)", spec)
|
||||
}
|
||||
|
||||
base := r.BaseURL
|
||||
if base == "" {
|
||||
base = "https://github.com"
|
||||
}
|
||||
url := fmt.Sprintf("%s/%s/%s.git", base, owner, repo)
|
||||
|
||||
// Clone shallowly into a temp dir, then resolve the SHA.
|
||||
// --depth=1 keeps the network cost bounded regardless of repo size.
|
||||
workDir, err := os.MkdirTemp("", "molecule-resolve-ref-*")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("github resolver: tempdir: %w", err)
|
||||
}
|
||||
defer os.RemoveAll(workDir)
|
||||
|
||||
runner := r.GitRunner
|
||||
if runner == nil {
|
||||
runner = defaultGitRunner
|
||||
}
|
||||
|
||||
// Build ref to fetch: for "tag:latest" we fetch all tags; for
|
||||
// "tag:vX.Y.Z" we fetch that specific tag; for bare refs we fetch
|
||||
// the branch/commit directly.
|
||||
fetchArgs := []string{"fetch", "--depth=1"}
|
||||
switch {
|
||||
case strings.HasPrefix(ref, "tag:"):
|
||||
tagName := strings.TrimPrefix(ref, "tag:")
|
||||
if tagName == "latest" {
|
||||
// Fetch all tags so we can find the latest one.
|
||||
fetchArgs = []string{"fetch", "--tags", "--deepen=1", "--", url}
|
||||
} else {
|
||||
fetchArgs = append(fetchArgs, "--", url, "tag", tagName)
|
||||
}
|
||||
case strings.HasPrefix(ref, "sha:"):
|
||||
// Already a SHA; just fetch it directly.
|
||||
sha := strings.TrimPrefix(ref, "sha:")
|
||||
fetchArgs = append(fetchArgs, "--", url, sha)
|
||||
default:
|
||||
// Branch or other named ref.
|
||||
fetchArgs = append(fetchArgs, "--", url, ref)
|
||||
}
|
||||
|
||||
if err := runner(ctx, workDir, fetchArgs...); err != nil {
|
||||
msg := strings.ToLower(err.Error())
|
||||
if strings.Contains(msg, "repository not found") ||
|
||||
strings.Contains(msg, "could not find remote ref") ||
|
||||
strings.Contains(msg, "remote ref not found") {
|
||||
return "", fmt.Errorf("github resolver: %s: %w", url, ErrPluginNotFound)
|
||||
}
|
||||
return "", fmt.Errorf("github resolver: fetch %s %s failed: %w", url, ref, err)
|
||||
}
|
||||
|
||||
// Resolve the ref to a SHA.
|
||||
var shaOut []byte
|
||||
var resolveErr error
|
||||
if strings.HasPrefix(ref, "tag:") {
|
||||
tagName := strings.TrimPrefix(ref, "tag:")
|
||||
if tagName == "latest" {
|
||||
// Find the most recent tag by commit date.
|
||||
tagCmd := exec.CommandContext(ctx, "git", "-C", workDir,
|
||||
"describe", "--tags", "--abbrev=0", "HEAD")
|
||||
tagOut, tagErr := tagCmd.CombinedOutput()
|
||||
if tagErr != nil {
|
||||
return "", fmt.Errorf("github resolver: no tags found in %s: %w (%s)",
|
||||
owner+"/"+repo, tagErr, string(tagOut))
|
||||
}
|
||||
resolvedTag := strings.TrimSpace(string(tagOut))
|
||||
shaCmd := exec.CommandContext(ctx, "git", "-C", workDir,
|
||||
"rev-parse", "--verify", "refs/tags/"+resolvedTag+"^{commit}")
|
||||
shaOut, resolveErr = shaCmd.CombinedOutput()
|
||||
} else {
|
||||
shaCmd := exec.CommandContext(ctx, "git", "-C", workDir,
|
||||
"rev-parse", "--verify", "refs/tags/"+tagName+"^{commit}")
|
||||
shaOut, resolveErr = shaCmd.CombinedOutput()
|
||||
}
|
||||
} else {
|
||||
refName := ref
|
||||
if strings.HasPrefix(ref, "sha:") {
|
||||
refName = strings.TrimPrefix(ref, "sha:")
|
||||
}
|
||||
shaCmd := exec.CommandContext(ctx, "git", "-C", workDir,
|
||||
"rev-parse", "--verify", refName+"^{commit}")
|
||||
shaOut, resolveErr = shaCmd.CombinedOutput()
|
||||
}
|
||||
if resolveErr != nil {
|
||||
return "", fmt.Errorf("github resolver: rev-parse %s failed: %w (%s)",
|
||||
ref, resolveErr, string(shaOut))
|
||||
}
|
||||
return strings.TrimSpace(string(shaOut)), nil
|
||||
}
|
||||
|
||||
@ -18,7 +18,6 @@ import (
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/plugins"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/ws"
|
||||
@ -27,7 +26,7 @@ import (
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provisioner, platformURL, configsDir string, wh *handlers.WorkspaceHandler, channelMgr *channels.Manager, memBundle *memwiring.Bundle, pluginResolver plugins.SourceResolver) *gin.Engine {
|
||||
func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provisioner, platformURL, configsDir string, wh *handlers.WorkspaceHandler, channelMgr *channels.Manager, memBundle *memwiring.Bundle) *gin.Engine {
|
||||
r := gin.Default()
|
||||
|
||||
// Issue #179 — trust no reverse-proxy headers. Without this call Gin's
|
||||
@ -499,15 +498,6 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
r.POST("/admin/workspace-images/refresh", middleware.AdminAuth(db.DB), imgH.Refresh)
|
||||
}
|
||||
|
||||
// Admin — plugin version-subscription drift queue (core#123).
|
||||
// List pending drift entries and apply approved updates.
|
||||
{
|
||||
driftH := handlers.NewAdminPluginDriftHandler(plgh)
|
||||
adminAuth := r.Group("", middleware.AdminAuth(db.DB))
|
||||
adminAuth.GET("/admin/plugin-updates-pending", driftH.ListPending)
|
||||
adminAuth.POST("/admin/plugin-updates/:id/apply", driftH.Apply)
|
||||
}
|
||||
|
||||
// Admin — test token minting (issue #6). Hidden in production via TestTokensEnabled().
|
||||
// NOT behind AdminAuth — this is the bootstrap endpoint E2E tests and
|
||||
// fresh installs use to obtain their first admin bearer. Adding AdminAuth
|
||||
@ -625,16 +615,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
|
||||
).Scan(&instanceID)
|
||||
return instanceID, err
|
||||
}
|
||||
// pluginResolver: when provided (normal production), use it for plgh so
|
||||
// the drift sweeper (which also gets the same resolver in main.go) uses
|
||||
// identical resolver state. When nil (test / backward compat), let
|
||||
// NewPluginsHandler create its own default registry.
|
||||
plgh := handlers.NewPluginsHandler(pluginsDir, dockerCli, wh.RestartByID).
|
||||
WithRuntimeLookup(runtimeLookup).
|
||||
WithInstanceIDLookup(instanceIDLookup)
|
||||
if pluginResolver != nil {
|
||||
plgh = plgh.WithSourceResolver(pluginResolver)
|
||||
}
|
||||
r.GET("/plugins", plgh.ListRegistry)
|
||||
r.GET("/plugins/sources", plgh.ListSources)
|
||||
wsAuth.GET("/plugins", plgh.ListInstalled)
|
||||
|
||||
@ -1,10 +0,0 @@
|
||||
-- Down migration for plugin_drift_queue.
|
||||
-- Reverses the two changes introduced by the up migration.
|
||||
|
||||
-- 1. Remove plugin_update_queue (all queued drift entries are discarded).
|
||||
DROP TABLE IF EXISTS plugin_update_queue;
|
||||
|
||||
-- 2. Remove installed_sha column from workspace_plugins.
|
||||
-- Existing drift sweeper rows are unaffected (sweeper doesn't exist yet
|
||||
-- in this version of the codebase — this is a forward-only component).
|
||||
ALTER TABLE workspace_plugins DROP COLUMN IF EXISTS installed_sha;
|
||||
@ -1,64 +0,0 @@
|
||||
-- plugin_drift_queue: plugin_update_queue table + installed_sha column.
|
||||
--
|
||||
-- Migration order:
|
||||
-- 1. plugin_update_queue — new table, safe to create first.
|
||||
-- 2. installed_sha on workspace_plugins — added column; existing rows stay NULL
|
||||
-- (no installed_sha until they are re-installed).
|
||||
--
|
||||
-- Why two changes in one migration: the drift detector reads from
|
||||
-- workspace_plugins (needs installed_sha to compare) and writes to
|
||||
-- plugin_update_queue. Both must exist before the sweeper starts.
|
||||
-- The alternative — a separate migration for installed_sha — would mean
|
||||
-- the sweeper could start after migration-1 but before migration-2,
|
||||
-- writing queue rows with NULL installed_sha. Keeping them together
|
||||
-- avoids that race without needing a schema-lock flag.
|
||||
|
||||
-- plugin_update_queue: records upstream drift for operator review before
|
||||
-- the platform auto-applies the update.
|
||||
--
|
||||
-- Rows are created by the drift sweeper when workspace_plugins.tracked_ref
|
||||
-- is not 'none' and the upstream SHA differs from installed_sha.
|
||||
-- Rows are consumed by the admin apply endpoint (core#123).
|
||||
--
|
||||
-- Uniqueness: one pending row per (workspace_id, plugin_name). A new drift
|
||||
-- while a row is still pending is a no-op — the existing pending row
|
||||
-- reflects the same desired update.
|
||||
CREATE TABLE IF NOT EXISTS plugin_update_queue (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE,
|
||||
plugin_name TEXT NOT NULL,
|
||||
tracked_ref TEXT NOT NULL,
|
||||
current_sha TEXT NOT NULL, -- SHA we had installed
|
||||
latest_sha TEXT NOT NULL, -- SHA upstream resolved to
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
-- Valid statuses: pending | applied | dismissed
|
||||
-- 'pending': drift detected, awaiting operator review
|
||||
-- 'applied': operator confirmed, plugin re-installed
|
||||
-- 'dismissed': operator explicitly ignored this drift
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
CONSTRAINT plugin_update_queue_status CHECK (status IN ('pending', 'applied', 'dismissed'))
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS plugin_update_queue_pending_unique
|
||||
ON plugin_update_queue(workspace_id, plugin_name)
|
||||
WHERE status = 'pending';
|
||||
|
||||
-- Partial index: the GET /admin/plugin-updates-pending query filters by
|
||||
-- status = 'pending' on every call.
|
||||
CREATE INDEX IF NOT EXISTS plugin_update_queue_status_pending
|
||||
ON plugin_update_queue(created_at)
|
||||
WHERE status = 'pending';
|
||||
|
||||
-- Add installed_sha to workspace_plugins. This column stores the SHA that
|
||||
-- was last successfully installed. The drift sweeper compares this against
|
||||
-- the upstream-resolved SHA for the tracked_ref to detect drift.
|
||||
--
|
||||
-- NULL means: the row exists (was written by an install before this
|
||||
-- migration) but we don't know what SHA was installed. The drift sweeper
|
||||
-- treats NULL as "no drift possible yet" — it only compares rows where
|
||||
-- installed_sha IS NOT NULL.
|
||||
--
|
||||
-- The column is updated by:
|
||||
-- (a) recordWorkspacePluginInstall at install time (always set)
|
||||
-- (b) the admin apply endpoint after a successful re-install (always set)
|
||||
ALTER TABLE workspace_plugins ADD COLUMN installed_sha TEXT;
|
||||
Loading…
Reference in New Issue
Block a user