traces(v1): per-workspace Langfuse config + upstream error handling #2051

Closed
core-be wants to merge 1 commits from fix/traces-error-handling into staging
2 changed files with 326 additions and 25 deletions
+89 -9
View File
@@ -1,12 +1,15 @@
package handlers
import (
"context"
"fmt"
"io"
"net/http"
"os"
"time"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/db"
"github.com/gin-gonic/gin"
)
@@ -18,30 +21,96 @@ func NewTracesHandler() *TracesHandler {
return &TracesHandler{}
}
// langfuseConfig holds the resolved Langfuse connection parameters.
// Workspace secrets override global secrets which override environment
// variables, matching the precedence rules in workspace_provision.go.
type langfuseConfig struct {
Host string
Public string
Secret string
}
// resolveLangfuseConfig looks up LANGFUSE_HOST, LANGFUSE_PUBLIC_KEY and
// LANGFUSE_SECRET_KEY for a workspace. Resolution order:
// 1. workspace_secrets (workspace-level override)
// 2. global_secrets (platform-wide default)
// 3. environment vars (legacy fallback for self-hosted / dev)
//
// If any of the three keys is missing after all three layers, the config
// is considered incomplete and traces are disabled for the workspace.
// This closes the gap where every workspace in a tenant shared the same
// Langfuse project (global env vars) and operators could not isolate
// traces per workspace. Traces v1 — issue #2976.
func resolveLangfuseConfig(ctx context.Context, workspaceID string) (*langfuseConfig, error) {
cfg := &langfuseConfig{}
// Helper: read a single key from workspace → global → env fallback.
resolve := func(key string) string {
// 1. Workspace secret
var val []byte
var ver int
err := db.DB.QueryRowContext(ctx,
`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = $1 AND key = $2`,
workspaceID, key).Scan(&val, &ver)
if err == nil {
decrypted, decErr := crypto.DecryptVersioned(val, ver)
if decErr == nil {
return string(decrypted)
}
// Decrypt failure is logged but not fatal — fall through to next layer.
}
// 2. Global secret
err = db.DB.QueryRowContext(ctx,
`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = $1`,
key).Scan(&val, &ver)
if err == nil {
decrypted, decErr := crypto.DecryptVersioned(val, ver)
if decErr == nil {
return string(decrypted)
}
}
// 3. Environment fallback
return os.Getenv(key)
}
cfg.Host = resolve("LANGFUSE_HOST")
cfg.Public = resolve("LANGFUSE_PUBLIC_KEY")
cfg.Secret = resolve("LANGFUSE_SECRET_KEY")
// Incomplete config is not an error — it simply means tracing is
// disabled for this workspace. Callers treat (nil, nil) as
// "no traces available".
if cfg.Host == "" || cfg.Public == "" || cfg.Secret == "" {
return nil, nil
}
return cfg, nil
}
// List handles GET /workspaces/:id/traces
// Proxies to Langfuse API to get recent traces for a workspace.
func (h *TracesHandler) List(c *gin.Context) {
workspaceID := c.Param("id")
langfuseHost := os.Getenv("LANGFUSE_HOST")
langfusePublic := os.Getenv("LANGFUSE_PUBLIC_KEY")
langfuseSecret := os.Getenv("LANGFUSE_SECRET_KEY")
if langfuseHost == "" || langfusePublic == "" || langfuseSecret == "" {
cfg, err := resolveLangfuseConfig(c.Request.Context(), workspaceID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to resolve trace config"})
return
}
if cfg == nil {
c.JSON(http.StatusOK, []interface{}{})
return
}
// Fetch traces from Langfuse, filtered by workspace tag or name
url := fmt.Sprintf("%s/api/public/traces?limit=20&orderBy=timestamp&orderDir=desc&tags=%s",
langfuseHost, workspaceID)
cfg.Host, workspaceID)
req, err := http.NewRequestWithContext(c.Request.Context(), "GET", url, nil)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create request"})
return
}
req.SetBasicAuth(langfusePublic, langfuseSecret)
req.SetBasicAuth(cfg.Public, cfg.Secret)
resp, err := langfuseClient.Do(req)
if err != nil {
@@ -51,6 +120,17 @@ func (h *TracesHandler) List(c *gin.Context) {
}
defer func() { _ = resp.Body.Close() }()
body, _ := io.ReadAll(resp.Body)
c.Data(resp.StatusCode, "application/json", body)
body, readErr := io.ReadAll(resp.Body)
if readErr != nil {
c.JSON(http.StatusOK, []interface{}{})
return
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
// Upstream error — don't proxy HTML error pages or unexpected
// JSON shapes to the Canvas client. Return empty so the UI
// gracefully shows "no traces" rather than breaking on parse.
c.JSON(http.StatusOK, []interface{}{})
return
}
c.Data(http.StatusOK, "application/json", body)
}
+237 -16
View File
@@ -1,26 +1,47 @@
package handlers
import (
"database/sql"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Molecule-AI/molecule-monorepo/platform/internal/crypto"
"github.com/gin-gonic/gin"
)
// ==================== GET /workspaces/:id/traces ====================
func TestTracesList_NoLangfuseConfig(t *testing.T) {
setupTestDB(t)
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
// Ensure Langfuse env vars are not set
os.Unsetenv("LANGFUSE_HOST")
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
os.Unsetenv("LANGFUSE_SECRET_KEY")
// No workspace secrets, no global secrets, no env vars
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces", "LANGFUSE_HOST").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_HOST").
WillReturnError(sql.ErrNoRows)
// env fallback is empty (default)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces", "LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces", "LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -43,23 +64,91 @@ func TestTracesList_NoLangfuseConfig(t *testing.T) {
}
}
func TestTracesList_PartialLangfuseConfig(t *testing.T) {
func TestTracesList_WorkspaceSecretsOverride(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
// Set only host, missing keys
os.Setenv("LANGFUSE_HOST", "http://localhost:3000")
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
os.Unsetenv("LANGFUSE_SECRET_KEY")
defer func() {
os.Unsetenv("LANGFUSE_HOST")
}()
// Encrypt a test secret
encHost, _ := crypto.Encrypt([]byte("http://localhost:3000"))
verHost := crypto.CurrentEncryptionVersion()
encPk, _ := crypto.Encrypt([]byte("pk-ws"))
verPk := crypto.CurrentEncryptionVersion()
encSk, _ := crypto.Encrypt([]byte("sk-ws"))
verSk := crypto.CurrentEncryptionVersion()
mock := setupTestDB(t)
setupTestRedis(t)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-override", "LANGFUSE_HOST").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHost, verHost))
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-override", "LANGFUSE_PUBLIC_KEY").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk))
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-override", "LANGFUSE_SECRET_KEY").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encSk, verSk))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-partial"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-partial/traces", nil)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-override"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-override/traces", nil)
handler.List(c)
// We don't have a real Langfuse server, so the request will fail
// network-wise and return empty (graceful fallback)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
}
}
func TestTracesList_GlobalSecretsFallback(t *testing.T) {
setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
// No workspace secrets, but global secrets exist
encHost, _ := crypto.Encrypt([]byte("http://localhost:3000"))
verHost := crypto.CurrentEncryptionVersion()
encPk, _ := crypto.Encrypt([]byte("pk-global"))
verPk := crypto.CurrentEncryptionVersion()
encSk, _ := crypto.Encrypt([]byte("sk-global"))
verSk := crypto.CurrentEncryptionVersion()
mock := setupTestDB(t)
setupTestRedis(t)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-global", "LANGFUSE_HOST").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_HOST").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHost, verHost))
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-global", "LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_PUBLIC_KEY").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encPk, verPk))
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-global", "LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_SECRET_KEY").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encSk, verSk))
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-global"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-global/traces", nil)
handler.List(c)
@@ -70,7 +159,7 @@ func TestTracesList_PartialLangfuseConfig(t *testing.T) {
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list with partial config, got %d items", len(resp))
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
}
}
@@ -89,6 +178,30 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) {
os.Unsetenv("LANGFUSE_SECRET_KEY")
}()
// No workspace or global secrets, so env vars are used
mock := setupTestDB(t)
setupTestRedis(t)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-down", "LANGFUSE_HOST").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_HOST").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-down", "LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-down", "LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-down"}}
@@ -107,3 +220,111 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) {
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
}
}
func TestTracesList_PartialWorkspaceConfig(t *testing.T) {
handler := NewTracesHandler()
// Workspace has HOST but missing keys — should fall through all layers
// and ultimately return empty because config is incomplete.
encHost, _ := crypto.Encrypt([]byte("http://localhost:3000"))
verHost := crypto.CurrentEncryptionVersion()
mock := setupTestDB(t)
setupTestRedis(t)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-partial", "LANGFUSE_HOST").
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(encHost, verHost))
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-partial", "LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-partial", "LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-partial"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-partial/traces", nil)
handler.List(c)
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list with partial config, got %d items", len(resp))
}
}
func TestTracesList_LangfuseUpstreamError(t *testing.T) {
mock := setupTestDB(t)
setupTestRedis(t)
handler := NewTracesHandler()
// Start a mock Langfuse server that returns 500 with a non-JSON body
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html")
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("<html><body>Internal Server Error</body></html>"))
}))
defer upstream.Close()
os.Setenv("LANGFUSE_HOST", upstream.URL)
os.Setenv("LANGFUSE_PUBLIC_KEY", "pk-test")
os.Setenv("LANGFUSE_SECRET_KEY", "sk-test")
defer func() {
os.Unsetenv("LANGFUSE_HOST")
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
os.Unsetenv("LANGFUSE_SECRET_KEY")
}()
// No workspace/global secrets — falls through to env
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-500", "LANGFUSE_HOST").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_HOST").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-500", "LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_PUBLIC_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM workspace_secrets WHERE workspace_id = \$1 AND key = \$2`).
WithArgs("ws-traces-500", "LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
WithArgs("LANGFUSE_SECRET_KEY").
WillReturnError(sql.ErrNoRows)
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Params = gin.Params{{Key: "id", Value: "ws-traces-500"}}
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-500/traces", nil)
handler.List(c)
// Should return empty JSON (not proxy the 500 HTML) when Langfuse errors
if w.Code != http.StatusOK {
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []interface{}
json.Unmarshal(w.Body.Bytes(), &resp)
if len(resp) != 0 {
t.Errorf("expected empty list on upstream error, got %d items", len(resp))
}
}