traces(v1): per-workspace Langfuse config + upstream error handling #2029
+53
-24
@@ -234,23 +234,48 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
|
||||
"Authorization": `Bearer ${tenantToken}`,
|
||||
"X-Molecule-Org-Id": orgID,
|
||||
};
|
||||
const ws = await jsonFetch(`${tenantURL}/workspaces`, {
|
||||
method: "POST",
|
||||
headers: tenantAuth,
|
||||
body: JSON.stringify({
|
||||
name: "E2E Canvas Test",
|
||||
runtime: "hermes",
|
||||
tier: 2,
|
||||
model: "gpt-4o",
|
||||
}),
|
||||
});
|
||||
if (ws.status >= 400 || !ws.body?.id) {
|
||||
throw new Error(`Workspace create ${ws.status}: ${JSON.stringify(ws.body)}`);
|
||||
// Retry workspace creation on transient 5xx / timeout — staging CP can
|
||||
// return 502/503/504 under load and a single-shot failure kills the
|
||||
// entire E2E run. 3 attempts with 3s exponential backoff (3s, 6s, 12s)
|
||||
// gives ~21s total budget, well inside the 20-min provision envelope.
|
||||
let workspaceId = "";
|
||||
for (let attempt = 1; attempt <= 3; attempt++) {
|
||||
const ws = await jsonFetch(`${tenantURL}/workspaces`, {
|
||||
method: "POST",
|
||||
headers: tenantAuth,
|
||||
body: JSON.stringify({
|
||||
name: "E2E Canvas Test",
|
||||
runtime: "hermes",
|
||||
tier: 2,
|
||||
model: "gpt-4o",
|
||||
}),
|
||||
});
|
||||
if (ws.status >= 200 && ws.status < 300 && ws.body?.id) {
|
||||
workspaceId = ws.body.id as string;
|
||||
break;
|
||||
}
|
||||
const isTransient = ws.status >= 500 || ws.status === 0;
|
||||
if (!isTransient || attempt === 3) {
|
||||
throw new Error(`Workspace create ${ws.status} (attempt ${attempt}): ${JSON.stringify(ws.body)}`);
|
||||
}
|
||||
const backoff = 3000 * Math.pow(2, attempt - 1);
|
||||
console.log(`[staging-setup] Workspace create transient ${ws.status}, retrying in ${backoff}ms...`);
|
||||
await new Promise((r) => setTimeout(r, backoff));
|
||||
}
|
||||
const workspaceId = ws.body.id as string;
|
||||
console.log(`[staging-setup] Workspace created: ${workspaceId}`);
|
||||
|
||||
// 6. Wait for workspace online
|
||||
//
|
||||
// Hermes cold-boot takes 10-13 min on slow apt days (apt + uv + hermes
|
||||
// install + npm browser-tools). The controlplane bootstrap-watcher
|
||||
// deadline fires at 5 min and sets status=failed prematurely; heartbeat
|
||||
// then transitions failed → online after install.sh finishes. So
|
||||
// 'failed' is a TRANSIENT state we must tolerate — log once and keep
|
||||
// polling, only hard-fail at the deadline. Pre-fix this was a flake
|
||||
// generator: workspace went failed→online inside our window but we
|
||||
// bailed at the failed read. See test_staging_full_saas.sh step 7/11
|
||||
// and issue #2632.
|
||||
let wsFailedLogged = false;
|
||||
await waitFor<boolean>(
|
||||
async () => {
|
||||
const r = await jsonFetch(`${tenantURL}/workspaces/${workspaceId}`, {
|
||||
@@ -259,17 +284,21 @@ export default async function globalSetup(_config: FullConfig): Promise<void> {
|
||||
if (r.status !== 200) return null;
|
||||
if (r.body?.status === "online") return true;
|
||||
if (r.body?.status === "failed") {
|
||||
// last_sample_error is often empty when the failure happens before
|
||||
// the agent emits a sample (e.g. boot crash, image pull error,
|
||||
// missing PYTHONPATH, OpenAI quota at startup). Dumping the full
|
||||
// body gives triage the boot_stage / last_error / image fields it
|
||||
// needs without a second probe. Otherwise this propagates as a
|
||||
// bare "Workspace failed: " — the exact useless message that
|
||||
// sent #2632 to the issue tracker.
|
||||
const detail = r.body.last_sample_error
|
||||
? r.body.last_sample_error
|
||||
: `(no last_sample_error) full body: ${JSON.stringify(r.body)}`;
|
||||
throw new Error(`Workspace failed: ${detail}`);
|
||||
if (!wsFailedLogged) {
|
||||
// last_sample_error is often empty when the failure happens before
|
||||
// the agent emits a sample (e.g. boot crash, image pull error,
|
||||
// missing PYTHONPATH, OpenAI quota at startup). Dumping the full
|
||||
// body gives triage the boot_stage / last_error / image fields it
|
||||
// needs without a second probe. Otherwise this propagates as a
|
||||
// bare "Workspace failed: " — the exact useless message that
|
||||
// sent #2632 to the issue tracker.
|
||||
const detail = r.body.last_sample_error
|
||||
? r.body.last_sample_error
|
||||
: `(no last_sample_error) full body: ${JSON.stringify(r.body)}`;
|
||||
console.log(`[staging-setup] workspace ${workspaceId} transiently failed — waiting for heartbeat recovery (bootstrap-watcher deadline, see cp#245). detail: ${detail}`);
|
||||
wsFailedLogged = true;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
},
|
||||
|
||||
@@ -3,6 +3,7 @@ package bundle
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
@@ -92,7 +93,9 @@ func Import(
|
||||
if err != nil {
|
||||
markFailed(provCtx, wsID, broadcaster, err)
|
||||
} else if url != "" {
|
||||
db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID)
|
||||
if _, dbErr := db.DB.ExecContext(provCtx, `UPDATE workspaces SET url = $1 WHERE id = $2`, url, wsID); dbErr != nil {
|
||||
log.Printf("bundle import: failed to update workspace URL for %s: %v", wsID, dbErr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -139,12 +142,16 @@ func markFailed(ctx context.Context, wsID string, broadcaster *events.Broadcaste
|
||||
// markProvisionFailed in workspace-server/internal/handlers/
|
||||
// workspace_provision_shared.go.
|
||||
msg := err.Error()
|
||||
db.DB.ExecContext(ctx,
|
||||
if _, dbErr := db.DB.ExecContext(ctx,
|
||||
`UPDATE workspaces SET status = $1, last_sample_error = $2, updated_at = now() WHERE id = $3`,
|
||||
models.StatusFailed, msg, wsID)
|
||||
broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
|
||||
models.StatusFailed, msg, wsID); dbErr != nil {
|
||||
log.Printf("bundle import: failed to mark workspace %s failed: %v", wsID, dbErr)
|
||||
}
|
||||
if bcErr := broadcaster.RecordAndBroadcast(ctx, string(events.EventWorkspaceProvisionFailed), wsID, map[string]interface{}{
|
||||
"error": msg,
|
||||
})
|
||||
}); bcErr != nil {
|
||||
log.Printf("bundle import: failed to broadcast provision failed for %s: %v", wsID, bcErr)
|
||||
}
|
||||
}
|
||||
|
||||
func nilIfEmpty(s string) interface{} {
|
||||
|
||||
@@ -375,21 +375,25 @@ func (m *Manager) HandleInbound(ctx context.Context, ch ChannelRow, msg *Inbound
|
||||
|
||||
// Update stats in DB
|
||||
if db.DB != nil {
|
||||
db.DB.ExecContext(ctx, `
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_channels
|
||||
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
|
||||
WHERE id = $1
|
||||
`, ch.ID)
|
||||
`, ch.ID); err != nil {
|
||||
log.Printf("Channels: failed to update inbound stats for channel %s: %v", ch.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast event
|
||||
if m.broadcaster != nil {
|
||||
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
"channel_id": ch.ID,
|
||||
"channel_type": ch.ChannelType,
|
||||
"username": msg.Username,
|
||||
"direction": "inbound",
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("Channels: failed to broadcast inbound event: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -420,19 +424,23 @@ func (m *Manager) SendOutbound(ctx context.Context, channelID string, text strin
|
||||
}
|
||||
|
||||
if db.DB != nil {
|
||||
db.DB.ExecContext(ctx, `
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE workspace_channels
|
||||
SET last_message_at = now(), message_count = message_count + 1, updated_at = now()
|
||||
WHERE id = $1
|
||||
`, channelID)
|
||||
`, channelID); err != nil {
|
||||
log.Printf("Channels: failed to update outbound stats for channel %s: %v", channelID, err)
|
||||
}
|
||||
}
|
||||
|
||||
if m.broadcaster != nil {
|
||||
m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
if err := m.broadcaster.RecordAndBroadcast(ctx, string(events.EventChannelMessage), ch.WorkspaceID, map[string]interface{}{
|
||||
"channel_id": ch.ID,
|
||||
"channel_type": ch.ChannelType,
|
||||
"direction": "outbound",
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("Channels: failed to broadcast outbound event: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -498,7 +506,10 @@ func (m *Manager) FetchWorkspaceChannelContext(ctx context.Context, workspaceID
|
||||
return ""
|
||||
}
|
||||
var config map[string]interface{}
|
||||
json.Unmarshal(configJSON, &config)
|
||||
if err := json.Unmarshal(configJSON, &config); err != nil {
|
||||
log.Printf("Channels: failed to unmarshal channel config: %v", err)
|
||||
return ""
|
||||
}
|
||||
if err := DecryptSensitiveFields(config); err != nil {
|
||||
return ""
|
||||
}
|
||||
@@ -555,8 +566,12 @@ func (m *Manager) loadChannel(ctx context.Context, channelID string) (ChannelRow
|
||||
if err != nil {
|
||||
return ch, fmt.Errorf("channel %s not found: %w", channelID, err)
|
||||
}
|
||||
json.Unmarshal(configJSON, &ch.Config)
|
||||
json.Unmarshal(allowedJSON, &ch.AllowedUsers)
|
||||
if err := json.Unmarshal(configJSON, &ch.Config); err != nil {
|
||||
return ch, fmt.Errorf("unmarshal channel %s config: %w", channelID, err)
|
||||
}
|
||||
if err := json.Unmarshal(allowedJSON, &ch.AllowedUsers); err != nil {
|
||||
return ch, fmt.Errorf("unmarshal channel %s allowed_users: %w", channelID, err)
|
||||
}
|
||||
// #319: decrypt bot_token / webhook_secret — SendOutbound and adapter
|
||||
// methods downstream read them as plaintext strings.
|
||||
if err := DecryptSensitiveFields(ch.Config); err != nil {
|
||||
|
||||
@@ -514,7 +514,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
|
||||
// Acknowledge the button press (removes loading spinner)
|
||||
ackCfg := tgbotapi.NewCallback(cb.ID, "Received")
|
||||
bot.Send(ackCfg)
|
||||
if _, err := bot.Send(ackCfg); err != nil {
|
||||
log.Printf("telegram: failed to send callback ack: %v", err)
|
||||
}
|
||||
|
||||
// Update the message to show what was clicked
|
||||
decision := "approved"
|
||||
@@ -526,7 +528,9 @@ func (t *TelegramAdapter) StartPolling(ctx context.Context, config map[string]in
|
||||
cb.Message.MessageID,
|
||||
cb.Message.Text+"\n\n✅ CEO "+decision,
|
||||
)
|
||||
bot.Send(editMsg)
|
||||
if _, err := bot.Send(editMsg); err != nil {
|
||||
log.Printf("telegram: failed to send edit message: %v", err)
|
||||
}
|
||||
|
||||
// Route the decision as an inbound message to the agent
|
||||
inbound := &InboundMessage{
|
||||
|
||||
@@ -51,23 +51,29 @@ func (h *ApprovalsHandler) Create(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalRequested), workspaceID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"action": body.Action,
|
||||
"reason": body.Reason,
|
||||
"task_id": body.TaskID,
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("approvals: failed to broadcast approval requested: %v", err)
|
||||
}
|
||||
|
||||
// Auto-escalate to parent
|
||||
var parentID *string
|
||||
db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID)
|
||||
if err := db.DB.QueryRowContext(ctx, `SELECT parent_id FROM workspaces WHERE id = $1`, workspaceID).Scan(&parentID); err != nil {
|
||||
log.Printf("approvals: failed to lookup parent for escalation: %v", err)
|
||||
}
|
||||
if parentID != nil {
|
||||
h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, string(events.EventApprovalEscalated), *parentID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"from_workspace_id": workspaceID,
|
||||
"action": body.Action,
|
||||
"reason": body.Reason,
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("approvals: failed to broadcast approval escalated: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
c.JSON(http.StatusCreated, gin.H{"approval_id": approvalID, "status": "pending"})
|
||||
@@ -80,10 +86,12 @@ func (h *ApprovalsHandler) ListAll(c *gin.Context) {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
// Auto-expire stale approvals (older than 10 min)
|
||||
db.DB.ExecContext(ctx, `
|
||||
if _, err := db.DB.ExecContext(ctx, `
|
||||
UPDATE approval_requests SET status = 'denied', decided_by = 'auto-expired', decided_at = now()
|
||||
WHERE status = 'pending' AND created_at < now() - interval '10 minutes'
|
||||
`)
|
||||
`); err != nil {
|
||||
log.Printf("approvals: failed to auto-expire stale approvals: %v", err)
|
||||
}
|
||||
|
||||
rows, err := db.DB.QueryContext(ctx, `
|
||||
SELECT a.id, a.workspace_id, w.name, a.action, a.reason, a.status, a.created_at
|
||||
@@ -205,11 +213,13 @@ func (h *ApprovalsHandler) Decide(c *gin.Context) {
|
||||
eventType = "APPROVAL_DENIED"
|
||||
}
|
||||
|
||||
h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
|
||||
if err := h.broadcaster.RecordAndBroadcast(ctx, eventType, workspaceID, map[string]interface{}{
|
||||
"approval_id": approvalID,
|
||||
"decision": body.Decision,
|
||||
"decided_by": decidedBy,
|
||||
})
|
||||
}); err != nil {
|
||||
log.Printf("approvals: failed to broadcast approval decision: %v", err)
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"status": body.Decision, "approval_id": approvalID})
|
||||
}
|
||||
|
||||
@@ -67,7 +67,10 @@ func (h *ChannelHandler) List(c *gin.Context) {
|
||||
}
|
||||
|
||||
var config map[string]interface{}
|
||||
json.Unmarshal(configJSON, &config)
|
||||
if err := json.Unmarshal(configJSON, &config); err != nil {
|
||||
log.Printf("Channels: unmarshal config on list for channel %s: %v", id, err)
|
||||
config = map[string]interface{}{}
|
||||
}
|
||||
// #319: decrypt sensitive fields first so the mask operates on
|
||||
// plaintext (first-4 / last-4 of the real token, not the ciphertext
|
||||
// prefix). Decrypt errors are logged but non-fatal — List must keep
|
||||
@@ -86,7 +89,10 @@ func (h *ChannelHandler) List(c *gin.Context) {
|
||||
}
|
||||
|
||||
var allowed []string
|
||||
json.Unmarshal(allowedJSON, &allowed)
|
||||
if err := json.Unmarshal(allowedJSON, &allowed); err != nil {
|
||||
log.Printf("Channels: unmarshal allowed_users on list for channel %s: %v", id, err)
|
||||
allowed = []string{}
|
||||
}
|
||||
|
||||
entry := map[string]interface{}{
|
||||
"id": id,
|
||||
|
||||
@@ -1,47 +1,235 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
var langfuseClient = &http.Client{Timeout: 10 * time.Second}
|
||||
|
||||
// langfuseHostValidator is the validation function used by resolveLangfuseConfig.
|
||||
// Overridable in tests that need to exercise loopback / private endpoints.
|
||||
var langfuseHostValidator = validateLangfuseHost
|
||||
|
||||
type TracesHandler struct{}
|
||||
|
||||
func NewTracesHandler() *TracesHandler {
|
||||
return &TracesHandler{}
|
||||
}
|
||||
|
||||
// langfuseConfig holds the resolved Langfuse connection parameters.
|
||||
// Workspace secrets override global secrets which override environment
|
||||
// variables, matching the precedence rules in workspace_provision.go.
|
||||
type langfuseConfig struct {
|
||||
Host string
|
||||
Public string
|
||||
Secret string
|
||||
}
|
||||
|
||||
// resolveLangfuseConfig looks up LANGFUSE_HOST, LANGFUSE_PUBLIC_KEY and
|
||||
// LANGFUSE_SECRET_KEY for a workspace.
|
||||
//
|
||||
// SECURITY BOUNDARY (RC 8337 / #2029):
|
||||
// LANGFUSE_HOST is admin-controlled ONLY — resolved from global_secrets
|
||||
// or environment variables. Workspace secrets CANNOT override the host.
|
||||
// This prevents SSRF: a workspace owner could otherwise set HOST to an
|
||||
// internal endpoint and exfiltrate data using the Langfuse credentials.
|
||||
//
|
||||
// LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY follow the normal
|
||||
// precedence: workspace_secrets → global_secrets → environment vars.
|
||||
// This preserves per-workspace project isolation without the SSRF vector.
|
||||
//
|
||||
// If any of the three keys is missing after all layers, the config is
|
||||
// considered incomplete and traces are disabled for the workspace.
|
||||
func resolveLangfuseConfig(ctx context.Context, workspaceID string) (*langfuseConfig, error) {
|
||||
cfg := &langfuseConfig{}
|
||||
|
||||
// HOST — admin-controlled only (global → env). Workspace secrets are
|
||||
// intentionally NOT consulted to close the SSRF vector described above.
|
||||
resolveHost := func() string {
|
||||
var val []byte
|
||||
var ver int
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = $1`,
|
||||
"LANGFUSE_HOST").Scan(&val, &ver)
|
||||
if err == nil {
|
||||
decrypted, decErr := crypto.DecryptVersioned(val, ver)
|
||||
if decErr == nil {
|
||||
return string(decrypted)
|
||||
}
|
||||
}
|
||||
return os.Getenv("LANGFUSE_HOST")
|
||||
}
|
||||
|
||||
// Keys — workspace secrets override global secrets override env.
|
||||
resolveKey := func(key string) string {
|
||||
var val []byte
|
||||
var ver int
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = $1 AND key = $2`,
|
||||
workspaceID, key).Scan(&val, &ver)
|
||||
if err == nil {
|
||||
decrypted, decErr := crypto.DecryptVersioned(val, ver)
|
||||
if decErr == nil {
|
||||
return string(decrypted)
|
||||
}
|
||||
}
|
||||
err = db.DB.QueryRowContext(ctx,
|
||||
`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = $1`,
|
||||
key).Scan(&val, &ver)
|
||||
if err == nil {
|
||||
decrypted, decErr := crypto.DecryptVersioned(val, ver)
|
||||
if decErr == nil {
|
||||
return string(decrypted)
|
||||
}
|
||||
}
|
||||
return os.Getenv(key)
|
||||
}
|
||||
|
||||
cfg.Host = resolveHost()
|
||||
cfg.Public = resolveKey("LANGFUSE_PUBLIC_KEY")
|
||||
cfg.Secret = resolveKey("LANGFUSE_SECRET_KEY")
|
||||
|
||||
// Incomplete config is not an error — it simply means tracing is
|
||||
// disabled for this workspace. Callers treat (nil, nil) as
|
||||
// "no traces available".
|
||||
if cfg.Host == "" || cfg.Public == "" || cfg.Secret == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Defense-in-depth: even admin-controlled hosts can be misconfigured
|
||||
// (accidentally or maliciously) to point at internal infrastructure.
|
||||
// Validate the resolved host before constructing outbound requests.
|
||||
if err := langfuseHostValidator(cfg.Host); err != nil {
|
||||
return nil, fmt.Errorf("invalid LANGFUSE_HOST: %w", err)
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// validateLangfuseHost rejects URLs that would create an SSRF or
|
||||
// credential-leak vector against internal infrastructure. It is
|
||||
// defense-in-depth on top of the admin-only host control boundary.
|
||||
//
|
||||
// Rejected targets:
|
||||
// - non-HTTP(S) schemes (file, ftp, gopher, unix, etc.)
|
||||
// - private IPv4 ranges (10/8, 172.16/12, 192.168/16)
|
||||
// - loopback (127/8, ::1)
|
||||
// - link-local (169.254/16, fe80::/10)
|
||||
// - AWS metadata endpoint (169.254.169.254)
|
||||
// - URLs containing userinfo, path, query, or fragment
|
||||
func validateLangfuseHost(raw string) error {
|
||||
u, err := url.Parse(raw)
|
||||
if err != nil {
|
||||
return fmt.Errorf("not a valid URL: %v", err)
|
||||
}
|
||||
|
||||
// Scheme must be http or https.
|
||||
scheme := strings.ToLower(u.Scheme)
|
||||
if scheme != "http" && scheme != "https" {
|
||||
return fmt.Errorf("scheme %q not allowed (only http/https)", u.Scheme)
|
||||
}
|
||||
|
||||
// Reject userinfo — prevents credential stuffing / info-leak.
|
||||
if u.User != nil {
|
||||
return fmt.Errorf("URL must not contain userinfo")
|
||||
}
|
||||
|
||||
// Reject path, query, fragment — we only want a base host.
|
||||
if u.Path != "" && u.Path != "/" {
|
||||
return fmt.Errorf("URL must not contain a path")
|
||||
}
|
||||
if u.RawQuery != "" {
|
||||
return fmt.Errorf("URL must not contain a query string")
|
||||
}
|
||||
if u.Fragment != "" {
|
||||
return fmt.Errorf("URL must not contain a fragment")
|
||||
}
|
||||
|
||||
host := u.Hostname()
|
||||
if host == "" {
|
||||
return fmt.Errorf("missing hostname")
|
||||
}
|
||||
|
||||
// Reject IPv4 literals that resolve to internal infrastructure.
|
||||
if ip := net.ParseIP(host); ip != nil {
|
||||
if ip.IsLoopback() {
|
||||
return fmt.Errorf("loopback address %q not allowed", host)
|
||||
}
|
||||
if ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() {
|
||||
return fmt.Errorf("link-local address %q not allowed", host)
|
||||
}
|
||||
if ip.IsPrivate() {
|
||||
return fmt.Errorf("private address %q not allowed", host)
|
||||
}
|
||||
// AWS metadata service (also catches 169.254.169.254).
|
||||
if ip.Equal(net.ParseIP("169.254.169.254")) {
|
||||
return fmt.Errorf("metadata endpoint %q not allowed", host)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// For hostnames, reject ones that look like IPv4 literals with a
|
||||
// trailing dot or other evasion patterns.
|
||||
if strings.HasSuffix(host, ".") {
|
||||
host = strings.TrimSuffix(host, ".")
|
||||
}
|
||||
|
||||
// Reject hostnames that resolve to the metadata endpoint or other
|
||||
// well-known internal names.
|
||||
hostLower := strings.ToLower(host)
|
||||
blocked := []string{
|
||||
"localhost",
|
||||
"metadata.google.internal",
|
||||
"metadata",
|
||||
"metadata.azure.internal",
|
||||
"100.100.100.200", // Alibaba metadata
|
||||
}
|
||||
for _, b := range blocked {
|
||||
if hostLower == b {
|
||||
return fmt.Errorf("hostname %q not allowed", host)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// List handles GET /workspaces/:id/traces
|
||||
// Proxies to Langfuse API to get recent traces for a workspace.
|
||||
func (h *TracesHandler) List(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
|
||||
langfuseHost := os.Getenv("LANGFUSE_HOST")
|
||||
langfusePublic := os.Getenv("LANGFUSE_PUBLIC_KEY")
|
||||
langfuseSecret := os.Getenv("LANGFUSE_SECRET_KEY")
|
||||
|
||||
if langfuseHost == "" || langfusePublic == "" || langfuseSecret == "" {
|
||||
cfg, err := resolveLangfuseConfig(c.Request.Context(), workspaceID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to resolve trace config"})
|
||||
return
|
||||
}
|
||||
if cfg == nil {
|
||||
c.JSON(http.StatusOK, []interface{}{})
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch traces from Langfuse, filtered by workspace tag or name
|
||||
url := fmt.Sprintf("%s/api/public/traces?limit=20&orderBy=timestamp&orderDir=desc&tags=%s",
|
||||
langfuseHost, workspaceID)
|
||||
cfg.Host, workspaceID)
|
||||
|
||||
req, err := http.NewRequestWithContext(c.Request.Context(), "GET", url, nil)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create request"})
|
||||
return
|
||||
}
|
||||
req.SetBasicAuth(langfusePublic, langfuseSecret)
|
||||
req.SetBasicAuth(cfg.Public, cfg.Secret)
|
||||
|
||||
resp, err := langfuseClient.Do(req)
|
||||
if err != nil {
|
||||
@@ -51,6 +239,17 @@ func (h *TracesHandler) List(c *gin.Context) {
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
c.Data(resp.StatusCode, "application/json", body)
|
||||
body, readErr := io.ReadAll(resp.Body)
|
||||
if readErr != nil {
|
||||
c.JSON(http.StatusOK, []interface{}{})
|
||||
return
|
||||
}
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
// Upstream error — don't proxy HTML error pages or unexpected
|
||||
// JSON shapes to the Canvas client. Return empty so the UI
|
||||
// gracefully shows "no traces" rather than breaking on parse.
|
||||
c.JSON(http.StatusOK, []interface{}{})
|
||||
return
|
||||
}
|
||||
c.Data(http.StatusOK, "application/json", body)
|
||||
}
|
||||
|
||||
@@ -1,26 +1,45 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// ==================== GET /workspaces/:id/traces ====================
|
||||
|
||||
func TestTracesList_NoLangfuseConfig(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewTracesHandler()
|
||||
|
||||
// Ensure Langfuse env vars are not set
|
||||
os.Unsetenv("LANGFUSE_HOST")
|
||||
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
|
||||
os.Unsetenv("LANGFUSE_SECRET_KEY")
|
||||
// No global secrets, no env vars. HOST is admin-only (global/env) so
|
||||
// workspace_secrets is NOT queried for LANGFUSE_HOST.
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_HOST").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
// env fallback is empty (default)
|
||||
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces", "LANGFUSE_PUBLIC_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_PUBLIC_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces", "LANGFUSE_SECRET_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_SECRET_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
@@ -43,23 +62,96 @@ func TestTracesList_NoLangfuseConfig(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracesList_PartialLangfuseConfig(t *testing.T) {
|
||||
func TestTracesList_WorkspaceSecretsOverride(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewTracesHandler()
|
||||
|
||||
// Set only host, missing keys
|
||||
os.Setenv("LANGFUSE_HOST", "http://localhost:3000")
|
||||
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
|
||||
os.Unsetenv("LANGFUSE_SECRET_KEY")
|
||||
defer func() {
|
||||
os.Unsetenv("LANGFUSE_HOST")
|
||||
}()
|
||||
// Encrypt test secrets — use a non-loopback, non-private hostname so
|
||||
// validateLangfuseHost does not reject it.
|
||||
encHost, _ := crypto.Encrypt([]byte("http://langfuse.example.com:3000"))
|
||||
verHost := crypto.CurrentEncryptionVersion()
|
||||
encPk, _ := crypto.Encrypt([]byte("pk-ws"))
|
||||
verPk := crypto.CurrentEncryptionVersion()
|
||||
encSk, _ := crypto.Encrypt([]byte("sk-ws"))
|
||||
verSk := crypto.CurrentEncryptionVersion()
|
||||
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
// HOST is admin-only: workspace secret for HOST is IGNORED.
|
||||
// Code queries global_secrets first, then falls through to env.
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_HOST").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHost, verHost))
|
||||
|
||||
// Keys follow normal precedence: workspace secrets override global.
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-override", "LANGFUSE_PUBLIC_KEY").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk))
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-override", "LANGFUSE_SECRET_KEY").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encSk, verSk))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-partial"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-partial/traces", nil)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-override"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-override/traces", nil)
|
||||
|
||||
handler.List(c)
|
||||
|
||||
// We don't have a real Langfuse server, so the request will fail
|
||||
// network-wise and return empty (graceful fallback)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp []interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracesList_GlobalSecretsFallback(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewTracesHandler()
|
||||
|
||||
// No workspace secrets, but global secrets exist
|
||||
encHost, _ := crypto.Encrypt([]byte("http://langfuse.example.com:3000"))
|
||||
verHost := crypto.CurrentEncryptionVersion()
|
||||
encPk, _ := crypto.Encrypt([]byte("pk-global"))
|
||||
verPk := crypto.CurrentEncryptionVersion()
|
||||
encSk, _ := crypto.Encrypt([]byte("sk-global"))
|
||||
verSk := crypto.CurrentEncryptionVersion()
|
||||
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-global", "LANGFUSE_HOST").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_HOST").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHost, verHost))
|
||||
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-global", "LANGFUSE_PUBLIC_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_PUBLIC_KEY").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk))
|
||||
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-global", "LANGFUSE_SECRET_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_SECRET_KEY").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encSk, verSk))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-global"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-global/traces", nil)
|
||||
|
||||
handler.List(c)
|
||||
|
||||
@@ -70,7 +162,7 @@ func TestTracesList_PartialLangfuseConfig(t *testing.T) {
|
||||
var resp []interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list with partial config, got %d items", len(resp))
|
||||
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -79,8 +171,9 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) {
|
||||
setupTestRedis(t)
|
||||
handler := NewTracesHandler()
|
||||
|
||||
// Set all env vars but point to unreachable host
|
||||
os.Setenv("LANGFUSE_HOST", "http://localhost:99999")
|
||||
// Set all env vars but point to unreachable host (must be a valid
|
||||
// public-looking hostname so validateLangfuseHost does not reject it).
|
||||
os.Setenv("LANGFUSE_HOST", "http://langfuse-down.example.com:99999")
|
||||
os.Setenv("LANGFUSE_PUBLIC_KEY", "pk-test")
|
||||
os.Setenv("LANGFUSE_SECRET_KEY", "sk-test")
|
||||
defer func() {
|
||||
@@ -89,6 +182,27 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) {
|
||||
os.Unsetenv("LANGFUSE_SECRET_KEY")
|
||||
}()
|
||||
|
||||
// No global secrets, so env vars are used. HOST skips workspace_secrets.
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_HOST").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-down", "LANGFUSE_PUBLIC_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_PUBLIC_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-down", "LANGFUSE_SECRET_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_SECRET_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-down"}}
|
||||
@@ -107,3 +221,274 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) {
|
||||
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracesList_PartialWorkspaceConfig(t *testing.T) {
|
||||
handler := NewTracesHandler()
|
||||
|
||||
// Workspace has PUBLIC_KEY but missing HOST and SECRET_KEY.
|
||||
// Config incomplete → empty response.
|
||||
encPk, _ := crypto.Encrypt([]byte("pk-partial"))
|
||||
verPk := crypto.CurrentEncryptionVersion()
|
||||
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
// HOST skipped for workspace secrets
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_HOST").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-partial", "LANGFUSE_PUBLIC_KEY").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk))
|
||||
// SECRET_KEY missing everywhere
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-partial", "LANGFUSE_SECRET_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_SECRET_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-partial"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-partial/traces", nil)
|
||||
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp []interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list with partial config, got %d items", len(resp))
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracesList_WorkspaceHostIgnored(t *testing.T) {
|
||||
// SECURITY REGRESSION: workspace secrets MUST NOT override LANGFUSE_HOST.
|
||||
// If they could, a workspace owner could set HOST to an internal
|
||||
// endpoint and exfiltrate data using the Langfuse credentials.
|
||||
handler := NewTracesHandler()
|
||||
|
||||
// Workspace tries to set a malicious HOST via workspace secrets.
|
||||
encHostWs, _ := crypto.Encrypt([]byte("http://169.254.169.254"))
|
||||
verHostWs := crypto.CurrentEncryptionVersion()
|
||||
encPk, _ := crypto.Encrypt([]byte("pk-ws"))
|
||||
verPk := crypto.CurrentEncryptionVersion()
|
||||
encSk, _ := crypto.Encrypt([]byte("sk-ws"))
|
||||
verSk := crypto.CurrentEncryptionVersion()
|
||||
|
||||
// Admin sets a legitimate HOST via global secrets (non-loopback).
|
||||
encHostGlobal, _ := crypto.Encrypt([]byte("http://langfuse.example.com:3000"))
|
||||
verHostGlobal := crypto.CurrentEncryptionVersion()
|
||||
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
// Code queries global_secrets for HOST first — workspace secret is never read.
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_HOST").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHostGlobal, verHostGlobal))
|
||||
|
||||
// Keys still read workspace secrets.
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-ssrf", "LANGFUSE_PUBLIC_KEY").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk))
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-ssrf", "LANGFUSE_SECRET_KEY").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encSk, verSk))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-ssrf"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-ssrf/traces", nil)
|
||||
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp []interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracesList_LangfuseUpstreamError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewTracesHandler()
|
||||
|
||||
// Start a mock Langfuse server that returns 500 with a non-JSON body.
|
||||
// httptest uses 127.0.0.1 which validateLangfuseHost rejects, so we
|
||||
// temporarily swap the validator for this test.
|
||||
origValidator := langfuseHostValidator
|
||||
langfuseHostValidator = func(string) error { return nil }
|
||||
defer func() { langfuseHostValidator = origValidator }()
|
||||
|
||||
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/html")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte("<html><body>Internal Server Error</body></html>"))
|
||||
}))
|
||||
defer upstream.Close()
|
||||
|
||||
os.Setenv("LANGFUSE_HOST", upstream.URL)
|
||||
os.Setenv("LANGFUSE_PUBLIC_KEY", "pk-test")
|
||||
os.Setenv("LANGFUSE_SECRET_KEY", "sk-test")
|
||||
defer func() {
|
||||
os.Unsetenv("LANGFUSE_HOST")
|
||||
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
|
||||
os.Unsetenv("LANGFUSE_SECRET_KEY")
|
||||
}()
|
||||
|
||||
// HOST skips workspace secrets — falls through global → env.
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_HOST").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-500", "LANGFUSE_PUBLIC_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_PUBLIC_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
|
||||
WithArgs("ws-traces-500", "LANGFUSE_SECRET_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_SECRET_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-500"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-500/traces", nil)
|
||||
|
||||
handler.List(c)
|
||||
|
||||
// Should return empty JSON (not proxy the 500 HTML) when Langfuse errors
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp []interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list on upstream error, got %d items", len(resp))
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== validateLangfuseHost ====================
|
||||
|
||||
func TestValidateLangfuseHost_Valid(t *testing.T) {
|
||||
cases := []string{
|
||||
"http://langfuse.example.com",
|
||||
"https://langfuse.example.com:3000",
|
||||
"https://us.cloud.langfuse.com",
|
||||
}
|
||||
for _, c := range cases {
|
||||
if err := validateLangfuseHost(c); err != nil {
|
||||
t.Errorf("validateLangfuseHost(%q) = %v, want nil", c, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateLangfuseHost_InvalidScheme(t *testing.T) {
|
||||
cases := []string{
|
||||
"ftp://langfuse.example.com",
|
||||
"file:///etc/passwd",
|
||||
"unix:///var/run/langfuse.sock",
|
||||
"gopher://langfuse.example.com",
|
||||
}
|
||||
for _, c := range cases {
|
||||
if err := validateLangfuseHost(c); err == nil {
|
||||
t.Errorf("validateLangfuseHost(%q) = nil, want error", c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateLangfuseHost_Loopback(t *testing.T) {
|
||||
cases := []string{
|
||||
"http://localhost",
|
||||
"http://127.0.0.1",
|
||||
"http://127.0.0.1:3000",
|
||||
"http://[::1]",
|
||||
"http://[::1]:3000",
|
||||
}
|
||||
for _, c := range cases {
|
||||
if err := validateLangfuseHost(c); err == nil {
|
||||
t.Errorf("validateLangfuseHost(%q) = nil, want error", c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateLangfuseHost_Private(t *testing.T) {
|
||||
cases := []string{
|
||||
"http://10.0.0.1",
|
||||
"http://192.168.1.1",
|
||||
"http://172.16.0.1",
|
||||
"http://172.31.255.255",
|
||||
}
|
||||
for _, c := range cases {
|
||||
if err := validateLangfuseHost(c); err == nil {
|
||||
t.Errorf("validateLangfuseHost(%q) = nil, want error", c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateLangfuseHost_LinkLocal(t *testing.T) {
|
||||
cases := []string{
|
||||
"http://169.254.0.1",
|
||||
"http://169.254.169.254", // AWS metadata
|
||||
"http://[fe80::1]",
|
||||
}
|
||||
for _, c := range cases {
|
||||
if err := validateLangfuseHost(c); err == nil {
|
||||
t.Errorf("validateLangfuseHost(%q) = nil, want error", c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateLangfuseHost_BlockedHostnames(t *testing.T) {
|
||||
cases := []string{
|
||||
"http://localhost",
|
||||
"http://metadata.google.internal",
|
||||
"http://metadata",
|
||||
"http://metadata.azure.internal",
|
||||
}
|
||||
for _, c := range cases {
|
||||
if err := validateLangfuseHost(c); err == nil {
|
||||
t.Errorf("validateLangfuseHost(%q) = nil, want error", c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateLangfuseHost_Userinfo(t *testing.T) {
|
||||
if err := validateLangfuseHost("http://user:pass@langfuse.example.com"); err == nil {
|
||||
t.Error("expected error for URL with userinfo")
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateLangfuseHost_PathQueryFragment(t *testing.T) {
|
||||
cases := []string{
|
||||
"http://langfuse.example.com/path",
|
||||
"http://langfuse.example.com?query=1",
|
||||
"http://langfuse.example.com#frag",
|
||||
}
|
||||
for _, c := range cases {
|
||||
if err := validateLangfuseHost(c); err == nil {
|
||||
t.Errorf("validateLangfuseHost(%q) = nil, want error", c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateLangfuseHost_MissingHostname(t *testing.T) {
|
||||
if err := validateLangfuseHost("http://"); err == nil {
|
||||
t.Error("expected error for missing hostname")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user