diff --git a/README.md b/README.md index 6d7360c3..44c116af 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,11 @@ Input example: } }, "index_name": "my_index", - "encoder": "my_encoder" + "encoder": { + "type": "openai", + "name": "text-embedding-3-small", + "dimensions": 1536 # encoder depends on the provider and model + }, "webhook_url": "https://my-webhook-url" } ``` @@ -42,7 +46,10 @@ Input example: } }, "index_name": "my_index", - "encoder": "my_encoder", + "encoder": { + "type": "openai", + "name": "text-embedding-3-small", + } } ``` diff --git a/api/delete.py b/api/delete.py index 44527897..89d2a17e 100644 --- a/api/delete.py +++ b/api/delete.py @@ -10,11 +10,12 @@ @router.delete("/delete", response_model=ResponsePayload) async def delete(payload: RequestPayload): - encoder = get_encoder(encoder_type=payload.encoder) + encoder = get_encoder(encoder_config=payload.encoder) vector_service: BaseVectorDatabase = get_vector_service( index_name=payload.index_name, credentials=payload.vector_database, encoder=encoder, + dimensions=encoder.dimensions, ) data = await vector_service.delete(file_url=payload.file_url) return ResponsePayload(success=True, data=data) diff --git a/api/ingest.py b/api/ingest.py index 707b3a21..0aa454c6 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -17,13 +17,15 @@ async def ingest(payload: RequestPayload) -> Dict: files=payload.files, index_name=payload.index_name, vector_credentials=payload.vector_database, + dimensions=payload.encoder.dimensions, ) chunks = await embedding_service.generate_chunks() - encoder = get_encoder(encoder_type=payload.encoder) + encoder = get_encoder(encoder_config=payload.encoder) summary_documents = await embedding_service.generate_summary_documents( documents=chunks ) - + print(summary_documents) + return {"success": True} await asyncio.gather( embedding_service.generate_and_upsert_embeddings( documents=chunks, encoder=encoder, index_name=payload.index_name diff --git a/encoders/__init__.py b/encoders/__init__.py deleted file mode 100644 index dd76ea90..00000000 --- a/encoders/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -from encoders.base import BaseEncoder -from encoders.bm25 import BM25Encoder -from encoders.cohere import CohereEncoder -from encoders.huggingface import HuggingFaceEncoder -from encoders.openai import OpenAIEncoder - -__all__ = [ - "BaseEncoder", - "CohereEncoder", - "OpenAIEncoder", - "BM25Encoder", - "HuggingFaceEncoder", -] diff --git a/encoders/base.py b/encoders/base.py deleted file mode 100644 index 8320b486..00000000 --- a/encoders/base.py +++ /dev/null @@ -1,16 +0,0 @@ -from typing import List - -from pydantic.v1 import BaseModel, Field - - -class BaseEncoder(BaseModel): - name: str - score_threshold: float - type: str = Field(default="base") - dimension: int = Field(default=1536) - - class Config: - arbitrary_types_allowed = True - - def __call__(self, docs: List[str]) -> List[List[float]]: - raise NotImplementedError("Subclasses must implement this method") diff --git a/encoders/bm25.py b/encoders/bm25.py deleted file mode 100644 index 1965fb6e..00000000 --- a/encoders/bm25.py +++ /dev/null @@ -1,67 +0,0 @@ -from typing import Any, Dict, List, Optional - -from semantic_router.encoders import BaseEncoder -from semantic_router.utils.logger import logger - - -class BM25Encoder(BaseEncoder): - model: Optional[Any] = None - idx_mapping: Optional[Dict[int, int]] = None - type: str = "sparse" - - def __init__( - self, - name: str = "bm25", - score_threshold: float = 0.82, - use_default_params: bool = True, - ): - super().__init__(name=name, score_threshold=score_threshold) - try: - from pinecone_text.sparse import BM25Encoder as encoder - except ImportError: - raise ImportError( - "Please install pinecone-text to use BM25Encoder. " - "You can install it with: `pip install 'semantic-router[hybrid]'`" - ) - - self.model = encoder() - - if use_default_params: - logger.info("Downloading and initializing default sBM25 model parameters.") - self.model = encoder.default() - self._set_idx_mapping() - - def _set_idx_mapping(self): - params = self.model.get_params() - doc_freq = params["doc_freq"] - if isinstance(doc_freq, dict): - indices = doc_freq["indices"] - self.idx_mapping = {int(idx): i for i, idx in enumerate(indices)} - else: - raise TypeError("Expected a dictionary for 'doc_freq'") - - def __call__(self, docs: List[str]) -> List[List[float]]: - if self.model is None or self.idx_mapping is None: - raise ValueError("Model or index mapping is not initialized.") - if len(docs) == 1: - sparse_dicts = self.model.encode_queries(docs) - elif len(docs) > 1: - sparse_dicts = self.model.encode_documents(docs) - else: - raise ValueError("No documents to encode.") - - embeds = [[0.0] * len(self.idx_mapping)] * len(docs) - for i, output in enumerate(sparse_dicts): - indices = output["indices"] - values = output["values"] - for idx, val in zip(indices, values): - if idx in self.idx_mapping: - position = self.idx_mapping[idx] - embeds[i][position] = val - return embeds - - def fit(self, docs: List[str]): - if self.model is None: - raise ValueError("Model is not initialized.") - self.model.fit(docs) - self._set_idx_mapping() diff --git a/encoders/cohere.py b/encoders/cohere.py deleted file mode 100644 index 76209ba1..00000000 --- a/encoders/cohere.py +++ /dev/null @@ -1,40 +0,0 @@ -from typing import List, Optional - -import cohere -from decouple import config - -from encoders import BaseEncoder - - -class CohereEncoder(BaseEncoder): - client: Optional[cohere.Client] = None - type: str = "cohere" - dimension: int = 1024 # https://docs.cohere.com/reference/embed - - def __init__( - self, - name: Optional[str] = None, - cohere_api_key: Optional[str] = None, - score_threshold: float = 0.3, - ): - if name is None: - name = config("COHERE_MODEL_NAME", "embed-english-v3.0") - super().__init__(name=name, score_threshold=score_threshold) - cohere_api_key = cohere_api_key or config("COHERE_API_KEY") - if cohere_api_key is None: - raise ValueError("Cohere API key cannot be 'None'.") - try: - self.client = cohere.Client(cohere_api_key) - except Exception as e: - raise ValueError( - f"Cohere API client failed to initialize. Error: {e}" - ) from e - - def __call__(self, docs: List[str]) -> List[List[float]]: - if self.client is None: - raise ValueError("Cohere client is not initialized.") - try: - embeds = self.client.embed(docs, input_type="search_query", model=self.name) - return embeds.embeddings - except Exception as e: - raise ValueError(f"Cohere API call failed. Error: {e}") from e diff --git a/encoders/huggingface.py b/encoders/huggingface.py deleted file mode 100644 index 63dfa54c..00000000 --- a/encoders/huggingface.py +++ /dev/null @@ -1,114 +0,0 @@ -from typing import Any, List, Optional - -from pydantic.v1 import PrivateAttr - -from encoders import BaseEncoder - - -class HuggingFaceEncoder(BaseEncoder): - name: str = "sentence-transformers/all-MiniLM-L6-v2" - type: str = "huggingface" - score_threshold: float = 0.5 - tokenizer_kwargs: dict = {} - model_kwargs: dict = {} - device: Optional[str] = None - _tokenizer: Any = PrivateAttr() - _model: Any = PrivateAttr() - _torch: Any = PrivateAttr() - - def __init__(self, **data): - super().__init__(**data) - self._tokenizer, self._model = self._initialize_hf_model() - - def _initialize_hf_model(self): - try: - from transformers import AutoModel, AutoTokenizer - except ImportError: - raise ImportError( - "Please install transformers to use HuggingFaceEncoder. " - "You can install it with: " - "`pip install semantic-router[local]`" - ) - - try: - import torch - except ImportError: - raise ImportError( - "Please install Pytorch to use HuggingFaceEncoder. " - "You can install it with: " - "`pip install semantic-router[local]`" - ) - - self._torch = torch - - tokenizer = AutoTokenizer.from_pretrained( - self.name, - **self.tokenizer_kwargs, - ) - - model = AutoModel.from_pretrained(self.name, **self.model_kwargs) - - if self.device: - model.to(self.device) - - else: - device = "cuda" if self._torch.cuda.is_available() else "cpu" - model.to(device) - self.device = device - - return tokenizer, model - - def __call__( - self, - docs: List[str], - batch_size: int = 32, - normalize_embeddings: bool = True, - pooling_strategy: str = "mean", - ) -> List[List[float]]: - all_embeddings = [] - for i in range(0, len(docs), batch_size): - batch_docs = docs[i : i + batch_size] - - encoded_input = self._tokenizer( - batch_docs, padding=True, truncation=True, return_tensors="pt" - ).to(self.device) - - with self._torch.no_grad(): - model_output = self._model(**encoded_input) - - if pooling_strategy == "mean": - embeddings = self._mean_pooling( - model_output, encoded_input["attention_mask"] - ) - elif pooling_strategy == "max": - embeddings = self._max_pooling( - model_output, encoded_input["attention_mask"] - ) - else: - raise ValueError( - "Invalid pooling_strategy. Please use 'mean' or 'max'." - ) - - if normalize_embeddings: - embeddings = self._torch.nn.functional.normalize(embeddings, p=2, dim=1) - - embeddings = embeddings.tolist() - all_embeddings.extend(embeddings) - return all_embeddings - - def _mean_pooling(self, model_output, attention_mask): - token_embeddings = model_output[0] - input_mask_expanded = ( - attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() - ) - return self._torch.sum( - token_embeddings * input_mask_expanded, 1 - ) / self._torch.clamp(input_mask_expanded.sum(1), min=1e-9) - - def _max_pooling(self, model_output, attention_mask): - token_embeddings = model_output[0] - input_mask_expanded = ( - attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() - ) - token_embeddings[input_mask_expanded == 0] = -1e9 - return self._torch.max(token_embeddings, 1)[0] diff --git a/encoders/openai.py b/encoders/openai.py deleted file mode 100644 index f4857b12..00000000 --- a/encoders/openai.py +++ /dev/null @@ -1,65 +0,0 @@ -from time import sleep -from typing import List, Optional - -import openai -from decouple import config -from openai import OpenAIError -from openai.types import CreateEmbeddingResponse -from semantic_router.utils.logger import logger - -from encoders import BaseEncoder - - -class OpenAIEncoder(BaseEncoder): - client: Optional[openai.Client] - type: str = "openai" - dimension: int = 1536 - - def __init__( - self, - name: Optional[str] = None, - openai_api_key: Optional[str] = None, - score_threshold: float = 0.82, - ): - if name is None: - name = config("OPENAI_MODEL_NAME", "text-embedding-3-small") - super().__init__(name=name, score_threshold=score_threshold) - api_key = openai_api_key or config("OPENAI_API_KEY") - if api_key is None: - raise ValueError("OpenAI API key cannot be 'None'.") - try: - self.client = openai.Client(api_key=api_key) - except Exception as e: - raise ValueError( - f"OpenAI API client failed to initialize. Error: {e}" - ) from e - - def __call__(self, docs: List[str]) -> List[List[float]]: - if self.client is None: - raise ValueError("OpenAI client is not initialized.") - embeds = None - error_message = "" - - # Exponential backoff - for j in range(3): - try: - embeds = self.client.embeddings.create(input=docs, model=self.name) - if embeds.data: - break - except OpenAIError as e: - sleep(2**j) - error_message = str(e) - logger.warning(f"Retrying in {2**j} seconds...") - except Exception as e: - logger.error(f"OpenAI API call failed. Error: {error_message}") - raise ValueError(f"OpenAI API call failed. Error: {e}") from e - - if ( - not embeds - or not isinstance(embeds, CreateEmbeddingResponse) - or not embeds.data - ): - raise ValueError(f"No embeddings returned. Error: {error_message}") - - embeddings = [embeds_obj.embedding for embeds_obj in embeds.data] - return embeddings diff --git a/models/delete.py b/models/delete.py index 9b4bb899..5420bebe 100644 --- a/models/delete.py +++ b/models/delete.py @@ -1,6 +1,6 @@ from pydantic import BaseModel -from models.ingest import EncoderEnum +from models.ingest import Encoder from models.vector_database import VectorDatabase @@ -8,7 +8,7 @@ class RequestPayload(BaseModel): index_name: str file_url: str vector_database: VectorDatabase - encoder: EncoderEnum + encoder: Encoder class DeleteResponse(BaseModel): diff --git a/models/ingest.py b/models/ingest.py index 021c3a17..d61cd3f6 100644 --- a/models/ingest.py +++ b/models/ingest.py @@ -14,9 +14,15 @@ class EncoderEnum(str, Enum): fastembed = "fastembed" +class Encoder(BaseModel): + name: str + type: str + dimensions: Optional[int] = None + + class RequestPayload(BaseModel): files: List[File] - encoder: EncoderEnum + encoder: Encoder vector_database: VectorDatabase index_name: str webhook_url: Optional[str] = None diff --git a/models/query.py b/models/query.py index f5f217b8..9d2a897e 100644 --- a/models/query.py +++ b/models/query.py @@ -2,7 +2,7 @@ from pydantic import BaseModel -from models.ingest import EncoderEnum +from models.ingest import EncoderEnum, Encoder from models.vector_database import VectorDatabase @@ -10,7 +10,7 @@ class RequestPayload(BaseModel): input: str vector_database: VectorDatabase index_name: str - encoder: EncoderEnum = EncoderEnum.openai + encoder: Encoder class ResponseData(BaseModel): diff --git a/service/embedding.py b/service/embedding.py index 082a3a1b..25ad0481 100644 --- a/service/embedding.py +++ b/service/embedding.py @@ -11,21 +11,33 @@ from unstructured.documents.elements import Element from unstructured.partition.auto import partition -import encoders -from encoders import BaseEncoder + +from semantic_router.encoders import ( + BaseEncoder, + CohereEncoder, + OpenAIEncoder, + HuggingFaceEncoder, +) from models.document import BaseDocument, BaseDocumentChunk from models.file import File -from models.ingest import EncoderEnum +from models.ingest import EncoderEnum, Encoder from utils.logger import logger from utils.summarise import completion from vectordbs import get_vector_service class EmbeddingService: - def __init__(self, files: List[File], index_name: str, vector_credentials: dict): + def __init__( + self, + files: List[File], + index_name: str, + vector_credentials: dict, + dimensions: Optional[int], + ): self.files = files self.index_name = index_name self.vector_credentials = vector_credentials + self.dimensions = dimensions def _get_datasource_suffix(self, type: str) -> str: suffixes = { @@ -170,6 +182,7 @@ async def generate_embedding( index_name=index_name or self.index_name, credentials=self.vector_credentials, encoder=encoder, + dimensions=self.dimensions, ) try: await vector_service.upsert(chunks=chunks_with_embeddings) @@ -199,14 +212,15 @@ async def generate_summary_documents( return summary_documents -def get_encoder(*, encoder_type: EncoderEnum) -> encoders.BaseEncoder: +def get_encoder(*, encoder_config: Encoder) -> BaseEncoder: encoder_mapping = { - EncoderEnum.cohere: encoders.CohereEncoder, - EncoderEnum.openai: encoders.OpenAIEncoder, - EncoderEnum.huggingface: encoders.HuggingFaceEncoder, + EncoderEnum.cohere: CohereEncoder, + EncoderEnum.openai: OpenAIEncoder, + EncoderEnum.huggingface: HuggingFaceEncoder, } - - encoder_class = encoder_mapping.get(encoder_type) + encoder_provider = encoder_config.type + encoder = encoder_config.name + encoder_class = encoder_mapping.get(encoder_provider) if encoder_class is None: - raise ValueError(f"Unsupported encoder: {encoder_type}") - return encoder_class() + raise ValueError(f"Unsupported provider: {encoder_provider}") + return encoder_class(name=encoder) diff --git a/service/router.py b/service/router.py index 8573d48c..41f7619e 100644 --- a/service/router.py +++ b/service/router.py @@ -44,7 +44,7 @@ async def get_documents( async def query(payload: RequestPayload) -> list[BaseDocumentChunk]: rl = create_route_layer() decision = rl(payload.input).name - encoder = get_encoder(encoder_type=payload.encoder) + encoder = get_encoder(encoder_config=payload.encoder) if decision == "summarize": vector_service: BaseVectorDatabase = get_vector_service( diff --git a/vectordbs/__init__.py b/vectordbs/__init__.py index 3383dd25..970efbed 100644 --- a/vectordbs/__init__.py +++ b/vectordbs/__init__.py @@ -1,5 +1,8 @@ -from encoders import BaseEncoder -from encoders.openai import OpenAIEncoder +from typing import Optional + +from dotenv import load_dotenv +from semantic_router.encoders import BaseEncoder +from semantic_router.encoders.openai import OpenAIEncoder from models.vector_database import VectorDatabase from vectordbs.astra import AstraService from vectordbs.base import BaseVectorDatabase @@ -7,12 +10,15 @@ from vectordbs.qdrant import QdrantService from vectordbs.weaviate import WeaviateService +load_dotenv() + def get_vector_service( *, index_name: str, credentials: VectorDatabase, encoder: BaseEncoder = OpenAIEncoder(), + dimensions: Optional[int] = None, ) -> BaseVectorDatabase: services = { "pinecone": PineconeService, @@ -23,11 +29,13 @@ def get_vector_service( # e.g "weaviate": WeaviateVectorService, } service = services.get(credentials.type.value) + if service is None: raise ValueError(f"Unsupported provider: {credentials.type.value}") + return service( index_name=index_name, - dimension=encoder.dimension, + dimension=dimensions, credentials=dict(credentials.config), encoder=encoder, ) diff --git a/vectordbs/astra.py b/vectordbs/astra.py index 05e9f846..5a3785e8 100644 --- a/vectordbs/astra.py +++ b/vectordbs/astra.py @@ -3,7 +3,7 @@ from astrapy.db import AstraDB from tqdm import tqdm -from encoders.base import BaseEncoder +from semantic_router.encoders import BaseEncoder from models.document import BaseDocumentChunk from vectordbs.base import BaseVectorDatabase diff --git a/vectordbs/base.py b/vectordbs/base.py index 3931169a..b515faf7 100644 --- a/vectordbs/base.py +++ b/vectordbs/base.py @@ -4,7 +4,7 @@ from decouple import config from tqdm import tqdm -from encoders.base import BaseEncoder +from semantic_router.encoders import BaseEncoder from models.delete import DeleteResponse from models.document import BaseDocumentChunk from utils.logger import logger diff --git a/vectordbs/pinecone.py b/vectordbs/pinecone.py index 2ebf827e..0e1e77e9 100644 --- a/vectordbs/pinecone.py +++ b/vectordbs/pinecone.py @@ -3,7 +3,7 @@ from pinecone import Pinecone, ServerlessSpec from tqdm import tqdm -from encoders.base import BaseEncoder +from semantic_router.encoders import BaseEncoder from models.delete import DeleteResponse from models.document import BaseDocumentChunk from utils.logger import logger diff --git a/vectordbs/qdrant.py b/vectordbs/qdrant.py index 51438335..f70587ed 100644 --- a/vectordbs/qdrant.py +++ b/vectordbs/qdrant.py @@ -4,7 +4,7 @@ from qdrant_client.http import models as rest from tqdm import tqdm -from encoders.base import BaseEncoder +from semantic_router.encoders import BaseEncoder from models.document import BaseDocumentChunk from vectordbs.base import BaseVectorDatabase diff --git a/vectordbs/weaviate.py b/vectordbs/weaviate.py index 1516f54d..1c9f5262 100644 --- a/vectordbs/weaviate.py +++ b/vectordbs/weaviate.py @@ -4,7 +4,7 @@ import weaviate from tqdm import tqdm -from encoders.base import BaseEncoder +from semantic_router.encoders import BaseEncoder from models.delete import DeleteResponse from models.document import BaseDocumentChunk from utils.logger import logger