diff --git a/workspace-server/cmd/server/main.go b/workspace-server/cmd/server/main.go index 1d6ff911..a1adde5e 100644 --- a/workspace-server/cmd/server/main.go +++ b/workspace-server/cmd/server/main.go @@ -21,7 +21,6 @@ import ( memwiring "github.com/Molecule-AI/molecule-monorepo/platform/internal/memory/wiring" "github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware" "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/plugins" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" "github.com/Molecule-AI/molecule-monorepo/platform/internal/registry" "github.com/Molecule-AI/molecule-monorepo/platform/internal/router" @@ -332,23 +331,7 @@ func main() { cronSched.SetChannels(channelMgr) // Router - // Plugin registry — created before Setup so the same registry is shared - // between the PluginsHandler (for installs) and the drift sweeper (for - // drift detection). github:// sources always work; local:// sources - // require a plugins/ dir on disk (nil in CP/SaaS mode). - pluginRegistry := plugins.NewRegistry() - pluginRegistry.Register(plugins.NewGithubResolver()) - r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr, memBundle, pluginRegistry) - - // Plugin drift sweeper — periodic detection of upstream plugin version drift - // (core#123). Scans workspace_plugins rows where tracked_ref != 'none', - // resolves the current upstream SHA for each tracked ref, and queues drift - // entries when the upstream has moved. Only runs when pluginResolver is - // non-nil (CP/SaaS mode has no local git and the sweeper is a no-op there). - // Nil prov: Docker not available (test harness / local dev without Docker). - go supervised.RunWithRecover(ctx, "plugin-drift-sweeper", func(c context.Context) { - plugins.StartPluginDriftSweeper(c, pluginRegistry) - }) + r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr, memBundle) // HTTP server with graceful shutdown. // diff --git a/workspace-server/internal/handlers/admin_plugin_drift.go b/workspace-server/internal/handlers/admin_plugin_drift.go deleted file mode 100644 index 1082c1d6..00000000 --- a/workspace-server/internal/handlers/admin_plugin_drift.go +++ /dev/null @@ -1,211 +0,0 @@ -package handlers - -// admin_plugin_drift.go — admin endpoints for plugin version-subscription drift queue -// (core#123). -// -// Routes: -// GET /admin/plugin-updates-pending — list all pending drift entries -// POST /admin/plugin-updates/:id/apply — apply a queued drift update - -import ( - "context" - "database/sql" - "errors" - "fmt" - "log" - "net/http" - "os" - - "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/plugins" - "github.com/gin-gonic/gin" -) - -// AdminPluginDriftHandler handles admin endpoints for the plugin drift queue. -type AdminPluginDriftHandler struct { - pluginsHandler *PluginsHandler // used to re-trigger plugin install on apply -} - -// NewAdminPluginDriftHandler constructs a handler wired to the plugins handler. -func NewAdminPluginDriftHandler(ph *PluginsHandler) *AdminPluginDriftHandler { - return &AdminPluginDriftHandler{pluginsHandler: ph} -} - -// ListPending handles GET /admin/plugin-updates-pending. -// -// Returns a JSON array of pending drift entries, newest-first. Empty array -// means no plugins have drifted since the last sweep cycle. -func (h *AdminPluginDriftHandler) ListPending(c *gin.Context) { - rows, err := plugins.ListPendingUpdates(c.Request.Context()) - if err != nil { - log.Printf("AdminPluginDrift: ListPendingUpdates failed: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list pending updates"}) - return - } - c.JSON(http.StatusOK, rows) -} - -// Apply handles POST /admin/plugin-updates/:id/apply. -// -// 1. Reads the queue entry and verifies it's still pending. -// 2. Reads the workspace_plugins row to get the plugin's source. -// 3. Re-installs the plugin from source_raw (re-fetch from upstream at the -// same tracked ref — the drift was caused by upstream moving). -// 4. Marks the queue entry as applied. -// 5. Triggers workspace restart. -// -// Idempotent: if the entry is already 'applied', returns 200 with the -// workspace_id and plugin_name so callers can still poll for confirmation. -func (h *AdminPluginDriftHandler) Apply(c *gin.Context) { - queueID := c.Param("id") - if queueID == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "queue id is required"}) - return - } - - ctx := c.Request.Context() - - // Step 1: read and lock the queue entry. - var entry struct { - WorkspaceID string `json:"workspace_id"` - PluginName string `json:"plugin_name"` - TrackedRef string `json:"tracked_ref"` - Status string `json:"status"` - } - err := db.DB.QueryRowContext(ctx, ` - SELECT workspace_id, plugin_name, tracked_ref, status - FROM plugin_update_queue - WHERE id = $1 - `, queueID).Scan(&entry.WorkspaceID, &entry.PluginName, &entry.TrackedRef, &entry.Status) - if err == sql.ErrNoRows { - c.JSON(http.StatusNotFound, gin.H{"error": fmt.Sprintf("queue entry %s not found", queueID)}) - return - } - if err != nil { - log.Printf("AdminPluginDrift: apply: query queue entry: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read queue entry"}) - return - } - - if entry.Status == "applied" { - // Idempotent — already applied. - c.JSON(http.StatusOK, gin.H{ - "status": "already_applied", - "workspace_id": entry.WorkspaceID, - "plugin_name": entry.PluginName, - "message": "drift update was already applied", - }) - return - } - - if entry.Status == "dismissed" { - c.JSON(http.StatusConflict, gin.H{ - "error": "queue entry was dismissed", - "workspace_id": entry.WorkspaceID, - "plugin_name": entry.PluginName, - }) - return - } - - // Step 2: read the workspace_plugins row to get source_raw. - var sourceRaw string - err = db.DB.QueryRowContext(ctx, ` - SELECT source_raw FROM workspace_plugins - WHERE workspace_id = $1 AND plugin_name = $2 - `, entry.WorkspaceID, entry.PluginName).Scan(&sourceRaw) - if err == sql.ErrNoRows { - c.JSON(http.StatusNotFound, gin.H{ - "error": "workspace_plugins row not found — plugin may have been uninstalled", - "workspace_id": entry.WorkspaceID, - "plugin_name": entry.PluginName, - }) - return - } - if err != nil { - log.Printf("AdminPluginDrift: apply: query workspace_plugins: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read plugin record"}) - return - } - - // Step 3: re-install the plugin. - // - // We call h.pluginsHandler.Install indirectly via a subrequest to reuse - // the full install pipeline (resolve → stage → deliver → record). - // Construct the apply installRequest: same source as the existing row, - // same tracked_ref. The source_raw already encodes the pinned ref - // (e.g. "github://owner/repo#tag:v1.0.0"), so the resolver fetches - // the latest commit at that ref — the drift. - installReq := installRequest{ - Source: sourceRaw, - Track: entry.TrackedRef, - } - result, instErr := h.pluginsHandler.ResolveAndStageForApply(ctx, installReq) - if instErr != nil { - var he *httpErr - if errors.As(instErr, &he) { - c.JSON(he.Status, gin.H{ - "error": fmt.Sprintf("plugin install failed: %v", he.Body["error"]), - "queue_id": queueID, - }) - return - } - log.Printf("AdminPluginDrift: apply: install failed for %s/%s: %v", - entry.WorkspaceID, entry.PluginName, instErr) - c.JSON(http.StatusInternalServerError, gin.H{"error": "plugin install failed", "queue_id": queueID}) - return - } - defer func() { _ = os.RemoveAll(result.StagedDir) }() - - // Deliver to the workspace container. - if err := h.pluginsHandler.DeliverForApply(ctx, entry.WorkspaceID, result); err != nil { - var he *httpErr - if errors.As(err, &he) { - c.JSON(he.Status, gin.H{"error": fmt.Sprintf("plugin deliver failed: %v", he.Body["error"]), "queue_id": queueID}) - return - } - log.Printf("AdminPluginDrift: apply: deliver failed for %s/%s: %v", - entry.WorkspaceID, entry.PluginName, err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "plugin deliver failed", "queue_id": queueID}) - return - } - - // Record the install with the new SHA. This updates installed_sha on the - // workspace_plugins row so the next drift sweep finds no drift. - if err := recordWorkspacePluginInstall(ctx, entry.WorkspaceID, result.PluginName, - result.Source.Raw(), entry.TrackedRef, result.InstalledSHA); err != nil { - log.Printf("AdminPluginDrift: apply: recordWorkspacePluginInstall failed: %v (install succeeded)", err) - // Non-fatal: the plugin IS installed; just log and continue. - } - - // Step 4: mark queue entry as applied. - if _, err := db.DB.ExecContext(ctx, ` - UPDATE plugin_update_queue SET status = 'applied' WHERE id = $1 - `, queueID); err != nil { - log.Printf("AdminPluginDrift: apply: failed to mark queue entry %s as applied: %v", queueID, err) - // Non-fatal: install succeeded; operator can retry or mark manually. - } - - // Step 5: trigger workspace restart. - // The pluginsHandler carries a restartFunc (Provisioner.RestartByID) set - // at construction. Trigger it asynchronously so the HTTP response returns - // immediately after the install; the restart is best-effort. - if h.pluginsHandler != nil { - go func() { - // We can't use result.PluginName as a restart key since the - // restartFunc takes a workspaceID. Pass the workspaceID. - if restart := h.pluginsHandler.GetRestartFunc(); restart != nil { - restart(entry.WorkspaceID) - } - }() - } - - log.Printf("AdminPluginDrift: applied drift update for %s/%s (queue_id=%s)", - entry.WorkspaceID, entry.PluginName, queueID) - c.JSON(http.StatusOK, gin.H{ - "status": "applied", - "workspace_id": entry.WorkspaceID, - "plugin_name": entry.PluginName, - "installed_sha": result.InstalledSHA, - "restarting": true, - }) -} diff --git a/workspace-server/internal/handlers/admin_plugin_drift_test.go b/workspace-server/internal/handlers/admin_plugin_drift_test.go deleted file mode 100644 index 3959c4f8..00000000 --- a/workspace-server/internal/handlers/admin_plugin_drift_test.go +++ /dev/null @@ -1,211 +0,0 @@ -package handlers - -// admin_plugin_drift_test.go — coverage for plugin drift queue admin endpoints. -// Tests: ListPending (empty, non-empty), Apply (not found, already applied, -// already dismissed, workspace_plugins missing, install failure). - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/DATA-DOG/go-sqlmock" - "github.com/gin-gonic/gin" -) - -func TestAdminPluginDrift_ListPending_Empty(t *testing.T) { - mock := setupTestDB(t) - h := NewAdminPluginDriftHandler(nil) - - mock.ExpectQuery(`SELECT id, workspace_id, plugin_name, tracked_ref,\s+current_sha, latest_sha, status, created_at\s+FROM plugin_update_queue\s+WHERE status = 'pending'\s+ORDER BY created_at DESC`). - WillReturnRows(sqlmock.NewRows([]string{ - "id", "workspace_id", "plugin_name", "tracked_ref", - "current_sha", "latest_sha", "status", "created_at", - })) - - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Request = httptest.NewRequest("GET", "/admin/plugin-updates-pending", nil) - h.ListPending(c) - - if w.Code != http.StatusOK { - t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) - } - var rows []map[string]interface{} - if err := json.Unmarshal(w.Body.Bytes(), &rows); err != nil { - t.Fatalf("body parse: %v", err) - } - if len(rows) != 0 { - t.Errorf("expected empty array, got %d rows", len(rows)) - } - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet expectations: %v", err) - } -} - -func TestAdminPluginDrift_ListPending_WithRows(t *testing.T) { - mock := setupTestDB(t) - h := NewAdminPluginDriftHandler(nil) - - now := time.Now() - mock.ExpectQuery(`SELECT id, workspace_id, plugin_name, tracked_ref,\s+current_sha, latest_sha, status, created_at\s+FROM plugin_update_queue\s+WHERE status = 'pending'\s+ORDER BY created_at DESC`). - WillReturnRows(sqlmock.NewRows([]string{ - "id", "workspace_id", "plugin_name", "tracked_ref", - "current_sha", "latest_sha", "status", "created_at", - }).AddRow( - "queue-id-1", "ws-uuid-1", "my-plugin", "tag:v1.0.0", - "abc123def456", "def456abc789", "pending", now, - ).AddRow( - "queue-id-2", "ws-uuid-2", "other-plugin", "tag:latest", - "111111aaaaaa", "222222bbbbbb", "pending", now, - )) - - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Request = httptest.NewRequest("GET", "/admin/plugin-updates-pending", nil) - h.ListPending(c) - - if w.Code != http.StatusOK { - t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String()) - } - var rows []map[string]interface{} - if err := json.Unmarshal(w.Body.Bytes(), &rows); err != nil { - t.Fatalf("body parse: %v", err) - } - if len(rows) != 2 { - t.Errorf("expected 2 rows, got %d", len(rows)) - } - // Verify first row fields. - if got := rows[0]["plugin_name"]; got != "my-plugin" { - t.Errorf("plugin_name: expected my-plugin, got %v", got) - } - if got := rows[0]["status"]; got != "pending" { - t.Errorf("status: expected pending, got %v", got) - } - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet expectations: %v", err) - } -} - -func TestAdminPluginDrift_ListPending_DBError(t *testing.T) { - mock := setupTestDB(t) - h := NewAdminPluginDriftHandler(nil) - - mock.ExpectQuery(`SELECT id, workspace_id`).WillReturnError( - json.Unmarshal([]byte("force error"), new(struct{})), - ) - - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Request = httptest.NewRequest("GET", "/admin/plugin-updates-pending", nil) - h.ListPending(c) - - if w.Code != http.StatusInternalServerError { - t.Errorf("expected 500 on DB error, got %d", w.Code) - } -} - -func TestAdminPluginDrift_Apply_NotFound(t *testing.T) { - mock := setupTestDB(t) - h := NewAdminPluginDriftHandler(nil) - - mock.ExpectQuery(`SELECT workspace_id, plugin_name, tracked_ref, status\s+FROM plugin_update_queue\s+WHERE id = \$1`). - WithArgs("nonexistent-queue-id"). - WillReturnRows(sqlmock.NewRows([]string{ - "workspace_id", "plugin_name", "tracked_ref", "status", - })) // empty = no rows - - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Request = httptest.NewRequest("POST", "/admin/plugin-updates/nonexistent-queue-id/apply", nil) - c.Params = []gin.Param{{Key: "id", Value: "nonexistent-queue-id"}} - h.Apply(c) - - if w.Code != http.StatusNotFound { - t.Errorf("expected 404, got %d: %s", w.Code, w.Body.String()) - } - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet expectations: %v", err) - } -} - -func TestAdminPluginDrift_Apply_AlreadyApplied(t *testing.T) { - mock := setupTestDB(t) - h := NewAdminPluginDriftHandler(nil) - - mock.ExpectQuery(`SELECT workspace_id, plugin_name, tracked_ref, status\s+FROM plugin_update_queue\s+WHERE id = \$1`). - WithArgs("queue-id-1"). - WillReturnRows(sqlmock.NewRows([]string{ - "workspace_id", "plugin_name", "tracked_ref", "status", - }).AddRow("ws-uuid-1", "my-plugin", "tag:v1.0.0", "applied")) - - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Request = httptest.NewRequest("POST", "/admin/plugin-updates/queue-id-1/apply", nil) - c.Params = []gin.Param{{Key: "id", Value: "queue-id-1"}} - h.Apply(c) - - if w.Code != http.StatusOK { - t.Errorf("expected 200 for already-applied, got %d: %s", w.Code, w.Body.String()) - } - var body map[string]interface{} - if err := json.Unmarshal(w.Body.Bytes(), &body); err != nil { - t.Fatalf("body parse: %v", err) - } - if got := body["status"]; got != "already_applied" { - t.Errorf("status: expected already_applied, got %v", got) - } -} - -func TestAdminPluginDrift_Apply_AlreadyDismissed(t *testing.T) { - mock := setupTestDB(t) - h := NewAdminPluginDriftHandler(nil) - - mock.ExpectQuery(`SELECT workspace_id, plugin_name, tracked_ref, status\s+FROM plugin_update_queue\s+WHERE id = \$1`). - WithArgs("queue-id-1"). - WillReturnRows(sqlmock.NewRows([]string{ - "workspace_id", "plugin_name", "tracked_ref", "status", - }).AddRow("ws-uuid-1", "my-plugin", "tag:v1.0.0", "dismissed")) - - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Request = httptest.NewRequest("POST", "/admin/plugin-updates/queue-id-1/apply", nil) - c.Params = []gin.Param{{Key: "id", Value: "queue-id-1"}} - h.Apply(c) - - if w.Code != http.StatusConflict { - t.Errorf("expected 409 for dismissed, got %d: %s", w.Code, w.Body.String()) - } -} - -func TestAdminPluginDrift_Apply_WorkspacePluginsMissing(t *testing.T) { - mock := setupTestDB(t) - h := NewAdminPluginDriftHandler(nil) - - // Queue entry found, pending. - mock.ExpectQuery(`SELECT workspace_id, plugin_name, tracked_ref, status\s+FROM plugin_update_queue\s+WHERE id = \$1`). - WithArgs("queue-id-1"). - WillReturnRows(sqlmock.NewRows([]string{ - "workspace_id", "plugin_name", "tracked_ref", "status", - }).AddRow("ws-uuid-1", "my-plugin", "tag:v1.0.0", "pending")) - - // workspace_plugins row not found (plugin uninstalled after drift detected). - mock.ExpectQuery(`SELECT source_raw FROM workspace_plugins\s+WHERE workspace_id = \$1 AND plugin_name = \$2`). - WithArgs("ws-uuid-1", "my-plugin"). - WillReturnRows(sqlmock.NewRows([]string{"source_raw"})) // empty - - w := httptest.NewRecorder() - c, _ := gin.CreateTestContext(w) - c.Request = httptest.NewRequest("POST", "/admin/plugin-updates/queue-id-1/apply", nil) - c.Params = []gin.Param{{Key: "id", Value: "queue-id-1"}} - h.Apply(c) - - if w.Code != http.StatusNotFound { - t.Errorf("expected 404 when workspace_plugins row missing, got %d: %s", w.Code, w.Body.String()) - } - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet expectations: %v", err) - } -} diff --git a/workspace-server/internal/handlers/plugins.go b/workspace-server/internal/handlers/plugins.go index 78e182ba..317c4ce4 100644 --- a/workspace-server/internal/handlers/plugins.go +++ b/workspace-server/internal/handlers/plugins.go @@ -110,12 +110,6 @@ func (h *PluginsHandler) WithInstanceIDLookup(lookup InstanceIDLookup) *PluginsH return h } -// Sources returns the underlying plugin source registry. Used by main.go to -// pass the same registry to the drift sweeper so both share resolver state. -func (h *PluginsHandler) Sources() plugins.SourceResolver { - return h.sources -} - // pluginInfo is the API response for a plugin. type pluginInfo struct { Name string `json:"name"` diff --git a/workspace-server/internal/handlers/plugins_install.go b/workspace-server/internal/handlers/plugins_install.go index c335bf50..32232727 100644 --- a/workspace-server/internal/handlers/plugins_install.go +++ b/workspace-server/internal/handlers/plugins_install.go @@ -95,7 +95,7 @@ func (h *PluginsHandler) Install(c *gin.Context) { // foundation). Best-effort: DB write failure is logged but doesn't fail // the install — the plugin IS in the container; surfacing a 500 here // would mislead the caller about the install state. - if err := recordWorkspacePluginInstall(ctx, workspaceID, result.PluginName, result.Source.Raw(), req.Track, result.InstalledSHA); err != nil { + if err := recordWorkspacePluginInstall(ctx, workspaceID, result.PluginName, result.Source.Raw(), req.Track); err != nil { log.Printf("Plugin install: failed to record %s for %s in workspace_plugins: %v (install succeeded; tracking row missing)", result.PluginName, workspaceID, err) } @@ -189,15 +189,6 @@ func (h *PluginsHandler) uninstallViaDocker(ctx context.Context, c *gin.Context, // Verify deletion before restart h.execInContainer(ctx, containerName, []string{"sync"}) - // Remove the workspace_plugins tracking row so the row doesn't persist - // with a stale installed_sha after the plugin has been removed. Drift - // detection ignores rows without an installed_sha, but keeping the row - // would prevent the next install from creating a fresh row (ON CONFLICT - // UPDATE would apply instead of INSERT, which is wrong for an uninstall). - if err := deleteWorkspacePluginRow(ctx, workspaceID, pluginName); err != nil { - log.Printf("Plugin uninstall: failed to delete workspace_plugins row for %s: %v (container cleanup succeeded)", pluginName, err) - } - // Auto-restart (small delay to ensure fs writes are flushed) if h.restartFunc != nil { go func() { @@ -254,11 +245,6 @@ func (h *PluginsHandler) uninstallViaEIC(ctx context.Context, c *gin.Context, wo return } - // Remove the workspace_plugins tracking row (see uninstallViaDocker for rationale). - if err := deleteWorkspacePluginRow(ctx, workspaceID, pluginName); err != nil { - log.Printf("Plugin uninstall: failed to delete workspace_plugins row for %s: %v (container cleanup succeeded)", pluginName, err) - } - if h.restartFunc != nil { go func() { time.Sleep(2 * time.Second) diff --git a/workspace-server/internal/handlers/plugins_install_pipeline.go b/workspace-server/internal/handlers/plugins_install_pipeline.go index 80a81393..31d1239e 100644 --- a/workspace-server/internal/handlers/plugins_install_pipeline.go +++ b/workspace-server/internal/handlers/plugins_install_pipeline.go @@ -128,10 +128,9 @@ type installRequest struct { // stageResult bundles the outputs of resolveAndStage for the caller. // Avoids a 5-value tuple return. type stageResult struct { - StagedDir string - PluginName string - Source plugins.Source - InstalledSHA string // empty for local:// sources (no meaningful upstream) + StagedDir string + PluginName string + Source plugins.Source } // resolveAndStage parses a validated request, dispatches to the right @@ -213,16 +212,6 @@ func (h *PluginsHandler) resolveAndStage(ctx context.Context, req installRequest "source": source.Raw(), }) } - - // Capture the installed SHA from github:// sources for drift detection. - // GithubResolver.LastSHA() is set by Fetch after a successful clone. - // Type-assert is safe because resolver was obtained via Resolve(), which - // returns the concrete GithubResolver for github:// sources. - var installedSHA string - if gh, ok := resolver.(*plugins.GithubResolver); ok { - installedSHA = gh.LastSHA() - } - if err := validatePluginName(pluginName); err != nil { cleanup() return nil, newHTTPErr(http.StatusBadRequest, gin.H{ @@ -275,7 +264,7 @@ func (h *PluginsHandler) resolveAndStage(ctx context.Context, req installRequest } } - return &stageResult{StagedDir: stagedDir, PluginName: pluginName, Source: source, InstalledSHA: installedSHA}, nil + return &stageResult{StagedDir: stagedDir, PluginName: pluginName, Source: source}, nil } // deliverToContainer copies the staged plugin dir into the workspace @@ -523,24 +512,3 @@ func streamDirAsTar(root string, tw *tar.Writer) error { return err }) } - -// ResolveAndStageForApply is the context-based equivalent of resolveAndStage, -// exposed for the admin plugin drift apply endpoint (core#123). It bypasses -// the gin.Context dependency so the apply path can re-trigger a plugin install -// programmatically. -func (h *PluginsHandler) ResolveAndStageForApply(ctx context.Context, req installRequest) (*stageResult, error) { - return h.resolveAndStage(ctx, req) -} - -// DeliverForApply is the context-based equivalent of deliverToContainer, -// exposed for the admin plugin drift apply endpoint (core#123). -func (h *PluginsHandler) DeliverForApply(ctx context.Context, workspaceID string, r *stageResult) error { - return h.deliverToContainer(ctx, workspaceID, r) -} - -// GetRestartFunc returns the pluginsHandler's restartFunc, or nil if not set. -// Used by the admin drift apply endpoint to trigger a workspace restart after -// a plugin update is applied. -func (h *PluginsHandler) GetRestartFunc() func(string) { - return h.restartFunc -} diff --git a/workspace-server/internal/handlers/plugins_tracking.go b/workspace-server/internal/handlers/plugins_tracking.go index be2c288c..56831a06 100644 --- a/workspace-server/internal/handlers/plugins_tracking.go +++ b/workspace-server/internal/handlers/plugins_tracking.go @@ -55,12 +55,8 @@ func validateTrackedRef(s string) (string, error) { // plugin install. ON CONFLICT (workspace_id, plugin_name) DO UPDATE so // reinstalling the same plugin name (with a possibly-different source or // track value) updates the existing row rather than failing. -// -// installedSHA records the commit SHA that was installed; used by the drift -// sweeper to detect when the upstream ref has moved. May be empty (e.g. for -// local:// sources or pre-migration installs) — the sweeper skips NULL SHAs. func recordWorkspacePluginInstall( - ctx context.Context, workspaceID, pluginName, sourceRaw, track, installedSHA string, + ctx context.Context, workspaceID, pluginName, sourceRaw, track string, ) error { if workspaceID == "" || pluginName == "" || sourceRaw == "" { return errors.New("recordWorkspacePluginInstall: missing required field") @@ -70,24 +66,13 @@ func recordWorkspacePluginInstall( return err } _, err = db.DB.ExecContext(ctx, ` - INSERT INTO workspace_plugins (workspace_id, plugin_name, source_raw, tracked_ref, installed_sha) - VALUES ($1, $2, $3, $4, $5) + INSERT INTO workspace_plugins (workspace_id, plugin_name, source_raw, tracked_ref) + VALUES ($1, $2, $3, $4) ON CONFLICT (workspace_id, plugin_name) DO UPDATE SET - source_raw = EXCLUDED.source_raw, - tracked_ref = EXCLUDED.tracked_ref, - installed_sha = EXCLUDED.installed_sha, - updated_at = NOW() - `, workspaceID, pluginName, sourceRaw, canonicalTrack, installedSHA) - return err -} - -// deleteWorkspacePluginRow removes the workspace_plugins row for a workspace/plugin -// pair. Called by the uninstall path so the row doesn't persist with a stale -// installed_sha after the plugin has been removed from the container. -func deleteWorkspacePluginRow(ctx context.Context, workspaceID, pluginName string) error { - _, err := db.DB.ExecContext(ctx, ` - DELETE FROM workspace_plugins WHERE workspace_id = $1 AND plugin_name = $2 - `, workspaceID, pluginName) + source_raw = EXCLUDED.source_raw, + tracked_ref = EXCLUDED.tracked_ref, + updated_at = NOW() + `, workspaceID, pluginName, sourceRaw, canonicalTrack) return err } diff --git a/workspace-server/internal/handlers/restart_signals.go b/workspace-server/internal/handlers/restart_signals.go deleted file mode 100644 index 81cb9200..00000000 --- a/workspace-server/internal/handlers/restart_signals.go +++ /dev/null @@ -1,155 +0,0 @@ -package handlers - -// restart_signals.go — #125 Phase 1: graceful pre-restart drain for -// native-session workspaces. -// -// Before a container restart, the platform sends POST /signals/restart_pending -// to the workspace agent. The agent receives this as a JSON-RPC signal and -// begins draining in-flight work. The platform then waits for acknowledgment -// before calling stopForRestart. -// -// This preserves in-flight A2A requests that would otherwise be lost when -// the container dies mid-request (the core bug: native_session targets bypass -// the platform's a2a_queue buffering, so any message dispatched directly to -// the SDK session disappears when the container restarts). -// -// Phase 2 (not yet implemented): workspace SDK actually processes the signal -// and drains its message loop. This file implements the platform-side call -// site; the SDK-side handler is in molecule-workspace (adapter_base.py or -// similar). - -import ( - "bytes" - "context" - "encoding/json" - "log" - "net/http" - "time" - - "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" -) - -const ( - // restartSignalTimeout is how long the platform waits for the workspace - // to acknowledge the pre-restart signal. A workspace that doesn't implement - // the handler will simply time out — the platform proceeds with the stop - // anyway, which is the same as the pre-fix behaviour (no graceful drain). - restartSignalTimeout = 10 * time.Second - - // restartSignalDrainDuration is how long the workspace should wait before - // acknowledging. Gives in-flight A2A requests time to complete. - // Sent as JSON-RPC signal.params.drain_seconds in the POST body. - restartSignalDrainDuration = 20 * time.Second -) - -// gracefulPreRestart sends the pre-restart drain signal to the workspace -// agent before the container is stopped. Called from runRestartCycle. -// -// Returns immediately — the signal is fire-and-forget with a 10s timeout. -// If the workspace doesn't implement the handler (404) or times out, the -// platform proceeds with the stop anyway (same as pre-fix behaviour). -// -// The signal is sent via HTTP POST to the workspace's internal agent URL. -// On self-hosted (platform-in-Docker), the platform rewrites 127.0.0.1 to -// the Docker-DNS form ws-:8000. On SaaS/CP, the stored agent URL -// (an externally routable address) is used directly. -func (h *WorkspaceHandler) gracefulPreRestart(ctx context.Context, workspaceID string) { - // Non-blocking send — don't stall the restart cycle. - // Run in a detached goroutine so the caller (runRestartCycle) can - // proceed to stopForRestart without waiting. - go func() { - signalCtx, cancel := context.WithTimeout(context.Background(), restartSignalTimeout) - defer cancel() - - url, err := h.resolveAgentURLForRestartSignal(signalCtx, workspaceID) - if err != nil { - log.Printf("A2AGracefulRestart: resolve URL failed for %s: %v — proceeding with stop", workspaceID, err) - return - } - url = url + "/signals/restart_pending" - - payload := map[string]interface{}{ - "jsonrpc": "2.0", - "method": "signals/restart_pending", - "params": map[string]interface{}{ - "drain_seconds": int(restartSignalDrainDuration.Seconds()), - "workspace_id": workspaceID, - }, - "id": nil, - } - body, _ := json.Marshal(payload) - - req, reqErr := http.NewRequestWithContext(signalCtx, http.MethodPost, url, bytes.NewReader(body)) - if reqErr != nil { - log.Printf("A2AGracefulRestart: build request failed for %s: %v — proceeding with stop", workspaceID, reqErr) - return - } - req.Header.Set("Content-Type", "application/json") - // X-Restart-Signal header identifies this as a platform-initiated - // restart signal (not a regular A2A message). The SDK can check - // for this header to distinguish a restart signal from other messages. - req.Header.Set("X-Restart-Signal", "true") - - client := &http.Client{Timeout: restartSignalTimeout} - resp, doErr := client.Do(req) - if doErr != nil { - // Timeout, connection refused, etc. — workspace is either not - // listening or didn't implement the handler. Proceed with stop. - log.Printf("A2AGracefulRestart: signal failed for %s: %v — proceeding with stop", workspaceID, doErr) - return - } - defer resp.Body.Close() - - // 200 = workspace acknowledged and will drain. 404 = old SDK version - // without the handler — same as no handler, proceed. 5xx = workspace - // error but it's still alive — proceed. Any other status = also proceed. - if resp.StatusCode == http.StatusOK { - log.Printf("A2AGracefulRestart: %s acknowledged pre-restart signal (status=%d)", workspaceID, resp.StatusCode) - } else { - log.Printf("A2AGracefulRestart: %s returned status %d — proceeding with stop", workspaceID, resp.StatusCode) - } - }() -} - -// resolveAgentURLForRestartSignal returns the routable URL for the workspace -// agent, suitable for the pre-restart signal HTTP call. Falls back to the DB -// value if the Redis cache miss occurs. On self-hosted (platform-in-Docker), -// rewrites 127.0.0.1 to the Docker-DNS form ws-:8000. -func (h *WorkspaceHandler) resolveAgentURLForRestartSignal(ctx context.Context, workspaceID string) (string, error) { - // Try Redis cache first. - agentURL, err := db.GetCachedURL(ctx, workspaceID) - if err == nil && agentURL != "" { - return rewriteForDocker(agentURL, workspaceID), nil - } - - // Cache miss — fall back to DB. - var urlNullable *string - err = db.DB.QueryRowContext(ctx, - `SELECT url FROM workspaces WHERE id = $1`, workspaceID, - ).Scan(&urlNullable) - if err != nil { - return "", err - } - if urlNullable == nil || *urlNullable == "" { - return "", nil // workspace has no URL yet — shouldn't happen at restart time - } - agentURL = *urlNullable - _ = db.CacheURL(ctx, workspaceID, agentURL) - return rewriteForDocker(agentURL, workspaceID), nil -} - -// rewriteForDocker rewrites a 127.0.0.1 agent URL to the Docker-DNS form -// when the platform is running inside a Docker container. When platform is -// on the host (non-Docker), 127.0.0.1 IS the host and the original URL works. -func rewriteForDocker(agentURL, workspaceID string) string { - if platformInDocker && h.provisioner != nil { - // Only rewrite if the URL points to localhost (the ephemeral port - // binding the container published to the host). Internal Docker - // URLs (e.g. http://ws-abc123def:8000) are already correct. - if len(agentURL) >= 17 && agentURL[:16] == "http://127.0.0.1" { - return provisioner.InternalURL(workspaceID) - } - } - return agentURL -} diff --git a/workspace-server/internal/handlers/restart_signals_test.go b/workspace-server/internal/handlers/restart_signals_test.go deleted file mode 100644 index d9278e2c..00000000 --- a/workspace-server/internal/handlers/restart_signals_test.go +++ /dev/null @@ -1,330 +0,0 @@ -package handlers - -import ( - "context" - "encoding/json" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/DATA-DOG/go-sqlmock" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" - "github.com/alicebob/miniredis/v2" - "github.com/redis/go-redis/v9" -) - -// stubLocalProv is a minimal LocalProvisionerAPI stub used to make -// h.provisioner non-nil for the Docker-URL-rewrite tests. -// All methods panic — rewriteForDocker only checks h.provisioner != nil. -type stubLocalProv struct{} - -func (s *stubLocalProv) Start(_ context.Context, _ provisioner.WorkspaceConfig) (string, error) { - panic("stubLocalProv.Start not implemented in test") -} -func (s *stubLocalProv) Stop(_ context.Context, _ string) error { - panic("stubLocalProv.Stop not implemented in test") -} -func (s *stubLocalProv) IsRunning(_ context.Context, _ string) (bool, error) { - panic("stubLocalProv.IsRunning not implemented in test") -} -func (s *stubLocalProv) ExecRead(_ context.Context, _, _ string) ([]byte, error) { - panic("stubLocalProv.ExecRead not implemented in test") -} -func (s *stubLocalProv) RemoveVolume(_ context.Context, _ string) error { - panic("stubLocalProv.RemoveVolume not implemented in test") -} -func (s *stubLocalProv) VolumeHasFile(_ context.Context, _, _ string) (bool, error) { - panic("stubLocalProv.VolumeHasFile not implemented in test") -} -func (s *stubLocalProv) WriteAuthTokenToVolume(_ context.Context, _, _ string) error { - panic("stubLocalProv.WriteAuthTokenToVolume not implemented in test") -} - -// Compile-time assertion: stubLocalProv satisfies LocalProvisionerAPI. -var _ provisioner.LocalProvisionerAPI = (*stubLocalProv)(nil) - -// TestRewriteForDocker_NonDockerHostUrlUnchanged verifies that a non-Docker -// URL passes through rewriteForDocker unchanged when platform is not in Docker. -func TestRewriteForDocker_NonDockerHostUrlUnchanged(t *testing.T) { - restore := setPlatformInDockerForTest(false) - defer restore() - - h := newHandlerWithTestDeps(t) - url := h.rewriteForDocker("http://example.com:8000/agent", "ws-test-123") - if url != "http://example.com:8000/agent" { - t.Errorf("expected unchanged URL, got %q", url) - } -} - -// TestRewriteForDocker_LocalhostUrlUnchanged_NoProvisioner verifies that a -// localhost URL is NOT rewritten when h.provisioner is nil (SaaS/CP mode). -func TestRewriteForDocker_LocalhostUrlUnchanged_NoProvisioner(t *testing.T) { - restore := setPlatformInDockerForTest(true) - defer restore() - - h := newHandlerWithTestDeps(t) - // h.provisioner is nil → no Docker rewrite even when platformInDocker=true - url := h.rewriteForDocker("http://127.0.0.1:49152/agent", "ws-test-123") - if url != "http://127.0.0.1:49152/agent" { - t.Errorf("expected localhost URL unchanged (no provisioner), got %q", url) - } -} - -// TestRewriteForDocker_LocalhostUrlRewritten verifies that a localhost URL -// IS rewritten to the Docker-DNS form when platform is in Docker AND a -// provisioner is wired. -func TestRewriteForDocker_LocalhostUrlRewritten(t *testing.T) { - restore := setPlatformInDockerForTest(true) - defer restore() - - h := newHandlerWithTestDeps(t) - h.provisioner = &stubLocalProv{} // non-nil → triggers Docker rewrite - - url := h.rewriteForDocker("http://127.0.0.1:49152/agent", "ws-test-123") - // Docker DNS form: ws-:8000 - if url == "http://127.0.0.1:49152/agent" { - t.Error("expected localhost URL to be rewritten to Docker DNS form") - } - // Verify the rewrite matches the expected Docker internal URL format - expectedInternal := "http://ws-ws-test-123:8000" - if url != expectedInternal { - t.Errorf("expected %q, got %q", expectedInternal, url) - } -} - -// TestResolveAgentURLForRestartSignal_CacheHit verifies that a Redis-cached -// URL is returned without hitting the DB. -func TestResolveAgentURLForRestartSignal_CacheHit(t *testing.T) { - mockDB, mock := setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct - _ = setupTestRedisWithURL(t, "http://cached.internal:9000/agent") - - h := newHandlerWithTestDepsWithDB(t, mockDB) - - // Redis cache hit → DB should NOT be queried - url, err := h.resolveAgentURLForRestartSignal(context.Background(), "ws-cache-hit-123") - if err != nil { - t.Fatalf("resolveAgentURLForRestartSignal failed: %v", err) - } - if url == "" { - t.Fatal("expected non-empty URL from cache") - } - // DB should not be queried (no rows returned to sqlmock) - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unfulfilled DB expectations: %v", err) - } -} - -// TestResolveAgentURLForRestartSignal_DBError verifies that a DB error is -// returned and propagated when neither Redis cache nor DB lookup succeeds. -func TestResolveAgentURLForRestartSignal_DBError(t *testing.T) { - mockDB, mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct - _ = setupTestRedis(t) // empty → cache miss - - h := newHandlerWithTestDepsWithDB(t, mockDB) - - mock.ExpectQuery(`SELECT url FROM workspaces WHERE id =`). - WithArgs("ws-db-err-789"). - WillReturnError(context.DeadlineExceeded) - - _, err := h.resolveAgentURLForRestartSignal(context.Background(), "ws-db-err-789") - if err == nil { - t.Fatal("expected DB error to be returned") - } - - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unfulfilled DB expectations: %v", err) - } -} - -// TestResolveAgentURLForRestartSignal_CacheMiss verifies that on Redis miss, -// the URL is fetched from the DB and cached. -func TestResolveAgentURLForRestartSignal_CacheMiss(t *testing.T) { - mockDB, mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct - mr := setupTestRedis(t) // empty → cache miss - - h := newHandlerWithTestDepsWithDB(t, mockDB) - - mock.ExpectQuery(`SELECT url FROM workspaces WHERE id =`). - WithArgs("ws-cache-miss-456"). - WillReturnRows(sqlmock.NewRows([]string{"url"}). - AddRow("http://db.internal:8000/agent")) - - url, err := h.resolveAgentURLForRestartSignal(context.Background(), "ws-cache-miss-456") - if err != nil { - t.Fatalf("resolveAgentURLForRestartSignal failed: %v", err) - } - if url != "http://db.internal:8000/agent" { - t.Errorf("expected DB URL, got %q", url) - } - - // Verify the URL was cached in Redis - cached, err := mr.Get(context.Background(), "ws:ws-cache-miss-456:url").Result() - if err != nil { - t.Fatalf("URL was not cached in Redis: %v", err) - } - if cached != "http://db.internal:8000/agent" { - t.Errorf("expected cached URL %q, got %q", "http://db.internal:8000/agent", cached) - } - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unfulfilled DB expectations: %v", err) - } -} - -// TestGracefulPreRestart_Success verifies that when the workspace returns 200, -// the signal is logged as acknowledged without error. -func TestGracefulPreRestart_Success(t *testing.T) { - _ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct - - mr := setupTestRedisWithURL(t, "http://localhost:18000/agent") - - // httptest server simulating the workspace container's /signals/restart_pending - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - t.Errorf("expected POST, got %s", r.Method) - } - if r.Header.Get("Content-Type") != "application/json" { - t.Errorf("expected Content-Type: application/json, got %s", r.Header.Get("Content-Type")) - } - if r.Header.Get("X-Restart-Signal") != "true" { - t.Error("expected X-Restart-Signal: true header") - } - - var req map[string]interface{} - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - t.Errorf("failed to decode request body: %v", err) - } - if req["method"] != "signals/restart_pending" { - t.Errorf("expected method signals/restart_pending, got %v", req["method"]) - } - - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(map[string]interface{}{ - "jsonrpc": "2.0", - "result": map[string]interface{}{"acknowledged": true}, - }) - })) - defer srv.Close() - mr.Set("ws:ws-ack-789:url", srv.URL, 5*time.Minute) - - // Patch the handler's resolveAgentURLForRestartSignal to return the test server URL - // (avoids needing a real provisioner for this test) - h := newHandlerWithTestDeps(t) - origResolve := h.resolveAgentURLForRestartSignal - h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) { - return srv.URL + "/agent", nil - } - defer func() { h.resolveAgentURLForRestartSignal = origResolve }() - - // gracefulPreRestart runs in a goroutine with its own timeout. - // We give it time to complete before the test ends. - h.gracefulPreRestart(context.Background(), "ws-ack-789") - time.Sleep(200 * time.Millisecond) -} - -// TestGracefulPreRestart_NotImplemented verifies that when the workspace returns -// 404 (old SDK version), the platform proceeds gracefully (log + no error). -func TestGracefulPreRestart_NotImplemented(t *testing.T) { - _ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct - - mr := setupTestRedisWithURL(t, "http://localhost:18001/agent") - - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNotFound) - })) - defer srv.Close() - mr.Set("ws:ws-noimpl-999:url", srv.URL, 5*time.Minute) - - h := newHandlerWithTestDeps(t) - origResolve := h.resolveAgentURLForRestartSignal - h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) { - return srv.URL + "/agent", nil - } - defer func() { h.resolveAgentURLForRestartSignal = origResolve }() - - h.gracefulPreRestart(context.Background(), "ws-noimpl-999") - time.Sleep(200 * time.Millisecond) - // No panic or error expected — graceful degradation -} - -// TestGracefulPreRestart_ConnectionRefused verifies that when the workspace -// is unreachable, the platform proceeds gracefully without error. -func TestGracefulPreRestart_ConnectionRefused(t *testing.T) { - _ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct - - mr := setupTestRedisWithURL(t, "http://localhost:19999/agent") // nothing listening on 19999 - mr.Set("ws:ws-unreachable-000:url", "http://localhost:19999/agent", 5*time.Minute) - - h := newHandlerWithTestDeps(t) - origResolve := h.resolveAgentURLForRestartSignal - h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) { - return "http://localhost:19999/agent", nil - } - defer func() { h.resolveAgentURLForRestartSignal = origResolve }() - - h.gracefulPreRestart(context.Background(), "ws-unreachable-000") - time.Sleep(200 * time.Millisecond) - // No panic or error expected — proceeds with stop as documented -} - -// TestGracefulPreRestart_URLResolutionError verifies that when URL resolution -// fails, the platform proceeds gracefully without blocking the restart. -func TestGracefulPreRestart_URLResolutionError(t *testing.T) { - _ = setupTestDB(t) - _ = setupTestRedis(t) // empty → URL resolution will fail in resolveAgentURLForRestartSignal - - h := newHandlerWithTestDeps(t) - - // Override resolveAgentURLForRestartSignal to return an error - origResolve := h.resolveAgentURLForRestartSignal - h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) { - return "", context.DeadlineExceeded - } - defer func() { h.resolveAgentURLForRestartSignal = origResolve }() - - h.gracefulPreRestart(context.Background(), "ws-url-err-111") - time.Sleep(200 * time.Millisecond) - // No panic or error expected — proceeds with stop as documented -} - -// ─── helpers ───────────────────────────────────────────────────────────────── - -// newHandlerWithTestDeps creates a WorkspaceHandler with test stubs. -// provisioner is nil so rewriteForDocker returns URL unchanged. -func newHandlerWithTestDeps(t *testing.T) *WorkspaceHandler { - return NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) -} - -// newHandlerWithTestDepsWithDB creates a WorkspaceHandler with a specific mock DB. -// Use this when you need to control the DB mock expectations. -func newHandlerWithTestDepsWithDB(t *testing.T, mockDB *sql.DB) *WorkspaceHandler { - // We need to temporarily replace db.DB with our mock - origDB := db.DB - db.DB = mockDB - t.Cleanup(func() { db.DB = origDB }) - - return NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) -} - -// setupTestRedisWithURL is like setupTestRedis but pre-populates a workspace URL. -func setupTestRedisWithURL(t *testing.T, url string) *miniredis.Miniredis { - mr, err := miniredis.Run() - if err != nil { - t.Fatalf("failed to start miniredis: %v", err) - } - db.RDB = redis.NewClient(&redis.Options{Addr: mr.Addr()}) - // Pre-populate a URL for the test workspace IDs used in these tests - for _, wsID := range []string{"ws-cache-hit-123", "ws-cache-miss-456", "ws-ack-789", "ws-noimpl-999", "ws-unreachable-000"} { - if err := db.CacheURL(context.Background(), wsID, url); err != nil { - t.Fatalf("failed to cache URL for %s: %v", wsID, err) - } - } - t.Cleanup(func() { mr.Close() }) - return mr -} - -// rewriteForDocker is exported from restart_signals.go so it can be tested here. -func (h *WorkspaceHandler) rewriteForDocker(agentURL, workspaceID string) string { - return rewriteForDocker(agentURL, workspaceID) -} diff --git a/workspace-server/internal/handlers/workspace_restart.go b/workspace-server/internal/handlers/workspace_restart.go index 6e3bb424..2af5291c 100644 --- a/workspace-server/internal/handlers/workspace_restart.go +++ b/workspace-server/internal/handlers/workspace_restart.go @@ -564,18 +564,6 @@ func (h *WorkspaceHandler) runRestartCycle(workspaceID string) { log.Printf("Auto-restart: restarting %s (%s) runtime=%q (was: %s)", wsName, workspaceID, dbRuntime, status) - // #125 Phase 1: send pre-restart drain signal to the workspace agent. - // For native_session targets, A2A messages go directly to the SDK session - // and bypass the platform's a2a_queue buffering. If the container dies - // mid-request, those messages are lost. The pre-restart signal gives the - // SDK a chance to drain in-flight work before the container stops. - // - // Fire-and-forget: gracefulPreRestart runs in a detached goroutine with its - // own 10s timeout. If the workspace doesn't implement the handler (404) or - // times out, we proceed with the stop anyway — identical to the pre-fix - // behaviour. - h.gracefulPreRestart(ctx, workspaceID) - h.stopForRestart(ctx, workspaceID) db.DB.ExecContext(ctx, diff --git a/workspace-server/internal/plugins/drift_sweeper.go b/workspace-server/internal/plugins/drift_sweeper.go deleted file mode 100644 index 9b6399d5..00000000 --- a/workspace-server/internal/plugins/drift_sweeper.go +++ /dev/null @@ -1,317 +0,0 @@ -package plugins - -// drift_sweeper.go — periodic drift detection for the plugin version-subscription -// model (core#113 / #123). -// -// How it works -// ───────────── -// Every DriftSweepInterval the sweeper: -// 1. SELECTs workspace_plugins rows where tracked_ref != 'none' -// AND installed_sha IS NOT NULL (skip pre-migration rows with NULL SHA). -// 2. For each row, resolves the tracked ref to its current upstream SHA -// using the appropriate SourceResolver. -// 3. If the resolved SHA differs from installed_sha → drift detected. -// 4. On drift, INSERT INTO plugin_update_queue (ON CONFLICT DO NOTHING so -// a re-drift while a row is still pending is a no-op). -// -// Thread-safety -// ───────────── -// The sweeper holds no mutable state between ticks. Each tick runs a fresh -// goroutine spawned by the ticker; the parent goroutine is cancelled when -// the passed context is cancelled. This matches the pattern used by -// pendinguploads/sweeper.go and registry/orphan_sweeper.go. -// -// Gitea compatibility -// ─────────────────── -// Gitea's REST API is a GitHub-API-compatible surface, so the GithubResolver -// with BaseURL pointing at a Gitea instance works for Gitea-hosted plugin -// sources too. The source_raw in workspace_plugins stores the full spec -// (e.g. "github://owner/repo#tag:v1.0.0") which the resolver parses. -// For "local://" sources the resolver has no SHA concept, so those rows -// are skipped (local plugins have no upstream to drift against). -// -// Resource cost -// ───────────── -// Each tick runs O(N) resolves where N is the count of tracked plugins. -// Each resolve does a --depth=1 git fetch, bounded by the network round-trip -// to GitHub/Gitea. With 1000 tracked plugins and 1h interval, worst case is -// ~1,000 network calls per hour. The per-row timeout (ResolveRefDeadline) -// prevents a slow/hanging fetch from blocking the entire sweep cycle. - -import ( - "context" - "database/sql" - "fmt" - "log" - "strings" - "time" - - "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" -) - -// DriftSweepInterval is the cadence between drift-sweep cycles. -// 1 hour is a reasonable balance: fast enough to surface new tag releases -// within a reasonable window, sparse enough to not hammer GitHub's API with -// 1000s of concurrent requests across a large deployment. -const DriftSweepInterval = 1 * time.Hour - -// ResolveRefDeadline bounds the git fetch for a single plugin. A -// --depth=1 clone of any reasonable plugin repo should complete well -// within 30s on a healthy connection; 60s is the conservative ceiling -// that handles Gitea instances on high-latency links. -const ResolveRefDeadline = 60 * time.Second - -// SourceResolver resolves plugin sources to installable directories. -// Satisfied by *Registry (which wraps GithubResolver + LocalResolver). -type SourceResolver interface { - Resolve(source Source) (SourceResolver, error) - Schemes() []string -} - -// StartPluginDriftSweeper runs the drift-detection loop until ctx is cancelled. -// Pass a nil resolver to disable the sweeper (useful for harnesses or CP/SaaS -// mode where git operations are unavailable). -// -// Registers itself via atexits in cmd/server/main.go so the process -// shuts down cleanly on SIGTERM. -func StartPluginDriftSweeper(ctx context.Context, resolver SourceResolver) { - if resolver == nil { - log.Println("Plugin drift sweeper: resolver is nil — sweeper disabled") - return - } - log.Printf("Plugin drift sweeper started — interval %s", DriftSweepInterval) - ticker := time.NewTicker(DriftSweepInterval) - defer ticker.Stop() - - // Run once on startup so we detect drift immediately rather than waiting - // for the first tick. - sweepDriftOnce(ctx, resolver) - for { - select { - case <-ctx.Done(): - log.Println("Plugin drift sweeper: shutdown") - return - case <-ticker.C: - // ctx.Err() guard: the ticker may fire just as ctx is cancelled - // (MPMC channel race). Skip the sweep so we don't start a - // ResolveRef cycle after shutdown that would pollute the next - // test's baseline. - if ctx.Err() != nil { - continue - } - sweepDriftOnce(ctx, resolver) - } - } -} - -// sweepDriftOnce runs one full drift-detection cycle. -// Errors are non-fatal — each row is handled independently so a single -// slow row doesn't block the rest of the sweep. -func sweepDriftOnce(parent context.Context, resolver SourceResolver) { - ctx, cancel := context.WithTimeout(parent, 10*time.Minute) - defer cancel() - - rows, err := db.DB.QueryContext(ctx, ` - SELECT wp.id, wp.workspace_id, wp.plugin_name, wp.source_raw, - wp.tracked_ref, wp.installed_sha - FROM workspace_plugins wp - WHERE wp.tracked_ref != 'none' - AND wp.installed_sha IS NOT NULL - `) - if err != nil { - log.Printf("Plugin drift sweeper: SELECT failed: %v", err) - return - } - defer rows.Close() - - for rows.Next() { - var row struct { - id string - workspaceID string - pluginName string - sourceRaw string - trackedRef string - installedSHA string - } - if scanErr := rows.Scan(&row.id, &row.workspaceID, &row.pluginName, - &row.sourceRaw, &row.trackedRef, &row.installedSHA); scanErr != nil { - log.Printf("Plugin drift sweeper: row scan failed: %v", scanErr) - continue - } - - latestSHA, resolveErr := resolveLatestSHA(ctx, resolver, row.sourceRaw, row.trackedRef) - if resolveErr != nil { - // Log and skip — don't queue drift if we couldn't resolve. - // Transient network errors self-heal on the next cycle. - log.Printf("Plugin drift sweeper: resolve %s@%s failed: %v — skipping", - row.pluginName, row.trackedRef, resolveErr) - continue - } - - if latestSHA == row.installedSHA { - continue // no drift - } - - log.Printf("Plugin drift sweeper: drift detected for %s (workspace=%s): "+ - "installed=%s upstream=%s", row.pluginName, row.workspaceID, - row.installedSHA[:8], latestSHA[:8]) - - if queueErr := queueDriftEntry(ctx, row.workspaceID, row.pluginName, - row.trackedRef, row.installedSHA, latestSHA); queueErr != nil { - log.Printf("Plugin drift sweeper: queue drift for %s failed: %v", - row.pluginName, queueErr) - } - } - if iterErr := rows.Err(); iterErr != nil { - log.Printf("Plugin drift sweeper: rows iteration failed: %v", iterErr) - } -} - -// resolveLatestSHA resolves the tracked ref to its current upstream SHA. -// Handles both github:// and local:// sources; local sources are skipped -// (no meaningful upstream to drift against). -func resolveLatestSHA(ctx context.Context, resolver SourceResolver, sourceRaw, trackedRef string) (string, error) { - // Strip the scheme prefix to get the raw spec. - // sourceRaw is stored as the full string, e.g. "github://owner/repo#tag:v1.0.0" - spec := sourceRaw - for _, scheme := range resolver.Schemes() { - if strings.HasPrefix(spec, scheme+"://") { - spec = strings.TrimPrefix(spec, scheme+"://") - break - } - } - - // Parse the ref from the tracked_ref field (e.g. "tag:v1.0.0"). - // Prepend it as a # suffix so the resolver can fetch the right ref. - var refSuffix string - switch { - case strings.HasPrefix(trackedRef, "tag:"): - refSuffix = "#" + trackedRef - case strings.HasPrefix(trackedRef, "sha:"): - refSuffix = "#" + trackedRef - default: - // Bare ref (shouldn't happen per validateTrackedRef, but be safe). - refSuffix = "#" + trackedRef - } - - // If spec already has a # fragment, replace it with the tracked ref. - // (In practice source_raw always has one, but handle both cases.) - if strings.Contains(spec, "#") { - spec = strings.SplitN(spec, "#", 2)[0] + refSuffix - } else { - spec = spec + refSuffix - } - - // Use the github resolver directly — it handles the fetch + rev-parse. - gh := NewGithubResolver() - resolvedSHA, err := gh.ResolveRef(ctx, spec) - if err != nil { - return "", fmt.Errorf("resolve %s: %w", spec, err) - } - return resolvedSHA, nil -} - -// queueDriftEntry inserts a pending drift entry into plugin_update_queue. -// ON CONFLICT (workspace_id, plugin_name) WHERE status = 'pending' DO NOTHING -// makes this idempotent — re-drift while a row is already pending is a no-op. -// Uses the partial unique index plugin_update_queue_pending_unique as the -// inference target; the WHERE clause ensures we only dedup pending rows. -func queueDriftEntry(ctx context.Context, workspaceID, pluginName, trackedRef, currentSHA, latestSHA string) error { - _, err := db.DB.ExecContext(ctx, ` - INSERT INTO plugin_update_queue - (workspace_id, plugin_name, tracked_ref, current_sha, latest_sha) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (workspace_id, plugin_name) DO NOTHING - `, workspaceID, pluginName, trackedRef, currentSHA, latestSHA) - return err -} - -// ───────────────────────────────────────────────────────────────────────────── -// Test helpers -// ───────────────────────────────────────────────────────────────────────────── - -// SweepDriftOnceForTest exposes sweepDriftOnce for package-level testing. -func SweepDriftOnceForTest(parent context.Context, resolver SourceResolver) { - sweepDriftOnce(parent, resolver) -} - -// QueueDriftEntryForTest exposes queueDriftEntry for package-level testing. -func QueueDriftEntryForTest(ctx context.Context, workspaceID, pluginName, trackedRef, currentSHA, latestSHA string) error { - return queueDriftEntry(ctx, workspaceID, pluginName, trackedRef, currentSHA, latestSHA) -} - -// PluginUpdateQueueRow is the Go struct mirroring a plugin_update_queue row. -// Exported for tests and for the admin handler to consume. -type PluginUpdateQueueRow struct { - ID string `json:"id"` - WorkspaceID string `json:"workspace_id"` - PluginName string `json:"plugin_name"` - TrackedRef string `json:"tracked_ref"` - CurrentSHA string `json:"current_sha"` - LatestSHA string `json:"latest_sha"` - Status string `json:"status"` - CreatedAt time.Time `json:"created_at"` -} - -// ListPendingUpdates returns all pending drift entries, newest first. -func ListPendingUpdates(ctx context.Context) ([]PluginUpdateQueueRow, error) { - rows, err := db.DB.QueryContext(ctx, ` - SELECT id, workspace_id, plugin_name, tracked_ref, - current_sha, latest_sha, status, created_at - FROM plugin_update_queue - WHERE status = 'pending' - ORDER BY created_at DESC - `) - if err != nil { - return nil, fmt.Errorf("list pending updates: %w", err) - } - defer rows.Close() - - var result []PluginUpdateQueueRow - for rows.Next() { - var r PluginUpdateQueueRow - if scanErr := rows.Scan(&r.ID, &r.WorkspaceID, &r.PluginName, - &r.TrackedRef, &r.CurrentSHA, &r.LatestSHA, &r.Status, &r.CreatedAt); scanErr != nil { - return nil, fmt.Errorf("scan row: %w", scanErr) - } - result = append(result, r) - } - return result, rows.Err() -} - -// ApplyDriftUpdate marks a queue entry as applied (or already-applied idempotently) -// and returns the workspace_id and plugin_name so the caller can trigger a restart. -func ApplyDriftUpdate(ctx context.Context, queueID string) (workspaceID, pluginName string, err error) { - var row struct { - WorkspaceID string - PluginName string - Status sql.NullString - } - err = db.DB.QueryRowContext(ctx, ` - SELECT workspace_id, plugin_name, status - FROM plugin_update_queue - WHERE id = $1 - `, queueID).Scan(&row.WorkspaceID, &row.PluginName, &row.Status) - if err == sql.ErrNoRows { - return "", "", fmt.Errorf("queue entry %s not found", queueID) - } - if err != nil { - return "", "", fmt.Errorf("query queue entry: %w", err) - } - - if row.Status.Valid && row.Status.String == "applied" { - // Idempotent — already applied. - return row.WorkspaceID, row.PluginName, nil - } - - _, execErr := db.DB.ExecContext(ctx, ` - UPDATE plugin_update_queue - SET status = 'applied' - WHERE id = $1 - AND status = 'pending' - `, queueID) - if execErr != nil { - return "", "", fmt.Errorf("update status: %w", execErr) - } - return row.WorkspaceID, row.PluginName, nil -} diff --git a/workspace-server/internal/plugins/drift_sweeper_test.go b/workspace-server/internal/plugins/drift_sweeper_test.go deleted file mode 100644 index 3370dce1..00000000 --- a/workspace-server/internal/plugins/drift_sweeper_test.go +++ /dev/null @@ -1,163 +0,0 @@ -package plugins - -import ( - "context" - "database/sql" - "errors" - "testing" -) - -// stubResolver is a SourceResolver that always returns a stub github resolver. -type stubResolver struct { - schemes []string -} - -func (s *stubResolver) Resolve(source Source) (SourceResolver, error) { - return NewGithubResolver(), nil -} - -func (s *stubResolver) Schemes() []string { return s.schemes } - -func TestResolveRef_RejectsBareSpec(t *testing.T) { - r := NewGithubResolver() - _, err := r.ResolveRef(context.Background(), "org/repo") - if err == nil { - t.Error("bare spec (no ref) should be rejected") - } -} - -func TestResolveRef_RejectsInvalidSpec(t *testing.T) { - r := NewGithubResolver() - for _, spec := range []string{"", "single-segment", "a/b/c"} { - t.Run(spec, func(t *testing.T) { - _, err := r.ResolveRef(context.Background(), spec) - if err == nil { - t.Errorf("spec %q should be rejected", spec) - } - }) - } -} - -func TestResolveRef_PropagatesGitError(t *testing.T) { - r := &GithubResolver{ - GitRunner: func(ctx context.Context, dir string, args ...string) error { - return errors.New("simulated network failure") - }, - } - _, err := r.ResolveRef(context.Background(), "org/repo#v1.0.0") - if err == nil { - t.Error("expected error from git runner") - } -} - -func TestResolveRef_MapsNotFoundToErrPluginNotFound(t *testing.T) { - r := &GithubResolver{ - GitRunner: func(ctx context.Context, dir string, args ...string) error { - return errors.New("remote: Repository not found") - }, - } - _, err := r.ResolveRef(context.Background(), "org/repo#v1.0.0") - if !errors.Is(err, ErrPluginNotFound) { - t.Errorf("expected ErrPluginNotFound, got %v", err) - } -} - -// stubGitForResolveRef creates a stub that handles fetch + rev-parse for ResolveRef. -func stubGitForResolveRef(t *testing.T, sha string) func(ctx context.Context, dir string, args ...string) error { - return func(ctx context.Context, dir string, args ...string) error { - if ctx.Err() != nil { - return ctx.Err() - } - if len(args) < 1 { - return errors.New("no args") - } - switch args[0] { - case "fetch": - // mkdir for clone target - _ = dir - return nil - case "rev-parse": - // rev-parse success — write SHA to a file so rev-parse can "read" it - return nil - case "describe": - // git describe for latest tag - return nil - } - return errors.New("unexpected git command: " + args[0]) - } -} - -func TestResolveRef_SucceedsForTagRef(t *testing.T) { - // This test verifies the happy path: fetch + rev-parse succeed. - // We stub all git commands to succeed, then verify LastFetchSHA is populated. - calls := make(map[string]bool) - r := &GithubResolver{ - GitRunner: func(ctx context.Context, dir string, args ...string) error { - if ctx.Err() != nil { - return ctx.Err() - } - calls[args[0]] = true - return nil - }, - } - _, err := r.ResolveRef(context.Background(), "org/repo#tag:v1.0.0") - // Without a real git binary, we can't fully test success — but we can - // verify the argument routing doesn't panic and returns expected errors. - if err != nil && !errors.Is(err, ErrPluginNotFound) { - // Expect ErrPluginNotFound when git is not available (no real git binary) - // The important thing is it doesn't panic. - } - if !calls["fetch"] && !calls["rev-parse"] { - // At least one git command should have been called - } -} - -// TestResolveRef_DoesNotPanic verifies that ResolveRef handles all ref shapes -// without panicking on nil dereference or similar. -func TestResolveRef_DoesNotPanic(t *testing.T) { - r := NewGithubResolver() - refs := []string{ - "org/repo#tag:v1.0.0", - "org/repo#tag:latest", - "org/repo#sha:abc123def456", - "org/repo#main", - } - for _, ref := range refs { - t.Run(ref, func(t *testing.T) { - // Without real git, just verify no panic - _, _ = r.ResolveRef(context.Background(), ref) - }) - } -} - -// TestQueueDriftEntry_Integration is a basic sanity that the SQL doesn't -// explode. Real integration requires a test DB; here we verify the function -// signature and error paths. -func TestQueueDriftEntry_HandlesNilDB(t *testing.T) { - // queueDriftEntry is internal; test via SweepDriftOnce which uses it. - // When db.DB is nil, the SELECT in sweepDriftOnce will fail with a - // nil pointer panic — but that's correct behaviour (DB must be wired). - // The sweeper logs and skips on error, so nil DB gracefully degrades. -} - -// TestPluginUpdateQueueRow_Struct covers the struct field names. -func TestPluginUpdateQueueRow_Struct(t *testing.T) { - row := PluginUpdateQueueRow{ - ID: "test-id", - WorkspaceID: "test-workspace", - PluginName: "test-plugin", - TrackedRef: "tag:v1.0.0", - CurrentSHA: "abc123", - LatestSHA: "def456", - Status: "pending", - } - if row.Status != "pending" { - t.Errorf("expected status pending, got %s", row.Status) - } -} - -// TestSourceResolverInterface_StubResolver verifies that a stub resolver -// satisfies the SourceResolver interface. -func TestSourceResolverInterface_StubResolver(t *testing.T) { - var _ SourceResolver = (*stubResolver)(nil) -} diff --git a/workspace-server/internal/plugins/github.go b/workspace-server/internal/plugins/github.go index f1eed9a7..3059ff18 100644 --- a/workspace-server/internal/plugins/github.go +++ b/workspace-server/internal/plugins/github.go @@ -30,11 +30,6 @@ type GithubResolver struct { // BaseURL defaults to https://github.com. Tests point it at a local // file:// bare repo. BaseURL string - - // LastFetchSHA is set by Fetch after a successful clone. It holds the - // commit SHA that was checked out. callers can retrieve it via LastSHA(). - // Only valid after a successful Fetch call; reset on each Fetch. - LastFetchSHA string } // NewGithubResolver constructs a resolver with sensible defaults. @@ -45,10 +40,6 @@ func NewGithubResolver() *GithubResolver { } } -// LastSHA returns the SHA of the last successful Fetch call, or "" if -// Fetch has not been called or the last call failed. -func (r *GithubResolver) LastSHA() string { return r.LastFetchSHA } - // Scheme returns "github". func (r *GithubResolver) Scheme() string { return "github" } @@ -123,15 +114,6 @@ func (r *GithubResolver) Fetch(ctx context.Context, spec string, dst string) (st return "", fmt.Errorf("github resolver: clone %s failed: %w", url, err) } - // Capture the SHA before we strip .git. This is the commit that will - // be installed, used by the drift detector to seed installed_sha so - // subsequent cycles can detect drift. - // runGit captures output; errors are non-fatal — an unknown SHA just - // means drift detection can't work for this row, which is acceptable. - if shaOut, shaErr := runGitOneLine(ctx, cloneTarget, "rev-parse", "--verify", "HEAD"); shaErr == nil { - r.LastFetchSHA = strings.TrimSpace(shaOut) - } - // Strip .git so the plugin dir doesn't become a nested repo in the // workspace container's filesystem. if err := os.RemoveAll(filepath.Join(cloneTarget, ".git")); err != nil { @@ -146,24 +128,6 @@ func (r *GithubResolver) Fetch(ctx context.Context, spec string, dst string) (st return repo, nil } -// runGitOneLine runs git with args in dir and returns stdout trimmed. -// Returns "" on error (caller decides whether to treat it as fatal). -func runGitOneLine(ctx context.Context, dir string, args ...string) (string, error) { - cmd := exec.CommandContext(ctx, "git", args...) - cmd.Dir = dir - childEnv := os.Environ() - if os.Getenv("HOME") == "" && dir != "" { - childEnv = append(childEnv, "HOME="+dir) - } - childEnv = append(childEnv, "LANG=C", "LC_ALL=C") - cmd.Env = childEnv - out, err := cmd.CombinedOutput() - if err != nil { - return "", fmt.Errorf("git %v: %w (output: %s)", args, err, string(out)) - } - return string(out), nil -} - // defaultGitRunner shells out to the system `git`. `dir` is the working // directory for the command (nil/empty means current process cwd). func defaultGitRunner(ctx context.Context, dir string, args ...string) error { @@ -190,116 +154,3 @@ func defaultGitRunner(ctx context.Context, dir string, args ...string) error { } return nil } - -// ResolveRef resolves a plugin spec to a full commit SHA. -// -// Used by the drift sweeper to compare the SHA installed in a workspace -// against the current upstream SHA for the tracked ref. -// -// Spec shapes: -// - "owner/repo#tag:v1.0.0" → fetch the tag, return its commit SHA -// - "owner/repo#tag:latest" → fetch tags, find the latest tag, return its SHA -// - "owner/repo#sha:abc123" → already a full SHA; validate and return as-is -// - "owner/repo#main" → fetch the branch, return its tip SHA -// -// Returns ErrPluginNotFound if the ref does not exist upstream. -func (r *GithubResolver) ResolveRef(ctx context.Context, spec string) (string, error) { - spec = strings.TrimSpace(spec) - m := repoRE.FindStringSubmatch(spec) - if m == nil { - return "", fmt.Errorf("github resolver: spec %q must be /[#]", spec) - } - owner, repo, ref := m[1], m[2], m[3] - if ref == "" { - return "", fmt.Errorf("github resolver: ResolveRef requires a ref (got bare %q)", spec) - } - - base := r.BaseURL - if base == "" { - base = "https://github.com" - } - url := fmt.Sprintf("%s/%s/%s.git", base, owner, repo) - - // Clone shallowly into a temp dir, then resolve the SHA. - // --depth=1 keeps the network cost bounded regardless of repo size. - workDir, err := os.MkdirTemp("", "molecule-resolve-ref-*") - if err != nil { - return "", fmt.Errorf("github resolver: tempdir: %w", err) - } - defer os.RemoveAll(workDir) - - runner := r.GitRunner - if runner == nil { - runner = defaultGitRunner - } - - // Build ref to fetch: for "tag:latest" we fetch all tags; for - // "tag:vX.Y.Z" we fetch that specific tag; for bare refs we fetch - // the branch/commit directly. - fetchArgs := []string{"fetch", "--depth=1"} - switch { - case strings.HasPrefix(ref, "tag:"): - tagName := strings.TrimPrefix(ref, "tag:") - if tagName == "latest" { - // Fetch all tags so we can find the latest one. - fetchArgs = []string{"fetch", "--tags", "--deepen=1", "--", url} - } else { - fetchArgs = append(fetchArgs, "--", url, "tag", tagName) - } - case strings.HasPrefix(ref, "sha:"): - // Already a SHA; just fetch it directly. - sha := strings.TrimPrefix(ref, "sha:") - fetchArgs = append(fetchArgs, "--", url, sha) - default: - // Branch or other named ref. - fetchArgs = append(fetchArgs, "--", url, ref) - } - - if err := runner(ctx, workDir, fetchArgs...); err != nil { - msg := strings.ToLower(err.Error()) - if strings.Contains(msg, "repository not found") || - strings.Contains(msg, "could not find remote ref") || - strings.Contains(msg, "remote ref not found") { - return "", fmt.Errorf("github resolver: %s: %w", url, ErrPluginNotFound) - } - return "", fmt.Errorf("github resolver: fetch %s %s failed: %w", url, ref, err) - } - - // Resolve the ref to a SHA. - var shaOut []byte - var resolveErr error - if strings.HasPrefix(ref, "tag:") { - tagName := strings.TrimPrefix(ref, "tag:") - if tagName == "latest" { - // Find the most recent tag by commit date. - tagCmd := exec.CommandContext(ctx, "git", "-C", workDir, - "describe", "--tags", "--abbrev=0", "HEAD") - tagOut, tagErr := tagCmd.CombinedOutput() - if tagErr != nil { - return "", fmt.Errorf("github resolver: no tags found in %s: %w (%s)", - owner+"/"+repo, tagErr, string(tagOut)) - } - resolvedTag := strings.TrimSpace(string(tagOut)) - shaCmd := exec.CommandContext(ctx, "git", "-C", workDir, - "rev-parse", "--verify", "refs/tags/"+resolvedTag+"^{commit}") - shaOut, resolveErr = shaCmd.CombinedOutput() - } else { - shaCmd := exec.CommandContext(ctx, "git", "-C", workDir, - "rev-parse", "--verify", "refs/tags/"+tagName+"^{commit}") - shaOut, resolveErr = shaCmd.CombinedOutput() - } - } else { - refName := ref - if strings.HasPrefix(ref, "sha:") { - refName = strings.TrimPrefix(ref, "sha:") - } - shaCmd := exec.CommandContext(ctx, "git", "-C", workDir, - "rev-parse", "--verify", refName+"^{commit}") - shaOut, resolveErr = shaCmd.CombinedOutput() - } - if resolveErr != nil { - return "", fmt.Errorf("github resolver: rev-parse %s failed: %w (%s)", - ref, resolveErr, string(shaOut)) - } - return strings.TrimSpace(string(shaOut)), nil -} diff --git a/workspace-server/internal/router/router.go b/workspace-server/internal/router/router.go index 585e4f7c..770b0a66 100644 --- a/workspace-server/internal/router/router.go +++ b/workspace-server/internal/router/router.go @@ -18,7 +18,6 @@ import ( "github.com/Molecule-AI/molecule-monorepo/platform/internal/metrics" "github.com/Molecule-AI/molecule-monorepo/platform/internal/middleware" "github.com/Molecule-AI/molecule-monorepo/platform/internal/pendinguploads" - "github.com/Molecule-AI/molecule-monorepo/platform/internal/plugins" "github.com/Molecule-AI/molecule-monorepo/platform/internal/provisioner" "github.com/Molecule-AI/molecule-monorepo/platform/internal/supervised" "github.com/Molecule-AI/molecule-monorepo/platform/internal/ws" @@ -27,7 +26,7 @@ import ( "github.com/gin-gonic/gin" ) -func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provisioner, platformURL, configsDir string, wh *handlers.WorkspaceHandler, channelMgr *channels.Manager, memBundle *memwiring.Bundle, pluginResolver plugins.SourceResolver) *gin.Engine { +func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provisioner, platformURL, configsDir string, wh *handlers.WorkspaceHandler, channelMgr *channels.Manager, memBundle *memwiring.Bundle) *gin.Engine { r := gin.Default() // Issue #179 — trust no reverse-proxy headers. Without this call Gin's @@ -499,15 +498,6 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi r.POST("/admin/workspace-images/refresh", middleware.AdminAuth(db.DB), imgH.Refresh) } - // Admin — plugin version-subscription drift queue (core#123). - // List pending drift entries and apply approved updates. - { - driftH := handlers.NewAdminPluginDriftHandler(plgh) - adminAuth := r.Group("", middleware.AdminAuth(db.DB)) - adminAuth.GET("/admin/plugin-updates-pending", driftH.ListPending) - adminAuth.POST("/admin/plugin-updates/:id/apply", driftH.Apply) - } - // Admin — test token minting (issue #6). Hidden in production via TestTokensEnabled(). // NOT behind AdminAuth — this is the bootstrap endpoint E2E tests and // fresh installs use to obtain their first admin bearer. Adding AdminAuth @@ -625,16 +615,9 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi ).Scan(&instanceID) return instanceID, err } - // pluginResolver: when provided (normal production), use it for plgh so - // the drift sweeper (which also gets the same resolver in main.go) uses - // identical resolver state. When nil (test / backward compat), let - // NewPluginsHandler create its own default registry. plgh := handlers.NewPluginsHandler(pluginsDir, dockerCli, wh.RestartByID). WithRuntimeLookup(runtimeLookup). WithInstanceIDLookup(instanceIDLookup) - if pluginResolver != nil { - plgh = plgh.WithSourceResolver(pluginResolver) - } r.GET("/plugins", plgh.ListRegistry) r.GET("/plugins/sources", plgh.ListSources) wsAuth.GET("/plugins", plgh.ListInstalled) diff --git a/workspace-server/migrations/20260510000000_plugin_drift_queue.down.sql b/workspace-server/migrations/20260510000000_plugin_drift_queue.down.sql deleted file mode 100644 index 3f29cc9b..00000000 --- a/workspace-server/migrations/20260510000000_plugin_drift_queue.down.sql +++ /dev/null @@ -1,10 +0,0 @@ --- Down migration for plugin_drift_queue. --- Reverses the two changes introduced by the up migration. - --- 1. Remove plugin_update_queue (all queued drift entries are discarded). -DROP TABLE IF EXISTS plugin_update_queue; - --- 2. Remove installed_sha column from workspace_plugins. --- Existing drift sweeper rows are unaffected (sweeper doesn't exist yet --- in this version of the codebase — this is a forward-only component). -ALTER TABLE workspace_plugins DROP COLUMN IF EXISTS installed_sha; diff --git a/workspace-server/migrations/20260510000000_plugin_drift_queue.up.sql b/workspace-server/migrations/20260510000000_plugin_drift_queue.up.sql deleted file mode 100644 index 19cdf903..00000000 --- a/workspace-server/migrations/20260510000000_plugin_drift_queue.up.sql +++ /dev/null @@ -1,64 +0,0 @@ --- plugin_drift_queue: plugin_update_queue table + installed_sha column. --- --- Migration order: --- 1. plugin_update_queue — new table, safe to create first. --- 2. installed_sha on workspace_plugins — added column; existing rows stay NULL --- (no installed_sha until they are re-installed). --- --- Why two changes in one migration: the drift detector reads from --- workspace_plugins (needs installed_sha to compare) and writes to --- plugin_update_queue. Both must exist before the sweeper starts. --- The alternative — a separate migration for installed_sha — would mean --- the sweeper could start after migration-1 but before migration-2, --- writing queue rows with NULL installed_sha. Keeping them together --- avoids that race without needing a schema-lock flag. - --- plugin_update_queue: records upstream drift for operator review before --- the platform auto-applies the update. --- --- Rows are created by the drift sweeper when workspace_plugins.tracked_ref --- is not 'none' and the upstream SHA differs from installed_sha. --- Rows are consumed by the admin apply endpoint (core#123). --- --- Uniqueness: one pending row per (workspace_id, plugin_name). A new drift --- while a row is still pending is a no-op — the existing pending row --- reflects the same desired update. -CREATE TABLE IF NOT EXISTS plugin_update_queue ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - workspace_id UUID NOT NULL REFERENCES workspaces(id) ON DELETE CASCADE, - plugin_name TEXT NOT NULL, - tracked_ref TEXT NOT NULL, - current_sha TEXT NOT NULL, -- SHA we had installed - latest_sha TEXT NOT NULL, -- SHA upstream resolved to - status TEXT NOT NULL DEFAULT 'pending', - -- Valid statuses: pending | applied | dismissed - -- 'pending': drift detected, awaiting operator review - -- 'applied': operator confirmed, plugin re-installed - -- 'dismissed': operator explicitly ignored this drift - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - CONSTRAINT plugin_update_queue_status CHECK (status IN ('pending', 'applied', 'dismissed')) -); - -CREATE UNIQUE INDEX IF NOT EXISTS plugin_update_queue_pending_unique - ON plugin_update_queue(workspace_id, plugin_name) - WHERE status = 'pending'; - --- Partial index: the GET /admin/plugin-updates-pending query filters by --- status = 'pending' on every call. -CREATE INDEX IF NOT EXISTS plugin_update_queue_status_pending - ON plugin_update_queue(created_at) - WHERE status = 'pending'; - --- Add installed_sha to workspace_plugins. This column stores the SHA that --- was last successfully installed. The drift sweeper compares this against --- the upstream-resolved SHA for the tracked_ref to detect drift. --- --- NULL means: the row exists (was written by an install before this --- migration) but we don't know what SHA was installed. The drift sweeper --- treats NULL as "no drift possible yet" — it only compares rows where --- installed_sha IS NOT NULL. --- --- The column is updated by: --- (a) recordWorkspacePluginInstall at install time (always set) --- (b) the admin apply endpoint after a successful re-install (always set) -ALTER TABLE workspace_plugins ADD COLUMN installed_sha TEXT;