From 3b8994414a5ee8871f64d36e48f7522a10fd8840 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Sun, 31 May 2026 19:11:00 +0000 Subject: [PATCH 1/6] fix(workspace-server): handle unchecked errors in channels, bundle importer, telegram, approvals Addresses golangci-lint errcheck findings (sub-task of #1062): - internal/channels/manager.go: check db.ExecContext, RecordAndBroadcast, json.Unmarshal errors - internal/channels/telegram.go: check bot.Send errors for callback ack and edit message - internal/bundle/importer.go: check db.ExecContext and RecordAndBroadcast in markFailed; check db.ExecContext in URL update - internal/handlers/approvals.go: check db.ExecContext, QueryRowContext, RecordAndBroadcast errors - internal/handlers/channels.go: check json.Unmarshal errors Fixes #1062 --- workspace-server/internal/bundle/importer.go | 17 ++++++--- workspace-server/internal/channels/manager.go | 37 +++++++++++++------ .../internal/channels/telegram.go | 8 +++- .../internal/handlers/approvals.go | 28 +++++++++----- .../internal/handlers/channels.go | 10 ++++- 5 files changed, 71 insertions(+), 29 deletions(-) diff --git a/workspace-server/internal/bundle/importer.go b/workspace-server/internal/bundle/importer.go index f61c7a98..29c4da7d 100644 --- a/workspace-server/internal/bundle/importer.go +++ b/workspace-server/internal/bundle/importer.go @@ -3,6 +3,7 @@ package bundle import ( "context" "fmt" + "log" "strings" "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" @@ -92,7 +93,9 @@ func Import( if err != nil { markFailed(provCtx, wsID, broadcaster, err) } else if url != "" { - db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID) + if _, dbErr := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); dbErr != nil { + log.Printf("bundle import: failed to update workspace URL for %s: %v", wsID, dbErr) + } } }() } @@ -139,12 +142,16 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste // markProvisionFailed in workspace-server/internal/handlers/ // workspace_provision_shared.go. msg := err.Error() - db.DB.ExecContext(ctx, + if _, dbErr := db.DB.ExecContext(ctx, `UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`, - models.StatusFailed, msg, wsID) - broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{ + models.StatusFailed, msg, wsID); dbErr != nil { + log.Printf("bundle import: failed to mark workspace %s failed: %v", wsID, dbErr) + } + if bcErr := broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{ "error": msg, - }) + }); bcErr != nil { + log.Printf("bundle import: failed to broadcast provision failed for %s: %v", wsID, bcErr) + } } func nilIfEmpty(s string) interface{} { diff --git a/workspace-server/internal/channels/manager.go b/workspace-server/internal/channels/manager.go index 3085de35..c57c5a24 100644 --- a/workspace-server/internal/channels/manager.go +++ b/workspace-server/internal/channels/manager.go @@ -375,21 +375,25 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound // Update stats in DB if db.DB != nil { - db.DB.ExecContext(ctx, ` + if _, err := db.DB.ExecContext(ctx, ` UPDATE workspace_channels SET last_message_at = now(), message_count = message_count + 1, updated_at = now() WHERE id = $1 - `, ch.ID) + `, ch.ID); err != nil { + log.Printf("Channels: failed to update inbound stats for channel %s: %v", ch.ID, err) + } } // Broadcast event if m.broadcaster != nil { - m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{ + if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{ "channel_id": ch.ID, "channel_type": ch.ChannelType, "username": msg.Username, "direction": "inbound", - }) + }); err != nil { + log.Printf("Channels: failed to broadcast inbound event: %v", err) + } } return nil @@ -420,19 +424,23 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin } if db.DB != nil { - db.DB.ExecContext(ctx, ` + if _, err := db.DB.ExecContext(ctx, ` UPDATE workspace_channels SET last_message_at = now(), message_count = message_count + 1, updated_at = now() WHERE id = $1 - `, channelID) + `, channelID); err != nil { + log.Printf("Channels: failed to update outbound stats for channel %s: %v", channelID, err) + } } if m.broadcaster != nil { - m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{ + if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{ "channel_id": ch.ID, "channel_type": ch.ChannelType, "direction": "outbound", - }) + }); err != nil { + log.Printf("Channels: failed to broadcast outbound event: %v", err) + } } return nil @@ -498,7 +506,10 @@ func (m *Manager) FetchWorkspaceChannelContext(ctx context.Context, workspaceID return "" } var config map[string]interface{} - json.Unmarshal(configJSON, &config) + if err := json.Unmarshal(configJSON, &config); err != nil { + log.Printf("Channels: failed to unmarshal channel config: %v", err) + return "" + } if err := DecryptSensitiveFields(config); err != nil { return "" } @@ -555,8 +566,12 @@ func (m *Manager) loadChannel(ctx context.Context, channelID string) (ChannelRow if err != nil { return ch, fmt.Errorf("channel %s not found: %w", channelID, err) } - json.Unmarshal(configJSON, &ch.Config) - json.Unmarshal(allowedJSON, &ch.AllowedUsers) + if err := json.Unmarshal(configJSON, &ch.Config); err != nil { + return ch, fmt.Errorf("unmarshal channel %s config: %w", channelID, err) + } + if err := json.Unmarshal(allowedJSON, &ch.AllowedUsers); err != nil { + return ch, fmt.Errorf("unmarshal channel %s allowed_users: %w", channelID, err) + } // #319: decrypt bot_token / webhook_secret — SendOutbound and adapter // methods downstream read them as plaintext strings. if err := DecryptSensitiveFields(ch.Config); err != nil { diff --git a/workspace-server/internal/channels/telegram.go b/workspace-server/internal/channels/telegram.go index ffbc561f..1fe373e3 100644 --- a/workspace-server/internal/channels/telegram.go +++ b/workspace-server/internal/channels/telegram.go @@ -514,7 +514,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in // Acknowledge the button press (removes loading spinner) ackCfg := tgbotapi.NewCallback(cb.ID, "Received") - bot.Send(ackCfg) + if _, err := bot.Send(ackCfg); err != nil { + log.Printf("telegram: failed to send callback ack: %v", err) + } // Update the message to show what was clicked decision := "approved" @@ -526,7 +528,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in cb.Message.MessageID, cb.Message.Text+"\n\n✅ CEO "+decision, ) - bot.Send(editMsg) + if _, err := bot.Send(editMsg); err != nil { + log.Printf("telegram: failed to send edit message: %v", err) + } // Route the decision as an inbound message to the agent inbound := &InboundMessage{ diff --git a/workspace-server/internal/handlers/approvals.go b/workspace-server/internal/handlers/approvals.go index 1f091afa..d4f8fa1c 100644 --- a/workspace-server/internal/handlers/approvals.go +++ b/workspace-server/internal/handlers/approvals.go @@ -51,23 +51,29 @@ func (h *ApprovalsHandler) Create(c *gin.Context) { return } - h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{ + if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{ "approval_id": approvalID, "action": body.Action, "reason": body.Reason, "task_id": body.TaskID, - }) + }); err != nil { + log.Printf("approvals: failed to broadcast approval requested: %v", err) + } // Auto-escalate to parent var parentID *string - db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID) + if err := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); err != nil { + log.Printf("approvals: failed to lookup parent for escalation: %v", err) + } if parentID != nil { - h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{ + if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{ "approval_id": approvalID, "from_workspace_id": workspaceID, "action": body.Action, "reason": body.Reason, - }) + }); err != nil { + log.Printf("approvals: failed to broadcast approval escalated: %v", err) + } } c.JSON(http.StatusCreated, gin.H{"approval_id": approvalID, "status": "pending"}) @@ -80,10 +86,12 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) { ctx := c.Request.Context() // Auto-expire stale approvals (older than 10 min) - db.DB.ExecContext(ctx, ` + if _, err := db.DB.ExecContext(ctx, ` UPDATE approval_requests SET status = 'denied', decided_by = 'auto-expired', decided_at = now() WHERE status = 'pending' AND created_at < now() - interval '10 minutes' - `) + `); err != nil { + log.Printf("approvals: failed to auto-expire stale approvals: %v", err) + } rows, err := db.DB.QueryContext(ctx, ` SELECT a.id, a.workspace_id, w.name, a.action, a.reason, a.status, a.created_at @@ -205,11 +213,13 @@ func (h *ApprovalsHandler) Decide(c *gin.Context) { eventType = "APPROVAL_DENIED" } - h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{ + if err := h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{ "approval_id": approvalID, "decision": body.Decision, "decided_by": decidedBy, - }) + }); err != nil { + log.Printf("approvals: failed to broadcast approval decision: %v", err) + } c.JSON(http.StatusOK, gin.H{"status": body.Decision, "approval_id": approvalID}) } diff --git a/workspace-server/internal/handlers/channels.go b/workspace-server/internal/handlers/channels.go index 6d9008bf..409774f4 100644 --- a/workspace-server/internal/handlers/channels.go +++ b/workspace-server/internal/handlers/channels.go @@ -67,7 +67,10 @@ func (h *ChannelHandler) List(c *gin.Context) { } var config map[string]interface{} - json.Unmarshal(configJSON, &config) + if err := json.Unmarshal(configJSON, &config); err != nil { + log.Printf("Channels: unmarshal config on list for channel %s: %v", id, err) + config = map[string]interface{}{} + } // #319: decrypt sensitive fields first so the mask operates on // plaintext (first-4 / last-4 of the real token, not the ciphertext // prefix). Decrypt errors are logged but non-fatal — List must keep @@ -86,7 +89,10 @@ func (h *ChannelHandler) List(c *gin.Context) { } var allowed []string - json.Unmarshal(allowedJSON, &allowed) + if err := json.Unmarshal(allowedJSON, &allowed); err != nil { + log.Printf("Channels: unmarshal allowed_users on list for channel %s: %v", id, err) + allowed = []string{} + } entry := map[string]interface{}{ "id": id, -- 2.52.0 From e92e14e7d1be45312bcdef5d58b24b94f22d103e Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Sun, 31 May 2026 19:31:25 +0000 Subject: [PATCH 2/6] canvas(e2e): tolerate transient 'failed' status during workspace boot (#2632) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The staging canvas E2E intermittently failed (~50% pass rate) because the workspace-online poll in staging-setup.ts threw immediately when the workspace status hit 'failed'. On hermes runtimes the controlplane bootstrap-watcher deadline fires at 5 min and marks the workspace failed prematurely; the heartbeat then transitions failed→online after install.sh finishes at 10–13 min. Fixes: - Treat 'failed' as a transient state during workspace-online polling: log once and keep polling until the 20-min deadline, matching the behavior of test_staging_full_saas.sh step 7/11. - Add retry-with-exponential-backoff (3 attempts, 3/6/12s) to the workspace creation POST so transient 5xx/504 errors from staging CP don't kill the entire run. Closes #2632 Co-Authored-By: Claude Opus 4.7 --- canvas/e2e/staging-setup.ts | 77 +++++++++++++++++++++++++------------ 1 file changed, 53 insertions(+), 24 deletions(-) diff --git a/canvas/e2e/staging-setup.ts b/canvas/e2e/staging-setup.ts index 873ac07b..b94d6793 100644 --- a/canvas/e2e/staging-setup.ts +++ b/canvas/e2e/staging-setup.ts @@ -234,23 +234,48 @@ export default async function globalSetup(_config: FullConfig): Promise { "Authorization": `Bearer ${tenantToken}`, "X-Molecule-Org-Id": orgID, }; - const ws = await jsonFetch(`${tenantURL}/workspaces`, { - method: "POST", - headers: tenantAuth, - body: JSON.stringify({ - name: "E2E Canvas Test", - runtime: "hermes", - tier: 2, - model: "gpt-4o", - }), - }); - if (ws.status >= 400 || !ws.body?.id) { - throw new Error(`Workspace create ${ws.status}: ${JSON.stringify(ws.body)}`); + // Retry workspace creation on transient 5xx / timeout — staging CP can + // return 502/503/504 under load and a single-shot failure kills the + // entire E2E run. 3 attempts with 3s exponential backoff (3s, 6s, 12s) + // gives ~21s total budget, well inside the 20-min provision envelope. + let workspaceId = ""; + for (let attempt = 1; attempt <= 3; attempt++) { + const ws = await jsonFetch(`${tenantURL}/workspaces`, { + method: "POST", + headers: tenantAuth, + body: JSON.stringify({ + name: "E2E Canvas Test", + runtime: "hermes", + tier: 2, + model: "gpt-4o", + }), + }); + if (ws.status >= 200 && ws.status < 300 && ws.body?.id) { + workspaceId = ws.body.id as string; + break; + } + const isTransient = ws.status >= 500 || ws.status === 0; + if (!isTransient || attempt === 3) { + throw new Error(`Workspace create ${ws.status} (attempt ${attempt}): ${JSON.stringify(ws.body)}`); + } + const backoff = 3000 * Math.pow(2, attempt - 1); + console.log(`[staging-setup] Workspace create transient ${ws.status}, retrying in ${backoff}ms...`); + await new Promise((r) => setTimeout(r, backoff)); } - const workspaceId = ws.body.id as string; console.log(`[staging-setup] Workspace created: ${workspaceId}`); // 6. Wait for workspace online + // + // Hermes cold-boot takes 10-13 min on slow apt days (apt + uv + hermes + // install + npm browser-tools). The controlplane bootstrap-watcher + // deadline fires at 5 min and sets status=failed prematurely; heartbeat + // then transitions failed → online after install.sh finishes. So + // 'failed' is a TRANSIENT state we must tolerate — log once and keep + // polling, only hard-fail at the deadline. Pre-fix this was a flake + // generator: workspace went failed→online inside our window but we + // bailed at the failed read. See test_staging_full_saas.sh step 7/11 + // and issue #2632. + let wsFailedLogged = false; await waitFor( async () => { const r = await jsonFetch(`${tenantURL}/workspaces/${workspaceId}`, { @@ -259,17 +284,21 @@ export default async function globalSetup(_config: FullConfig): Promise { if (r.status !== 200) return null; if (r.body?.status === "online") return true; if (r.body?.status === "failed") { - // last_sample_error is often empty when the failure happens before - // the agent emits a sample (e.g. boot crash, image pull error, - // missing PYTHONPATH, OpenAI quota at startup). Dumping the full - // body gives triage the boot_stage / last_error / image fields it - // needs without a second probe. Otherwise this propagates as a - // bare "Workspace failed: " — the exact useless message that - // sent #2632 to the issue tracker. - const detail = r.body.last_sample_error - ? r.body.last_sample_error - : `(no last_sample_error) full body: ${JSON.stringify(r.body)}`; - throw new Error(`Workspace failed: ${detail}`); + if (!wsFailedLogged) { + // last_sample_error is often empty when the failure happens before + // the agent emits a sample (e.g. boot crash, image pull error, + // missing PYTHONPATH, OpenAI quota at startup). Dumping the full + // body gives triage the boot_stage / last_error / image fields it + // needs without a second probe. Otherwise this propagates as a + // bare "Workspace failed: " — the exact useless message that + // sent #2632 to the issue tracker. + const detail = r.body.last_sample_error + ? r.body.last_sample_error + : `(no last_sample_error) full body: ${JSON.stringify(r.body)}`; + console.log(`[staging-setup] workspace ${workspaceId} transiently failed — waiting for heartbeat recovery (bootstrap-watcher deadline, see cp#245). detail: ${detail}`); + wsFailedLogged = true; + } + return null; } return null; }, -- 2.52.0 From a19d7b4df33f2bc13797fc90deda0d2440435830 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Sun, 31 May 2026 19:38:32 +0000 Subject: [PATCH 3/6] traces: per-workspace Langfuse config via workspace secrets (#2976) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Traces v1 — closes the gap where every workspace in a tenant shared the same Langfuse project (global env vars). Operators can now isolate traces per workspace by setting LANGFUSE_HOST, LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY as workspace secrets. Resolution order (mirrors provisioner secret precedence): 1. workspace_secrets (workspace-level override) 2. global_secrets (platform-wide default) 3. environment vars (legacy fallback for self-hosted / dev) If any of the three keys is missing after all layers, traces are disabled for that workspace (empty array, HTTP 200) — same graceful fallback as before. - Extracted resolveLangfuseConfig() with layered lookup + decrypt. - Added tests: no-config, workspace-override, global-fallback, unreachable, partial-config. Closes #2976 Co-Authored-By: Claude Opus 4.7 --- workspace-server/internal/handlers/traces.go | 83 +++++++- .../internal/handlers/traces_test.go | 190 ++++++++++++++++-- 2 files changed, 250 insertions(+), 23 deletions(-) diff --git a/workspace-server/internal/handlers/traces.go b/workspace-server/internal/handlers/traces.go index 19df5f1c..13f9c0e4 100644 --- a/workspace-server/internal/handlers/traces.go +++ b/workspace-server/internal/handlers/traces.go @@ -1,12 +1,15 @@ package handlers import ( + "context" "fmt" "io" "net/http" "os" "time" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/db" "github.com/gin-gonic/gin" ) @@ -18,30 +21,96 @@ func NewTracesHandler() *TracesHandler { return &TracesHandler{} } +// langfuseConfig holds the resolved Langfuse connection parameters. +// Workspace secrets override global secrets which override environment +// variables, matching the precedence rules in workspace_provision.go. +type langfuseConfig struct { + Host string + Public string + Secret string +} + +// resolveLangfuseConfig looks up LANGFUSE_HOST, LANGFUSE_PUBLIC_KEY and +// LANGFUSE_SECRET_KEY for a workspace. Resolution order: +// 1. workspace_secrets (workspace-level override) +// 2. global_secrets (platform-wide default) +// 3. environment vars (legacy fallback for self-hosted / dev) +// +// If any of the three keys is missing after all three layers, the config +// is considered incomplete and traces are disabled for the workspace. +// This closes the gap where every workspace in a tenant shared the same +// Langfuse project (global env vars) and operators could not isolate +// traces per workspace. Traces v1 — issue #2976. +func resolveLangfuseConfig(ctx context.Context, workspaceID string) (*langfuseConfig, error) { + cfg := &langfuseConfig{} + + // Helper: read a single key from workspace → global → env fallback. + resolve := func(key string) string { + // 1. Workspace secret + var val []byte + var ver int + err := db.DB.QueryRowContext(ctx, + `SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = $1 AND key = $2`, + workspaceID, key).Scan(&val, &ver) + if err == nil { + decrypted, decErr := crypto.DecryptVersioned(val, ver) + if decErr == nil { + return string(decrypted) + } + // Decrypt failure is logged but not fatal — fall through to next layer. + } + // 2. Global secret + err = db.DB.QueryRowContext(ctx, + `SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = $1`, + key).Scan(&val, &ver) + if err == nil { + decrypted, decErr := crypto.DecryptVersioned(val, ver) + if decErr == nil { + return string(decrypted) + } + } + // 3. Environment fallback + return os.Getenv(key) + } + + cfg.Host = resolve("LANGFUSE_HOST") + cfg.Public = resolve("LANGFUSE_PUBLIC_KEY") + cfg.Secret = resolve("LANGFUSE_SECRET_KEY") + + // Incomplete config is not an error — it simply means tracing is + // disabled for this workspace. Callers treat (nil, nil) as + // "no traces available". + if cfg.Host == "" || cfg.Public == "" || cfg.Secret == "" { + return nil, nil + } + return cfg, nil +} + // List handles GET /workspaces/:id/traces // Proxies to Langfuse API to get recent traces for a workspace. func (h *TracesHandler) List(c *gin.Context) { workspaceID := c.Param("id") - langfuseHost := os.Getenv("LANGFUSE_HOST") - langfusePublic := os.Getenv("LANGFUSE_PUBLIC_KEY") - langfuseSecret := os.Getenv("LANGFUSE_SECRET_KEY") - - if langfuseHost == "" || langfusePublic == "" || langfuseSecret == "" { + cfg, err := resolveLangfuseConfig(c.Request.Context(), workspaceID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to resolve trace config"}) + return + } + if cfg == nil { c.JSON(http.StatusOK, []interface{}{}) return } // Fetch traces from Langfuse, filtered by workspace tag or name url := fmt.Sprintf("%s/api/public/traces?limit=20&orderBy=timestamp&orderDir=desc&tags=%s", - langfuseHost, workspaceID) + cfg.Host, workspaceID) req, err := http.NewRequestWithContext(c.Request.Context(), "GET", url, nil) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create request"}) return } - req.SetBasicAuth(langfusePublic, langfuseSecret) + req.SetBasicAuth(cfg.Public, cfg.Secret) resp, err := langfuseClient.Do(req) if err != nil { diff --git a/workspace-server/internal/handlers/traces_test.go b/workspace-server/internal/handlers/traces_test.go index 06e6aad5..6be96180 100644 --- a/workspace-server/internal/handlers/traces_test.go +++ b/workspace-server/internal/handlers/traces_test.go @@ -1,26 +1,47 @@ package handlers import ( + "database/sql" "encoding/json" "net/http" "net/http/httptest" "os" "testing" + "github.com/DATA-DOG/go-sqlmock" + "github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto" "github.com/gin-gonic/gin" ) // ==================== GET /workspaces/:id/traces ==================== func TestTracesList_NoLangfuseConfig(t *testing.T) { - setupTestDB(t) + mock := setupTestDB(t) setupTestRedis(t) handler := NewTracesHandler() - // Ensure Langfuse env vars are not set - os.Unsetenv("LANGFUSE_HOST") - os.Unsetenv("LANGFUSE_PUBLIC_KEY") - os.Unsetenv("LANGFUSE_SECRET_KEY") + // No workspace secrets, no global secrets, no env vars + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces", "LANGFUSE_HOST"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_HOST"). + WillReturnError(sql.ErrNoRows) + // env fallback is empty (default) + + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces", "LANGFUSE_PUBLIC_KEY"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_PUBLIC_KEY"). + WillReturnError(sql.ErrNoRows) + + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces", "LANGFUSE_SECRET_KEY"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_SECRET_KEY"). + WillReturnError(sql.ErrNoRows) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) @@ -43,23 +64,91 @@ func TestTracesList_NoLangfuseConfig(t *testing.T) { } } -func TestTracesList_PartialLangfuseConfig(t *testing.T) { +func TestTracesList_WorkspaceSecretsOverride(t *testing.T) { setupTestDB(t) setupTestRedis(t) handler := NewTracesHandler() - // Set only host, missing keys - os.Setenv("LANGFUSE_HOST", "http://localhost:3000") - os.Unsetenv("LANGFUSE_PUBLIC_KEY") - os.Unsetenv("LANGFUSE_SECRET_KEY") - defer func() { - os.Unsetenv("LANGFUSE_HOST") - }() + // Encrypt a test secret + encHost, _ := crypto.Encrypt([]byte("http://localhost:3000")) + verHost := crypto.CurrentEncryptionVersion() + encPk, _ := crypto.Encrypt([]byte("pk-ws")) + verPk := crypto.CurrentEncryptionVersion() + encSk, _ := crypto.Encrypt([]byte("sk-ws")) + verSk := crypto.CurrentEncryptionVersion() + + mock := setupTestDB(t) + setupTestRedis(t) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-override", "LANGFUSE_HOST"). + WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHost, verHost)) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-override", "LANGFUSE_PUBLIC_KEY"). + WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk)) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-override", "LANGFUSE_SECRET_KEY"). + WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encSk, verSk)) w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) - c.Params = gin.Params{{Key: "id", Value: "ws-traces-partial"}} - c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-partial/traces", nil) + c.Params = gin.Params{{Key: "id", Value: "ws-traces-override"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-override/traces", nil) + + handler.List(c) + + // We don't have a real Langfuse server, so the request will fail + // network-wise and return empty (graceful fallback) + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp []interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if len(resp) != 0 { + t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp)) + } +} + +func TestTracesList_GlobalSecretsFallback(t *testing.T) { + setupTestDB(t) + setupTestRedis(t) + handler := NewTracesHandler() + + // No workspace secrets, but global secrets exist + encHost, _ := crypto.Encrypt([]byte("http://localhost:3000")) + verHost := crypto.CurrentEncryptionVersion() + encPk, _ := crypto.Encrypt([]byte("pk-global")) + verPk := crypto.CurrentEncryptionVersion() + encSk, _ := crypto.Encrypt([]byte("sk-global")) + verSk := crypto.CurrentEncryptionVersion() + + mock := setupTestDB(t) + setupTestRedis(t) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-global", "LANGFUSE_HOST"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_HOST"). + WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHost, verHost)) + + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-global", "LANGFUSE_PUBLIC_KEY"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_PUBLIC_KEY"). + WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk)) + + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-global", "LANGFUSE_SECRET_KEY"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_SECRET_KEY"). + WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encSk, verSk)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-traces-global"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-global/traces", nil) handler.List(c) @@ -70,7 +159,7 @@ func TestTracesList_PartialLangfuseConfig(t *testing.T) { var resp []interface{} json.Unmarshal(w.Body.Bytes(), &resp) if len(resp) != 0 { - t.Errorf("expected empty list with partial config, got %d items", len(resp)) + t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp)) } } @@ -89,6 +178,30 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) { os.Unsetenv("LANGFUSE_SECRET_KEY") }() + // No workspace or global secrets, so env vars are used + mock := setupTestDB(t) + setupTestRedis(t) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-down", "LANGFUSE_HOST"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_HOST"). + WillReturnError(sql.ErrNoRows) + + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-down", "LANGFUSE_PUBLIC_KEY"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_PUBLIC_KEY"). + WillReturnError(sql.ErrNoRows) + + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-down", "LANGFUSE_SECRET_KEY"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_SECRET_KEY"). + WillReturnError(sql.ErrNoRows) + w := httptest.NewRecorder() c, _ := gin.CreateTestContext(w) c.Params = gin.Params{{Key: "id", Value: "ws-traces-down"}} @@ -107,3 +220,48 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) { t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp)) } } + +func TestTracesList_PartialWorkspaceConfig(t *testing.T) { + handler := NewTracesHandler() + + // Workspace has HOST but missing keys — should fall through all layers + // and ultimately return empty because config is incomplete. + encHost, _ := crypto.Encrypt([]byte("http://localhost:3000")) + verHost := crypto.CurrentEncryptionVersion() + + mock := setupTestDB(t) + setupTestRedis(t) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-partial", "LANGFUSE_HOST"). + WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHost, verHost)) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-partial", "LANGFUSE_PUBLIC_KEY"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_PUBLIC_KEY"). + WillReturnError(sql.ErrNoRows) + + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-partial", "LANGFUSE_SECRET_KEY"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_SECRET_KEY"). + WillReturnError(sql.ErrNoRows) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-traces-partial"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-partial/traces", nil) + + handler.List(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp []interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if len(resp) != 0 { + t.Errorf("expected empty list with partial config, got %d items", len(resp)) + } +} -- 2.52.0 From 5465b111c9139f7fd4a39d317b38fca91518cd19 Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Sun, 31 May 2026 19:55:35 +0000 Subject: [PATCH 4/6] traces: handle io.ReadAll error and upstream non-2xx gracefully - Don't ignore io.ReadAll(resp.Body) error. - Don't proxy HTML error pages from Langfuse as application/json to the Canvas client; return empty [] instead. - Add TestTracesList_LangfuseUpstreamError to pin the behavior. Issue #2976 follow-up. Co-Authored-By: Claude Opus 4.7 --- workspace-server/internal/handlers/traces.go | 15 ++++- .../internal/handlers/traces_test.go | 63 +++++++++++++++++++ 2 files changed, 76 insertions(+), 2 deletions(-) diff --git a/workspace-server/internal/handlers/traces.go b/workspace-server/internal/handlers/traces.go index 13f9c0e4..c8802bcd 100644 --- a/workspace-server/internal/handlers/traces.go +++ b/workspace-server/internal/handlers/traces.go @@ -120,6 +120,17 @@ func (h *TracesHandler) List(c *gin.Context) { } defer func() { _ = resp.Body.Close() }() - body, _ := io.ReadAll(resp.Body) - c.Data(resp.StatusCode, "application/json", body) + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + c.JSON(http.StatusOK, []interface{}{}) + return + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + // Upstream error — don't proxy HTML error pages or unexpected + // JSON shapes to the Canvas client. Return empty so the UI + // gracefully shows "no traces" rather than breaking on parse. + c.JSON(http.StatusOK, []interface{}{}) + return + } + c.Data(http.StatusOK, "application/json", body) } diff --git a/workspace-server/internal/handlers/traces_test.go b/workspace-server/internal/handlers/traces_test.go index 6be96180..84e8f1b2 100644 --- a/workspace-server/internal/handlers/traces_test.go +++ b/workspace-server/internal/handlers/traces_test.go @@ -265,3 +265,66 @@ func TestTracesList_PartialWorkspaceConfig(t *testing.T) { t.Errorf("expected empty list with partial config, got %d items", len(resp)) } } + +func TestTracesList_LangfuseUpstreamError(t *testing.T) { + mock := setupTestDB(t) + setupTestRedis(t) + handler := NewTracesHandler() + + // Start a mock Langfuse server that returns 500 with a non-JSON body + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html") + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("Internal Server Error")) + })) + defer upstream.Close() + + os.Setenv("LANGFUSE_HOST", upstream.URL) + os.Setenv("LANGFUSE_PUBLIC_KEY", "pk-test") + os.Setenv("LANGFUSE_SECRET_KEY", "sk-test") + defer func() { + os.Unsetenv("LANGFUSE_HOST") + os.Unsetenv("LANGFUSE_PUBLIC_KEY") + os.Unsetenv("LANGFUSE_SECRET_KEY") + }() + + // No workspace/global secrets — falls through to env + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-500", "LANGFUSE_HOST"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_HOST"). + WillReturnError(sql.ErrNoRows) + + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-500", "LANGFUSE_PUBLIC_KEY"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_PUBLIC_KEY"). + WillReturnError(sql.ErrNoRows) + + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-500", "LANGFUSE_SECRET_KEY"). + WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_SECRET_KEY"). + WillReturnError(sql.ErrNoRows) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-traces-500"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-500/traces", nil) + + handler.List(c) + + // Should return empty JSON (not proxy the 500 HTML) when Langfuse errors + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp []interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if len(resp) != 0 { + t.Errorf("expected empty list on upstream error, got %d items", len(resp)) + } +} -- 2.52.0 From 92d163a9f04f51343942a98e7b7cf539dfb4c9fc Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Wed, 3 Jun 2026 01:03:56 +0000 Subject: [PATCH 5/6] fix(security): SSRF via workspace-controlled LANGFUSE_HOST (#2029) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SECURITY REVIEW RC 8337: LANGFUSE_HOST must be admin-controlled only. A workspace owner could otherwise set HOST to an internal endpoint and exfiltrate data using the Langfuse credentials. Changes: - resolveLangfuseConfig: HOST now resolves from global_secrets → env only. Workspace secrets CANNOT override the host. - PUBLIC_KEY and SECRET_KEY still allow workspace-level override, preserving per-workspace project isolation. - Added TestTracesList_WorkspaceHostIgnored regression test asserting workspace HOST secrets are ignored. - Updated all existing tests to match the new query pattern. Co-Authored-By: Claude Opus 4.7 --- workspace-server/internal/handlers/traces.go | 53 ++++++---- .../internal/handlers/traces_test.go | 98 ++++++++++++++----- 2 files changed, 107 insertions(+), 44 deletions(-) diff --git a/workspace-server/internal/handlers/traces.go b/workspace-server/internal/handlers/traces.go index c8802bcd..1cabafa1 100644 --- a/workspace-server/internal/handlers/traces.go +++ b/workspace-server/internal/handlers/traces.go @@ -31,22 +31,42 @@ type langfuseConfig struct { } // resolveLangfuseConfig looks up LANGFUSE_HOST, LANGFUSE_PUBLIC_KEY and -// LANGFUSE_SECRET_KEY for a workspace. Resolution order: -// 1. workspace_secrets (workspace-level override) -// 2. global_secrets (platform-wide default) -// 3. environment vars (legacy fallback for self-hosted / dev) +// LANGFUSE_SECRET_KEY for a workspace. // -// If any of the three keys is missing after all three layers, the config -// is considered incomplete and traces are disabled for the workspace. -// This closes the gap where every workspace in a tenant shared the same -// Langfuse project (global env vars) and operators could not isolate -// traces per workspace. Traces v1 — issue #2976. +// SECURITY BOUNDARY (RC 8337 / #2029): +// LANGFUSE_HOST is admin-controlled ONLY — resolved from global_secrets +// or environment variables. Workspace secrets CANNOT override the host. +// This prevents SSRF: a workspace owner could otherwise set HOST to an +// internal endpoint and exfiltrate data using the Langfuse credentials. +// +// LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY follow the normal +// precedence: workspace_secrets → global_secrets → environment vars. +// This preserves per-workspace project isolation without the SSRF vector. +// +// If any of the three keys is missing after all layers, the config is +// considered incomplete and traces are disabled for the workspace. func resolveLangfuseConfig(ctx context.Context, workspaceID string) (*langfuseConfig, error) { cfg := &langfuseConfig{} - // Helper: read a single key from workspace → global → env fallback. - resolve := func(key string) string { - // 1. Workspace secret + // HOST — admin-controlled only (global → env). Workspace secrets are + // intentionally NOT consulted to close the SSRF vector described above. + resolveHost := func() string { + var val []byte + var ver int + err := db.DB.QueryRowContext(ctx, + `SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = $1`, + "LANGFUSE_HOST").Scan(&val, &ver) + if err == nil { + decrypted, decErr := crypto.DecryptVersioned(val, ver) + if decErr == nil { + return string(decrypted) + } + } + return os.Getenv("LANGFUSE_HOST") + } + + // Keys — workspace secrets override global secrets override env. + resolveKey := func(key string) string { var val []byte var ver int err := db.DB.QueryRowContext(ctx, @@ -57,9 +77,7 @@ func resolveLangfuseConfig(ctx context.Context, workspaceID string) (*langfuseCo if decErr == nil { return string(decrypted) } - // Decrypt failure is logged but not fatal — fall through to next layer. } - // 2. Global secret err = db.DB.QueryRowContext(ctx, `SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = $1`, key).Scan(&val, &ver) @@ -69,13 +87,12 @@ func resolveLangfuseConfig(ctx context.Context, workspaceID string) (*langfuseCo return string(decrypted) } } - // 3. Environment fallback return os.Getenv(key) } - cfg.Host = resolve("LANGFUSE_HOST") - cfg.Public = resolve("LANGFUSE_PUBLIC_KEY") - cfg.Secret = resolve("LANGFUSE_SECRET_KEY") + cfg.Host = resolveHost() + cfg.Public = resolveKey("LANGFUSE_PUBLIC_KEY") + cfg.Secret = resolveKey("LANGFUSE_SECRET_KEY") // Incomplete config is not an error — it simply means tracing is // disabled for this workspace. Callers treat (nil, nil) as diff --git a/workspace-server/internal/handlers/traces_test.go b/workspace-server/internal/handlers/traces_test.go index 84e8f1b2..fd5e3a70 100644 --- a/workspace-server/internal/handlers/traces_test.go +++ b/workspace-server/internal/handlers/traces_test.go @@ -20,10 +20,8 @@ func TestTracesList_NoLangfuseConfig(t *testing.T) { setupTestRedis(t) handler := NewTracesHandler() - // No workspace secrets, no global secrets, no env vars - mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). - WithArgs("ws-traces", "LANGFUSE_HOST"). - WillReturnError(sql.ErrNoRows) + // No global secrets, no env vars. HOST is admin-only (global/env) so + // workspace_secrets is NOT queried for LANGFUSE_HOST. mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). WithArgs("LANGFUSE_HOST"). WillReturnError(sql.ErrNoRows) @@ -69,7 +67,7 @@ func TestTracesList_WorkspaceSecretsOverride(t *testing.T) { setupTestRedis(t) handler := NewTracesHandler() - // Encrypt a test secret + // Encrypt test secrets encHost, _ := crypto.Encrypt([]byte("http://localhost:3000")) verHost := crypto.CurrentEncryptionVersion() encPk, _ := crypto.Encrypt([]byte("pk-ws")) @@ -79,9 +77,13 @@ func TestTracesList_WorkspaceSecretsOverride(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) - mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). - WithArgs("ws-traces-override", "LANGFUSE_HOST"). + // HOST is admin-only: workspace secret for HOST is IGNORED. + // Code queries global_secrets first, then falls through to env. + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_HOST"). WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHost, verHost)) + + // Keys follow normal precedence: workspace secrets override global. mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). WithArgs("ws-traces-override", "LANGFUSE_PUBLIC_KEY"). WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk)) @@ -178,12 +180,9 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) { os.Unsetenv("LANGFUSE_SECRET_KEY") }() - // No workspace or global secrets, so env vars are used + // No global secrets, so env vars are used. HOST skips workspace_secrets. mock := setupTestDB(t) setupTestRedis(t) - mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). - WithArgs("ws-traces-down", "LANGFUSE_HOST"). - WillReturnError(sql.ErrNoRows) mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). WithArgs("LANGFUSE_HOST"). WillReturnError(sql.ErrNoRows) @@ -224,23 +223,22 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) { func TestTracesList_PartialWorkspaceConfig(t *testing.T) { handler := NewTracesHandler() - // Workspace has HOST but missing keys — should fall through all layers - // and ultimately return empty because config is incomplete. - encHost, _ := crypto.Encrypt([]byte("http://localhost:3000")) - verHost := crypto.CurrentEncryptionVersion() + // Workspace has PUBLIC_KEY but missing HOST and SECRET_KEY. + // Config incomplete → empty response. + encPk, _ := crypto.Encrypt([]byte("pk-partial")) + verPk := crypto.CurrentEncryptionVersion() mock := setupTestDB(t) setupTestRedis(t) - mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). - WithArgs("ws-traces-partial", "LANGFUSE_HOST"). - WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHost, verHost)) - mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). - WithArgs("ws-traces-partial", "LANGFUSE_PUBLIC_KEY"). - WillReturnError(sql.ErrNoRows) + // HOST skipped for workspace secrets mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). - WithArgs("LANGFUSE_PUBLIC_KEY"). + WithArgs("LANGFUSE_HOST"). WillReturnError(sql.ErrNoRows) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-partial", "LANGFUSE_PUBLIC_KEY"). + WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk)) + // SECRET_KEY missing everywhere mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). WithArgs("ws-traces-partial", "LANGFUSE_SECRET_KEY"). WillReturnError(sql.ErrNoRows) @@ -266,6 +264,57 @@ func TestTracesList_PartialWorkspaceConfig(t *testing.T) { } } +func TestTracesList_WorkspaceHostIgnored(t *testing.T) { + // SECURITY REGRESSION: workspace secrets MUST NOT override LANGFUSE_HOST. + // If they could, a workspace owner could set HOST to an internal + // endpoint and exfiltrate data using the Langfuse credentials. + handler := NewTracesHandler() + + // Workspace tries to set a malicious HOST via workspace secrets. + encHostWs, _ := crypto.Encrypt([]byte("http://169.254.169.254")) + verHostWs := crypto.CurrentEncryptionVersion() + encPk, _ := crypto.Encrypt([]byte("pk-ws")) + verPk := crypto.CurrentEncryptionVersion() + encSk, _ := crypto.Encrypt([]byte("sk-ws")) + verSk := crypto.CurrentEncryptionVersion() + + // Admin sets a legitimate HOST via global secrets. + encHostGlobal, _ := crypto.Encrypt([]byte("http://localhost:3000")) + verHostGlobal := crypto.CurrentEncryptionVersion() + + mock := setupTestDB(t) + setupTestRedis(t) + // Code queries global_secrets for HOST first — workspace secret is never read. + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). + WithArgs("LANGFUSE_HOST"). + WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHostGlobal, verHostGlobal)) + + // Keys still read workspace secrets. + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-ssrf", "LANGFUSE_PUBLIC_KEY"). + WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk)) + mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). + WithArgs("ws-traces-ssrf", "LANGFUSE_SECRET_KEY"). + WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encSk, verSk)) + + w := httptest.NewRecorder() + c, _ := gin.CreateTestContext(w) + c.Params = gin.Params{{Key: "id", Value: "ws-traces-ssrf"}} + c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-ssrf/traces", nil) + + handler.List(c) + + if w.Code != http.StatusOK { + t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String()) + } + + var resp []interface{} + json.Unmarshal(w.Body.Bytes(), &resp) + if len(resp) != 0 { + t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp)) + } +} + func TestTracesList_LangfuseUpstreamError(t *testing.T) { mock := setupTestDB(t) setupTestRedis(t) @@ -288,10 +337,7 @@ func TestTracesList_LangfuseUpstreamError(t *testing.T) { os.Unsetenv("LANGFUSE_SECRET_KEY") }() - // No workspace/global secrets — falls through to env - mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`). - WithArgs("ws-traces-500", "LANGFUSE_HOST"). - WillReturnError(sql.ErrNoRows) + // HOST skips workspace secrets — falls through global → env. mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`). WithArgs("LANGFUSE_HOST"). WillReturnError(sql.ErrNoRows) -- 2.52.0 From 1418c53dd1cf0ae4a0f3d086808eaf082717232d Mon Sep 17 00:00:00 2001 From: "Molecule AI Dev Engineer A (Kimi)" Date: Thu, 4 Jun 2026 18:58:36 +0000 Subject: [PATCH 6/6] fix(security): add SSRF defense-in-depth validation for LANGFUSE_HOST (#2029) Adds validateLangfuseHost that rejects: - non-HTTP(S) schemes - loopback, private, link-local IPs - AWS metadata endpoint (169.254.169.254) - blocked hostnames (localhost, metadata.*) - URLs with userinfo, path, query, or fragment The admin-only host boundary (global_secrets/env) is preserved; this is defense-in-depth against accidental or malicious misconfiguration of an otherwise admin-controlled value. Also overridable via langfuseHostValidator package var for tests that need to exercise loopback mock servers. Co-Authored-By: Claude Opus 4.7 --- workspace-server/internal/handlers/traces.go | 102 +++++++++++++ .../internal/handlers/traces_test.go | 134 ++++++++++++++++-- 2 files changed, 228 insertions(+), 8 deletions(-) diff --git a/workspace-server/internal/handlers/traces.go b/workspace-server/internal/handlers/traces.go index 1cabafa1..0e000f9a 100644 --- a/workspace-server/internal/handlers/traces.go +++ b/workspace-server/internal/handlers/traces.go @@ -4,8 +4,11 @@ import ( "context" "fmt" "io" + "net" "net/http" + "net/url" "os" + "strings" "time" "github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto" @@ -15,6 +18,10 @@ import ( var langfuseClient = &http.Client{Timeout: 10 * time.Second} +// langfuseHostValidator is the validation function used by resolveLangfuseConfig. +// Overridable in tests that need to exercise loopback / private endpoints. +var langfuseHostValidator = validateLangfuseHost + type TracesHandler struct{} func NewTracesHandler() *TracesHandler { @@ -100,9 +107,104 @@ func resolveLangfuseConfig(ctx context.Context, workspaceID string) (*langfuseCo if cfg.Host == "" || cfg.Public == "" || cfg.Secret == "" { return nil, nil } + + // Defense-in-depth: even admin-controlled hosts can be misconfigured + // (accidentally or maliciously) to point at internal infrastructure. + // Validate the resolved host before constructing outbound requests. + if err := langfuseHostValidator(cfg.Host); err != nil { + return nil, fmt.Errorf("invalid LANGFUSE_HOST: %w", err) + } + return cfg, nil } +// validateLangfuseHost rejects URLs that would create an SSRF or +// credential-leak vector against internal infrastructure. It is +// defense-in-depth on top of the admin-only host control boundary. +// +// Rejected targets: +// - non-HTTP(S) schemes (file, ftp, gopher, unix, etc.) +// - private IPv4 ranges (10/8, 172.16/12, 192.168/16) +// - loopback (127/8, ::1) +// - link-local (169.254/16, fe80::/10) +// - AWS metadata endpoint (169.254.169.254) +// - URLs containing userinfo, path, query, or fragment +func validateLangfuseHost(raw string) error { + u, err := url.Parse(raw) + if err != nil { + return fmt.Errorf("not a valid URL: %v", err) + } + + // Scheme must be http or https. + scheme := strings.ToLower(u.Scheme) + if scheme != "http" && scheme != "https" { + return fmt.Errorf("scheme %q not allowed (only http/https)", u.Scheme) + } + + // Reject userinfo — prevents credential stuffing / info-leak. + if u.User != nil { + return fmt.Errorf("URL must not contain userinfo") + } + + // Reject path, query, fragment — we only want a base host. + if u.Path != "" && u.Path != "/" { + return fmt.Errorf("URL must not contain a path") + } + if u.RawQuery != "" { + return fmt.Errorf("URL must not contain a query string") + } + if u.Fragment != "" { + return fmt.Errorf("URL must not contain a fragment") + } + + host := u.Hostname() + if host == "" { + return fmt.Errorf("missing hostname") + } + + // Reject IPv4 literals that resolve to internal infrastructure. + if ip := net.ParseIP(host); ip != nil { + if ip.IsLoopback() { + return fmt.Errorf("loopback address %q not allowed", host) + } + if ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() { + return fmt.Errorf("link-local address %q not allowed", host) + } + if ip.IsPrivate() { + return fmt.Errorf("private address %q not allowed", host) + } + // AWS metadata service (also catches 169.254.169.254). + if ip.Equal(net.ParseIP("169.254.169.254")) { + return fmt.Errorf("metadata endpoint %q not allowed", host) + } + return nil + } + + // For hostnames, reject ones that look like IPv4 literals with a + // trailing dot or other evasion patterns. + if strings.HasSuffix(host, ".") { + host = strings.TrimSuffix(host, ".") + } + + // Reject hostnames that resolve to the metadata endpoint or other + // well-known internal names. + hostLower := strings.ToLower(host) + blocked := []string{ + "localhost", + "metadata.google.internal", + "metadata", + "metadata.azure.internal", + "100.100.100.200", // Alibaba metadata + } + for _, b := range blocked { + if hostLower == b { + return fmt.Errorf("hostname %q not allowed", host) + } + } + + return nil +} + // List handles GET /workspaces/:id/traces // Proxies to Langfuse API to get recent traces for a workspace. func (h *TracesHandler) List(c *gin.Context) { diff --git a/workspace-server/internal/handlers/traces_test.go b/workspace-server/internal/handlers/traces_test.go index fd5e3a70..c37cbb45 100644 --- a/workspace-server/internal/handlers/traces_test.go +++ b/workspace-server/internal/handlers/traces_test.go @@ -67,8 +67,9 @@ func TestTracesList_WorkspaceSecretsOverride(t *testing.T) { setupTestRedis(t) handler := NewTracesHandler() - // Encrypt test secrets - encHost, _ := crypto.Encrypt([]byte("http://localhost:3000")) + // Encrypt test secrets — use a non-loopback, non-private hostname so + // validateLangfuseHost does not reject it. + encHost, _ := crypto.Encrypt([]byte("http://langfuse.example.com:3000")) verHost := crypto.CurrentEncryptionVersion() encPk, _ := crypto.Encrypt([]byte("pk-ws")) verPk := crypto.CurrentEncryptionVersion() @@ -117,7 +118,7 @@ func TestTracesList_GlobalSecretsFallback(t *testing.T) { handler := NewTracesHandler() // No workspace secrets, but global secrets exist - encHost, _ := crypto.Encrypt([]byte("http://localhost:3000")) + encHost, _ := crypto.Encrypt([]byte("http://langfuse.example.com:3000")) verHost := crypto.CurrentEncryptionVersion() encPk, _ := crypto.Encrypt([]byte("pk-global")) verPk := crypto.CurrentEncryptionVersion() @@ -170,8 +171,9 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) { setupTestRedis(t) handler := NewTracesHandler() - // Set all env vars but point to unreachable host - os.Setenv("LANGFUSE_HOST", "http://localhost:99999") + // Set all env vars but point to unreachable host (must be a valid + // public-looking hostname so validateLangfuseHost does not reject it). + os.Setenv("LANGFUSE_HOST", "http://langfuse-down.example.com:99999") os.Setenv("LANGFUSE_PUBLIC_KEY", "pk-test") os.Setenv("LANGFUSE_SECRET_KEY", "sk-test") defer func() { @@ -278,8 +280,8 @@ func TestTracesList_WorkspaceHostIgnored(t *testing.T) { encSk, _ := crypto.Encrypt([]byte("sk-ws")) verSk := crypto.CurrentEncryptionVersion() - // Admin sets a legitimate HOST via global secrets. - encHostGlobal, _ := crypto.Encrypt([]byte("http://localhost:3000")) + // Admin sets a legitimate HOST via global secrets (non-loopback). + encHostGlobal, _ := crypto.Encrypt([]byte("http://langfuse.example.com:3000")) verHostGlobal := crypto.CurrentEncryptionVersion() mock := setupTestDB(t) @@ -320,7 +322,13 @@ func TestTracesList_LangfuseUpstreamError(t *testing.T) { setupTestRedis(t) handler := NewTracesHandler() - // Start a mock Langfuse server that returns 500 with a non-JSON body + // Start a mock Langfuse server that returns 500 with a non-JSON body. + // httptest uses 127.0.0.1 which validateLangfuseHost rejects, so we + // temporarily swap the validator for this test. + origValidator := langfuseHostValidator + langfuseHostValidator = func(string) error { return nil } + defer func() { langfuseHostValidator = origValidator }() + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html") w.WriteHeader(http.StatusInternalServerError) @@ -374,3 +382,113 @@ func TestTracesList_LangfuseUpstreamError(t *testing.T) { t.Errorf("expected empty list on upstream error, got %d items", len(resp)) } } + +// ==================== validateLangfuseHost ==================== + +func TestValidateLangfuseHost_Valid(t *testing.T) { + cases := []string{ + "http://langfuse.example.com", + "https://langfuse.example.com:3000", + "https://us.cloud.langfuse.com", + } + for _, c := range cases { + if err := validateLangfuseHost(c); err != nil { + t.Errorf("validateLangfuseHost(%q) = %v, want nil", c, err) + } + } +} + +func TestValidateLangfuseHost_InvalidScheme(t *testing.T) { + cases := []string{ + "ftp://langfuse.example.com", + "file:///etc/passwd", + "unix:///var/run/langfuse.sock", + "gopher://langfuse.example.com", + } + for _, c := range cases { + if err := validateLangfuseHost(c); err == nil { + t.Errorf("validateLangfuseHost(%q) = nil, want error", c) + } + } +} + +func TestValidateLangfuseHost_Loopback(t *testing.T) { + cases := []string{ + "http://localhost", + "http://127.0.0.1", + "http://127.0.0.1:3000", + "http://[::1]", + "http://[::1]:3000", + } + for _, c := range cases { + if err := validateLangfuseHost(c); err == nil { + t.Errorf("validateLangfuseHost(%q) = nil, want error", c) + } + } +} + +func TestValidateLangfuseHost_Private(t *testing.T) { + cases := []string{ + "http://10.0.0.1", + "http://192.168.1.1", + "http://172.16.0.1", + "http://172.31.255.255", + } + for _, c := range cases { + if err := validateLangfuseHost(c); err == nil { + t.Errorf("validateLangfuseHost(%q) = nil, want error", c) + } + } +} + +func TestValidateLangfuseHost_LinkLocal(t *testing.T) { + cases := []string{ + "http://169.254.0.1", + "http://169.254.169.254", // AWS metadata + "http://[fe80::1]", + } + for _, c := range cases { + if err := validateLangfuseHost(c); err == nil { + t.Errorf("validateLangfuseHost(%q) = nil, want error", c) + } + } +} + +func TestValidateLangfuseHost_BlockedHostnames(t *testing.T) { + cases := []string{ + "http://localhost", + "http://metadata.google.internal", + "http://metadata", + "http://metadata.azure.internal", + } + for _, c := range cases { + if err := validateLangfuseHost(c); err == nil { + t.Errorf("validateLangfuseHost(%q) = nil, want error", c) + } + } +} + +func TestValidateLangfuseHost_Userinfo(t *testing.T) { + if err := validateLangfuseHost("http://user:pass@langfuse.example.com"); err == nil { + t.Error("expected error for URL with userinfo") + } +} + +func TestValidateLangfuseHost_PathQueryFragment(t *testing.T) { + cases := []string{ + "http://langfuse.example.com/path", + "http://langfuse.example.com?query=1", + "http://langfuse.example.com#frag", + } + for _, c := range cases { + if err := validateLangfuseHost(c); err == nil { + t.Errorf("validateLangfuseHost(%q) = nil, want error", c) + } + } +} + +func TestValidateLangfuseHost_MissingHostname(t *testing.T) { + if err := validateLangfuseHost("http://"); err == nil { + t.Error("expected error for missing hostname") + } +} -- 2.52.0