molecule-core/workspace/builtin_tools/telemetry.py
Hongming Wang d8026347e5 chore: open-source restructure — rename dirs, remove internal files, scrub secrets
Renames:
- platform/ → workspace-server/ (Go module path stays as "platform" for
  external dep compat — will update after plugin module republish)
- workspace-template/ → workspace/

Removed (moved to separate repos or deleted):
- PLAN.md — internal roadmap (move to private project board)
- HANDOFF.md, AGENTS.md — one-time internal session docs
- .claude/ — gitignored entirely (local agent config)
- infra/cloudflare-worker/ → Molecule-AI/molecule-tenant-proxy
- org-templates/molecule-dev/ → standalone template repo
- .mcp-eval/ → molecule-mcp-server repo
- test-results/ — ephemeral, gitignored

Security scrubbing:
- Cloudflare account/zone/KV IDs → placeholders
- Real EC2 IPs → <EC2_IP> in all docs
- CF token prefix, Neon project ID, Fly app names → redacted
- Langfuse dev credentials → parameterized
- Personal runner username/machine name → generic

Community files:
- CONTRIBUTING.md — build, test, branch conventions
- CODE_OF_CONDUCT.md — Contributor Covenant 2.1

All Dockerfiles, CI workflows, docker-compose, railway.toml, render.yaml,
README, CLAUDE.md updated for new directory names.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-18 00:24:44 -07:00

419 lines
16 KiB
Python

"""OpenTelemetry (OTEL) instrumentation for the Molecule AI workspace runtime.
Architecture
------------
* One global ``TracerProvider`` is initialised at startup via ``setup_telemetry()``.
* Up to three exporters are wired in:
1. **OTLP/HTTP** — activated when ``OTEL_EXPORTER_OTLP_ENDPOINT`` is set.
Point this at any compatible collector (Jaeger, Tempo, Grafana OTEL, …).
2. **Langfuse OTLP bridge** — activated when the ``LANGFUSE_HOST``,
``LANGFUSE_PUBLIC_KEY`` and ``LANGFUSE_SECRET_KEY`` env vars are all present.
Langfuse ≥4 accepts OTLP/HTTP at ``<host>/api/public/otel``.
This is a *second* exporter alongside the existing Langfuse LangChain
callback handler in agent.py — both paths emit spans simultaneously.
3. **Console** (debug) — activated when ``OTEL_DEBUG=1``.
* **W3C TraceContext** propagation (``traceparent`` / ``tracestate``) is used for
cross-workspace context injection and extraction so A2A hops form a single
distributed trace.
* ``make_trace_middleware()`` returns an ASGI middleware that extracts incoming
trace context from HTTP headers and stores it in a ``ContextVar`` so the
A2A executor can access it to parent its spans correctly.
GenAI semantic conventions
--------------------------
Attribute constants for ``gen_ai.*`` follow OpenTelemetry GenAI SemConv 1.26.
Usage example
-------------
# main.py — call once at startup
from builtin_tools.telemetry import setup_telemetry, make_trace_middleware
setup_telemetry(service_name=workspace_id)
instrumented = make_trace_middleware(app.build())
# Any module
from builtin_tools.telemetry import get_tracer
tracer = get_tracer()
with tracer.start_as_current_span("my_span") as span:
span.set_attribute("key", "value")
# Outgoing HTTP — inject W3C headers
from builtin_tools.telemetry import inject_trace_headers
headers = inject_trace_headers({"Content-Type": "application/json"})
await client.post(url, headers=headers, ...)
# Incoming HTTP — extract context (done automatically by middleware)
from builtin_tools.telemetry import extract_trace_context
ctx = extract_trace_context(dict(request.headers))
"""
from __future__ import annotations
import base64
import logging
import os
from contextvars import ContextVar
from typing import Any, Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# GenAI Semantic Convention attribute keys (OTel SemConv 1.26)
# https://opentelemetry.io/docs/specs/semconv/gen-ai/
# ---------------------------------------------------------------------------
GEN_AI_SYSTEM = "gen_ai.system"
GEN_AI_REQUEST_MODEL = "gen_ai.request.model"
GEN_AI_OPERATION_NAME = "gen_ai.operation.name"
GEN_AI_USAGE_INPUT_TOKENS = "gen_ai.usage.input_tokens"
GEN_AI_USAGE_OUTPUT_TOKENS = "gen_ai.usage.output_tokens"
GEN_AI_RESPONSE_FINISH_REASONS = "gen_ai.response.finish_reasons"
# ---------------------------------------------------------------------------
# Workspace / A2A attribute keys
# ---------------------------------------------------------------------------
WORKSPACE_ID_ATTR = "workspace.id"
A2A_SOURCE_WORKSPACE = "a2a.source_workspace_id"
A2A_TARGET_WORKSPACE = "a2a.target_workspace_id"
A2A_TASK_ID = "a2a.task_id"
MEMORY_SCOPE = "memory.scope"
MEMORY_QUERY = "memory.query"
# ---------------------------------------------------------------------------
# Module-level state
# ---------------------------------------------------------------------------
WORKSPACE_ID: str = os.environ.get("WORKSPACE_ID", "unknown")
_initialized: bool = False
_tracer: Any = None # opentelemetry.trace.Tracer | _NoopTracer
# ContextVar that carries incoming trace context from the ASGI middleware to
# the A2A executor. Using a ContextVar (rather than a global) is safe with
# asyncio because each task inherits a copy of the context at creation time.
_incoming_trace_context: ContextVar[Optional[Any]] = ContextVar(
"otel_incoming_trace_context", default=None
)
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def setup_telemetry(service_name: Optional[str] = None) -> None:
"""Initialise the global ``TracerProvider``. Safe to call multiple times.
Reads configuration from environment variables:
``OTEL_EXPORTER_OTLP_ENDPOINT``
Base URL of an OTLP-compatible collector (e.g. ``http://jaeger:4318``).
Spans are sent to ``<endpoint>/v1/traces``.
``LANGFUSE_HOST`` + ``LANGFUSE_PUBLIC_KEY`` + ``LANGFUSE_SECRET_KEY``
When all three are set, a second OTLP exporter is wired to Langfuse's
ingest endpoint using HTTP Basic auth.
``OTEL_DEBUG``
Set to ``1`` / ``true`` to also print spans to stdout.
"""
global _initialized, _tracer
if _initialized:
return
try:
from opentelemetry import propagate, trace
from opentelemetry.baggage.propagation import W3CBaggagePropagator
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.sdk.resources import SERVICE_NAME as OTEL_SERVICE_NAME
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
except ImportError as exc:
logger.warning(
"OTEL: opentelemetry packages not installed — telemetry disabled. "
"Add opentelemetry-api, opentelemetry-sdk, "
"opentelemetry-exporter-otlp-proto-http to requirements.txt. "
"Error: %s",
exc,
)
return
svc = service_name or f"molecule-{WORKSPACE_ID}"
resource = Resource.create(
{
OTEL_SERVICE_NAME: svc,
"service.version": "1.0.0",
WORKSPACE_ID_ATTR: WORKSPACE_ID,
}
)
provider = TracerProvider(resource=resource)
# -- Exporter 1: Generic OTLP/HTTP ----------------------------------------
otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "").rstrip("/")
if otlp_endpoint:
try:
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
exporter = OTLPSpanExporter(endpoint=f"{otlp_endpoint}/v1/traces")
provider.add_span_processor(BatchSpanProcessor(exporter))
logger.info("OTEL: OTLP/HTTP exporter → %s", otlp_endpoint)
except ImportError:
logger.warning(
"OTEL: OTEL_EXPORTER_OTLP_ENDPOINT is set but "
"opentelemetry-exporter-otlp-proto-http is not installed"
)
except Exception as exc:
logger.warning("OTEL: OTLP exporter init failed: %s", exc)
# -- Exporter 2: Langfuse OTLP bridge -------------------------------------
# Langfuse ≥4 accepts OTLP at <host>/api/public/otel (Basic auth).
lf_host = os.environ.get("LANGFUSE_HOST", "").rstrip("/")
lf_public = os.environ.get("LANGFUSE_PUBLIC_KEY", "")
lf_secret = os.environ.get("LANGFUSE_SECRET_KEY", "")
if lf_host and lf_public and lf_secret:
try:
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
lf_endpoint = f"{lf_host}/api/public/otel/v1/traces"
token = base64.b64encode(f"{lf_public}:{lf_secret}".encode()).decode()
lf_exporter = OTLPSpanExporter(
endpoint=lf_endpoint,
headers={"Authorization": f"Basic {token}"},
)
provider.add_span_processor(BatchSpanProcessor(lf_exporter))
logger.info("OTEL: Langfuse OTLP bridge → %s", lf_endpoint)
except ImportError:
logger.warning(
"OTEL: Langfuse env vars set but "
"opentelemetry-exporter-otlp-proto-http is not installed"
)
except Exception as exc:
logger.warning("OTEL: Langfuse OTLP bridge init failed: %s", exc)
# -- Exporter 3: Console (debug) ------------------------------------------
if os.environ.get("OTEL_DEBUG", "").lower() in ("1", "true", "yes"):
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
logger.info("OTEL: console debug exporter enabled")
# -- Register global provider + W3C propagators ---------------------------
trace.set_tracer_provider(provider)
propagate.set_global_textmap(
CompositePropagator(
[
TraceContextTextMapPropagator(),
W3CBaggagePropagator(),
]
)
)
_tracer = trace.get_tracer(
"molecule.workspace",
schema_url="https://opentelemetry.io/schemas/1.26.0",
)
_initialized = True
logger.info("OTEL: telemetry initialised for service '%s'", svc)
def get_tracer() -> Any:
"""Return the global ``Tracer``. Lazily calls ``setup_telemetry()`` if needed.
Returns a no-op tracer when the opentelemetry packages are not installed so
that instrumented code never raises ``ImportError``.
"""
global _tracer
if not _initialized:
setup_telemetry()
if _tracer is None:
# Packages unavailable — hand back a no-op implementation
try:
from opentelemetry import trace
return trace.get_tracer("molecule.noop")
except ImportError:
return _NoopTracer()
return _tracer
def inject_trace_headers(headers: dict) -> dict:
"""Inject W3C ``traceparent`` / ``tracestate`` into *headers* and return it.
Mutates the dict in-place so it can be used directly::
headers = inject_trace_headers({"Content-Type": "application/json"})
await client.post(url, headers=headers, ...)
"""
try:
from opentelemetry import propagate
propagate.inject(headers)
except Exception:
pass # Never let telemetry break the caller
return headers
def extract_trace_context(carrier: dict) -> Any:
"""Extract W3C trace context from a header mapping.
Returns an OpenTelemetry ``Context`` object suitable for::
tracer.start_as_current_span("name", context=ctx)
Returns ``None`` when packages are unavailable or no context is present.
"""
try:
from opentelemetry import propagate
return propagate.extract(carrier)
except Exception:
return None
def get_current_traceparent() -> Optional[str]:
"""Return the W3C ``traceparent`` string for the active span, or ``None``."""
try:
from opentelemetry import trace
span = trace.get_current_span()
ctx = span.get_span_context()
if not ctx.is_valid:
return None
trace_id = format(ctx.trace_id, "032x")
span_id = format(ctx.span_id, "016x")
flags = "01" if ctx.trace_flags else "00"
return f"00-{trace_id}-{span_id}-{flags}"
except Exception:
return None
def make_trace_middleware(asgi_app: Any) -> Any:
"""Wrap an ASGI application with W3C trace-context extraction middleware.
The middleware reads ``traceparent`` / ``tracestate`` from every incoming
HTTP request and stores the extracted ``Context`` in the
``_incoming_trace_context`` ContextVar. The A2A executor reads that
ContextVar to parent its ``task_receive`` span correctly, forming an
unbroken distributed trace across workspace hops.
Usage::
built = app.build()
instrumented = make_trace_middleware(built)
uvicorn.Config(instrumented, ...)
"""
async def _middleware(scope: dict, receive: Any, send: Any) -> None: # type: ignore[override]
if scope.get("type") != "http":
await asgi_app(scope, receive, send)
return
# Decode byte-headers from the ASGI scope (latin-1 per HTTP/1.1 spec)
raw_headers: list[tuple[bytes, bytes]] = scope.get("headers", [])
str_headers: dict[str, str] = {
k.decode("latin-1"): v.decode("latin-1") for k, v in raw_headers
}
ctx = extract_trace_context(str_headers)
token = _incoming_trace_context.set(ctx)
try:
await asgi_app(scope, receive, send)
finally:
_incoming_trace_context.reset(token)
return _middleware
# ---------------------------------------------------------------------------
# Helpers for GenAI attributes
# ---------------------------------------------------------------------------
def gen_ai_system_from_model(model_str: str) -> str:
"""Map a ``provider:model`` string to a ``gen_ai.system`` value."""
if ":" not in model_str:
return "unknown"
provider = model_str.split(":", 1)[0].lower()
return {
"anthropic": "anthropic",
"openai": "openai",
"openrouter": "openrouter",
"groq": "groq",
"google_genai": "google",
"ollama": "ollama",
}.get(provider, provider)
def record_llm_token_usage(span: Any, result: dict) -> None:
"""Extract token counts from a LangGraph ainvoke result and set span attrs.
Handles both Anthropic (``usage``) and OpenAI (``token_usage``) metadata
shapes. Silently skips if metadata is absent.
"""
try:
messages = result.get("messages", [])
for msg in reversed(messages):
meta = getattr(msg, "response_metadata", {}) or {}
# Anthropic
usage = meta.get("usage", {})
if usage:
inp = usage.get("input_tokens") or usage.get("prompt_tokens")
out = usage.get("output_tokens") or usage.get("completion_tokens")
if inp is not None:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, int(inp))
if out is not None:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, int(out))
return
# OpenAI
token_usage = meta.get("token_usage", {})
if token_usage:
inp = token_usage.get("prompt_tokens")
out = token_usage.get("completion_tokens")
if inp is not None:
span.set_attribute(GEN_AI_USAGE_INPUT_TOKENS, int(inp))
if out is not None:
span.set_attribute(GEN_AI_USAGE_OUTPUT_TOKENS, int(out))
return
except Exception:
pass # Best-effort — never break the caller
# ---------------------------------------------------------------------------
# No-op fallbacks (used when opentelemetry packages are absent)
# ---------------------------------------------------------------------------
class _NoopSpan:
"""Transparent no-op span that satisfies the context-manager protocol."""
def set_attribute(self, key: str, value: Any) -> None: # noqa: ARG002
pass
def set_status(self, *args: Any, **kwargs: Any) -> None:
pass
def record_exception(self, exc: BaseException, *args: Any, **kwargs: Any) -> None:
pass
def add_event(self, name: str, *args: Any, **kwargs: Any) -> None:
pass
def __enter__(self) -> "_NoopSpan":
return self
def __exit__(self, *args: Any) -> None:
pass
class _NoopTracer:
"""Transparent no-op tracer returned when the SDK is unavailable."""
def start_as_current_span(self, name: str, *args: Any, **kwargs: Any) -> _NoopSpan: # noqa: ARG002
return _NoopSpan()
def start_span(self, name: str, *args: Any, **kwargs: Any) -> _NoopSpan: # noqa: ARG002
return _NoopSpan()