fix(security): #2029 traces v1 SSRF — admin-only Langfuse host source #2133
@@ -1,12 +1,15 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/crypto"
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/db"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
@@ -18,30 +21,69 @@ func NewTracesHandler() *TracesHandler {
|
||||
return &TracesHandler{}
|
||||
}
|
||||
|
||||
type langfuseConfig struct {
|
||||
Host string
|
||||
Public string
|
||||
Secret string
|
||||
}
|
||||
|
||||
// resolveLangfuseConfig resolves Langfuse connection settings from
|
||||
// admin-controlled global secrets first, then process env for legacy/dev use.
|
||||
// Workspace secrets are intentionally excluded: a workspace-controlled
|
||||
// LANGFUSE_HOST would allow SSRF with BasicAuth attached (#2029).
|
||||
func resolveLangfuseConfig(ctx context.Context) (*langfuseConfig, error) {
|
||||
cfg := &langfuseConfig{}
|
||||
|
||||
resolve := func(key string) string {
|
||||
var val []byte
|
||||
var ver int
|
||||
err := db.DB.QueryRowContext(ctx,
|
||||
`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = $1`,
|
||||
key).Scan(&val, &ver)
|
||||
if err == nil {
|
||||
decrypted, decErr := crypto.DecryptVersioned(val, ver)
|
||||
if decErr == nil {
|
||||
return string(decrypted)
|
||||
}
|
||||
}
|
||||
return os.Getenv(key)
|
||||
}
|
||||
|
||||
cfg.Host = resolve("LANGFUSE_HOST")
|
||||
cfg.Public = resolve("LANGFUSE_PUBLIC_KEY")
|
||||
cfg.Secret = resolve("LANGFUSE_SECRET_KEY")
|
||||
|
||||
if cfg.Host == "" || cfg.Public == "" || cfg.Secret == "" {
|
||||
return nil, nil
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// List handles GET /workspaces/:id/traces
|
||||
// Proxies to Langfuse API to get recent traces for a workspace.
|
||||
func (h *TracesHandler) List(c *gin.Context) {
|
||||
workspaceID := c.Param("id")
|
||||
|
||||
langfuseHost := os.Getenv("LANGFUSE_HOST")
|
||||
langfusePublic := os.Getenv("LANGFUSE_PUBLIC_KEY")
|
||||
langfuseSecret := os.Getenv("LANGFUSE_SECRET_KEY")
|
||||
|
||||
if langfuseHost == "" || langfusePublic == "" || langfuseSecret == "" {
|
||||
cfg, err := resolveLangfuseConfig(c.Request.Context())
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to resolve trace config"})
|
||||
return
|
||||
}
|
||||
if cfg == nil {
|
||||
c.JSON(http.StatusOK, []interface{}{})
|
||||
return
|
||||
}
|
||||
|
||||
// Fetch traces from Langfuse, filtered by workspace tag or name
|
||||
url := fmt.Sprintf("%s/api/public/traces?limit=20&orderBy=timestamp&orderDir=desc&tags=%s",
|
||||
langfuseHost, workspaceID)
|
||||
cfg.Host, workspaceID)
|
||||
|
||||
req, err := http.NewRequestWithContext(c.Request.Context(), "GET", url, nil)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create request"})
|
||||
return
|
||||
}
|
||||
req.SetBasicAuth(langfusePublic, langfuseSecret)
|
||||
req.SetBasicAuth(cfg.Public, cfg.Secret)
|
||||
|
||||
resp, err := langfuseClient.Do(req)
|
||||
if err != nil {
|
||||
@@ -51,10 +93,14 @@ func (h *TracesHandler) List(c *gin.Context) {
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read response body"})
|
||||
body, readErr := io.ReadAll(resp.Body)
|
||||
if readErr != nil {
|
||||
c.JSON(http.StatusOK, []interface{}{})
|
||||
return
|
||||
}
|
||||
c.Data(resp.StatusCode, "application/json", body)
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
c.JSON(http.StatusOK, []interface{}{})
|
||||
return
|
||||
}
|
||||
c.Data(http.StatusOK, "application/json", body)
|
||||
}
|
||||
|
||||
@@ -1,27 +1,31 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"git.moleculesai.app/molecule-ai/molecule-core/workspace-server/internal/crypto"
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// ==================== GET /workspaces/:id/traces ====================
|
||||
|
||||
func TestTracesList_NoLangfuseConfig(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewTracesHandler()
|
||||
|
||||
// Ensure Langfuse env vars are not set
|
||||
os.Unsetenv("LANGFUSE_HOST")
|
||||
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
|
||||
os.Unsetenv("LANGFUSE_SECRET_KEY")
|
||||
|
||||
expectMissingGlobalLangfuseSecrets(mock)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces"}}
|
||||
@@ -41,14 +45,16 @@ func TestTracesList_NoLangfuseConfig(t *testing.T) {
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list when Langfuse not configured, got %d items", len(resp))
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations not met: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracesList_PartialLangfuseConfig(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewTracesHandler()
|
||||
|
||||
// Set only host, missing keys
|
||||
os.Setenv("LANGFUSE_HOST", "http://localhost:3000")
|
||||
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
|
||||
os.Unsetenv("LANGFUSE_SECRET_KEY")
|
||||
@@ -56,6 +62,8 @@ func TestTracesList_PartialLangfuseConfig(t *testing.T) {
|
||||
os.Unsetenv("LANGFUSE_HOST")
|
||||
}()
|
||||
|
||||
expectMissingGlobalLangfuseSecrets(mock)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-partial"}}
|
||||
@@ -72,10 +80,13 @@ func TestTracesList_PartialLangfuseConfig(t *testing.T) {
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list with partial config, got %d items", len(resp))
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations not met: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracesList_LangfuseUnreachable(t *testing.T) {
|
||||
setupTestDB(t)
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewTracesHandler()
|
||||
|
||||
@@ -89,6 +100,8 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) {
|
||||
os.Unsetenv("LANGFUSE_SECRET_KEY")
|
||||
}()
|
||||
|
||||
expectMissingGlobalLangfuseSecrets(mock)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-down"}}
|
||||
@@ -106,4 +119,171 @@ func TestTracesList_LangfuseUnreachable(t *testing.T) {
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations not met: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracesList_GlobalSecretsFallback(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewTracesHandler()
|
||||
|
||||
os.Unsetenv("LANGFUSE_HOST")
|
||||
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
|
||||
os.Unsetenv("LANGFUSE_SECRET_KEY")
|
||||
|
||||
expectGlobalLangfuseSecret(mock, "LANGFUSE_HOST", "http://localhost:3000")
|
||||
expectGlobalLangfuseSecret(mock, "LANGFUSE_PUBLIC_KEY", "pk-global")
|
||||
expectGlobalLangfuseSecret(mock, "LANGFUSE_SECRET_KEY", "sk-global")
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-global"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-global/traces", nil)
|
||||
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp []interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list when Langfuse unreachable, got %d items", len(resp))
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations not met: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracesList_GlobalPartialConfig(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewTracesHandler()
|
||||
|
||||
os.Unsetenv("LANGFUSE_HOST")
|
||||
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
|
||||
os.Unsetenv("LANGFUSE_SECRET_KEY")
|
||||
|
||||
expectGlobalLangfuseSecret(mock, "LANGFUSE_HOST", "http://localhost:3000")
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_PUBLIC_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs("LANGFUSE_SECRET_KEY").
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-partial"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-partial/traces", nil)
|
||||
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp []interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list with partial config, got %d items", len(resp))
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations not met: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracesList_LangfuseUpstreamError(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewTracesHandler()
|
||||
|
||||
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/html")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte("<html><body>Internal Server Error</body></html>"))
|
||||
}))
|
||||
defer upstream.Close()
|
||||
|
||||
os.Setenv("LANGFUSE_HOST", upstream.URL)
|
||||
os.Setenv("LANGFUSE_PUBLIC_KEY", "pk-test")
|
||||
os.Setenv("LANGFUSE_SECRET_KEY", "sk-test")
|
||||
defer func() {
|
||||
os.Unsetenv("LANGFUSE_HOST")
|
||||
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
|
||||
os.Unsetenv("LANGFUSE_SECRET_KEY")
|
||||
}()
|
||||
|
||||
expectMissingGlobalLangfuseSecrets(mock)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-500"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-500/traces", nil)
|
||||
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp []interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list on upstream error, got %d items", len(resp))
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations not met: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTracesList_WorkspaceSecretsIgnored(t *testing.T) {
|
||||
mock := setupTestDB(t)
|
||||
setupTestRedis(t)
|
||||
handler := NewTracesHandler()
|
||||
|
||||
os.Unsetenv("LANGFUSE_HOST")
|
||||
os.Unsetenv("LANGFUSE_PUBLIC_KEY")
|
||||
os.Unsetenv("LANGFUSE_SECRET_KEY")
|
||||
|
||||
expectMissingGlobalLangfuseSecrets(mock)
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(w)
|
||||
c.Params = gin.Params{{Key: "id", Value: "ws-traces-ssrf"}}
|
||||
c.Request = httptest.NewRequest("GET", "/workspaces/ws-traces-ssrf/traces", nil)
|
||||
|
||||
handler.List(c)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Errorf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp []interface{}
|
||||
json.Unmarshal(w.Body.Bytes(), &resp)
|
||||
if len(resp) != 0 {
|
||||
t.Errorf("expected empty list when workspace secrets ignored, got %d items", len(resp))
|
||||
}
|
||||
if err := mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("sqlmock expectations not met: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func expectMissingGlobalLangfuseSecrets(mock sqlmock.Sqlmock) {
|
||||
for _, key := range []string{"LANGFUSE_HOST", "LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY"} {
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs(key).
|
||||
WillReturnError(sql.ErrNoRows)
|
||||
}
|
||||
}
|
||||
|
||||
func expectGlobalLangfuseSecret(mock sqlmock.Sqlmock, key, value string) {
|
||||
enc, _ := crypto.Encrypt([]byte(value))
|
||||
ver := crypto.CurrentEncryptionVersion()
|
||||
mock.ExpectQuery(`SELECT encrypted_value, encryption_version FROM global_secrets WHERE key = \$1`).
|
||||
WithArgs(key).
|
||||
WillReturnRows(sqlmock.NewRows([]string{"encrypted_value", "encryption_version"}).AddRow(enc, ver))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user