diff --git a/workspace-server/internal/handlers/admin_plugin_drift.go b/workspace-server/internal/handlers/admin_plugin_drift.go index 1082c1d6..3ceb1166 100644 --- a/workspace-server/internal/handlers/admin_plugin_drift.go +++ b/workspace-server/internal/handlers/admin_plugin_drift.go @@ -8,7 +8,6 @@ package handlers // POST /admin/plugin-updates/:id/apply — apply a queued drift update import ( - "context" "database/sql" "errors" "fmt" diff --git a/workspace-server/internal/handlers/delegation_test.go b/workspace-server/internal/handlers/delegation_test.go index 427e71b2..38c63206 100644 --- a/workspace-server/internal/handlers/delegation_test.go +++ b/workspace-server/internal/handlers/delegation_test.go @@ -1262,4 +1262,3 @@ func TestExecuteDelegation_CleanProxyResponse_Unchanged(t *testing.T) { t.Errorf("unmet sqlmock expectations: %v", err) } } -} diff --git a/workspace-server/internal/handlers/plugins.go b/workspace-server/internal/handlers/plugins.go index 78e182ba..d26db674 100644 --- a/workspace-server/internal/handlers/plugins.go +++ b/workspace-server/internal/handlers/plugins.go @@ -112,7 +112,10 @@ func (h *PluginsHandler) WithInstanceIDLookup(lookup InstanceIDLookup) *PluginsH // Sources returns the underlying plugin source registry. Used by main.go to // pass the same registry to the drift sweeper so both share resolver state. -func (h *PluginsHandler) Sources() plugins.SourceResolver { +// Returns the narrow pluginSources interface so callers receive only the +// methods they need (Register, Resolve, Schemes), not the full SourceResolver +// contract with Fetch. +func (h *PluginsHandler) Sources() pluginSources { return h.sources } diff --git a/workspace-server/internal/handlers/restart_signals.go b/workspace-server/internal/handlers/restart_signals.go index 81cb9200..a947a560 100644 --- a/workspace-server/internal/handlers/restart_signals.go +++ b/workspace-server/internal/handlers/restart_signals.go @@ -120,7 +120,7 @@ func (h *WorkspaceHandler) resolveAgentURLForRestartSignal(ctx context.Context, // Try Redis cache first. agentURL, err := db.GetCachedURL(ctx, workspaceID) if err == nil && agentURL != "" { - return rewriteForDocker(agentURL, workspaceID), nil + return h.rewriteForDocker(agentURL, workspaceID), nil } // Cache miss — fall back to DB. @@ -136,13 +136,13 @@ func (h *WorkspaceHandler) resolveAgentURLForRestartSignal(ctx context.Context, } agentURL = *urlNullable _ = db.CacheURL(ctx, workspaceID, agentURL) - return rewriteForDocker(agentURL, workspaceID), nil + return h.rewriteForDocker(agentURL, workspaceID), nil } // rewriteForDocker rewrites a 127.0.0.1 agent URL to the Docker-DNS form // when the platform is running inside a Docker container. When platform is // on the host (non-Docker), 127.0.0.1 IS the host and the original URL works. -func rewriteForDocker(agentURL, workspaceID string) string { +func (h *WorkspaceHandler) rewriteForDocker(agentURL, workspaceID string) string { if platformInDocker && h.provisioner != nil { // Only rewrite if the URL points to localhost (the ephemeral port // binding the container published to the host). Internal Docker diff --git a/workspace-server/internal/handlers/restart_signals_test.go b/workspace-server/internal/handlers/restart_signals_test.go index d9278e2c..196170a9 100644 --- a/workspace-server/internal/handlers/restart_signals_test.go +++ b/workspace-server/internal/handlers/restart_signals_test.go @@ -97,10 +97,10 @@ func TestRewriteForDocker_LocalhostUrlRewritten(t *testing.T) { // TestResolveAgentURLForRestartSignal_CacheHit verifies that a Redis-cached // URL is returned without hitting the DB. func TestResolveAgentURLForRestartSignal_CacheHit(t *testing.T) { - mockDB, mock := setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct + _ = setupTestDB(t) // db.DB must be set before setupTestRedisWithURL _ = setupTestRedisWithURL(t, "http://cached.internal:9000/agent") - h := newHandlerWithTestDepsWithDB(t, mockDB) + h := newHandlerWithTestDeps(t) // Redis cache hit → DB should NOT be queried url, err := h.resolveAgentURLForRestartSignal(context.Background(), "ws-cache-hit-123") @@ -110,19 +110,18 @@ func TestResolveAgentURLForRestartSignal_CacheHit(t *testing.T) { if url == "" { t.Fatal("expected non-empty URL from cache") } - // DB should not be queried (no rows returned to sqlmock) - if err := mock.ExpectationsWereMet(); err != nil { - t.Errorf("unfulfilled DB expectations: %v", err) + if url != "http://cached.internal:9000/agent" { + t.Errorf("expected cached URL, got %q", url) } } // TestResolveAgentURLForRestartSignal_DBError verifies that a DB error is // returned and propagated when neither Redis cache nor DB lookup succeeds. func TestResolveAgentURLForRestartSignal_DBError(t *testing.T) { - mockDB, mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct - _ = setupTestRedis(t) // empty → cache miss + mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct + _ = setupTestRedis(t) // empty → cache miss - h := newHandlerWithTestDepsWithDB(t, mockDB) + h := newHandlerWithTestDeps(t) mock.ExpectQuery(`SELECT url FROM workspaces WHERE id =`). WithArgs("ws-db-err-789"). @@ -141,10 +140,10 @@ func TestResolveAgentURLForRestartSignal_DBError(t *testing.T) { // TestResolveAgentURLForRestartSignal_CacheMiss verifies that on Redis miss, // the URL is fetched from the DB and cached. func TestResolveAgentURLForRestartSignal_CacheMiss(t *testing.T) { - mockDB, mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct - mr := setupTestRedis(t) // empty → cache miss + mock := setupTestDB(t) // must come before setupTestRedis so db.DB is correct + _ = setupTestRedis(t) // empty → cache miss - h := newHandlerWithTestDepsWithDB(t, mockDB) + h := newHandlerWithTestDeps(t) mock.ExpectQuery(`SELECT url FROM workspaces WHERE id =`). WithArgs("ws-cache-miss-456"). @@ -159,10 +158,12 @@ func TestResolveAgentURLForRestartSignal_CacheMiss(t *testing.T) { t.Errorf("expected DB URL, got %q", url) } - // Verify the URL was cached in Redis - cached, err := mr.Get(context.Background(), "ws:ws-cache-miss-456:url").Result() + // Verify the URL was cached in Redis via db.GetCachedURL. + // GetCachedURL takes workspaceID and builds the key internally, so + // pass "ws-cache-miss-456" (not the full "ws:ws-cache-miss-456:url"). + cached, err := db.GetCachedURL(context.Background(), "ws-cache-miss-456") if err != nil { - t.Fatalf("URL was not cached in Redis: %v", err) + t.Fatalf("URL cache read failed: %v", err) } if cached != "http://db.internal:8000/agent" { t.Errorf("expected cached URL %q, got %q", "http://db.internal:8000/agent", cached) @@ -175,9 +176,7 @@ func TestResolveAgentURLForRestartSignal_CacheMiss(t *testing.T) { // TestGracefulPreRestart_Success verifies that when the workspace returns 200, // the signal is logged as acknowledged without error. func TestGracefulPreRestart_Success(t *testing.T) { - _ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct - - mr := setupTestRedisWithURL(t, "http://localhost:18000/agent") + _ = setupTestDB(t) // httptest server simulating the workspace container's /signals/restart_pending srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -206,44 +205,40 @@ func TestGracefulPreRestart_Success(t *testing.T) { }) })) defer srv.Close() - mr.Set("ws:ws-ack-789:url", srv.URL, 5*time.Minute) - // Patch the handler's resolveAgentURLForRestartSignal to return the test server URL - // (avoids needing a real provisioner for this test) - h := newHandlerWithTestDeps(t) - origResolve := h.resolveAgentURLForRestartSignal - h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) { - return srv.URL + "/agent", nil + // Pre-populate Redis cache with the test server URL + _ = setupTestRedisWithURL(t, srv.URL) + + // Use an embedded struct to override resolveAgentURLForRestartSignal. + hWrapper := &resolveURLTestWrapper{ + WorkspaceHandler: newHandlerWithTestDeps(t), + testURL: srv.URL + "/agent", } - defer func() { h.resolveAgentURLForRestartSignal = origResolve }() // gracefulPreRestart runs in a goroutine with its own timeout. // We give it time to complete before the test ends. - h.gracefulPreRestart(context.Background(), "ws-ack-789") + hWrapper.gracefulPreRestart(context.Background(), "ws-ack-789") time.Sleep(200 * time.Millisecond) } // TestGracefulPreRestart_NotImplemented verifies that when the workspace returns // 404 (old SDK version), the platform proceeds gracefully (log + no error). func TestGracefulPreRestart_NotImplemented(t *testing.T) { - _ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct - - mr := setupTestRedisWithURL(t, "http://localhost:18001/agent") + _ = setupTestDB(t) srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotFound) })) defer srv.Close() - mr.Set("ws:ws-noimpl-999:url", srv.URL, 5*time.Minute) - h := newHandlerWithTestDeps(t) - origResolve := h.resolveAgentURLForRestartSignal - h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) { - return srv.URL + "/agent", nil + _ = setupTestRedisWithURL(t, srv.URL) + + hWrapper := &resolveURLTestWrapper{ + WorkspaceHandler: newHandlerWithTestDeps(t), + testURL: srv.URL + "/agent", } - defer func() { h.resolveAgentURLForRestartSignal = origResolve }() - h.gracefulPreRestart(context.Background(), "ws-noimpl-999") + hWrapper.gracefulPreRestart(context.Background(), "ws-noimpl-999") time.Sleep(200 * time.Millisecond) // No panic or error expected — graceful degradation } @@ -251,19 +246,17 @@ func TestGracefulPreRestart_NotImplemented(t *testing.T) { // TestGracefulPreRestart_ConnectionRefused verifies that when the workspace // is unreachable, the platform proceeds gracefully without error. func TestGracefulPreRestart_ConnectionRefused(t *testing.T) { - _ = setupTestDB(t) // must come before setupTestRedisWithURL so db.DB is correct + _ = setupTestDB(t) mr := setupTestRedisWithURL(t, "http://localhost:19999/agent") // nothing listening on 19999 - mr.Set("ws:ws-unreachable-000:url", "http://localhost:19999/agent", 5*time.Minute) + _ = mr - h := newHandlerWithTestDeps(t) - origResolve := h.resolveAgentURLForRestartSignal - h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) { - return "http://localhost:19999/agent", nil + hWrapper := &resolveURLTestWrapper{ + WorkspaceHandler: newHandlerWithTestDeps(t), + testURL: "http://localhost:19999/agent", } - defer func() { h.resolveAgentURLForRestartSignal = origResolve }() - h.gracefulPreRestart(context.Background(), "ws-unreachable-000") + hWrapper.gracefulPreRestart(context.Background(), "ws-unreachable-000") time.Sleep(200 * time.Millisecond) // No panic or error expected — proceeds with stop as documented } @@ -274,36 +267,35 @@ func TestGracefulPreRestart_URLResolutionError(t *testing.T) { _ = setupTestDB(t) _ = setupTestRedis(t) // empty → URL resolution will fail in resolveAgentURLForRestartSignal - h := newHandlerWithTestDeps(t) - - // Override resolveAgentURLForRestartSignal to return an error - origResolve := h.resolveAgentURLForRestartSignal - h.resolveAgentURLForRestartSignal = func(ctx context.Context, wsID string) (string, error) { - return "", context.DeadlineExceeded + hWrapper := &resolveURLTestWrapper{ + WorkspaceHandler: newHandlerWithTestDeps(t), + errToReturn: context.DeadlineExceeded, } - defer func() { h.resolveAgentURLForRestartSignal = origResolve }() - h.gracefulPreRestart(context.Background(), "ws-url-err-111") + hWrapper.gracefulPreRestart(context.Background(), "ws-url-err-111") time.Sleep(200 * time.Millisecond) // No panic or error expected — proceeds with stop as documented } // ─── helpers ───────────────────────────────────────────────────────────────── -// newHandlerWithTestDeps creates a WorkspaceHandler with test stubs. -// provisioner is nil so rewriteForDocker returns URL unchanged. -func newHandlerWithTestDeps(t *testing.T) *WorkspaceHandler { - return NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) +// resolveURLTestWrapper embeds *WorkspaceHandler and overrides +// resolveAgentURLForRestartSignal so tests can inject a fixed URL or error. +type resolveURLTestWrapper struct { + *WorkspaceHandler + testURL string + errToReturn error } -// newHandlerWithTestDepsWithDB creates a WorkspaceHandler with a specific mock DB. -// Use this when you need to control the DB mock expectations. -func newHandlerWithTestDepsWithDB(t *testing.T, mockDB *sql.DB) *WorkspaceHandler { - // We need to temporarily replace db.DB with our mock - origDB := db.DB - db.DB = mockDB - t.Cleanup(func() { db.DB = origDB }) +func (w *resolveURLTestWrapper) resolveAgentURLForRestartSignal(ctx context.Context, workspaceID string) (string, error) { + if w.errToReturn != nil { + return "", w.errToReturn + } + return w.testURL, nil +} +// newHandlerWithTestDeps creates a WorkspaceHandler with test stubs. +func newHandlerWithTestDeps(t *testing.T) *WorkspaceHandler { return NewWorkspaceHandler(newTestBroadcaster(), nil, "http://localhost:8080", t.TempDir()) } @@ -314,7 +306,6 @@ func setupTestRedisWithURL(t *testing.T, url string) *miniredis.Miniredis { t.Fatalf("failed to start miniredis: %v", err) } db.RDB = redis.NewClient(&redis.Options{Addr: mr.Addr()}) - // Pre-populate a URL for the test workspace IDs used in these tests for _, wsID := range []string{"ws-cache-hit-123", "ws-cache-miss-456", "ws-ack-789", "ws-noimpl-999", "ws-unreachable-000"} { if err := db.CacheURL(context.Background(), wsID, url); err != nil { t.Fatalf("failed to cache URL for %s: %v", wsID, err) @@ -322,9 +313,4 @@ func setupTestRedisWithURL(t *testing.T, url string) *miniredis.Miniredis { } t.Cleanup(func() { mr.Close() }) return mr -} - -// rewriteForDocker is exported from restart_signals.go so it can be tested here. -func (h *WorkspaceHandler) rewriteForDocker(agentURL, workspaceID string) string { - return rewriteForDocker(agentURL, workspaceID) -} +} \ No newline at end of file diff --git a/workspace-server/internal/handlers/workspace.go b/workspace-server/internal/handlers/workspace.go index d5abd2ed..2c033561 100644 --- a/workspace-server/internal/handlers/workspace.go +++ b/workspace-server/internal/handlers/workspace.go @@ -248,6 +248,19 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { // Begin a transaction so the workspace row and any initial secrets are // committed atomically. A secret-encrypt or DB error rolls back the // workspace insert so we never leave a workspace row with missing secrets. + + // SSRF guard: validate workspace URL before starting any DB transaction. + // registry.go:324 calls this same guard for agent self-registration; + // the admin-create path must be covered too (core#212). + // Must stay above BeginTx so the rejection path never touches the DB. + if payload.URL != "" { + if err := validateAgentURL(payload.URL); err != nil { + log.Printf("Create: workspace URL rejected: %v", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "unsafe workspace URL: " + err.Error()}) + return + } + } + tx, txErr := db.DB.BeginTx(ctx, nil) if txErr != nil { log.Printf("Create workspace: begin tx error: %v", txErr) @@ -383,16 +396,9 @@ func (h *WorkspaceHandler) Create(c *gin.Context) { if payload.External || payload.Runtime == "external" { var connectionToken string if payload.URL != "" { - // SSRF guard (issue #212): validateAgentURL blocks cloud metadata - // IPs (169.254/16), loopback, link-local, and RFC-1918 in - // strict/self-hosted mode. AdminAuth is required here, but the - // admin token could be leaked or a compromised insider — defence - // in depth. Compare: registry.go:324 (heartbeat path) also - // calls validateAgentURL; external_rotate.go should too. - if err := validateAgentURL(payload.URL); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "unsafe workspace URL: " + err.Error()}) - return - } + // URL already validated by validateAgentURL above (before BeginTx). + // Now persist it: the external URL is set after the workspace row + // commits so that a failed URL UPDATE doesn't roll back the row. db.DB.ExecContext(ctx, `UPDATE workspaces SET url = $1, status = $2, runtime = 'external', updated_at = now() WHERE id = $3`, payload.URL, models.StatusOnline, id) if err := db.CacheURL(ctx, id, payload.URL); err != nil { log.Printf("External workspace: failed to cache URL for %s: %v", id, err) diff --git a/workspace-server/internal/handlers/workspace_test.go b/workspace-server/internal/handlers/workspace_test.go index c5abbffa..4e58a7bf 100644 --- a/workspace-server/internal/handlers/workspace_test.go +++ b/workspace-server/internal/handlers/workspace_test.go @@ -537,17 +537,15 @@ func TestWorkspaceCreate_ExternalURL_SSRFSafe(t *testing.T) { WithArgs(sqlmock.AnyArg(), "Ext Agent", nil, 3, "external", sqlmock.AnyArg(), (*string)(nil), nil, "none", (*int64)(nil), models.DefaultMaxConcurrentTasks, "push"). WillReturnResult(sqlmock.NewResult(0, 1)) mock.ExpectCommit() - // External URL update (SSRF-safe public URL passes validateAgentURL). + // External URL update (localhost is explicitly allowed by validateAgentURL). mock.ExpectExec("UPDATE workspaces SET url"). WillReturnResult(sqlmock.NewResult(0, 1)) - // CacheURL is non-fatal but still called. - mock.ExpectExec("SELECT"). - WillReturnRows(sqlmock.NewRows([]string{"ok"}).AddRow("ok")) + // CacheURL is non-fatal — uses Redis (db.RDB, set by setupTestRedis), not the DB. w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) - body := `{"name":"Ext Agent","runtime":"external","external":true,"url":"https://agent.example.com/a2a"}` + body := `{"name":"Ext Agent","runtime":"external","external":true,"url":"http://localhost:8000"}` c.Request = httptest.NewRequest("POST", "/workspaces", bytes.NewBufferString(body)) c.Request.Header.Set("Content-Type", "application/json") diff --git a/workspace-server/internal/plugins/drift_sweeper.go b/workspace-server/internal/plugins/drift_sweeper.go index 9b6399d5..684b2f65 100644 --- a/workspace-server/internal/plugins/drift_sweeper.go +++ b/workspace-server/internal/plugins/drift_sweeper.go @@ -9,7 +9,7 @@ package plugins // 1. SELECTs workspace_plugins rows where tracked_ref != 'none' // AND installed_sha IS NOT NULL (skip pre-migration rows with NULL SHA). // 2. For each row, resolves the tracked ref to its current upstream SHA -// using the appropriate SourceResolver. +// using the appropriate PluginResolver. // 3. If the resolved SHA differs from installed_sha → drift detected. // 4. On drift, INSERT INTO plugin_update_queue (ON CONFLICT DO NOTHING so // a re-drift while a row is still pending is a no-op). @@ -61,10 +61,12 @@ const DriftSweepInterval = 1 * time.Hour // that handles Gitea instances on high-latency links. const ResolveRefDeadline = 60 * time.Second -// SourceResolver resolves plugin sources to installable directories. +// PluginResolver resolves plugin sources to installable directories. // Satisfied by *Registry (which wraps GithubResolver + LocalResolver). -type SourceResolver interface { - Resolve(source Source) (SourceResolver, error) +// Named PluginResolver (not SourceResolver) to avoid redeclaring the +// SourceResolver interface defined in source.go (core#228 fix). +type PluginResolver interface { + Resolve(source Source) (PluginResolver, error) Schemes() []string } @@ -74,7 +76,7 @@ type SourceResolver interface { // // Registers itself via atexits in cmd/server/main.go so the process // shuts down cleanly on SIGTERM. -func StartPluginDriftSweeper(ctx context.Context, resolver SourceResolver) { +func StartPluginDriftSweeper(ctx context.Context, resolver PluginResolver) { if resolver == nil { log.Println("Plugin drift sweeper: resolver is nil — sweeper disabled") return @@ -107,7 +109,7 @@ func StartPluginDriftSweeper(ctx context.Context, resolver SourceResolver) { // sweepDriftOnce runs one full drift-detection cycle. // Errors are non-fatal — each row is handled independently so a single // slow row doesn't block the rest of the sweep. -func sweepDriftOnce(parent context.Context, resolver SourceResolver) { +func sweepDriftOnce(parent context.Context, resolver PluginResolver) { ctx, cancel := context.WithTimeout(parent, 10*time.Minute) defer cancel() @@ -170,7 +172,7 @@ func sweepDriftOnce(parent context.Context, resolver SourceResolver) { // resolveLatestSHA resolves the tracked ref to its current upstream SHA. // Handles both github:// and local:// sources; local sources are skipped // (no meaningful upstream to drift against). -func resolveLatestSHA(ctx context.Context, resolver SourceResolver, sourceRaw, trackedRef string) (string, error) { +func resolveLatestSHA(ctx context.Context, resolver PluginResolver, sourceRaw, trackedRef string) (string, error) { // Strip the scheme prefix to get the raw spec. // sourceRaw is stored as the full string, e.g. "github://owner/repo#tag:v1.0.0" spec := sourceRaw @@ -231,7 +233,7 @@ func queueDriftEntry(ctx context.Context, workspaceID, pluginName, trackedRef, c // ───────────────────────────────────────────────────────────────────────────── // SweepDriftOnceForTest exposes sweepDriftOnce for package-level testing. -func SweepDriftOnceForTest(parent context.Context, resolver SourceResolver) { +func SweepDriftOnceForTest(parent context.Context, resolver PluginResolver) { sweepDriftOnce(parent, resolver) }