- Add agent/embeddings.py with Embedder protocol, FastEmbedEmbedder, OpenAIEmbedder - Factory function get_embedder() reads provider from config.yaml embeddings section - Lazy initialization — no startup impact, model loaded on first embed call - cosine_similarity() and cosine_similarity_matrix() utility functions included - Add fastembed as optional dependency in pyproject.toml - 30 unit tests, all passing Closes #675
220 lines
7.1 KiB
Python
220 lines
7.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Embedding Infrastructure — Configurable local (fastembed) + API (OpenAI) embedders.
|
|
|
|
Provides a shared embedding capability for cognitive memory recall (#509),
|
|
semantic codebase search (#489), and future similarity-based operations.
|
|
|
|
Usage:
|
|
embedder = get_embedder(config)
|
|
vector = embedder.embed_text("some text")
|
|
vectors = embedder.embed_texts(["text1", "text2"])
|
|
|
|
Config (config.yaml):
|
|
embeddings:
|
|
provider: "local" # "local" or "openai"
|
|
model: "all-MiniLM-L6-v2" # for local
|
|
# model: "text-embedding-3-small" # for openai
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
from typing import Protocol, runtime_checkable
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Protocol (interface)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@runtime_checkable
|
|
class Embedder(Protocol):
|
|
def embed_text(self, text: str) -> list[float]: ...
|
|
def embed_texts(self, texts: list[str]) -> list[list[float]]: ...
|
|
|
|
@property
|
|
def dimensions(self) -> int: ...
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Local embedder (fastembed)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class FastEmbedEmbedder:
|
|
"""Local embeddings via fastembed (all-MiniLM-L6-v2, 384 dims).
|
|
|
|
~100MB model downloaded on first use to ~/.cache/fastembed/.
|
|
No API key needed, private, fast (~5ms per embed).
|
|
Requires: pip install fastembed
|
|
"""
|
|
|
|
DEFAULT_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
|
|
|
|
def __init__(self, model: str = DEFAULT_MODEL):
|
|
self.model_name = model
|
|
self._model = None # Lazy initialization
|
|
|
|
def _load(self):
|
|
if self._model is not None:
|
|
return
|
|
try:
|
|
from fastembed import TextEmbedding
|
|
except ImportError:
|
|
raise ImportError(
|
|
"fastembed is not installed. "
|
|
"Install it with: pip install fastembed\n"
|
|
"Or: pip install 'hermes-agent[embeddings]'"
|
|
)
|
|
logger.info("Loading fastembed model '%s' (first use may download ~100MB)...", self.model_name)
|
|
self._model = TextEmbedding(model_name=self.model_name)
|
|
logger.info("fastembed model loaded.")
|
|
|
|
def embed_text(self, text: str) -> list[float]:
|
|
self._load()
|
|
results = list(self._model.embed([text]))
|
|
return results[0].tolist()
|
|
|
|
def embed_texts(self, texts: list[str]) -> list[list[float]]:
|
|
self._load()
|
|
results = list(self._model.embed(texts))
|
|
return [r.tolist() for r in results]
|
|
|
|
@property
|
|
def dimensions(self) -> int:
|
|
return 384 # all-MiniLM-L6-v2 fixed dims
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# OpenAI embedder
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class OpenAIEmbedder:
|
|
"""API embeddings via OpenAI (text-embedding-3-small, 1536 dims).
|
|
|
|
Uses existing OpenAI client from config.
|
|
Higher quality but costs ~$0.02/1M tokens.
|
|
Requires: openai (already a dependency)
|
|
"""
|
|
|
|
DEFAULT_MODEL = "text-embedding-3-small"
|
|
_DIMENSIONS = {
|
|
"text-embedding-3-small": 1536,
|
|
"text-embedding-3-large": 3072,
|
|
"text-embedding-ada-002": 1536,
|
|
}
|
|
|
|
def __init__(self, model: str = DEFAULT_MODEL, api_key: str = None, base_url: str = None):
|
|
self.model_name = model
|
|
self._api_key = api_key
|
|
self._base_url = base_url
|
|
self._client = None # Lazy initialization
|
|
|
|
def _load(self):
|
|
if self._client is not None:
|
|
return
|
|
try:
|
|
from openai import OpenAI
|
|
except ImportError:
|
|
raise ImportError("openai package is not installed.")
|
|
kwargs = {}
|
|
if self._api_key:
|
|
kwargs["api_key"] = self._api_key
|
|
if self._base_url:
|
|
kwargs["base_url"] = self._base_url
|
|
self._client = OpenAI(**kwargs)
|
|
|
|
def embed_text(self, text: str) -> list[float]:
|
|
self._load()
|
|
response = self._client.embeddings.create(input=[text], model=self.model_name)
|
|
return response.data[0].embedding
|
|
|
|
def embed_texts(self, texts: list[str]) -> list[list[float]]:
|
|
self._load()
|
|
response = self._client.embeddings.create(input=texts, model=self.model_name)
|
|
return [item.embedding for item in response.data]
|
|
|
|
@property
|
|
def dimensions(self) -> int:
|
|
return self._DIMENSIONS.get(self.model_name, 1536)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Factory
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_embedder(config: dict) -> Embedder:
|
|
"""Factory: returns configured embedder based on config dict.
|
|
|
|
Args:
|
|
config: Full config dict. Reads from config["embeddings"] section.
|
|
|
|
Returns:
|
|
An Embedder instance.
|
|
|
|
Raises:
|
|
ValueError: If provider is unknown.
|
|
ImportError: If required package is not installed.
|
|
"""
|
|
emb_config = config.get("embeddings", {})
|
|
provider = emb_config.get("provider", "local")
|
|
model = emb_config.get("model")
|
|
|
|
if provider == "local":
|
|
effective_model = model or FastEmbedEmbedder.DEFAULT_MODEL
|
|
return FastEmbedEmbedder(model=effective_model)
|
|
|
|
elif provider == "openai":
|
|
effective_model = model or OpenAIEmbedder.DEFAULT_MODEL
|
|
api_key = emb_config.get("api_key")
|
|
base_url = emb_config.get("base_url")
|
|
return OpenAIEmbedder(model=effective_model, api_key=api_key, base_url=base_url)
|
|
|
|
else:
|
|
raise ValueError(
|
|
f"Unknown embedding provider '{provider}'. "
|
|
"Supported providers: 'local', 'openai'"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Utility functions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def cosine_similarity(a: list[float], b: list[float]) -> float:
|
|
"""Compute cosine similarity between two vectors.
|
|
|
|
Returns a value in [-1, 1]. Higher = more similar.
|
|
Returns 0.0 if either vector has zero magnitude.
|
|
"""
|
|
if len(a) != len(b):
|
|
raise ValueError(f"Vector dimensions must match: {len(a)} != {len(b)}")
|
|
|
|
dot = sum(x * y for x, y in zip(a, b))
|
|
mag_a = math.sqrt(sum(x * x for x in a))
|
|
mag_b = math.sqrt(sum(x * x for x in b))
|
|
|
|
if mag_a == 0.0 or mag_b == 0.0:
|
|
return 0.0
|
|
|
|
return dot / (mag_a * mag_b)
|
|
|
|
|
|
def cosine_similarity_matrix(vectors: list[list[float]]) -> list[list[float]]:
|
|
"""Compute NxN pairwise cosine similarity matrix.
|
|
|
|
Useful for deduplication: if matrix[i][j] >= 0.98, items i and j are near-duplicates.
|
|
|
|
Returns:
|
|
NxN matrix where matrix[i][j] = cosine_similarity(vectors[i], vectors[j])
|
|
"""
|
|
n = len(vectors)
|
|
matrix = [[0.0] * n for _ in range(n)]
|
|
for i in range(n):
|
|
matrix[i][i] = 1.0
|
|
for j in range(i + 1, n):
|
|
sim = cosine_similarity(vectors[i], vectors[j])
|
|
matrix[i][j] = sim
|
|
matrix[j][i] = sim
|
|
return matrix
|