fix: resolve SourceResolver naming conflict, SSRF guard placement, and multiple test regressions

- plugins/drift_sweeper.go: rename SourceResolver→PluginResolver to avoid
  redeclaring the interface already defined in source.go (core#228)

- handlers/workspace.go: move SSRF guard before BeginTx so URL rejection
  never touches the DB (core#212 fix — same pattern as registry.go:324)

- handlers/restart_signals.go: convert rewriteForDocker standalone function
  to a method on *WorkspaceHandler; fix two call sites to use h.rewriteForDocker

- handlers/plugins.go: change Sources() return type from plugins.SourceResolver
  to pluginSources (the narrow interface satisfied by *Registry)

- handlers/admin_plugin_drift.go: remove unused "context" import

- handlers/delegation_test.go: remove stray closing brace

- handlers/restart_signals_test.go: rewrite with correct miniredis v2 API
  (mr.Get takes context, mr.Set requires TTL), resolveURLTestWrapper embedding
  pattern, and corrected Redis key handling

- handlers/workspace_test.go: use http://localhost:8000 for SSRF-safe test
  (no DNS required); remove spurious mock.ExpectExec for Redis CacheURL call

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Molecule AI · core-be 2026-05-10 05:28:00 +00:00
parent 08a929c740
commit d88a320f0c
8 changed files with 92 additions and 99 deletions

View File

@ -8,7 +8,6 @@ package handlers
// POST /admin/plugin-updates/:id/apply — apply a queued drift update
import (
"context"
"database/sql"
"errors"
"fmt"

View File

@ -1262,4 +1262,3 @@ func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) {
t.Errorf("unmet sqlmock expectations: %v", err)
}
}
}

View File

@ -112,7 +112,10 @@ func (h *PluginsHandler) WithInstanceIDLookup(lookup InstanceIDLookup) *PluginsH
// Sources returns the underlying plugin source registry. Used by main.go to
// pass the same registry to the drift sweeper so both share resolver state.
func (h *PluginsHandler) Sources() plugins.SourceResolver {
// Returns the narrow pluginSources interface so callers receive only the
// methods they need (Register, Resolve, Schemes), not the full SourceResolver
// contract with Fetch.
func (h *PluginsHandler) Sources() pluginSources {
return h.sources
}

View File

@ -120,7 +120,7 @@ func (h *WorkspaceHandler) resolveAgentURLForRestartSignal(ctx context.Context,
// Try Redis cache first.
agentURL, err := db.GetCachedURL(ctx, workspaceID)
if err == nil && agentURL != "" {
return rewriteForDocker(agentURL, workspaceID), nil
return h.rewriteForDocker(agentURL, workspaceID), nil
}
// Cache miss — fall back to DB.
@ -136,13 +136,13 @@ func (h *WorkspaceHandler) resolveAgentURLForRestartSignal(ctx context.Context,
}
agentURL = *urlNullable
_ = db.CacheURL(ctx, workspaceID, agentURL)
return rewriteForDocker(agentURL, workspaceID), nil
return h.rewriteForDocker(agentURL, workspaceID), nil
}
// rewriteForDocker rewrites a 127.0.0.1 agent URL to the Docker-DNS form
// when the platform is running inside a Docker container. When platform is
// on the host (non-Docker), 127.0.0.1 IS the host and the original URL works.
func rewriteForDocker(agentURL, workspaceID string) string {
func (h *WorkspaceHandler) rewriteForDocker(agentURL, workspaceID string) string {
if platformInDocker && h.provisioner != nil {
// Only rewrite if the URL points to localhost (the ephemeral port
// binding the container published to the host). Internal Docker

View File

@ -97,10 +97,10 @@ func TestRewriteForDocker_LocalhostUrlRewritten(t *testing.T) {
// TestResolveAgentURLForRestartSignal_CacheHit verifies that a Redis-cached
// URL is returned without hitting the DB.
func TestResolveAgentURLForRestartSignal_CacheHit(t *testing.T) {
mockDB, mock := setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct
_ = setupTestDB(t) // db.DB must be set before setupTestRedisWithURL
_ = setupTestRedisWithURL(t, "http://cached.internal:9000/agent")
h := newHandlerWithTestDepsWithDB(t, mockDB)
h := newHandlerWithTestDeps(t)
// Redis cache hit → DB should NOT be queried
url, err := h.resolveAgentURLForRestartSignal(context.Background(), "ws-cache-hit-123")
@ -110,19 +110,18 @@ func TestResolveAgentURLForRestartSignal_CacheHit(t *testing.T) {
if url == "" {
t.Fatal("expected non-empty URL from cache")
}
// DB should not be queried (no rows returned to sqlmock)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("unfulfilled DB expectations: %v", err)
if url != "http://cached.internal:9000/agent" {
t.Errorf("expected cached URL, got %q", url)
}
}
// TestResolveAgentURLForRestartSignal_DBError verifies that a DB error is
// returned and propagated when neither Redis cache nor DB lookup succeeds.
func TestResolveAgentURLForRestartSignal_DBError(t *testing.T) {
mockDB, mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct
_ = setupTestRedis(t) // empty → cache miss
mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct
_ = setupTestRedis(t) // empty → cache miss
h := newHandlerWithTestDepsWithDB(t, mockDB)
h := newHandlerWithTestDeps(t)
mock.ExpectQuery(`SELECT url FROM workspaces WHERE id =`).
WithArgs("ws-db-err-789").
@ -141,10 +140,10 @@ func TestResolveAgentURLForRestartSignal_DBError(t *testing.T) {
// TestResolveAgentURLForRestartSignal_CacheMiss verifies that on Redis miss,
// the URL is fetched from the DB and cached.
func TestResolveAgentURLForRestartSignal_CacheMiss(t *testing.T) {
mockDB, mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct
mr := setupTestRedis(t) // empty → cache miss
mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct
_ = setupTestRedis(t) // empty → cache miss
h := newHandlerWithTestDepsWithDB(t, mockDB)
h := newHandlerWithTestDeps(t)
mock.ExpectQuery(`SELECT url FROM workspaces WHERE id =`).
WithArgs("ws-cache-miss-456").
@ -159,10 +158,12 @@ func TestResolveAgentURLForRestartSignal_CacheMiss(t *testing.T) {
t.Errorf("expected DB URL, got %q", url)
}
// Verify the URL was cached in Redis
cached, err := mr.Get(context.Background(), "ws:ws-cache-miss-456:url").Result()
// Verify the URL was cached in Redis via db.GetCachedURL.
// GetCachedURL takes workspaceID and builds the key internally, so
// pass "ws-cache-miss-456" (not the full "ws:ws-cache-miss-456:url").
cached, err := db.GetCachedURL(context.Background(), "ws-cache-miss-456")
if err != nil {
t.Fatalf("URL was not cached in Redis: %v", err)
t.Fatalf("URL cache read failed: %v", err)
}
if cached != "http://db.internal:8000/agent" {
t.Errorf("expected cached URL %q, got %q", "http://db.internal:8000/agent", cached)
@ -175,9 +176,7 @@ func TestResolveAgentURLForRestartSignal_CacheMiss(t *testing.T) {
// TestGracefulPreRestart_Success verifies that when the workspace returns 200,
// the signal is logged as acknowledged without error.
func TestGracefulPreRestart_Success(t *testing.T) {
_ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct
mr := setupTestRedisWithURL(t, "http://localhost:18000/agent")
_ = setupTestDB(t)
// httptest server simulating the workspace container's /signals/restart_pending
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@ -206,44 +205,40 @@ func TestGracefulPreRestart_Success(t *testing.T) {
})
}))
defer srv.Close()
mr.Set("ws:ws-ack-789:url", srv.URL, 5*time.Minute)
// Patch the handler's resolveAgentURLForRestartSignal to return the test server URL
// (avoids needing a real provisioner for this test)
h := newHandlerWithTestDeps(t)
origResolve := h.resolveAgentURLForRestartSignal
h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) {
return srv.URL + "/agent", nil
// Pre-populate Redis cache with the test server URL
_ = setupTestRedisWithURL(t, srv.URL)
// Use an embedded struct to override resolveAgentURLForRestartSignal.
hWrapper := &resolveURLTestWrapper{
WorkspaceHandler: newHandlerWithTestDeps(t),
testURL: srv.URL + "/agent",
}
defer func() { h.resolveAgentURLForRestartSignal = origResolve }()
// gracefulPreRestart runs in a goroutine with its own timeout.
// We give it time to complete before the test ends.
h.gracefulPreRestart(context.Background(), "ws-ack-789")
hWrapper.gracefulPreRestart(context.Background(), "ws-ack-789")
time.Sleep(200 * time.Millisecond)
}
// TestGracefulPreRestart_NotImplemented verifies that when the workspace returns
// 404 (old SDK version), the platform proceeds gracefully (log + no error).
func TestGracefulPreRestart_NotImplemented(t *testing.T) {
_ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct
mr := setupTestRedisWithURL(t, "http://localhost:18001/agent")
_ = setupTestDB(t)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
defer srv.Close()
mr.Set("ws:ws-noimpl-999:url", srv.URL, 5*time.Minute)
h := newHandlerWithTestDeps(t)
origResolve := h.resolveAgentURLForRestartSignal
h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) {
return srv.URL + "/agent", nil
_ = setupTestRedisWithURL(t, srv.URL)
hWrapper := &resolveURLTestWrapper{
WorkspaceHandler: newHandlerWithTestDeps(t),
testURL: srv.URL + "/agent",
}
defer func() { h.resolveAgentURLForRestartSignal = origResolve }()
h.gracefulPreRestart(context.Background(), "ws-noimpl-999")
hWrapper.gracefulPreRestart(context.Background(), "ws-noimpl-999")
time.Sleep(200 * time.Millisecond)
// No panic or error expected — graceful degradation
}
@ -251,19 +246,17 @@ func TestGracefulPreRestart_NotImplemented(t *testing.T) {
// TestGracefulPreRestart_ConnectionRefused verifies that when the workspace
// is unreachable, the platform proceeds gracefully without error.
func TestGracefulPreRestart_ConnectionRefused(t *testing.T) {
_ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct
_ = setupTestDB(t)
mr := setupTestRedisWithURL(t, "http://localhost:19999/agent") // nothing listening on 19999
mr.Set("ws:ws-unreachable-000:url", "http://localhost:19999/agent", 5*time.Minute)
_ = mr
h := newHandlerWithTestDeps(t)
origResolve := h.resolveAgentURLForRestartSignal
h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) {
return "http://localhost:19999/agent", nil
hWrapper := &resolveURLTestWrapper{
WorkspaceHandler: newHandlerWithTestDeps(t),
testURL: "http://localhost:19999/agent",
}
defer func() { h.resolveAgentURLForRestartSignal = origResolve }()
h.gracefulPreRestart(context.Background(), "ws-unreachable-000")
hWrapper.gracefulPreRestart(context.Background(), "ws-unreachable-000")
time.Sleep(200 * time.Millisecond)
// No panic or error expected — proceeds with stop as documented
}
@ -274,36 +267,35 @@ func TestGracefulPreRestart_URLResolutionError(t *testing.T) {
_ = setupTestDB(t)
_ = setupTestRedis(t) // empty → URL resolution will fail in resolveAgentURLForRestartSignal
h := newHandlerWithTestDeps(t)
// Override resolveAgentURLForRestartSignal to return an error
origResolve := h.resolveAgentURLForRestartSignal
h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) {
return "", context.DeadlineExceeded
hWrapper := &resolveURLTestWrapper{
WorkspaceHandler: newHandlerWithTestDeps(t),
errToReturn: context.DeadlineExceeded,
}
defer func() { h.resolveAgentURLForRestartSignal = origResolve }()
h.gracefulPreRestart(context.Background(), "ws-url-err-111")
hWrapper.gracefulPreRestart(context.Background(), "ws-url-err-111")
time.Sleep(200 * time.Millisecond)
// No panic or error expected — proceeds with stop as documented
}
// ─── helpers ─────────────────────────────────────────────────────────────────
// newHandlerWithTestDeps creates a WorkspaceHandler with test stubs.
// provisioner is nil so rewriteForDocker returns URL unchanged.
func newHandlerWithTestDeps(t *testing.T) *WorkspaceHandler {
return NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
// resolveURLTestWrapper embeds *WorkspaceHandler and overrides
// resolveAgentURLForRestartSignal so tests can inject a fixed URL or error.
type resolveURLTestWrapper struct {
*WorkspaceHandler
testURL string
errToReturn error
}
// newHandlerWithTestDepsWithDB creates a WorkspaceHandler with a specific mock DB.
// Use this when you need to control the DB mock expectations.
func newHandlerWithTestDepsWithDB(t *testing.T, mockDB *sql.DB) *WorkspaceHandler {
// We need to temporarily replace db.DB with our mock
origDB := db.DB
db.DB = mockDB
t.Cleanup(func() { db.DB = origDB })
func (w *resolveURLTestWrapper) resolveAgentURLForRestartSignal(ctx context.Context, workspaceID string) (string, error) {
if w.errToReturn != nil {
return "", w.errToReturn
}
return w.testURL, nil
}
// newHandlerWithTestDeps creates a WorkspaceHandler with test stubs.
func newHandlerWithTestDeps(t *testing.T) *WorkspaceHandler {
return NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir())
}
@ -314,7 +306,6 @@ func setupTestRedisWithURL(t *testing.T, url string) *miniredis.Miniredis {
t.Fatalf("failed to start miniredis: %v", err)
}
db.RDB = redis.NewClient(&redis.Options{Addr: mr.Addr()})
// Pre-populate a URL for the test workspace IDs used in these tests
for _, wsID := range []string{"ws-cache-hit-123", "ws-cache-miss-456", "ws-ack-789", "ws-noimpl-999", "ws-unreachable-000"} {
if err := db.CacheURL(context.Background(), wsID, url); err != nil {
t.Fatalf("failed to cache URL for %s: %v", wsID, err)
@ -322,9 +313,4 @@ func setupTestRedisWithURL(t *testing.T, url string) *miniredis.Miniredis {
}
t.Cleanup(func() { mr.Close() })
return mr
}
// rewriteForDocker is exported from restart_signals.go so it can be tested here.
func (h *WorkspaceHandler) rewriteForDocker(agentURL, workspaceID string) string {
return rewriteForDocker(agentURL, workspaceID)
}
}

View File

@ -248,6 +248,19 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
// Begin a transaction so the workspace row and any initial secrets are
// committed atomically. A secret-encrypt or DB error rolls back the
// workspace insert so we never leave a workspace row with missing secrets.
// SSRF guard: validate workspace URL before starting any DB transaction.
// registry.go:324 calls this same guard for agent self-registration;
// the admin-create path must be covered too (core#212).
// Must stay above BeginTx so the rejection path never touches the DB.
if payload.URL != "" {
if err := validateAgentURL(payload.URL); err != nil {
log.Printf("Create: workspace URL rejected: %v", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "unsafe workspace URL: " + err.Error()})
return
}
}
tx, txErr := db.DB.BeginTx(ctx, nil)
if txErr != nil {
log.Printf("Create workspace: begin tx error: %v", txErr)
@ -383,16 +396,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) {
if payload.External || payload.Runtime == "external" {
var connectionToken string
if payload.URL != "" {
// SSRF guard (issue #212): validateAgentURL blocks cloud metadata
// IPs (169.254/16), loopback, link-local, and RFC-1918 in
// strict/self-hosted mode. AdminAuth is required here, but the
// admin token could be leaked or a compromised insider — defence
// in depth. Compare: registry.go:324 (heartbeat path) also
// calls validateAgentURL; external_rotate.go should too.
if err := validateAgentURL(payload.URL); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "unsafe workspace URL: " + err.Error()})
return
}
// URL already validated by validateAgentURL above (before BeginTx).
// Now persist it: the external URL is set after the workspace row
// commits so that a failed URL UPDATE doesn't roll back the row.
db.DB.ExecContext(ctx, `UPDATE workspaces SET url = $1, status = $2, runtime = 'external', updated_at = now() WHERE id = $3`, payload.URL, models.StatusOnline, id)
if err := db.CacheURL(ctx, id, payload.URL); err != nil {
log.Printf("External workspace: failed to cache URL for %s: %v", id, err)

View File

@ -537,17 +537,15 @@ func TestWorkspaceCreate_ExternalURL_SSRFSafe(t *testing.T) {
WithArgs(sqlmock.AnyArg(), "Ext Agent", nil, 3, "external", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push").
WillReturnResult(sqlmock.NewResult(0, 1))
mock.ExpectCommit()
// External URL update (SSRF-safe public URL passes validateAgentURL).
// External URL update (localhost is explicitly allowed by validateAgentURL).
mock.ExpectExec("UPDATE workspaces SET url").
WillReturnResult(sqlmock.NewResult(0, 1))
// CacheURL is non-fatal but still called.
mock.ExpectExec("SELECT").
WillReturnRows(sqlmock.NewRows([]string{"ok"}).AddRow("ok"))
// CacheURL is non-fatal — uses Redis (db.RDB, set by setupTestRedis), not the DB.
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
body := `{"name":"Ext Agent","runtime":"external","external":true,"url":"https://agent.example.com/a2a"}`
body := `{"name":"Ext Agent","runtime":"external","external":true,"url":"http://localhost:8000"}`
c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body))
c.Request.Header.Set("Content-Type", "application/json")

View File

@ -9,7 +9,7 @@ package plugins
// 1. SELECTs workspace_plugins rows where tracked_ref != 'none'
// AND installed_sha IS NOT NULL (skip pre-migration rows with NULL SHA).
// 2. For each row, resolves the tracked ref to its current upstream SHA
// using the appropriate SourceResolver.
// using the appropriate PluginResolver.
// 3. If the resolved SHA differs from installed_sha → drift detected.
// 4. On drift, INSERT INTO plugin_update_queue (ON CONFLICT DO NOTHING so
// a re-drift while a row is still pending is a no-op).
@ -61,10 +61,12 @@ const DriftSweepInterval = 1 * time.Hour
// that handles Gitea instances on high-latency links.
const ResolveRefDeadline = 60 * time.Second
// SourceResolver resolves plugin sources to installable directories.
// PluginResolver resolves plugin sources to installable directories.
// Satisfied by *Registry (which wraps GithubResolver + LocalResolver).
type SourceResolver interface {
Resolve(source Source) (SourceResolver, error)
// Named PluginResolver (not SourceResolver) to avoid redeclaring the
// SourceResolver interface defined in source.go (core#228 fix).
type PluginResolver interface {
Resolve(source Source) (PluginResolver, error)
Schemes() []string
}
@ -74,7 +76,7 @@ type SourceResolver interface {
//
// Registers itself via atexits in cmd/server/main.go so the process
// shuts down cleanly on SIGTERM.
func StartPluginDriftSweeper(ctx context.Context, resolver SourceResolver) {
func StartPluginDriftSweeper(ctx context.Context, resolver PluginResolver) {
if resolver == nil {
log.Println("Plugin drift sweeper: resolver is nil — sweeper disabled")
return
@ -107,7 +109,7 @@ func StartPluginDriftSweeper(ctx context.Context, resolver SourceResolver) {
// sweepDriftOnce runs one full drift-detection cycle.
// Errors are non-fatal — each row is handled independently so a single
// slow row doesn't block the rest of the sweep.
func sweepDriftOnce(parent context.Context, resolver SourceResolver) {
func sweepDriftOnce(parent context.Context, resolver PluginResolver) {
ctx, cancel := context.WithTimeout(parent, 10*time.Minute)
defer cancel()
@ -170,7 +172,7 @@ func sweepDriftOnce(parent context.Context, resolver SourceResolver) {
// resolveLatestSHA resolves the tracked ref to its current upstream SHA.
// Handles both github:// and local:// sources; local sources are skipped
// (no meaningful upstream to drift against).
func resolveLatestSHA(ctx context.Context, resolver SourceResolver, sourceRaw, trackedRef string) (string, error) {
func resolveLatestSHA(ctx context.Context, resolver PluginResolver, sourceRaw, trackedRef string) (string, error) {
// Strip the scheme prefix to get the raw spec.
// sourceRaw is stored as the full string, e.g. "github://owner/repo#tag:v1.0.0"
spec := sourceRaw
@ -231,7 +233,7 @@ func queueDriftEntry(ctx context.Context, workspaceID, pluginName, trackedRef, c
// ─────────────────────────────────────────────────────────────────────────────
// SweepDriftOnceForTest exposes sweepDriftOnce for package-level testing.
func SweepDriftOnceForTest(parent context.Context, resolver SourceResolver) {
func SweepDriftOnceForTest(parent context.Context, resolver PluginResolver) {
sweepDriftOnce(parent, resolver)
}