traces(v1): per-workspace Langfuse config + upstream error handling #2029

Closed
core-be wants to merge 6 commits from feat/traces-v1-workspace-secrets-2976 into staging
8 changed files with 735 additions and 80 deletions
+53 -24
View File
@@ -234,23 +234,48 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
"Authorization": `Bearer ${tenantToken}`,
"X-Molecule-Org-Id": orgID,
};
const ws = await jsonFetch(`${tenantURL}/workspaces`, {
method: "POST",
headers: tenantAuth,
body: JSON.stringify({
name: "E2E Canvas Test",
runtime: "hermes",
tier: 2,
model: "gpt-4o",
}),
});
if (ws.status >= 400 || !ws.body?.id) {
throw new Error(`Workspace create ${ws.status}: ${JSON.stringify(ws.body)}`);
// Retry workspace creation on transient 5xx / timeout — staging CP can
// return 502/503/504 under load and a single-shot failure kills the
// entire E2E run. 3 attempts with 3s exponential backoff (3s, 6s, 12s)
// gives ~21s total budget, well inside the 20-min provision envelope.
let workspaceId = "";
for (let attempt = 1; attempt <= 3; attempt++) {
const ws = await jsonFetch(`${tenantURL}/workspaces`, {
method: "POST",
headers: tenantAuth,
body: JSON.stringify({
name: "E2E Canvas Test",
runtime: "hermes",
tier: 2,
model: "gpt-4o",
}),
});
if (ws.status >= 200 && ws.status < 300 && ws.body?.id) {
workspaceId = ws.body.id as string;
break;
}
const isTransient = ws.status >= 500 || ws.status === 0;
if (!isTransient || attempt === 3) {
throw new Error(`Workspace create ${ws.status} (attempt ${attempt}): ${JSON.stringify(ws.body)}`);
}
const backoff = 3000 * Math.pow(2, attempt - 1);
console.log(`[staging-setup] Workspace create transient ${ws.status}, retrying in ${backoff}ms...`);
await new Promise((r) => setTimeout(r, backoff));
}
const workspaceId = ws.body.id as string;
console.log(`[staging-setup] Workspace created: ${workspaceId}`);
// 6. Wait for workspace online
//
// Hermes cold-boot takes 10-13 min on slow apt days (apt + uv + hermes
// install + npm browser-tools). The controlplane bootstrap-watcher
// deadline fires at 5 min and sets status=failed prematurely; heartbeat
// then transitions failed → online after install.sh finishes. So
// 'failed' is a TRANSIENT state we must tolerate — log once and keep
// polling, only hard-fail at the deadline. Pre-fix this was a flake
// generator: workspace went failed→online inside our window but we
// bailed at the failed read. See test_staging_full_saas.sh step 7/11
// and issue #2632.
let wsFailedLogged = false;
await waitFor<boolean>(
async () => {
const r = await jsonFetch(`${tenantURL}/workspaces/${workspaceId}`, {
@@ -259,17 +284,21 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
if (r.status !== 200) return null;
if (r.body?.status === "online") return true;
if (r.body?.status === "failed") {
// last_sample_error is often empty when the failure happens before
// the agent emits a sample (e.g. boot crash, image pull error,
// missing PYTHONPATH, OpenAI quota at startup). Dumping the full
// body gives triage the boot_stage / last_error / image fields it
// needs without a second probe. Otherwise this propagates as a
// bare "Workspace failed: " — the exact useless message that
// sent #2632 to the issue tracker.
const detail = r.body.last_sample_error
? r.body.last_sample_error
: `(no last_sample_error) full body: ${JSON.stringify(r.body)}`;
throw new Error(`Workspace failed: ${detail}`);
if (!wsFailedLogged) {
// last_sample_error is often empty when the failure happens before
// the agent emits a sample (e.g. boot crash, image pull error,
// missing PYTHONPATH, OpenAI quota at startup). Dumping the full
// body gives triage the boot_stage / last_error / image fields it
// needs without a second probe. Otherwise this propagates as a
// bare "Workspace failed: " — the exact useless message that
// sent #2632 to the issue tracker.
const detail = r.body.last_sample_error
? r.body.last_sample_error
: `(no last_sample_error) full body: ${JSON.stringify(r.body)}`;
console.log(`[staging-setup] workspace ${workspaceId} transiently failed — waiting for heartbeat recovery (bootstrap-watcher deadline, see cp#245). detail: ${detail}`);
wsFailedLogged = true;
}
return null;
}
return null;
},
+12 -5
View File
@@ -3,6 +3,7 @@ package bundle
import (
"context"
"fmt"
"log"
"strings"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
@@ -92,7 +93,9 @@ func Import(
if err != nil {
markFailed(provCtx, wsID, broadcaster, err)
} else if url != "" {
db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID)
if _, dbErr := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); dbErr != nil {
log.Printf("bundle import: failed to update workspace URL for %s: %v", wsID, dbErr)
}
}
}()
}
@@ -139,12 +142,16 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste
// markProvisionFailed in workspace-server/internal/handlers/
// workspace_provision_shared.go.
msg := err.Error()
db.DB.ExecContext(ctx,
if _, dbErr := db.DB.ExecContext(ctx,
`UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`,
models.StatusFailed, msg, wsID)
broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
models.StatusFailed, msg, wsID); dbErr != nil {
log.Printf("bundle import: failed to mark workspace %s failed: %v", wsID, dbErr)
}
if bcErr := broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
"error": msg,
})
}); bcErr != nil {
log.Printf("bundle import: failed to broadcast provision failed for %s: %v", wsID, bcErr)
}
}
func nilIfEmpty(s string) interface{} {
+26 -11
View File
@@ -375,21 +375,25 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
// Update stats in DB
if db.DB != nil {
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE workspace_channels
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
WHERE id = $1
`, ch.ID)
`, ch.ID); err != nil {
log.Printf("Channels: failed to update inbound stats for channel %s: %v", ch.ID, err)
}
}
// Broadcast event
if m.broadcaster != nil {
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
"channel_id": ch.ID,
"channel_type": ch.ChannelType,
"username": msg.Username,
"direction": "inbound",
})
}); err != nil {
log.Printf("Channels: failed to broadcast inbound event: %v", err)
}
}
return nil
@@ -420,19 +424,23 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
}
if db.DB != nil {
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE workspace_channels
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
WHERE id = $1
`, channelID)
`, channelID); err != nil {
log.Printf("Channels: failed to update outbound stats for channel %s: %v", channelID, err)
}
}
if m.broadcaster != nil {
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
"channel_id": ch.ID,
"channel_type": ch.ChannelType,
"direction": "outbound",
})
}); err != nil {
log.Printf("Channels: failed to broadcast outbound event: %v", err)
}
}
return nil
@@ -498,7 +506,10 @@ func (m *Manager) FetchWorkspaceChannelContext(ctx context.Context, workspaceID
return ""
}
var config map[string]interface{}
json.Unmarshal(configJSON, &config)
if err := json.Unmarshal(configJSON, &config); err != nil {
log.Printf("Channels: failed to unmarshal channel config: %v", err)
return ""
}
if err := DecryptSensitiveFields(config); err != nil {
return ""
}
@@ -555,8 +566,12 @@ func (m *Manager) loadChannel(ctx context.Context, channelID string) (ChannelRow
if err != nil {
return ch, fmt.Errorf("channel %s not found: %w", channelID, err)
}
json.Unmarshal(configJSON, &ch.Config)
json.Unmarshal(allowedJSON, &ch.AllowedUsers)
if err := json.Unmarshal(configJSON, &ch.Config); err != nil {
return ch, fmt.Errorf("unmarshal channel %s config: %w", channelID, err)
}
if err := json.Unmarshal(allowedJSON, &ch.AllowedUsers); err != nil {
return ch, fmt.Errorf("unmarshal channel %s allowed_users: %w", channelID, err)
}
// #319: decrypt bot_token / webhook_secret — SendOutbound and adapter
// methods downstream read them as plaintext strings.
if err := DecryptSensitiveFields(ch.Config); err != nil {
@@ -514,7 +514,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
// Acknowledge the button press (removes loading spinner)
ackCfg := tgbotapi.NewCallback(cb.ID, "Received")
bot.Send(ackCfg)
if _, err := bot.Send(ackCfg); err != nil {
log.Printf("telegram: failed to send callback ack: %v", err)
}
// Update the message to show what was clicked
decision := "approved"
@@ -526,7 +528,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
cb.Message.MessageID,
cb.Message.Text+"\n\n✅ CEO "+decision,
)
bot.Send(editMsg)
if _, err := bot.Send(editMsg); err != nil {
log.Printf("telegram: failed to send edit message: %v", err)
}
// Route the decision as an inbound message to the agent
inbound := &InboundMessage{
@@ -51,23 +51,29 @@ func (h *ApprovalsHandler) Create(c *gin.Context) {
return
}
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
"approval_id": approvalID,
"action": body.Action,
"reason": body.Reason,
"task_id": body.TaskID,
})
}); err != nil {
log.Printf("approvals: failed to broadcast approval requested: %v", err)
}
// Auto-escalate to parent
var parentID *string
db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID)
if err := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); err != nil {
log.Printf("approvals: failed to lookup parent for escalation: %v", err)
}
if parentID != nil {
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
"approval_id": approvalID,
"from_workspace_id": workspaceID,
"action": body.Action,
"reason": body.Reason,
})
}); err != nil {
log.Printf("approvals: failed to broadcast approval escalated: %v", err)
}
}
c.JSON(http.StatusCreated, gin.H{"approval_id": approvalID, "status": "pending"})
@@ -80,10 +86,12 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) {
ctx := c.Request.Context()
// Auto-expire stale approvals (older than 10 min)
db.DB.ExecContext(ctx, `
if _, err := db.DB.ExecContext(ctx, `
UPDATE approval_requests SET status = 'denied', decided_by = 'auto-expired', decided_at = now()
WHERE status = 'pending' AND created_at < now() - interval '10 minutes'
`)
`); err != nil {
log.Printf("approvals: failed to auto-expire stale approvals: %v", err)
}
rows, err := db.DB.QueryContext(ctx, `
SELECT a.id, a.workspace_id, w.name, a.action, a.reason, a.status, a.created_at
@@ -205,11 +213,13 @@ func (h *ApprovalsHandler) Decide(c *gin.Context) {
eventType = "APPROVAL_DENIED"
}
h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
if err := h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
"approval_id": approvalID,
"decision": body.Decision,
"decided_by": decidedBy,
})
}); err != nil {
log.Printf("approvals: failed to broadcast approval decision: %v", err)
}
c.JSON(http.StatusOK, gin.H{"status": body.Decision, "approval_id": approvalID})
}
@@ -67,7 +67,10 @@ func (h *ChannelHandler) List(c *gin.Context) {
}
var config map[string]interface{}
json.Unmarshal(configJSON, &config)
if err := json.Unmarshal(configJSON, &config); err != nil {
log.Printf("Channels: unmarshal config on list for channel %s: %v", id, err)
config = map[string]interface{}{}
}
// #319: decrypt sensitive fields first so the mask operates on
// plaintext (first-4 / last-4 of the real token, not the ciphertext
// prefix). Decrypt errors are logged but non-fatal — List must keep
@@ -86,7 +89,10 @@ func (h *ChannelHandler) List(c *gin.Context) {
}
var allowed []string
json.Unmarshal(allowedJSON, &allowed)
if err := json.Unmarshal(allowedJSON, &allowed); err != nil {
log.Printf("Channels: unmarshal allowed_users on list for channel %s: %v", id, err)
allowed = []string{}
}
entry := map[string]interface{}{
"id": id,
+208 -9
View File
@@ -1,47 +1,235 @@
package handlers
import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
var langfuseClient = &http.Client{Timeout: 10 * time.Second}
// langfuseHostValidator is the validation function used by resolveLangfuseConfig.
// Overridable in tests that need to exercise loopback / private endpoints.
var langfuseHostValidator = validateLangfuseHost
type TracesHandler struct{}
func NewTracesHandler() *TracesHandler {
return &TracesHandler{}
}
// langfuseConfig holds the resolved Langfuse connection parameters.
// Workspace secrets override global secrets which override environment
// variables, matching the precedence rules in workspace_provision.go.
type langfuseConfig struct {
Host string
Public string
Secret string
}
// resolveLangfuseConfig looks up LANGFUSE_HOST, LANGFUSE_PUBLIC_KEY and
// LANGFUSE_SECRET_KEY for a workspace.
//
// SECURITY BOUNDARY (RC 8337 / #2029):
// LANGFUSE_HOST is admin-controlled ONLY — resolved from global_secrets
// or environment variables. Workspace secrets CANNOT override the host.
// This prevents SSRF: a workspace owner could otherwise set HOST to an
// internal endpoint and exfiltrate data using the Langfuse credentials.
//
// LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY follow the normal
// precedence: workspace_secrets → global_secrets → environment vars.
// This preserves per-workspace project isolation without the SSRF vector.
//
// If any of the three keys is missing after all layers, the config is
// considered incomplete and traces are disabled for the workspace.
func resolveLangfuseConfig(ctx context.Context, workspaceID string) (*langfuseConfig, error) {
cfg := &langfuseConfig{}
// HOST — admin-controlled only (global → env). Workspace secrets are
// intentionally NOT consulted to close the SSRF vector described above.
resolveHost := func() string {
var val []byte
var ver int
err := db.DB.QueryRowContext(ctx,
`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = $1`,
"LANGFUSE_HOST").Scan(&val, &ver)
if err == nil {
decrypted, decErr := crypto.DecryptVersioned(val, ver)
if decErr == nil {
return string(decrypted)
}
}
return os.Getenv("LANGFUSE_HOST")
}
// Keys — workspace secrets override global secrets override env.
resolveKey := func(key string) string {
var val []byte
var ver int
err := db.DB.QueryRowContext(ctx,
`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = $1 AND key = $2`,
workspaceID, key).Scan(&val, &ver)
if err == nil {
decrypted, decErr := crypto.DecryptVersioned(val, ver)
if decErr == nil {
return string(decrypted)
}
}
err = db.DB.QueryRowContext(ctx,
`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = $1`,
key).Scan(&val, &ver)
if err == nil {
decrypted, decErr := crypto.DecryptVersioned(val, ver)
if decErr == nil {
return string(decrypted)
}
}
return os.Getenv(key)
}
cfg.Host = resolveHost()
cfg.Public = resolveKey("LANGFUSE_PUBLIC_KEY")
cfg.Secret = resolveKey("LANGFUSE_SECRET_KEY")
// Incomplete config is not an error — it simply means tracing is
// disabled for this workspace. Callers treat (nil, nil) as
// "no traces available".
if cfg.Host == "" || cfg.Public == "" || cfg.Secret == "" {
return nil, nil
}
// Defense-in-depth: even admin-controlled hosts can be misconfigured
// (accidentally or maliciously) to point at internal infrastructure.
// Validate the resolved host before constructing outbound requests.
if err := langfuseHostValidator(cfg.Host); err != nil {
return nil, fmt.Errorf("invalid LANGFUSE_HOST: %w", err)
}
return cfg, nil
}
// validateLangfuseHost rejects URLs that would create an SSRF or
// credential-leak vector against internal infrastructure. It is
// defense-in-depth on top of the admin-only host control boundary.
//
// Rejected targets:
// - non-HTTP(S) schemes (file, ftp, gopher, unix, etc.)
// - private IPv4 ranges (10/8, 172.16/12, 192.168/16)
// - loopback (127/8, ::1)
// - link-local (169.254/16, fe80::/10)
// - AWS metadata endpoint (169.254.169.254)
// - URLs containing userinfo, path, query, or fragment
func validateLangfuseHost(raw string) error {
u, err := url.Parse(raw)
if err != nil {
return fmt.Errorf("not a valid URL: %v", err)
}
// Scheme must be http or https.
scheme := strings.ToLower(u.Scheme)
if scheme != "http" && scheme != "https" {
return fmt.Errorf("scheme %q not allowed (only http/https)", u.Scheme)
}
// Reject userinfo — prevents credential stuffing / info-leak.
if u.User != nil {
return fmt.Errorf("URL must not contain userinfo")
}
// Reject path, query, fragment — we only want a base host.
if u.Path != "" && u.Path != "/" {
return fmt.Errorf("URL must not contain a path")
}
if u.RawQuery != "" {
return fmt.Errorf("URL must not contain a query string")
}
if u.Fragment != "" {
return fmt.Errorf("URL must not contain a fragment")
}
host := u.Hostname()
if host == "" {
return fmt.Errorf("missing hostname")
}
// Reject IPv4 literals that resolve to internal infrastructure.
if ip := net.ParseIP(host); ip != nil {
if ip.IsLoopback() {
return fmt.Errorf("loopback address %q not allowed", host)
}
if ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() {
return fmt.Errorf("link-local address %q not allowed", host)
}
if ip.IsPrivate() {
return fmt.Errorf("private address %q not allowed", host)
}
// AWS metadata service (also catches 169.254.169.254).
if ip.Equal(net.ParseIP("169.254.169.254")) {
return fmt.Errorf("metadata endpoint %q not allowed", host)
}
return nil
}
// For hostnames, reject ones that look like IPv4 literals with a
// trailing dot or other evasion patterns.
if strings.HasSuffix(host, ".") {
host = strings.TrimSuffix(host, ".")
}
// Reject hostnames that resolve to the metadata endpoint or other
// well-known internal names.
hostLower := strings.ToLower(host)
blocked := []string{
"localhost",
"metadata.google.internal",
"metadata",
"metadata.azure.internal",
"100.100.100.200", // Alibaba metadata
}
for _, b := range blocked {
if hostLower == b {
return fmt.Errorf("hostname %q not allowed", host)
}
}
return nil
}
// List handles GET /workspaces/:id/traces
// Proxies to Langfuse API to get recent traces for a workspace.
func (h *TracesHandler) List(c *gin.Context) {
workspaceID := c.Param("id")
langfuseHost := os.Getenv("LANGFUSE_HOST")
langfusePublic := os.Getenv("LANGFUSE_PUBLIC_KEY")
langfuseSecret := os.Getenv("LANGFUSE_SECRET_KEY")
if langfuseHost == "" || langfusePublic == "" || langfuseSecret == "" {
cfg, err := resolveLangfuseConfig(c.Request.Context(), workspaceID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to resolve trace config"})
return
}
if cfg == nil {
c.JSON(http.StatusOK, []interface{}{})
return
}
// Fetch traces from Langfuse, filtered by workspace tag or name
url := fmt.Sprintf("%s/api/public/traces?limit=20&orderBy=timestamp&orderDir=desc&tags=%s",
langfuseHost, workspaceID)
cfg.Host, workspaceID)
req, err := http.NewRequestWithContext(c.Request.Context(), "GET", url, nil)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create request"})
return
}
req.SetBasicAuth(langfusePublic, langfuseSecret)
req.SetBasicAuth(cfg.Public, cfg.Secret)
resp, err := langfuseClient.Do(req)
if err != nil {
@@ -51,6 +239,17 @@ func (h *TracesHandler) List(c *gin.Context) {
}
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
c.Data(resp.StatusCode, "application/json", body)
body, readErr := io.ReadAll(resp.Body)
if readErr != nil {
c.JSON(http.StatusOK, []interface{}{})
return
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
// Upstream error — don't proxy HTML error pages or unexpected
// JSON shapes to the Canvas client. Return empty so the UI
// gracefully shows "no traces" rather than breaking on parse.
c.JSON(http.StatusOK, []interface{}{})
return
}
c.Data(http.StatusOK, "application/json", body)
}
+403 -18
View File
@@ -1,26 +1,45 @@
package handlers
import (
"database/sql"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto"
"github.com/gin-gonic/gin"
)
// ==================== GET /workspaces/:id/traces ====================
func TestTracesList_NoLangfuseConfig(t *testing.T) {
setupTestDB(t)
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
// Ensure Langfuse env vars are not set
os.Unsetenv("LANGFUSE_HOST")
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
os.Unsetenv("LANGFUSE_SECRET_KEY")
// No global secrets, no env vars. HOST is admin-only (global/env) so
// workspace_secrets is NOT queried for LANGFUSE_HOST.
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_HOST").
WillReturnError(sql.ErrNoRows)
// env fallback is empty (default)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces", "LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces", "LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -43,23 +62,96 @@ func TestTracesList_NoLangfuseConfig(t *testing.T) {
}
}
func TestTracesList_PartialLangfuseConfig(t *testing.T) {
func TestTracesList_WorkspaceSecretsOverride(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
// Set only host, missing keys
os.Setenv("LANGFUSE_HOST", "http://localhost:3000")
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
os.Unsetenv("LANGFUSE_SECRET_KEY")
defer func() {
os.Unsetenv("LANGFUSE_HOST")
}()
// Encrypt test secrets — use a non-loopback, non-private hostname so
// validateLangfuseHost does not reject it.
encHost, _ := crypto.Encrypt([]byte("http://langfuse.example.com:3000"))
verHost := crypto.CurrentEncryptionVersion()
encPk, _ := crypto.Encrypt([]byte("pk-ws"))
verPk := crypto.CurrentEncryptionVersion()
encSk, _ := crypto.Encrypt([]byte("sk-ws"))
verSk := crypto.CurrentEncryptionVersion()
mock := setupTestDB(t)
setupTestRedis(t)
// HOST is admin-only: workspace secret for HOST is IGNORED.
// Code queries global_secrets first, then falls through to env.
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_HOST").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHost, verHost))
// Keys follow normal precedence: workspace secrets override global.
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-override", "LANGFUSE_PUBLIC_KEY").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk))
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-override", "LANGFUSE_SECRET_KEY").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encSk, verSk))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-partial"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-partial/traces", nil)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-override"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-override/traces", nil)
handler.List(c)
// We don't have a real Langfuse server, so the request will fail
// network-wise and return empty (graceful fallback)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
}
}
func TestTracesList_GlobalSecretsFallback(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
// No workspace secrets, but global secrets exist
encHost, _ := crypto.Encrypt([]byte("http://langfuse.example.com:3000"))
verHost := crypto.CurrentEncryptionVersion()
encPk, _ := crypto.Encrypt([]byte("pk-global"))
verPk := crypto.CurrentEncryptionVersion()
encSk, _ := crypto.Encrypt([]byte("sk-global"))
verSk := crypto.CurrentEncryptionVersion()
mock := setupTestDB(t)
setupTestRedis(t)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-global", "LANGFUSE_HOST").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_HOST").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHost, verHost))
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-global", "LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_PUBLIC_KEY").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk))
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-global", "LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_SECRET_KEY").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encSk, verSk))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-global"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-global/traces", nil)
handler.List(c)
@@ -70,7 +162,7 @@ func TestTracesList_PartialLangfuseConfig(t *testing.T) {
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list with partial config, got %d items", len(resp))
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
}
}
@@ -79,8 +171,9 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) {
setupTestRedis(t)
handler := NewTracesHandler()
// Set all env vars but point to unreachable host
os.Setenv("LANGFUSE_HOST", "http://localhost:99999")
// Set all env vars but point to unreachable host (must be a valid
// public-looking hostname so validateLangfuseHost does not reject it).
os.Setenv("LANGFUSE_HOST", "http://langfuse-down.example.com:99999")
os.Setenv("LANGFUSE_PUBLIC_KEY", "pk-test")
os.Setenv("LANGFUSE_SECRET_KEY", "sk-test")
defer func() {
@@ -89,6 +182,27 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) {
os.Unsetenv("LANGFUSE_SECRET_KEY")
}()
// No global secrets, so env vars are used. HOST skips workspace_secrets.
mock := setupTestDB(t)
setupTestRedis(t)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_HOST").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-down", "LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-down", "LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-down"}}
@@ -107,3 +221,274 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) {
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
}
}
func TestTracesList_PartialWorkspaceConfig(t *testing.T) {
handler := NewTracesHandler()
// Workspace has PUBLIC_KEY but missing HOST and SECRET_KEY.
// Config incomplete → empty response.
encPk, _ := crypto.Encrypt([]byte("pk-partial"))
verPk := crypto.CurrentEncryptionVersion()
mock := setupTestDB(t)
setupTestRedis(t)
// HOST skipped for workspace secrets
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_HOST").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-partial", "LANGFUSE_PUBLIC_KEY").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk))
// SECRET_KEY missing everywhere
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-partial", "LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-partial"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-partial/traces", nil)
handler.List(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list with partial config, got %d items", len(resp))
}
}
func TestTracesList_WorkspaceHostIgnored(t *testing.T) {
// SECURITY REGRESSION: workspace secrets MUST NOT override LANGFUSE_HOST.
// If they could, a workspace owner could set HOST to an internal
// endpoint and exfiltrate data using the Langfuse credentials.
handler := NewTracesHandler()
// Workspace tries to set a malicious HOST via workspace secrets.
encHostWs, _ := crypto.Encrypt([]byte("http://169.254.169.254"))
verHostWs := crypto.CurrentEncryptionVersion()
encPk, _ := crypto.Encrypt([]byte("pk-ws"))
verPk := crypto.CurrentEncryptionVersion()
encSk, _ := crypto.Encrypt([]byte("sk-ws"))
verSk := crypto.CurrentEncryptionVersion()
// Admin sets a legitimate HOST via global secrets (non-loopback).
encHostGlobal, _ := crypto.Encrypt([]byte("http://langfuse.example.com:3000"))
verHostGlobal := crypto.CurrentEncryptionVersion()
mock := setupTestDB(t)
setupTestRedis(t)
// Code queries global_secrets for HOST first — workspace secret is never read.
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_HOST").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHostGlobal, verHostGlobal))
// Keys still read workspace secrets.
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-ssrf", "LANGFUSE_PUBLIC_KEY").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk))
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-ssrf", "LANGFUSE_SECRET_KEY").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encSk, verSk))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-ssrf"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-ssrf/traces", nil)
handler.List(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
}
}
func TestTracesList_LangfuseUpstreamError(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
// Start a mock Langfuse server that returns 500 with a non-JSON body.
// httptest uses 127.0.0.1 which validateLangfuseHost rejects, so we
// temporarily swap the validator for this test.
origValidator := langfuseHostValidator
langfuseHostValidator = func(string) error { return nil }
defer func() { langfuseHostValidator = origValidator }()
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("<html><body>Internal Server Error</body></html>"))
}))
defer upstream.Close()
os.Setenv("LANGFUSE_HOST", upstream.URL)
os.Setenv("LANGFUSE_PUBLIC_KEY", "pk-test")
os.Setenv("LANGFUSE_SECRET_KEY", "sk-test")
defer func() {
os.Unsetenv("LANGFUSE_HOST")
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
os.Unsetenv("LANGFUSE_SECRET_KEY")
}()
// HOST skips workspace secrets — falls through global → env.
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_HOST").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-500", "LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-500", "LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-500"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-500/traces", nil)
handler.List(c)
// Should return empty JSON (not proxy the 500 HTML) when Langfuse errors
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list on upstream error, got %d items", len(resp))
}
}
// ==================== validateLangfuseHost ====================
func TestValidateLangfuseHost_Valid(t *testing.T) {
cases := []string{
"http://langfuse.example.com",
"https://langfuse.example.com:3000",
"https://us.cloud.langfuse.com",
}
for _, c := range cases {
if err := validateLangfuseHost(c); err != nil {
t.Errorf("validateLangfuseHost(%q) = %v, want nil", c, err)
}
}
}
func TestValidateLangfuseHost_InvalidScheme(t *testing.T) {
cases := []string{
"ftp://langfuse.example.com",
"file:///etc/passwd",
"unix:///var/run/langfuse.sock",
"gopher://langfuse.example.com",
}
for _, c := range cases {
if err := validateLangfuseHost(c); err == nil {
t.Errorf("validateLangfuseHost(%q) = nil, want error", c)
}
}
}
func TestValidateLangfuseHost_Loopback(t *testing.T) {
cases := []string{
"http://localhost",
"http://127.0.0.1",
"http://127.0.0.1:3000",
"http://[::1]",
"http://[::1]:3000",
}
for _, c := range cases {
if err := validateLangfuseHost(c); err == nil {
t.Errorf("validateLangfuseHost(%q) = nil, want error", c)
}
}
}
func TestValidateLangfuseHost_Private(t *testing.T) {
cases := []string{
"http://10.0.0.1",
"http://192.168.1.1",
"http://172.16.0.1",
"http://172.31.255.255",
}
for _, c := range cases {
if err := validateLangfuseHost(c); err == nil {
t.Errorf("validateLangfuseHost(%q) = nil, want error", c)
}
}
}
func TestValidateLangfuseHost_LinkLocal(t *testing.T) {
cases := []string{
"http://169.254.0.1",
"http://169.254.169.254", // AWS metadata
"http://[fe80::1]",
}
for _, c := range cases {
if err := validateLangfuseHost(c); err == nil {
t.Errorf("validateLangfuseHost(%q) = nil, want error", c)
}
}
}
func TestValidateLangfuseHost_BlockedHostnames(t *testing.T) {
cases := []string{
"http://localhost",
"http://metadata.google.internal",
"http://metadata",
"http://metadata.azure.internal",
}
for _, c := range cases {
if err := validateLangfuseHost(c); err == nil {
t.Errorf("validateLangfuseHost(%q) = nil, want error", c)
}
}
}
func TestValidateLangfuseHost_Userinfo(t *testing.T) {
if err := validateLangfuseHost("http://user:pass@langfuse.example.com"); err == nil {
t.Error("expected error for URL with userinfo")
}
}
func TestValidateLangfuseHost_PathQueryFragment(t *testing.T) {
cases := []string{
"http://langfuse.example.com/path",
"http://langfuse.example.com?query=1",
"http://langfuse.example.com#frag",
}
for _, c := range cases {
if err := validateLangfuseHost(c); err == nil {
t.Errorf("validateLangfuseHost(%q) = nil, want error", c)
}
}
}
func TestValidateLangfuseHost_MissingHostname(t *testing.T) {
if err := validateLangfuseHost("http://"); err == nil {
t.Error("expected error for missing hostname")
}
}