[infra-lead-agent] fix(workspace-server): unmask compile errors blocking main #282

Closed
infra-lead wants to merge 1 commits from infra/fix-source-resolver-dup into main
9 changed files with 78 additions and 41 deletions

View File

@ -332,13 +332,18 @@ func main() {
cronSched.SetChannels(channelMgr)
// Router
// Plugin registry — created before Setup so the same registry is shared
// between the PluginsHandler (for installs) and the drift sweeper (for
// drift detection). github:// sources always work; local:// sources
// require a plugins/ dir on disk (nil in CP/SaaS mode).
// Plugin resolver + registry — the github:// resolver is shared between
// the PluginsHandler (for installs, via router.Setup → WithSourceResolver)
// and a local registry that the drift sweeper consumes (for drift
// detection). Sharing the resolver instance preserves the documented
// intent of unified resolver state (e.g. rate limiters, in-process
// caches) across both surfaces. local:// sources require a plugins/
// dir on disk and live entirely inside PluginsHandler's internal
// registry, which is fine — drift detection is github-only by design.
githubResolver := plugins.NewGithubResolver()
pluginRegistry := plugins.NewRegistry()
pluginRegistry.Register(plugins.NewGithubResolver())
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr, memBundle, pluginRegistry)
pluginRegistry.Register(githubResolver)
r := router.Setup(hub, broadcaster, prov, platformURL, configsDir, wh, channelMgr, memBundle, githubResolver)
// Plugin drift sweeper — periodic detection of upstream plugin version drift
// (core#123). Scans workspace_plugins rows where tracked_ref != 'none',

View File

@ -8,7 +8,6 @@ package handlers
// POST /admin/plugin-updates/:id/apply — apply a queued drift update
import (
"context"
"database/sql"
"errors"
"fmt"

View File

@ -1262,4 +1262,3 @@ func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
}

View File

@ -112,7 +112,15 @@ func (h *PluginsHandler) WithInstanceIDLookup(lookup InstanceIDLookup) *PluginsH
// Sources returns the underlying plugin source registry. Used by main.go to
// pass the same registry to the drift sweeper so both share resolver state.
func (h *PluginsHandler) Sources() plugins.SourceResolver {
//
// Returns plugins.RegistryResolver (the slim Schemes()-only interface the
// drift sweeper consumes), not plugins.SourceResolver — `h.sources` is the
// registry shape (Register/Resolve/Schemes), not the per-scheme fetcher
// shape (Scheme/Fetch). The previous return type was a leftover from
// before #1814 narrowed `h.sources` to the `pluginSources` interface and
// caused a compile error once the masking duplicate `SourceResolver` in
// `internal/plugins/drift_sweeper.go` was removed.
func (h *PluginsHandler) Sources() plugins.RegistryResolver {
return h.sources
}

View File

@ -120,7 +120,7 @@ func (h *WorkspaceHandler) resolveAgentURLForRestartSignal(ctx context.Context,
// Try Redis cache first.
agentURL, err := db.GetCachedURL(ctx, workspaceID)
if err == nil && agentURL != "" {
return rewriteForDocker(agentURL, workspaceID), nil
return h.rewriteForDocker(agentURL, workspaceID), nil
}
// Cache miss — fall back to DB.
@ -136,13 +136,18 @@ func (h *WorkspaceHandler) resolveAgentURLForRestartSignal(ctx context.Context,
}
agentURL = *urlNullable
_ = db.CacheURL(ctx, workspaceID, agentURL)
return rewriteForDocker(agentURL, workspaceID), nil
return h.rewriteForDocker(agentURL, workspaceID), nil
}
// rewriteForDocker rewrites a 127.0.0.1 agent URL to the Docker-DNS form
// when the platform is running inside a Docker container. When platform is
// on the host (non-Docker), 127.0.0.1 IS the host and the original URL works.
func rewriteForDocker(agentURL, workspaceID string) string {
//
// Method receiver `h` is required to access h.provisioner; was previously
// declared as a package-level function which referred to an undefined `h`
// — compile error masked by the upstream `SourceResolver` redeclaration in
// internal/plugins/drift_sweeper.go.
func (h *WorkspaceHandler) rewriteForDocker(agentURL, workspaceID string) string {
if platformInDocker && h.provisioner != nil {
// Only rewrite if the URL points to localhost (the ephemeral port
// binding the container published to the host). Internal Docker

View File

@ -324,7 +324,6 @@ func setupTestRedisWithURL(t *testing.T, url string) *miniredis.Miniredis {
return mr
}
// rewriteForDocker is exported from restart_signals.go so it can be tested here.
func (h *WorkspaceHandler) rewriteForDocker(agentURL, workspaceID string) string {
return rewriteForDocker(agentURL, workspaceID)
}
// (the previous test-only shim wrapping a package-level rewriteForDocker
// has been removed: production rewriteForDocker is now a *WorkspaceHandler
// method directly — see internal/handlers/restart_signals.go.)

View File

@ -9,7 +9,8 @@ package plugins
// 1. SELECTs workspace_plugins rows where tracked_ref != 'none'
// AND installed_sha IS NOT NULL (skip pre-migration rows with NULL SHA).
// 2. For each row, resolves the tracked ref to its current upstream SHA
// using the appropriate SourceResolver.
// using the appropriate per-scheme SourceResolver (via the
// RegistryResolver passed in at sweeper start).
// 3. If the resolved SHA differs from installed_sha → drift detected.
// 4. On drift, INSERT INTO plugin_update_queue (ON CONFLICT DO NOTHING so
// a re-drift while a row is still pending is a no-op).
@ -61,10 +62,21 @@ const DriftSweepInterval = 1 * time.Hour
// that handles Gitea instances on high-latency links.
const ResolveRefDeadline = 60 * time.Second
// SourceResolver resolves plugin sources to installable directories.
// Satisfied by *Registry (which wraps GithubResolver + LocalResolver).
type SourceResolver interface {
Resolve(source Source) (SourceResolver, error)
// RegistryResolver is the slim shape of a plugin source-scheme registry that
// the drift sweeper depends on. It exists distinct from `SourceResolver` (the
// per-scheme fetcher interface in source.go) so the two type names don't
// collide inside this package — historically both were named `SourceResolver`,
// which broke `go build` (issue: docker build fails with "SourceResolver
// redeclared in this block"). Satisfied by *Registry, which holds the set of
// per-scheme resolvers and exposes the list of registered scheme prefixes.
//
// Only `Schemes()` is used by the sweeper today (to strip the scheme prefix
// from `source_raw` before handing the spec to GithubResolver). If the
// sweeper ever needs to dispatch via the registry (e.g. to support
// non-github schemes for drift detection), add the resolution method back
// here — but keep it returning `SourceResolver` so it stays compatible with
// `*Registry.Resolve`.
type RegistryResolver interface {
Schemes() []string
}
@ -74,7 +86,7 @@ type SourceResolver interface {
//
// Registers itself via atexits in cmd/server/main.go so the process
// shuts down cleanly on SIGTERM.
func StartPluginDriftSweeper(ctx context.Context, resolver SourceResolver) {
func StartPluginDriftSweeper(ctx context.Context, resolver RegistryResolver) {
if resolver == nil {
log.Println("Plugin drift sweeper: resolver is nil — sweeper disabled")
return
@ -107,7 +119,7 @@ func StartPluginDriftSweeper(ctx context.Context, resolver SourceResolver) {
// sweepDriftOnce runs one full drift-detection cycle.
// Errors are non-fatal — each row is handled independently so a single
// slow row doesn't block the rest of the sweep.
func sweepDriftOnce(parent context.Context, resolver SourceResolver) {
func sweepDriftOnce(parent context.Context, resolver RegistryResolver) {
ctx, cancel := context.WithTimeout(parent, 10*time.Minute)
defer cancel()
@ -170,7 +182,7 @@ func sweepDriftOnce(parent context.Context, resolver SourceResolver) {
// resolveLatestSHA resolves the tracked ref to its current upstream SHA.
// Handles both github:// and local:// sources; local sources are skipped
// (no meaningful upstream to drift against).
func resolveLatestSHA(ctx context.Context, resolver SourceResolver, sourceRaw, trackedRef string) (string, error) {
func resolveLatestSHA(ctx context.Context, resolver RegistryResolver, sourceRaw, trackedRef string) (string, error) {
// Strip the scheme prefix to get the raw spec.
// sourceRaw is stored as the full string, e.g. "github://owner/repo#tag:v1.0.0"
spec := sourceRaw
@ -231,7 +243,7 @@ func queueDriftEntry(ctx context.Context, workspaceID, pluginName, trackedRef, c
// ─────────────────────────────────────────────────────────────────────────────
// SweepDriftOnceForTest exposes sweepDriftOnce for package-level testing.
func SweepDriftOnceForTest(parent context.Context, resolver SourceResolver) {
func SweepDriftOnceForTest(parent context.Context, resolver RegistryResolver) {
sweepDriftOnce(parent, resolver)
}

View File

@ -2,20 +2,17 @@ package plugins
import (
"context"
"database/sql"
"errors"
"testing"
)
// stubResolver is a SourceResolver that always returns a stub github resolver.
// stubResolver satisfies plugins.RegistryResolver — the slim shape the
// drift sweeper consumes (just `Schemes()`). Tests that dispatch by scheme
// build a per-scheme SourceResolver directly via NewGithubResolver().
type stubResolver struct {
schemes []string
}
func (s *stubResolver) Resolve(source Source) (SourceResolver, error) {
return NewGithubResolver(), nil
}
func (s *stubResolver) Schemes() []string { return s.schemes }
func TestResolveRef_RejectsBareSpec(t *testing.T) {
@ -156,8 +153,10 @@ func TestPluginUpdateQueueRow_Struct(t *testing.T) {
}
}
// TestSourceResolverInterface_StubResolver verifies that a stub resolver
// satisfies the SourceResolver interface.
func TestSourceResolverInterface_StubResolver(t *testing.T) {
var _ SourceResolver = (*stubResolver)(nil)
// TestRegistryResolverInterface_StubResolver verifies that a stub resolver
// satisfies the RegistryResolver interface — the slim shape the drift
// sweeper consumes. (The per-scheme SourceResolver interface lives in
// source.go and is exercised by GithubResolver / LocalResolver tests.)
func TestRegistryResolverInterface_StubResolver(t *testing.T) {
var _ RegistryResolver = (*stubResolver)(nil)
}

View File

@ -501,12 +501,10 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// Admin — plugin version-subscription drift queue (core#123).
// List pending drift entries and apply approved updates.
{
driftH := handlers.NewAdminPluginDriftHandler(plgh)
adminAuth := r.Group("", middleware.AdminAuth(db.DB))
adminAuth.GET("/admin/plugin-updates-pending", driftH.ListPending)
adminAuth.POST("/admin/plugin-updates/:id/apply", driftH.Apply)
}
// Moved below plgh declaration — was previously here but referenced
// `plgh` before its `:=` at the same function level, which started
// failing once the upstream `SourceResolver` redeclaration was fixed
// (the prior compile-time block was masking the forward reference).
// Admin — test token minting (issue #6). Hidden in production via TestTokensEnabled().
// NOT behind AdminAuth — this is the bootstrap endpoint E2E tests and
@ -646,6 +644,19 @@ func Setup(hub *ws.Hub, broadcaster *events.Broadcaster, prov *provisioner.Provi
// unpack locally instead of going through Docker exec.
wsAuth.GET("/plugins/:name/download", plgh.Download)
// Admin — plugin version-subscription drift queue (core#123).
// List pending drift entries and apply approved updates.
// Wired here (after plgh) rather than in the admin block above so the
// `plgh` reference resolves — the previous placement was a forward
// reference, masked by the upstream `SourceResolver` redeclaration in
// internal/plugins/drift_sweeper.go.
{
driftH := handlers.NewAdminPluginDriftHandler(plgh)
adminAuth := r.Group("", middleware.AdminAuth(db.DB))
adminAuth.GET("/admin/plugin-updates-pending", driftH.ListPending)
adminAuth.POST("/admin/plugin-updates/:id/apply", driftH.Apply)
}
// Bundles — #164 + #165: both gated behind AdminAuth.
// POST /bundles/import — CRITICAL: anon creation of arbitrary workspaces
// with user-supplied config (system prompts,