Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
clemlesne committed Aug 17, 2024
2 parents f23ec4e + e788b9f commit 4450ba0
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 89 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ Scraper:
Indexer:

- [x] AI Search index is created automatically
- [x] Chunck markdown while keeping the content coherent
- [x] Embed chuncks with OpenAI embeddings
- [x] Chunk markdown while keeping the content coherent
- [x] Embed chunks with OpenAI embeddings
- [x] Indexed content is semantically searchable with [Azure AI Search](https://learn.microsoft.com/en-us/azure/search)

## How to use
Expand Down Expand Up @@ -156,7 +156,7 @@ graph LR
web["Website"]
subgraph "Azure Queue Storage"
to_chunck["To chunck"]
to_chunk["To chunk"]
to_scrape["To scrape"]
end
Expand All @@ -174,7 +174,7 @@ graph LR
cli -- 4. Update cache --> scraped
cli -- 5. Push state --> state
cli -- 6. Add message --> to_scrape
cli -- 7. Add message --> to_chunck
cli -- 7. Add message --> to_chunk
cli -- 8. Update state --> job
```

Expand All @@ -187,7 +187,7 @@ graph LR
embeddings["Azure OpenAI Embeddings"]
subgraph "Azure Queue Storage"
to_chunck["To chunck"]
to_chunk["To chunk"]
end
subgraph "Azure Blob Storage"
Expand All @@ -196,7 +196,7 @@ graph LR
end
end
cli -- 1. Pull message --> to_chunck
cli -- 1. Pull message --> to_chunk
cli -- 2. Get cache --> scraped
cli -- 3. Chunk --> cli
cli -- 4. Embed --> embeddings
Expand Down
22 changes: 12 additions & 10 deletions app/helpers/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
from azure.core.exceptions import ResourceExistsError
from azure.search.documents.aio import SearchClient
from azure.search.documents.indexes.aio import SearchIndexClient
from azure.storage.blob.aio import BlobServiceClient, ContainerClient
from azure.storage.queue.aio import QueueClient, QueueServiceClient
from openai import AsyncAzureOpenAI
from azure.search.documents.indexes.models import (
AzureOpenAIParameters,
AzureOpenAIVectorizer,
Expand All @@ -21,6 +18,9 @@
VectorSearch,
VectorSearchProfile,
)
from azure.storage.blob.aio import BlobServiceClient, ContainerClient
from azure.storage.queue.aio import QueueClient, QueueServiceClient
from openai import AsyncAzureOpenAI

from app.helpers.logging import logger

Expand Down Expand Up @@ -105,9 +105,9 @@ async def search_client(
deployment_id=azure_openai_embedding_deployment,
model_name=azure_openai_embedding_model,
resource_uri=azure_openai_endpoint,
)
),
)
]
],
)

# Create index if it does not exist
Expand All @@ -122,11 +122,13 @@ async def search_client(
credential=AzureKeyCredential(api_key),
) as client:
try:
await client.create_index(SearchIndex(
fields=fields,
name=index,
vector_search=vector_search,
))
await client.create_index(
SearchIndex(
fields=fields,
name=index,
vector_search=vector_search,
)
)
logger.info('Created Search "%s"', index)
except ResourceExistsError:
pass
Expand Down
37 changes: 21 additions & 16 deletions app/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
queue_client,
search_client,
)
from app.helpers.resources import index_queue_name, hash_url, scrape_container_name, index_index_name
from app.helpers.resources import (
hash_url,
index_index_name,
index_queue_name,
scrape_container_name,
)
from app.helpers.threading import run_workers
from app.models.indexed import IndexedIngestModel
from app.models.scraped import ScrapedUrlModel
Expand Down Expand Up @@ -70,13 +75,13 @@ async def _process_one(
logger.info("%s data is invalid (code %i)", short_name, result.status)
return

# Chunck to small markdown files
chuncks = _markdown_chunck(
# Chunk to small markdown files
chunks = _markdown_chunk(
max_tokens=800,
text=result.content,
)
doc_ids = [f"{hash_url(result.url)}-{i}" for i in range(len(chuncks))]
logger.info("%s chunked into %i parts", short_name, len(chuncks))
doc_ids = [f"{hash_url(result.url)}-{i}" for i in range(len(chunks))]
logger.info("%s chunked into %i parts", short_name, len(chunks))

# Check if the document is already indexed
try:
Expand All @@ -90,14 +95,14 @@ async def _process_one(
return
except (
ResourceNotFoundError
): # If a chunck is not found, it is not indexed, thus we can re-process the document
): # If a chunk is not found, it is not indexed, thus we can re-process the document
pass

# Generate the embeddings by block (mitigate API throughput limits)
embeddings = []
chunks_size = 10
for i in range(0, len(chuncks), chunks_size):
chunk_input = chuncks[i : i + chunks_size]
for i in range(0, len(chunks), chunks_size):
chunk_input = chunks[i : i + chunks_size]
res = await _embeddings(
embedding_deployment=embedding_deployment,
embedding_dimensions=embedding_dimensions,
Expand All @@ -114,7 +119,7 @@ async def _process_one(
url=result.url,
vectors=embedding.embedding,
)
for doc_id, content, embedding in zip(doc_ids, chuncks, embeddings)
for doc_id, content, embedding in zip(doc_ids, chunks, embeddings)
]

# Index the documents
Expand Down Expand Up @@ -176,7 +181,7 @@ async def _embeddings(
)


def _markdown_chunck(
def _markdown_chunk(
max_tokens: int,
text: str,
) -> list[str]:
Expand Down Expand Up @@ -262,19 +267,19 @@ def _rebuild_headings() -> str:
current_chunk.splitlines()[: -(to_remove + 1)]
).strip()

# Chunck if is still too big
# Chunk if is still too big
current_cleaned_count = math.ceil(_count_tokens(current_cleaned) / max_tokens)
current_cleaned_chunck_size = math.ceil(
current_cleaned_chunk_size = math.ceil(
len(current_cleaned) / current_cleaned_count
)
for i in range(current_cleaned_count): # Iterate over the chunks
chunck_content = current_cleaned[
i * current_cleaned_chunck_size : (i + 1) * current_cleaned_chunck_size
chunk_content = current_cleaned[
i * current_cleaned_chunk_size : (i + 1) * current_cleaned_chunk_size
]
if i == 0: # Headings only on the first chunk
contents.append(chunck_content)
contents.append(chunk_content)
else: # Re-apply the last heading to the next chunk
contents.append(_rebuild_headings() + chunck_content)
contents.append(_rebuild_headings() + chunk_content)

return _rebuild_headings()

Expand Down
Loading

0 comments on commit 4450ba0

Please sign in to comment.