Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve create index #868

Closed
wants to merge 16 commits into from
10 changes: 5 additions & 5 deletions evadb/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ def _bind_create_index_statement(self, node: CreateIndexStatement):
# TODO: create index currently only works on TableInfo, but will extend later.
assert node.table_ref.is_table_atom(), "Index can only be created on Tableinfo"

self.bind(node.col_list[0])

if not node.udf_func:
# Feature table type needs to be float32 numpy array.
col_def = node.col_list[0]
table_ref_obj = node.table_ref.table.table_obj
col = [col for col in table_ref_obj.columns if col.name == col_def.name][0]
col_entry = node.col_list[0].col_object
assert (
col.array_type == NdArrayType.FLOAT32
col_entry.array_type == NdArrayType.FLOAT32
), "Index input needs to be float32."
assert len(col.array_dimensions) == 2
assert len(col_entry.array_dimensions) == 2
else:
# Output of the UDF should be 2 dimension and float32 type.
udf_obj = self._catalog().get_udf_catalog_entry_by_name(node.udf_func.name)
Expand Down
1 change: 1 addition & 0 deletions evadb/catalog/catalog_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class ImageColumnName(EvaDBEnum):

class DocumentColumnName(EvaDBEnum):
name # noqa: F821
chunk_id # noqa: F821
data # noqa: F821
metadata # noqa: F821

Expand Down
60 changes: 49 additions & 11 deletions evadb/catalog/catalog_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
UdfCacheCatalogEntry,
UdfCatalogEntry,
)
from evadb.catalog.sql_config import IDENTIFIER_COLUMN
from evadb.configuration.configuration_manager import ConfigurationManager
from evadb.expression.function_expression import FunctionExpression
from evadb.expression.tuple_value_expression import TupleValueExpression
Expand Down Expand Up @@ -119,8 +120,14 @@ def get_image_table_column_definitions() -> List[ColumnDefinition]:

def get_document_table_column_definitions() -> List[ColumnDefinition]:
"""
name: file path
data: file extracted data
name: file path
<<<<<<< HEAD
chunk_id: chunk id
data: file extracted data
=======
chunk_id: chunk id (0-indexed for each file)
data: text data associated with the chunk
>>>>>>> minor-fixes
"""
columns = [
ColumnDefinition(
Expand All @@ -130,6 +137,9 @@ def get_document_table_column_definitions() -> List[ColumnDefinition]:
None,
ColConstraintInfo(unique=True),
),
ColumnDefinition(
DocumentColumnName.chunk_id.name, ColumnType.INTEGER, None, None
),
ColumnDefinition(
DocumentColumnName.data.name,
ColumnType.TEXT,
Expand Down Expand Up @@ -161,17 +171,45 @@ def get_pdf_table_column_definitions() -> List[ColumnDefinition]:
return columns


def get_table_primary_columns(table_catalog_obj: TableCatalogEntry):
def get_table_primary_columns(
table_catalog_obj: TableCatalogEntry,
) -> List[ColumnDefinition]:
"""
Get the primary columns for a table based on its table type.

Args:
table_catalog_obj (TableCatalogEntry): The table catalog object.

Returns:
List[ColumnDefinition]: The list of primary columns for the table.
"""
primary_columns = [
ColumnDefinition(IDENTIFIER_COLUMN, ColumnType.INTEGER, None, None)
]
# _row_id for all the TableTypes, however for Video data and PDF data we also add frame_id (id) and paragraph as part of unique key
if table_catalog_obj.table_type == TableType.VIDEO_DATA:
return get_video_table_column_definitions()[:2]
elif table_catalog_obj.table_type == TableType.IMAGE_DATA:
return get_image_table_column_definitions()[:1]
elif table_catalog_obj.table_type == TableType.DOCUMENT_DATA:
return get_document_table_column_definitions()[:1]
# _row_id, id
primary_columns.append(
ColumnDefinition(VideoColumnName.id.name, ColumnType.INTEGER, None, None),
)

elif table_catalog_obj.table_type == TableType.PDF_DATA:
return get_pdf_table_column_definitions()[:3]
else:
raise Exception(f"Unexpected table type {table_catalog_obj.table_type}")
# _row_id, paragraph
primary_columns.append(
ColumnDefinition(
PDFColumnName.paragraph.name, ColumnType.INTEGER, None, None
)
)

elif table_catalog_obj.table_type == TableType.DOCUMENT_DATA:
# _row_id, chunk_id
primary_columns.append(
ColumnDefinition(
DocumentColumnName.chunk_id.name, ColumnType.INTEGER, None, None
)
)

return primary_columns


def xform_column_definitions_to_catalog_entries(
Expand Down
1 change: 1 addition & 0 deletions evadb/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@
IFRAMES = "IFRAMES"
AUDIORATE = "AUDIORATE"
DEFAULT_FUNCTION_EXPRESSION_COST = 100
MAGIC_NUMBER = 32000000000000 # 3.2e+13
37 changes: 7 additions & 30 deletions evadb/executor/create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@

import pandas as pd

from evadb.catalog.sql_config import IDENTIFIER_COLUMN
from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import ExecutorError, handle_vector_store_params
from evadb.models.storage.batch import Batch
from evadb.plan_nodes.create_index_plan import CreateIndexPlan
from evadb.storage.storage_engine import StorageEngine
from evadb.third_party.vector_stores.types import FeaturePayload
from evadb.third_party.vector_stores.utils import VectorStoreFactory
from evadb.utils.logging_manager import logger
Expand Down Expand Up @@ -59,38 +57,17 @@ def _get_index_save_path(self) -> Path:

def _create_index(self):
try:
# Get feature tables.
feat_catalog_entry = self.node.table_ref.table.table_obj

# Get feature column.
feat_col_name = self.node.col_list[0].name
feat_column = [
col for col in feat_catalog_entry.columns if col.name == feat_col_name
][0]
feat_column = self.node.col_list[0].col_object

# Add features to index.
# TODO: batch size is hardcoded for now.
input_dim = -1
storage_engine = StorageEngine.factory(self.db, feat_catalog_entry)
for input_batch in storage_engine.read(feat_catalog_entry):
if self.node.udf_func:
# Create index through UDF expression.
# UDF(input column) -> 2 dimension feature vector.
input_batch.modify_column_alias(feat_catalog_entry.name.lower())
feat_batch = self.node.udf_func.evaluate(input_batch)
feat_batch.drop_column_alias()
input_batch.drop_column_alias()
feat = feat_batch.column_as_numpy_array("features")
else:
# Create index on the feature table directly.
# Pandas wraps numpy array as an object inside a numpy
# array. Use zero index to get the actual numpy array.
feat = input_batch.column_as_numpy_array(feat_col_name)

row_id = input_batch.column_as_numpy_array(IDENTIFIER_COLUMN)
for input_batch in self.children[0].exec():
row_ids = input_batch.column_as_numpy_array(input_batch.columns[0])
features = input_batch.column_as_numpy_array(input_batch.columns[1])

for i in range(len(input_batch)):
row_feat = feat[i].reshape(1, -1)
for row_id, feat in zip(row_ids, features):
row_feat = feat.reshape(1, -1)
if self.index is None:
input_dim = row_feat.shape[1]
self.index = VectorStoreFactory.init_vector_store(
Expand All @@ -103,7 +80,7 @@ def _create_index(self):
self.index.create(input_dim)

# Row ID for mapping back to the row.
self.index.add([FeaturePayload(row_id[i], row_feat)])
self.index.add([FeaturePayload(row_id, row_feat)])

# Persist index.
self.index.persist()
Expand Down
6 changes: 6 additions & 0 deletions evadb/optimizer/rules/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,9 @@ def apply(self, before: LogicalJoin, context: OptimizerContext):
yield tracker


# Vector Index Queries


class CombineSimilarityOrderByAndLimitToVectorIndexScan(Rule):
"""
This rule currently rewrites Order By + Limit to a vector index scan.
Expand Down Expand Up @@ -767,6 +770,7 @@ def apply(self, before: LogicalCreateUDF, context: OptimizerContext):
class LogicalCreateIndexToVectorIndex(Rule):
def __init__(self):
pattern = Pattern(OperatorType.LOGICALCREATEINDEX)
pattern.append_child(Pattern(OperatorType.DUMMY))
super().__init__(RuleType.LOGICAL_CREATE_INDEX_TO_VECTOR_INDEX, pattern)

def promise(self):
Expand All @@ -783,6 +787,8 @@ def apply(self, before: LogicalCreateIndex, context: OptimizerContext):
before.vector_store_type,
before.udf_func,
)
for child in before.children:
after.append_child(child)
yield after


Expand Down
80 changes: 79 additions & 1 deletion evadb/optimizer/statement_to_opr_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from evadb.expression.abstract_expression import AbstractExpression
from evadb.catalog.catalog_type import TableType
from evadb.catalog.catalog_utils import get_table_primary_columns
from evadb.constants import MAGIC_NUMBER
from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
from evadb.expression.arithmetic_expression import ArithmeticExpression
from evadb.expression.constant_value_expression import ConstantValueExpression
from evadb.expression.tuple_value_expression import TupleValueExpression
from evadb.optimizer.operators import (
LogicalCreate,
LogicalCreateIndex,
Expand Down Expand Up @@ -313,13 +319,85 @@ def visit_explain(self, statement: ExplainStatement):
self._plan = explain_opr

def visit_create_index(self, statement: CreateIndexStatement):
# convert it into a following plan
# LogicalCreateIndex
# |
# LogicalProject
# |
# LogicalGet

table_ref = statement.table_ref
catalog_entry = table_ref.table.table_obj
logical_get = LogicalGet(table_ref, catalog_entry, table_ref.alias)
project_exprs = statement.col_list
# if there is a function expr, make col as its children
if statement.udf_func:
project_exprs = [statement.udf_func.copy()]

# We need to also store the primary keys of the table in the index to later do
# the mapping. Typically, the vector indexes support setting an integer ID with
# the embedding. Unfortunately, we cannot support multi-column primary keys for
# cases like video table and document table.
# Hack: In such cases, we convert them into a unique ID using the following
# logic: We assume that the maximum number of files in the table is <=
# MAGIC_NUMBER and the number of frames or chunks for each video/document is <=
# MAGIC_NUMBER. Based on this assumption, we can safely say that
# `_row_id` * MAGIC_NUMBER + `chunk_id` for document table and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you determine the magic number? And when will be the chunking be done?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, I set it to an arbitrarily large value. The idea is that as long as the number of files in the table is fewer than this number, our assumption will hold.

Reg chunking: It is done when we read the document similar to frame decoding in videos.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. And for video, each video will get a unique ID and each frame will be assigned a different frame ID? Is this assumption correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have these IDs (_row_id for video and id for frame). Building on top of it. The assumption is these ids won't change across runs. _row_id is persisted, so no issue there. id is generated at runtime, and as long as the reader is deterministic across runs, we don't have a problem.

# `_row_id` * MAGIC_NUMBER + `id`) for video table
# `_row_id` * MAGIC_NUMBER + `paragraph`) for PDF table
# will be unique.
def _build_expression(row_id_expr, id_expr):
left = ArithmeticExpression(
ExpressionType.ARITHMETIC_MULTIPLY,
row_id_expr,
ConstantValueExpression(MAGIC_NUMBER),
)
right = id_expr
return ArithmeticExpression(ExpressionType.ARITHMETIC_ADD, left, right)

primary_cols = get_table_primary_columns(catalog_entry)

# primary_col to TupleValueExpressions
primary_exprs = []
for col in primary_cols:
col_obj = next(
(obj for obj in catalog_entry.columns if obj.name == col.name), None
)
assert (
col_obj is not None
), f"Invalid Primary Column {col.name} for table {catalog_entry.name}"
primary_exprs.append(
TupleValueExpression(
col_name=col.name,
table_alias=table_ref.alias,
col_object=col_obj,
col_alias=f"{table_ref.alias}.{col.name}",
)
)

unique_col = primary_exprs[0]

if catalog_entry.table_type in [
TableType.VIDEO_DATA,
TableType.DOCUMENT_DATA,
TableType.PDF_DATA,
]:
unique_col = _build_expression(primary_exprs[0], primary_exprs[1])

project_exprs = [unique_col] + project_exprs

logical_project = LogicalProject(project_exprs)
logical_project.append_child(logical_get)

create_index_opr = LogicalCreateIndex(
statement.name,
statement.table_ref,
statement.col_list,
statement.vector_store_type,
statement.udf_func,
)
create_index_opr.append_child(logical_project)

self._plan = create_index_opr

def visit_delete(self, statement: DeleteTableStatement):
Expand Down
5 changes: 1 addition & 4 deletions evadb/parser/lark_visitor/_create_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,7 @@ def create_index(self, tree):
index_elem = index_elem.children[0]
index_elem = [index_elem]

col_list = [
ColumnDefinition(tv_expr.col_name, None, None, None)
for tv_expr in index_elem
]
col_list = index_elem

return CreateIndexStatement(
index_name, table_ref, col_list, vector_store_type, udf_func
Expand Down
6 changes: 4 additions & 2 deletions evadb/readers/document/document_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,7 @@ def _read(self) -> Iterator[Dict]:
)

for data in loader.load():
for row in langchain_text_splitter.split_documents([data]):
yield {"data": row.page_content}
for chunk_id, row in enumerate(
langchain_text_splitter.split_documents([data])
):
yield {"chunk_id": chunk_id, "data": row.page_content}
4 changes: 3 additions & 1 deletion evadb/storage/document_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class DocumentStorageEngine(AbstractMediaStorageEngine):
def __init__(self, db: EvaDBDatabase):
super().__init__(db)

def read(self, table: TableCatalogEntry, chunk_params: dict) -> Iterator[Batch]:
def read(
self, table: TableCatalogEntry, chunk_params: dict = {}
) -> Iterator[Batch]:
for doc_files in self._rdb_handler.read(self._get_metadata_table(table), 12):
for _, (row_id, file_name) in doc_files.iterrows():
system_file_name = self._xform_file_url_to_file_name(file_name)
Expand Down
4 changes: 4 additions & 0 deletions test/integration_tests/test_create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,7 @@ def test_should_create_index_with_udf(self):

# Cleanup.
self.evadb.catalog().drop_index_catalog_entry("testCreateIndexName")


if __name__ == "__main__":
unittest.main()