From 7a9e1a7d438141ddac7b6f4cfe3774025a21631e Mon Sep 17 00:00:00 2001 From: Gaurav Date: Wed, 14 Jun 2023 19:46:58 -0400 Subject: [PATCH 01/11] checkpoint --- evadb/catalog/catalog_type.py | 1 + evadb/catalog/catalog_utils.py | 10 ++++++- evadb/constants.py | 1 + evadb/optimizer/rules/rules.py | 1 + evadb/optimizer/statement_to_opr_converter.py | 27 ++++++++++++++++--- evadb/storage/document_storage_engine.py | 4 ++- 6 files changed, 39 insertions(+), 5 deletions(-) diff --git a/evadb/catalog/catalog_type.py b/evadb/catalog/catalog_type.py index e86a376b0..8ca674c55 100644 --- a/evadb/catalog/catalog_type.py +++ b/evadb/catalog/catalog_type.py @@ -123,6 +123,7 @@ class ImageColumnName(EvaDBEnum): class DocumentColumnName(EvaDBEnum): name # noqa: F821 + chunk_id # noqa: F821 data # noqa: F821 metadata # noqa: F821 diff --git a/evadb/catalog/catalog_utils.py b/evadb/catalog/catalog_utils.py index 250f98796..aff0691e1 100644 --- a/evadb/catalog/catalog_utils.py +++ b/evadb/catalog/catalog_utils.py @@ -31,6 +31,7 @@ UdfCacheCatalogEntry, UdfCatalogEntry, ) +from evadb.catalog.sql_config import IDENTIFIER_COLUMN from evadb.configuration.configuration_manager import ConfigurationManager from evadb.expression.function_expression import FunctionExpression from evadb.expression.tuple_value_expression import TupleValueExpression @@ -120,6 +121,7 @@ def get_image_table_column_definitions() -> List[ColumnDefinition]: def get_document_table_column_definitions() -> List[ColumnDefinition]: """ name: file path + chunk_id: chunk id data: file extracted data """ columns = [ @@ -130,6 +132,9 @@ def get_document_table_column_definitions() -> List[ColumnDefinition]: None, ColConstraintInfo(unique=True), ), + ColumnDefinition( + DocumentColumnName.chunk_id.name, ColumnType.INTEGER, None, None + ), ColumnDefinition( DocumentColumnName.data.name, ColumnType.TEXT, @@ -162,14 +167,17 @@ def get_pdf_table_column_definitions() -> List[ColumnDefinition]: def get_table_primary_columns(table_catalog_obj: TableCatalogEntry): + if table_catalog_obj.table_type == TableType.VIDEO_DATA: return get_video_table_column_definitions()[:2] elif table_catalog_obj.table_type == TableType.IMAGE_DATA: return get_image_table_column_definitions()[:1] elif table_catalog_obj.table_type == TableType.DOCUMENT_DATA: - return get_document_table_column_definitions()[:1] + return get_document_table_column_definitions()[:2] elif table_catalog_obj.table_type == TableType.PDF_DATA: return get_pdf_table_column_definitions()[:3] + elif table_catalog_obj.table_type == TableType.STRUCTURED_DATA: + return [ColumnDefinition(IDENTIFIER_COLUMN, ColumnType.INTEGER, None, None)] else: raise Exception(f"Unexpected table type {table_catalog_obj.table_type}") diff --git a/evadb/constants.py b/evadb/constants.py index 7c8a0a41c..fc9267287 100644 --- a/evadb/constants.py +++ b/evadb/constants.py @@ -21,3 +21,4 @@ IFRAMES = "IFRAMES" AUDIORATE = "AUDIORATE" DEFAULT_FUNCTION_EXPRESSION_COST = 100 +MAGIC_NUMBER = 32000000000000 # 3.2e+13 diff --git a/evadb/optimizer/rules/rules.py b/evadb/optimizer/rules/rules.py index d6fdf71b6..2b6fd718c 100644 --- a/evadb/optimizer/rules/rules.py +++ b/evadb/optimizer/rules/rules.py @@ -486,6 +486,7 @@ def apply(self, before: LogicalJoin, context: OptimizerContext): tracker.append_child(detector) yield tracker +# Vector Index Queries class CombineSimilarityOrderByAndLimitToVectorIndexScan(Rule): """ diff --git a/evadb/optimizer/statement_to_opr_converter.py b/evadb/optimizer/statement_to_opr_converter.py index 11c1508dd..53cdbd838 100644 --- a/evadb/optimizer/statement_to_opr_converter.py +++ b/evadb/optimizer/statement_to_opr_converter.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from evadb.expression.abstract_expression import AbstractExpression +from evadb.expression.arithmetic_expression import ArithmeticExpression from evadb.optimizer.operators import ( LogicalCreate, LogicalCreateIndex, @@ -313,13 +314,33 @@ def visit_explain(self, statement: ExplainStatement): self._plan = explain_opr def visit_create_index(self, statement: CreateIndexStatement): + # convert it into a following plan + # LogicalCreateIndex + # | + # LogicalProject + # | + # LogicalGet + table_ref = statement.table_ref + catalog_entry = table_ref.table.table_obj + logical_get = LogicalGet(table_ref, catalog_entry, table_ref.alias) + project_exprs = statement.col_list + # if there is a function expr, make col as its children + if statement.udf_func: + statement.udf_func.children = statement.col_list + project_exprs = [statement.udf_func] + + # hack: we also project the unique columns to identify the tuple + + ArithmeticExpression() + logical_project = LogicalProject(project_exprs) + logical_project.append_child(logical_get) + create_index_opr = LogicalCreateIndex( statement.name, - statement.table_ref, - statement.col_list, statement.vector_store_type, - statement.udf_func, ) + create_index_opr.append_child(logical_get) + self._plan = create_index_opr def visit_delete(self, statement: DeleteTableStatement): diff --git a/evadb/storage/document_storage_engine.py b/evadb/storage/document_storage_engine.py index 6d213bbe8..5f7ae0245 100644 --- a/evadb/storage/document_storage_engine.py +++ b/evadb/storage/document_storage_engine.py @@ -26,7 +26,9 @@ class DocumentStorageEngine(AbstractMediaStorageEngine): def __init__(self, db: EvaDBDatabase): super().__init__(db) - def read(self, table: TableCatalogEntry, chunk_params: dict) -> Iterator[Batch]: + def read( + self, table: TableCatalogEntry, chunk_params: dict = {} + ) -> Iterator[Batch]: for doc_files in self._rdb_handler.read(self._get_metadata_table(table), 12): for _, (row_id, file_name) in doc_files.iterrows(): system_file_name = self._xform_file_url_to_file_name(file_name) From 01dcab5faf1db55ae40e849aea10b1ab179ecbf0 Mon Sep 17 00:00:00 2001 From: Gaurav Date: Wed, 14 Jun 2023 21:15:25 -0400 Subject: [PATCH 02/11] fix: minor fix to the catalog utils. --- evadb/catalog/catalog_utils.py | 39 ++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/evadb/catalog/catalog_utils.py b/evadb/catalog/catalog_utils.py index 250f98796..1b4274cac 100644 --- a/evadb/catalog/catalog_utils.py +++ b/evadb/catalog/catalog_utils.py @@ -31,6 +31,7 @@ UdfCacheCatalogEntry, UdfCatalogEntry, ) +from evadb.catalog.sql_config import IDENTIFIER_COLUMN from evadb.configuration.configuration_manager import ConfigurationManager from evadb.expression.function_expression import FunctionExpression from evadb.expression.tuple_value_expression import TupleValueExpression @@ -161,17 +162,37 @@ def get_pdf_table_column_definitions() -> List[ColumnDefinition]: return columns -def get_table_primary_columns(table_catalog_obj: TableCatalogEntry): +def get_table_primary_columns( + table_catalog_obj: TableCatalogEntry, +) -> List[ColumnDefinition]: + """ + Get the primary columns for a table based on its table type. + + Args: + table_catalog_obj (TableCatalogEntry): The table catalog object. + + Returns: + List[ColumnDefinition]: The list of primary columns for the table. + """ + primary_columns = [ + ColumnDefinition(IDENTIFIER_COLUMN, ColumnType.INTEGER, None, None) + ] + # _row_id for all the TableTypes, however for Video data and PDF data we also add frame_id (id) and paragraph as part of unique key if table_catalog_obj.table_type == TableType.VIDEO_DATA: - return get_video_table_column_definitions()[:2] - elif table_catalog_obj.table_type == TableType.IMAGE_DATA: - return get_image_table_column_definitions()[:1] - elif table_catalog_obj.table_type == TableType.DOCUMENT_DATA: - return get_document_table_column_definitions()[:1] + # _row_id, id + primary_columns.append( + ColumnDefinition(VideoColumnName.id.name, ColumnType.INTEGER, None, None), + ) + elif table_catalog_obj.table_type == TableType.PDF_DATA: - return get_pdf_table_column_definitions()[:3] - else: - raise Exception(f"Unexpected table type {table_catalog_obj.table_type}") + # _row_id, paragraph + primary_columns.append( + ColumnDefinition( + PDFColumnName.paragraph.name, ColumnType.INTEGER, None, None + ) + ) + + return primary_columns def xform_column_definitions_to_catalog_entries( From 5b30fc61cb43bc22b82a085362fae32aabed1ab2 Mon Sep 17 00:00:00 2001 From: Gaurav Date: Wed, 14 Jun 2023 21:36:28 -0400 Subject: [PATCH 03/11] add chunk_id to document table --- evadb/catalog/catalog_type.py | 1 + evadb/catalog/catalog_utils.py | 6 +++++- evadb/readers/document/document_reader.py | 6 ++++-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/evadb/catalog/catalog_type.py b/evadb/catalog/catalog_type.py index e86a376b0..8ca674c55 100644 --- a/evadb/catalog/catalog_type.py +++ b/evadb/catalog/catalog_type.py @@ -123,6 +123,7 @@ class ImageColumnName(EvaDBEnum): class DocumentColumnName(EvaDBEnum): name # noqa: F821 + chunk_id # noqa: F821 data # noqa: F821 metadata # noqa: F821 diff --git a/evadb/catalog/catalog_utils.py b/evadb/catalog/catalog_utils.py index 1b4274cac..60dca9018 100644 --- a/evadb/catalog/catalog_utils.py +++ b/evadb/catalog/catalog_utils.py @@ -121,7 +121,8 @@ def get_image_table_column_definitions() -> List[ColumnDefinition]: def get_document_table_column_definitions() -> List[ColumnDefinition]: """ name: file path - data: file extracted data + chunk_id: chunk id (0-indexed for each file) + data: text data associated with the chunk """ columns = [ ColumnDefinition( @@ -131,6 +132,9 @@ def get_document_table_column_definitions() -> List[ColumnDefinition]: None, ColConstraintInfo(unique=True), ), + ColumnDefinition( + DocumentColumnName.chunk_id.name, ColumnType.INTEGER, None, None + ), ColumnDefinition( DocumentColumnName.data.name, ColumnType.TEXT, diff --git a/evadb/readers/document/document_reader.py b/evadb/readers/document/document_reader.py index 95da49d3a..9011424a1 100644 --- a/evadb/readers/document/document_reader.py +++ b/evadb/readers/document/document_reader.py @@ -45,5 +45,7 @@ def _read(self) -> Iterator[Dict]: ) for data in loader.load(): - for row in langchain_text_splitter.split_documents([data]): - yield {"data": row.page_content} + for chunk_id, row in enumerate( + langchain_text_splitter.split_documents([data]) + ): + yield {"chunk_id": chunk_id, "data": row.page_content} From 92b09f14785d59dc464518e6879a1fe24bb7cd7d Mon Sep 17 00:00:00 2001 From: Gaurav Date: Wed, 14 Jun 2023 21:40:33 -0400 Subject: [PATCH 04/11] minor fix --- evadb/catalog/catalog_utils.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/evadb/catalog/catalog_utils.py b/evadb/catalog/catalog_utils.py index 60dca9018..496da35eb 100644 --- a/evadb/catalog/catalog_utils.py +++ b/evadb/catalog/catalog_utils.py @@ -196,6 +196,14 @@ def get_table_primary_columns( ) ) + elif table_catalog_obj.table_type == TableType.DOCUMENT_DATA: + # _row_id, chunk_id + primary_columns.append( + ColumnDefinition( + DocumentColumnName.chunk_id.name, ColumnType.INTEGER, None, None + ) + ) + return primary_columns From c70d8c37d0930530f83556bd470e3a31cc504881 Mon Sep 17 00:00:00 2001 From: Gaurav Date: Thu, 15 Jun 2023 00:11:53 -0400 Subject: [PATCH 05/11] support creating index on doc tables, image tables, video tables --- evadb/binder/statement_binder.py | 10 +-- evadb/catalog/catalog_utils.py | 16 ++-- evadb/executor/create_index_executor.py | 37 ++------- evadb/optimizer/rules/rules.py | 5 ++ evadb/optimizer/statement_to_opr_converter.py | 75 ++++++++++++++++--- .../parser/lark_visitor/_create_statements.py | 5 +- .../test_create_index_executor.py | 4 + 7 files changed, 96 insertions(+), 56 deletions(-) diff --git a/evadb/binder/statement_binder.py b/evadb/binder/statement_binder.py index 9315136f2..f13344eb2 100644 --- a/evadb/binder/statement_binder.py +++ b/evadb/binder/statement_binder.py @@ -82,15 +82,15 @@ def _bind_create_index_statement(self, node: CreateIndexStatement): # TODO: create index currently only works on TableInfo, but will extend later. assert node.table_ref.is_table_atom(), "Index can only be created on Tableinfo" + self.bind(node.col_list[0]) + if not node.udf_func: # Feature table type needs to be float32 numpy array. - col_def = node.col_list[0] - table_ref_obj = node.table_ref.table.table_obj - col = [col for col in table_ref_obj.columns if col.name == col_def.name][0] + col_entry = node.col_list[0].col_object assert ( - col.array_type == NdArrayType.FLOAT32 + col_entry.array_type == NdArrayType.FLOAT32 ), "Index input needs to be float32." - assert len(col.array_dimensions) == 2 + assert len(col_entry.array_dimensions) == 2 else: # Output of the UDF should be 2 dimension and float32 type. udf_obj = self._catalog().get_udf_catalog_entry_by_name(node.udf_func.name) diff --git a/evadb/catalog/catalog_utils.py b/evadb/catalog/catalog_utils.py index 211d42605..8fac2bf61 100644 --- a/evadb/catalog/catalog_utils.py +++ b/evadb/catalog/catalog_utils.py @@ -120,14 +120,14 @@ def get_image_table_column_definitions() -> List[ColumnDefinition]: def get_document_table_column_definitions() -> List[ColumnDefinition]: """ - name: file path -<<<<<<< HEAD - chunk_id: chunk id - data: file extracted data -======= - chunk_id: chunk id (0-indexed for each file) - data: text data associated with the chunk ->>>>>>> minor-fixes + name: file path + <<<<<<< HEAD + chunk_id: chunk id + data: file extracted data + ======= + chunk_id: chunk id (0-indexed for each file) + data: text data associated with the chunk + >>>>>>> minor-fixes """ columns = [ ColumnDefinition( diff --git a/evadb/executor/create_index_executor.py b/evadb/executor/create_index_executor.py index c9fb87ce8..3ff4de2f5 100644 --- a/evadb/executor/create_index_executor.py +++ b/evadb/executor/create_index_executor.py @@ -16,13 +16,11 @@ import pandas as pd -from evadb.catalog.sql_config import IDENTIFIER_COLUMN from evadb.database import EvaDBDatabase from evadb.executor.abstract_executor import AbstractExecutor from evadb.executor.executor_utils import ExecutorError, handle_vector_store_params from evadb.models.storage.batch import Batch from evadb.plan_nodes.create_index_plan import CreateIndexPlan -from evadb.storage.storage_engine import StorageEngine from evadb.third_party.vector_stores.types import FeaturePayload from evadb.third_party.vector_stores.utils import VectorStoreFactory from evadb.utils.logging_manager import logger @@ -59,38 +57,17 @@ def _get_index_save_path(self) -> Path: def _create_index(self): try: - # Get feature tables. - feat_catalog_entry = self.node.table_ref.table.table_obj - # Get feature column. - feat_col_name = self.node.col_list[0].name - feat_column = [ - col for col in feat_catalog_entry.columns if col.name == feat_col_name - ][0] + feat_column = self.node.col_list[0].col_object # Add features to index. - # TODO: batch size is hardcoded for now. input_dim = -1 - storage_engine = StorageEngine.factory(self.db, feat_catalog_entry) - for input_batch in storage_engine.read(feat_catalog_entry): - if self.node.udf_func: - # Create index through UDF expression. - # UDF(input column) -> 2 dimension feature vector. - input_batch.modify_column_alias(feat_catalog_entry.name.lower()) - feat_batch = self.node.udf_func.evaluate(input_batch) - feat_batch.drop_column_alias() - input_batch.drop_column_alias() - feat = feat_batch.column_as_numpy_array("features") - else: - # Create index on the feature table directly. - # Pandas wraps numpy array as an object inside a numpy - # array. Use zero index to get the actual numpy array. - feat = input_batch.column_as_numpy_array(feat_col_name) - - row_id = input_batch.column_as_numpy_array(IDENTIFIER_COLUMN) + for input_batch in self.children[0].exec(): + row_ids = input_batch.column_as_numpy_array(input_batch.columns[0]) + features = input_batch.column_as_numpy_array(input_batch.columns[1]) - for i in range(len(input_batch)): - row_feat = feat[i].reshape(1, -1) + for row_id, feat in zip(row_ids, features): + row_feat = feat.reshape(1, -1) if self.index is None: input_dim = row_feat.shape[1] self.index = VectorStoreFactory.init_vector_store( @@ -103,7 +80,7 @@ def _create_index(self): self.index.create(input_dim) # Row ID for mapping back to the row. - self.index.add([FeaturePayload(row_id[i], row_feat)]) + self.index.add([FeaturePayload(row_id, row_feat)]) # Persist index. self.index.persist() diff --git a/evadb/optimizer/rules/rules.py b/evadb/optimizer/rules/rules.py index 2b6fd718c..d024bc99a 100644 --- a/evadb/optimizer/rules/rules.py +++ b/evadb/optimizer/rules/rules.py @@ -486,8 +486,10 @@ def apply(self, before: LogicalJoin, context: OptimizerContext): tracker.append_child(detector) yield tracker + # Vector Index Queries + class CombineSimilarityOrderByAndLimitToVectorIndexScan(Rule): """ This rule currently rewrites Order By + Limit to a vector index scan. @@ -768,6 +770,7 @@ def apply(self, before: LogicalCreateUDF, context: OptimizerContext): class LogicalCreateIndexToVectorIndex(Rule): def __init__(self): pattern = Pattern(OperatorType.LOGICALCREATEINDEX) + pattern.append_child(Pattern(OperatorType.DUMMY)) super().__init__(RuleType.LOGICAL_CREATE_INDEX_TO_VECTOR_INDEX, pattern) def promise(self): @@ -784,6 +787,8 @@ def apply(self, before: LogicalCreateIndex, context: OptimizerContext): before.vector_store_type, before.udf_func, ) + for child in before.children: + after.append_child(child) yield after diff --git a/evadb/optimizer/statement_to_opr_converter.py b/evadb/optimizer/statement_to_opr_converter.py index 53cdbd838..3504ceb5f 100644 --- a/evadb/optimizer/statement_to_opr_converter.py +++ b/evadb/optimizer/statement_to_opr_converter.py @@ -12,8 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from evadb.expression.abstract_expression import AbstractExpression +from evadb.catalog.catalog_type import TableType +from evadb.catalog.catalog_utils import get_table_primary_columns +from evadb.constants import MAGIC_NUMBER +from evadb.expression.abstract_expression import AbstractExpression, ExpressionType from evadb.expression.arithmetic_expression import ArithmeticExpression +from evadb.expression.constant_value_expression import ConstantValueExpression +from evadb.expression.tuple_value_expression import TupleValueExpression from evadb.optimizer.operators import ( LogicalCreate, LogicalCreateIndex, @@ -320,27 +325,79 @@ def visit_create_index(self, statement: CreateIndexStatement): # LogicalProject # | # LogicalGet + table_ref = statement.table_ref catalog_entry = table_ref.table.table_obj logical_get = LogicalGet(table_ref, catalog_entry, table_ref.alias) project_exprs = statement.col_list # if there is a function expr, make col as its children if statement.udf_func: - statement.udf_func.children = statement.col_list - project_exprs = [statement.udf_func] + project_exprs = [statement.udf_func.copy()] + + # We need to also store the primary keys of the table in the index to later do + # the mapping. Typically, the vector indexes support setting an integer ID with + # the embedding. Unfortunately, we cannot support multi-column primary keys for + # cases like video table and document table. + # Hack: In such cases, we convert them into a unique ID using the following + # logic: We assume that the maximum number of files in the table is <= + # MAGIC_NUMBER and the number of frames or chunks for each video/document is <= + # MAGIC_NUMBER. Based on this assumption, we can safely say that + # `_row_id` * MAGIC_NUMBER + `chunk_id` for document table and + # `_row_id` * MAGIC_NUMBER + `id`) for video table + # `_row_id` * MAGIC_NUMBER + `paragraph`) for PDF table + # will be unique. + def _build_expression(row_id_expr, id_expr): + left = ArithmeticExpression( + ExpressionType.ARITHMETIC_MULTIPLY, + row_id_expr, + ConstantValueExpression(MAGIC_NUMBER), + ) + right = id_expr + return ArithmeticExpression(ExpressionType.ARITHMETIC_ADD, left, right) + + primary_cols = get_table_primary_columns(catalog_entry) + + # primary_col to TupleValueExpressions + primary_exprs = [] + for col in primary_cols: + col_obj = next( + (obj for obj in catalog_entry.columns if obj.name == col.name), None + ) + assert ( + col_obj is not None + ), f"Invalid Primary Column {col.name} for table {catalog_entry.name}" + primary_exprs.append( + TupleValueExpression( + col_name=col.name, + table_alias=table_ref.alias, + col_object=col_obj, + col_alias=f"{table_ref.alias}.{col.name}", + ) + ) + + unique_col = primary_exprs[0] + + if catalog_entry.table_type in [ + TableType.VIDEO_DATA, + TableType.DOCUMENT_DATA, + TableType.PDF_DATA, + ]: + unique_col = _build_expression(primary_exprs[0], primary_exprs[1]) + + project_exprs = [unique_col] + project_exprs - # hack: we also project the unique columns to identify the tuple - - ArithmeticExpression() logical_project = LogicalProject(project_exprs) logical_project.append_child(logical_get) - + create_index_opr = LogicalCreateIndex( statement.name, + statement.table_ref, + statement.col_list, statement.vector_store_type, + statement.udf_func, ) - create_index_opr.append_child(logical_get) - + create_index_opr.append_child(logical_project) + self._plan = create_index_opr def visit_delete(self, statement: DeleteTableStatement): diff --git a/evadb/parser/lark_visitor/_create_statements.py b/evadb/parser/lark_visitor/_create_statements.py index 887a1323b..66114ef96 100644 --- a/evadb/parser/lark_visitor/_create_statements.py +++ b/evadb/parser/lark_visitor/_create_statements.py @@ -297,10 +297,7 @@ def create_index(self, tree): index_elem = index_elem.children[0] index_elem = [index_elem] - col_list = [ - ColumnDefinition(tv_expr.col_name, None, None, None) - for tv_expr in index_elem - ] + col_list = index_elem return CreateIndexStatement( index_name, table_ref, col_list, vector_store_type, udf_func diff --git a/test/integration_tests/test_create_index_executor.py b/test/integration_tests/test_create_index_executor.py index a64fb0e07..256dafe82 100644 --- a/test/integration_tests/test_create_index_executor.py +++ b/test/integration_tests/test_create_index_executor.py @@ -165,3 +165,7 @@ def test_should_create_index_with_udf(self): # Cleanup. self.evadb.catalog().drop_index_catalog_entry("testCreateIndexName") + + +if __name__ == "__main__": + unittest.main() From e2645439289003d55810df5ce3055b92029c7f5e Mon Sep 17 00:00:00 2001 From: Gaurav Date: Fri, 16 Jun 2023 08:59:37 -0400 Subject: [PATCH 06/11] checkpoint --- evadb/expression/arithmetic_expression.py | 3 ++ evadb/models/storage/batch.py | 13 ++++-- .../test_create_index_executor.py | 43 +++++++++++++++++-- 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/evadb/expression/arithmetic_expression.py b/evadb/expression/arithmetic_expression.py index 37c2c5a53..b0143caab 100644 --- a/evadb/expression/arithmetic_expression.py +++ b/evadb/expression/arithmetic_expression.py @@ -46,3 +46,6 @@ def __eq__(self, other): if not isinstance(other, ArithmeticExpression): return False return is_subtree_equal and self.etype == other.etype + + def __hash__(self) -> int: + return super().__hash__() diff --git a/evadb/models/storage/batch.py b/evadb/models/storage/batch.py index 515273948..c93710893 100644 --- a/evadb/models/storage/batch.py +++ b/evadb/models/storage/batch.py @@ -341,14 +341,19 @@ def combine_batches( """ Creates Batch by combining two batches using some arithmetic expression. """ + assert ( + len(first.columns) == 1 and len(second.columns) == 1 + ), "Arithmatic operations only supported on Batch with one column" + lvalues = first._frames.to_numpy() + rvalues = second._frames.to_numpy() if expression == ExpressionType.ARITHMETIC_ADD: - return Batch(pd.DataFrame(first._frames + second._frames)) + return Batch(pd.DataFrame(lvalues + rvalues)) elif expression == ExpressionType.ARITHMETIC_SUBTRACT: - return Batch(pd.DataFrame(first._frames - second._frames)) + return Batch(pd.DataFrame(lvalues - rvalues)) elif expression == ExpressionType.ARITHMETIC_MULTIPLY: - return Batch(pd.DataFrame(first._frames * second._frames)) + return Batch(pd.DataFrame(lvalues * rvalues)) elif expression == ExpressionType.ARITHMETIC_DIVIDE: - return Batch(pd.DataFrame(first._frames / second._frames)) + return Batch(pd.DataFrame(lvalues / rvalues)) def reassign_indices_to_hash(self, indices) -> None: """ diff --git a/test/integration_tests/test_create_index_executor.py b/test/integration_tests/test_create_index_executor.py index 256dafe82..7c6549132 100644 --- a/test/integration_tests/test_create_index_executor.py +++ b/test/integration_tests/test_create_index_executor.py @@ -14,9 +14,10 @@ # limitations under the License. import unittest from pathlib import Path +from evadb.configuration.constants import EvaDB_ROOT_DIR from test.markers import macos_skip_marker from test.util import get_evadb_for_testing, load_udfs_for_testing - +from evadb.udfs.udf_bootstrap_queries import Text_feat_udf_query import faiss import numpy as np import pandas as pd @@ -30,10 +31,10 @@ @pytest.mark.notparallel class CreateIndexTest(unittest.TestCase): - def _index_save_path(self): + def _index_save_path(self, name="testCreateIndexName"): return str( Path(self.evadb.config.get_value("storage", "index_dir")) - / Path("{}_{}.index".format("FAISS", "testCreateIndexName")) + / Path("{}_{}.index".format("FAISS", name)) ) @classmethod @@ -166,6 +167,42 @@ def test_should_create_index_with_udf(self): # Cleanup. self.evadb.catalog().drop_index_catalog_entry("testCreateIndexName") + def test_aashould_create_index_with_udf_on_doc_table(self): + pdf_path = f"{EvaDB_ROOT_DIR}/data/documents/pdf_sample1.pdf" + execute_query_fetch_all(self.evadb, f"LOAD DOCUMENT '{pdf_path}' INTO MyPDFs;") + execute_query_fetch_all(self.evadb, Text_feat_udf_query) + + self.evadb.config.update_value("experimental", "ray", False) + query = "CREATE INDEX doc_index ON MyPDFs (SentenceFeatureExtractor(data)) USING FAISS;" + execute_query_fetch_all(self.evadb, query) + + # Test index udf signature. + index_catalog_entry = self.evadb.catalog().get_index_catalog_entry_by_name( + "doc_index" + ) + self.assertEqual(index_catalog_entry.type, VectorStoreType.FAISS) + self.assertEqual( + index_catalog_entry.save_file_path, + self._index_save_path("doc_index"), + ) + + # Test referenced column. + input_table_entry = self.evadb.catalog().get_table_catalog_entry("doc_index") + input_column = [col for col in input_table_entry.columns if col.name == "data"][ + 0 + ] + self.assertEqual(index_catalog_entry.feat_column_id, input_column.row_id) + self.assertEqual(index_catalog_entry.feat_column, input_column) + + # Test on disk index. + # index = faiss.read_index(index_catalog_entry.save_file_path) + # distance, row_id = index.search(np.array([[0, 0, 0]]).astype(np.float32), 1) + # self.assertEqual(distance[0][0], 0) + # self.assertEqual(row_id[0][0], 1) + + # Cleanup. + self.evadb.catalog().drop_index_catalog_entry("doc_index") + if __name__ == "__main__": unittest.main() From 02ef4e3030c5db70e3de96b5e1096795b5f12fa8 Mon Sep 17 00:00:00 2001 From: Gaurav Date: Fri, 16 Jun 2023 15:20:02 -0400 Subject: [PATCH 07/11] create index story complete, now we support creating index directly on doc/video table --- evadb/executor/executor_utils.py | 40 ++++++++++------- evadb/executor/hash_join_executor.py | 6 +-- evadb/executor/lateral_join_executor.py | 4 +- evadb/executor/nested_loop_join_executor.py | 2 +- evadb/executor/predicate_executor.py | 2 +- evadb/executor/project_executor.py | 2 +- evadb/executor/seq_scan_executor.py | 4 +- .../test_create_index_executor.py | 45 ++++++++++++------- 8 files changed, 61 insertions(+), 44 deletions(-) diff --git a/evadb/executor/executor_utils.py b/evadb/executor/executor_utils.py index c635862a1..fda518dbb 100644 --- a/evadb/executor/executor_utils.py +++ b/evadb/executor/executor_utils.py @@ -15,7 +15,7 @@ import glob import os from pathlib import Path -from typing import TYPE_CHECKING, Generator, List +from typing import TYPE_CHECKING, Callable, Generator, List import cv2 @@ -36,38 +36,46 @@ class ExecutorError(Exception): pass +def _upsert_stats_related_to_function_expression( + expr: AbstractExpression, catalog: Callable +): + # try persisting stats for function expression + # avoiding raising error if it fails + try: + for func_expr in expr.find_all(FunctionExpression): + if func_expr.udf_obj and func_expr._stats: + udf_id = func_expr.udf_obj.row_id + catalog().upsert_udf_cost_catalog_entry( + udf_id, func_expr.udf_obj.name, func_expr._stats.prev_cost + ) + except Exception as e: + logger.warning(f"{str(e)} while upserting cost for {func_expr.name}") + + def apply_project( - batch: Batch, project_list: List[AbstractExpression], catalog: "CatalogManager" + batch: Batch, project_list: List[AbstractExpression], catalog: Callable ): if not batch.empty() and project_list: batches = [expr.evaluate(batch) for expr in project_list] batch = Batch.merge_column_wise(batches) - # persist stats of function expression + # try persisting stats for function expression for expr in project_list: - for func_expr in expr.find_all(FunctionExpression): - if func_expr.udf_obj and func_expr._stats: - udf_id = func_expr.udf_obj.row_id - catalog.upsert_udf_cost_catalog_entry( - udf_id, func_expr.udf_obj.name, func_expr._stats.prev_cost - ) + _upsert_stats_related_to_function_expression(expr, catalog) + return batch def apply_predicate( - batch: Batch, predicate: AbstractExpression, catalog: "CatalogManager" + batch: Batch, predicate: AbstractExpression, catalog: Callable ) -> Batch: if not batch.empty() and predicate is not None: outcomes = predicate.evaluate(batch) batch.drop_zero(outcomes) # persist stats of function expression - for func_expr in predicate.find_all(FunctionExpression): - if func_expr.udf_obj and func_expr._stats: - udf_id = func_expr.udf_obj.row_id - catalog.upsert_udf_cost_catalog_entry( - udf_id, func_expr.udf_obj.name, func_expr._stats.prev_cost - ) + _upsert_stats_related_to_function_expression(predicate, catalog) + return batch diff --git a/evadb/executor/hash_join_executor.py b/evadb/executor/hash_join_executor.py index d6edbc126..1ceda17df 100644 --- a/evadb/executor/hash_join_executor.py +++ b/evadb/executor/hash_join_executor.py @@ -38,8 +38,6 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]: probe_batch.reassign_indices_to_hash(hash_keys) join_batch = Batch.join(probe_batch, build_batch) join_batch.reset_index() - join_batch = apply_predicate(join_batch, self.predicate, self.catalog()) - join_batch = apply_project( - join_batch, self.join_project, self.catalog() - ) + join_batch = apply_predicate(join_batch, self.predicate, self.catalog) + join_batch = apply_project(join_batch, self.join_project, self.catalog) yield join_batch diff --git a/evadb/executor/lateral_join_executor.py b/evadb/executor/lateral_join_executor.py index a17155251..bec5e3adb 100644 --- a/evadb/executor/lateral_join_executor.py +++ b/evadb/executor/lateral_join_executor.py @@ -35,10 +35,10 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]: result_batch = Batch.join(outer_batch, result_batch) result_batch.reset_index() result_batch = apply_predicate( - result_batch, self.predicate, self.catalog() + result_batch, self.predicate, self.catalog ) result_batch = apply_project( - result_batch, self.join_project, self.catalog() + result_batch, self.join_project, self.catalog ) if not result_batch.empty(): yield result_batch diff --git a/evadb/executor/nested_loop_join_executor.py b/evadb/executor/nested_loop_join_executor.py index 7852b593d..18eda020d 100644 --- a/evadb/executor/nested_loop_join_executor.py +++ b/evadb/executor/nested_loop_join_executor.py @@ -34,7 +34,7 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]: result_batch = Batch.join(row1, row2) result_batch.reset_index() result_batch = apply_predicate( - result_batch, self.predicate, self.catalog() + result_batch, self.predicate, self.catalog ) if not result_batch.empty(): yield result_batch diff --git a/evadb/executor/predicate_executor.py b/evadb/executor/predicate_executor.py index 7ac7a402e..7431fbfc3 100644 --- a/evadb/executor/predicate_executor.py +++ b/evadb/executor/predicate_executor.py @@ -31,6 +31,6 @@ def __init__(self, db: EvaDBDatabase, node: PredicatePlan): def exec(self, *args, **kwargs) -> Iterator[Batch]: child_executor = self.children[0] for batch in child_executor.exec(**kwargs): - batch = apply_predicate(batch, self.predicate, self.catalog()) + batch = apply_predicate(batch, self.predicate, self.catalog) if not batch.empty(): yield batch diff --git a/evadb/executor/project_executor.py b/evadb/executor/project_executor.py index 7382fd0f5..f43f0cdc5 100644 --- a/evadb/executor/project_executor.py +++ b/evadb/executor/project_executor.py @@ -31,7 +31,7 @@ def __init__(self, db: EvaDBDatabase, node: ProjectPlan): def exec(self, *args, **kwargs) -> Iterator[Batch]: child_executor = self.children[0] for batch in child_executor.exec(**kwargs): - batch = apply_project(batch, self.target_list, self.catalog()) + batch = apply_project(batch, self.target_list, self.catalog) if not batch.empty(): yield batch diff --git a/evadb/executor/seq_scan_executor.py b/evadb/executor/seq_scan_executor.py index e79b52207..c1d3624bd 100644 --- a/evadb/executor/seq_scan_executor.py +++ b/evadb/executor/seq_scan_executor.py @@ -44,9 +44,9 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]: batch.modify_column_alias(self.alias) # We do the predicate first - batch = apply_predicate(batch, self.predicate, self.catalog()) + batch = apply_predicate(batch, self.predicate, self.catalog) # Then do project - batch = apply_project(batch, self.project_expr, self.catalog()) + batch = apply_project(batch, self.project_expr, self.catalog) if not batch.empty(): yield batch diff --git a/test/integration_tests/test_create_index_executor.py b/test/integration_tests/test_create_index_executor.py index 7c6549132..a7c2e2b84 100644 --- a/test/integration_tests/test_create_index_executor.py +++ b/test/integration_tests/test_create_index_executor.py @@ -14,19 +14,20 @@ # limitations under the License. import unittest from pathlib import Path -from evadb.configuration.constants import EvaDB_ROOT_DIR from test.markers import macos_skip_marker from test.util import get_evadb_for_testing, load_udfs_for_testing -from evadb.udfs.udf_bootstrap_queries import Text_feat_udf_query + import faiss import numpy as np import pandas as pd import pytest from evadb.catalog.catalog_type import VectorStoreType +from evadb.configuration.constants import EvaDB_ROOT_DIR from evadb.models.storage.batch import Batch from evadb.server.command_handler import execute_query_fetch_all from evadb.storage.storage_engine import StorageEngine +from evadb.udfs.udf_bootstrap_queries import Text_feat_udf_query @pytest.mark.notparallel @@ -168,8 +169,12 @@ def test_should_create_index_with_udf(self): self.evadb.catalog().drop_index_catalog_entry("testCreateIndexName") def test_aashould_create_index_with_udf_on_doc_table(self): - pdf_path = f"{EvaDB_ROOT_DIR}/data/documents/pdf_sample1.pdf" - execute_query_fetch_all(self.evadb, f"LOAD DOCUMENT '{pdf_path}' INTO MyPDFs;") + execute_query_fetch_all(self.evadb, "DROP TABLE IF EXISTS MYPDFs;") + pdf_path1 = f"{EvaDB_ROOT_DIR}/data/documents/pdf_sample1.pdf" + pdf_path2 = f"{EvaDB_ROOT_DIR}/data/documents/one_page.pdf" + execute_query_fetch_all(self.evadb, f"LOAD DOCUMENT '{pdf_path1}' INTO MyPDFs;") + execute_query_fetch_all(self.evadb, f"LOAD DOCUMENT '{pdf_path2}' INTO MyPDFs;") + execute_query_fetch_all(self.evadb, Text_feat_udf_query) self.evadb.config.update_value("experimental", "ray", False) @@ -186,22 +191,28 @@ def test_aashould_create_index_with_udf_on_doc_table(self): self._index_save_path("doc_index"), ) - # Test referenced column. - input_table_entry = self.evadb.catalog().get_table_catalog_entry("doc_index") - input_column = [col for col in input_table_entry.columns if col.name == "data"][ - 0 - ] - self.assertEqual(index_catalog_entry.feat_column_id, input_column.row_id) - self.assertEqual(index_catalog_entry.feat_column, input_column) + # get tuples in the MyPDFs + df = execute_query_fetch_all(self.evadb, "select * from MyPDFs;").frames + num_tuples_per_file = [len(df[df["mypdfs._row_id"] == i]) for i in range(2)] - # Test on disk index. - # index = faiss.read_index(index_catalog_entry.save_file_path) - # distance, row_id = index.search(np.array([[0, 0, 0]]).astype(np.float32), 1) - # self.assertEqual(distance[0][0], 0) - # self.assertEqual(row_id[0][0], 1) + # Test if index has correct number of tuples + index = faiss.read_index(index_catalog_entry.save_file_path) + self.assertEqual(index.ntotal, len(df)) + + # check if correct ids are added to the index + # Note: reconstruct might not work for all the FAISS index types + # https://github.com/facebookresearch/faiss/issues/1043 + from evadb.constants import MAGIC_NUMBER + + for i, num_tuples in enumerate(num_tuples_per_file): + for j in range(num_tuples): + idx = i * MAGIC_NUMBER + j + # this should not raise any exception + index.reconstruct(idx) # Cleanup. - self.evadb.catalog().drop_index_catalog_entry("doc_index") + execute_query_fetch_all(self.evadb, "DROP INDEX IF EXISTS doc_index;") + execute_query_fetch_all(self.evadb, "DROP TABLE IF EXISTS MYPDFs;") if __name__ == "__main__": From f5bf9c576a2cb9bcef8cb64a60255de9194fe28a Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Mon, 26 Jun 2023 22:54:38 -0700 Subject: [PATCH 08/11] fix paragraph id to be unique per PDF file --- evadb/readers/pdf_reader.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/evadb/readers/pdf_reader.py b/evadb/readers/pdf_reader.py index e1d805f47..95e99defe 100644 --- a/evadb/readers/pdf_reader.py +++ b/evadb/readers/pdf_reader.py @@ -34,10 +34,12 @@ def _read(self) -> Iterator[Dict]: doc = fitz.open(self.file_url) # PAGE ID, PARAGRAPH ID, STRING + # Maintain a global paragraph number per PDF + global_paragraph_no = 0 for page_no, page in enumerate(doc): blocks = page.get_text("dict")["blocks"] # iterate through the text blocks - for paragraph_no, b in enumerate(blocks): + for _, b in enumerate(blocks): # this block contains text if b["type"] == 0: # text found in block @@ -50,7 +52,8 @@ def _read(self) -> Iterator[Dict]: if span["text"].strip(): block_string += span["text"] yield { - "page": page_no + 1, - "paragraph": paragraph_no + 1, + "paragraph": global_paragraph_no, + "page": page_no, "data": block_string, } + global_paragraph_no += 1 From b10aab9bc67b26326d0f6f68a32e3aa3f86f084e Mon Sep 17 00:00:00 2001 From: Gaurav Date: Sun, 2 Jul 2023 14:57:41 -0400 Subject: [PATCH 09/11] updated design --- evadb/binder/statement_binder.py | 15 +--------- evadb/catalog/catalog_utils.py | 30 +++++++++---------- evadb/optimizer/statement_to_opr_converter.py | 30 +------------------ evadb/storage/document_storage_engine.py | 12 ++++++++ evadb/storage/pdf_storage_engine.py | 14 +++++++++ evadb/storage/video_storage_engine.py | 9 ++++++ 6 files changed, 51 insertions(+), 59 deletions(-) diff --git a/evadb/binder/statement_binder.py b/evadb/binder/statement_binder.py index 76451f6e1..3dadcd3bd 100644 --- a/evadb/binder/statement_binder.py +++ b/evadb/binder/statement_binder.py @@ -85,20 +85,7 @@ def _bind_create_index_statement(self, node: CreateIndexStatement): if not node.udf_func: # Feature table type needs to be float32 numpy array. - assert ( - len(node.col_list) == 1 - ), f"Index can be only created on one column, but instead {len(node.col_list)} are provided" - col_def = node.col_list[0] - - table_ref_obj = node.table_ref.table.table_obj - col_list = [ - col for col in table_ref_obj.columns if col.name == col_def.name - ] - assert ( - len(col_list) == 1 - ), f"Index is created on non-existent column {col_def.name}" - - col = col_list[0] + col_entry = node.col_list[0].col_object assert ( col_entry.array_type == NdArrayType.FLOAT32 ), "Index input needs to be float32." diff --git a/evadb/catalog/catalog_utils.py b/evadb/catalog/catalog_utils.py index 496da35eb..0c2e38517 100644 --- a/evadb/catalog/catalog_utils.py +++ b/evadb/catalog/catalog_utils.py @@ -70,12 +70,14 @@ def is_string_col(col: ColumnCatalogEntry): def get_video_table_column_definitions() -> List[ColumnDefinition]: """ + _id: unique id name: video path id: frame id data: frame data audio: frame audio """ columns = [ + ColumnDefinition("_id", ColumnType.INTEGER, None, None), ColumnDefinition( VideoColumnName.name.name, ColumnType.TEXT, @@ -120,11 +122,13 @@ def get_image_table_column_definitions() -> List[ColumnDefinition]: def get_document_table_column_definitions() -> List[ColumnDefinition]: """ + _id: unique id name: file path chunk_id: chunk id (0-indexed for each file) data: text data associated with the chunk """ columns = [ + ColumnDefinition("_id", ColumnType.INTEGER, None, None), ColumnDefinition( DocumentColumnName.name.name, ColumnType.TEXT, @@ -147,12 +151,14 @@ def get_document_table_column_definitions() -> List[ColumnDefinition]: def get_pdf_table_column_definitions() -> List[ColumnDefinition]: """ + _id: unique id name: pdf name page: page no paragraph: paragraph no data: pdf paragraph data """ columns = [ + ColumnDefinition("_id", ColumnType.INTEGER, None, None), ColumnDefinition(PDFColumnName.name.name, ColumnType.TEXT, None, None), ColumnDefinition(PDFColumnName.page.name, ColumnType.INTEGER, None, None), ColumnDefinition(PDFColumnName.paragraph.name, ColumnType.INTEGER, None, None), @@ -183,26 +189,18 @@ def get_table_primary_columns( ] # _row_id for all the TableTypes, however for Video data and PDF data we also add frame_id (id) and paragraph as part of unique key if table_catalog_obj.table_type == TableType.VIDEO_DATA: - # _row_id, id - primary_columns.append( - ColumnDefinition(VideoColumnName.id.name, ColumnType.INTEGER, None, None), - ) + # _row_id * MAGIC_NUMBER + id + primary_columns = [ + ColumnDefinition("_id", ColumnType.INTEGER, None, None), + ] elif table_catalog_obj.table_type == TableType.PDF_DATA: - # _row_id, paragraph - primary_columns.append( - ColumnDefinition( - PDFColumnName.paragraph.name, ColumnType.INTEGER, None, None - ) - ) + # _row_id * MAGIC_NUMBER + page * MAGIC_NUMBER + paragraph + primary_columns = [ColumnDefinition("_id", ColumnType.INTEGER, None, None)] elif table_catalog_obj.table_type == TableType.DOCUMENT_DATA: - # _row_id, chunk_id - primary_columns.append( - ColumnDefinition( - DocumentColumnName.chunk_id.name, ColumnType.INTEGER, None, None - ) - ) + # _row_id * MAGIC_NUMBER + chunk_id + primary_columns = [ColumnDefinition("_id", ColumnType.INTEGER, None, None)] return primary_columns diff --git a/evadb/optimizer/statement_to_opr_converter.py b/evadb/optimizer/statement_to_opr_converter.py index f13d595f4..cec8ecc60 100644 --- a/evadb/optimizer/statement_to_opr_converter.py +++ b/evadb/optimizer/statement_to_opr_converter.py @@ -319,31 +319,10 @@ def visit_create_index(self, statement: CreateIndexStatement): catalog_entry = table_ref.table.table_obj logical_get = LogicalGet(table_ref, catalog_entry, table_ref.alias) project_exprs = statement.col_list - # if there is a function expr, make col as its children + if statement.udf_func: project_exprs = [statement.udf_func.copy()] - # We need to also store the primary keys of the table in the index to later do - # the mapping. Typically, the vector indexes support setting an integer ID with - # the embedding. Unfortunately, we cannot support multi-column primary keys for - # cases like video table and document table. - # Hack: In such cases, we convert them into a unique ID using the following - # logic: We assume that the maximum number of files in the table is <= - # MAGIC_NUMBER and the number of frames or chunks for each video/document is <= - # MAGIC_NUMBER. Based on this assumption, we can safely say that - # `_row_id` * MAGIC_NUMBER + `chunk_id` for document table and - # `_row_id` * MAGIC_NUMBER + `id`) for video table - # `_row_id` * MAGIC_NUMBER + `paragraph`) for PDF table - # will be unique. - def _build_expression(row_id_expr, id_expr): - left = ArithmeticExpression( - ExpressionType.ARITHMETIC_MULTIPLY, - row_id_expr, - ConstantValueExpression(MAGIC_NUMBER), - ) - right = id_expr - return ArithmeticExpression(ExpressionType.ARITHMETIC_ADD, left, right) - primary_cols = get_table_primary_columns(catalog_entry) # primary_col to TupleValueExpressions @@ -366,13 +345,6 @@ def _build_expression(row_id_expr, id_expr): unique_col = primary_exprs[0] - if catalog_entry.table_type in [ - TableType.VIDEO_DATA, - TableType.DOCUMENT_DATA, - TableType.PDF_DATA, - ]: - unique_col = _build_expression(primary_exprs[0], primary_exprs[1]) - project_exprs = [unique_col] + project_exprs logical_project = LogicalProject(project_exprs) diff --git a/evadb/storage/document_storage_engine.py b/evadb/storage/document_storage_engine.py index 5f7ae0245..c28d8f7d1 100644 --- a/evadb/storage/document_storage_engine.py +++ b/evadb/storage/document_storage_engine.py @@ -16,6 +16,7 @@ from typing import Iterator from evadb.catalog.models.table_catalog import TableCatalogEntry +from evadb.constants import MAGIC_NUMBER from evadb.database import EvaDBDatabase from evadb.models.storage.batch import Batch from evadb.readers.document.document_reader import DocumentReader @@ -37,7 +38,18 @@ def read( reader = DocumentReader( str(doc_file), batch_mem_size=1, chunk_params=chunk_params ) + for batch in reader.read(): batch.frames[table.columns[0].name] = row_id batch.frames[table.columns[1].name] = str(file_name) + + # To create a distinct identifier, we use the following logic. + # Assuming the total number of entries in the table is less than a + # specified constant, referred to as MAGIC_NUMBER. Under this + # assumption, we can safely conclude that the expression + # `row_id * MAGIC_NUMBER + chunk_id` will yield a unique value. + + batch.frames["_id"] = ( + row_id * MAGIC_NUMBER + batch.frames["chunk_id"] + ) yield batch diff --git a/evadb/storage/pdf_storage_engine.py b/evadb/storage/pdf_storage_engine.py index 79f9db50a..771cc9652 100644 --- a/evadb/storage/pdf_storage_engine.py +++ b/evadb/storage/pdf_storage_engine.py @@ -16,6 +16,7 @@ from typing import Iterator from evadb.catalog.models.table_catalog import TableCatalogEntry +from evadb.constants import MAGIC_NUMBER from evadb.database import EvaDBDatabase from evadb.models.storage.batch import Batch from evadb.readers.pdf_reader import PDFReader @@ -36,4 +37,17 @@ def read(self, table: TableCatalogEntry) -> Iterator[Batch]: for batch in reader.read(): batch.frames[table.columns[0].name] = row_id batch.frames[table.columns[1].name] = str(file_name) + + # To create a distinct identifier, we use the following logic. + # Assuming the total number of entries in the table is less than a + # specified constant, referred to as MAGIC_NUMBER and the maximum + # number of paragraphs in a page is also less than or equal to + # MAGIC_NUMBER. Under this assumption, we can safely conclude that + # the expression `row_id * MAGIC_NUMBER + page_no * MAGIC_NUMBER + + # paragraph` will yield a unique value. + batch.frames["_id"] = ( + row_id * MAGIC_NUMBER + + batch.frames["page"] * MAGIC_NUMBER + + batch.frames["paragraph"] + ) yield batch diff --git a/evadb/storage/video_storage_engine.py b/evadb/storage/video_storage_engine.py index 142005f71..35be1cb2b 100644 --- a/evadb/storage/video_storage_engine.py +++ b/evadb/storage/video_storage_engine.py @@ -17,6 +17,7 @@ from typing import Iterator from evadb.catalog.models.table_catalog import TableCatalogEntry +from evadb.constants import MAGIC_NUMBER from evadb.database import EvaDBDatabase from evadb.expression.abstract_expression import AbstractExpression from evadb.models.storage.batch import Batch @@ -58,4 +59,12 @@ def read( for batch in reader.read(): batch.frames[table.columns[0].name] = row_id batch.frames[table.columns[1].name] = str(video_file_name) + + # To create a distinct identifier, we use the following logic. + # Assuming the total number of entries in the table is less than a + # specified constant, referred to as MAGIC_NUMBER. Under this + # assumption, we can safely conclude that the expression + # `row_id * MAGIC_NUMBER + id` will yield a unique value. + + batch.frames["_id"] = row_id * MAGIC_NUMBER + batch.frames["id"] yield batch From 3c3c002f2bd164313b10fd01f23c05c8fddf2735 Mon Sep 17 00:00:00 2001 From: Gaurav Date: Sun, 2 Jul 2023 14:58:52 -0400 Subject: [PATCH 10/11] revert changes --- evadb/readers/pdf_reader.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/evadb/readers/pdf_reader.py b/evadb/readers/pdf_reader.py index a7235abb5..e102e393f 100644 --- a/evadb/readers/pdf_reader.py +++ b/evadb/readers/pdf_reader.py @@ -35,12 +35,10 @@ def _read(self) -> Iterator[Dict]: doc = fitz.open(self.file_url) # PAGE ID, PARAGRAPH ID, STRING - # Maintain a global paragraph number per PDF - global_paragraph_no = 0 for page_no, page in enumerate(doc): blocks = page.get_text("dict")["blocks"] # iterate through the text blocks - for _, b in enumerate(blocks): + for paragraph_no, b in enumerate(blocks): # this block contains text if b["type"] == 0: # text found in block @@ -53,8 +51,7 @@ def _read(self) -> Iterator[Dict]: if span["text"].strip(): block_string += span["text"] yield { - "paragraph": global_paragraph_no, - "page": page_no, + "page": page_no + 1, + "paragraph": paragraph_no + 1, "data": block_string, } - global_paragraph_no += 1 From 64f7ed15da98ba3beeba177373b8f8982a70565f Mon Sep 17 00:00:00 2001 From: Gaurav Date: Sun, 2 Jul 2023 15:00:49 -0400 Subject: [PATCH 11/11] ran formatter --- evadb/optimizer/statement_to_opr_converter.py | 6 +----- test/integration_tests/test_create_index_executor.py | 2 ++ 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/evadb/optimizer/statement_to_opr_converter.py b/evadb/optimizer/statement_to_opr_converter.py index cec8ecc60..6a7dd15f8 100644 --- a/evadb/optimizer/statement_to_opr_converter.py +++ b/evadb/optimizer/statement_to_opr_converter.py @@ -12,12 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from evadb.catalog.catalog_type import TableType from evadb.catalog.catalog_utils import get_table_primary_columns -from evadb.constants import MAGIC_NUMBER -from evadb.expression.abstract_expression import AbstractExpression, ExpressionType -from evadb.expression.arithmetic_expression import ArithmeticExpression -from evadb.expression.constant_value_expression import ConstantValueExpression +from evadb.expression.abstract_expression import AbstractExpression from evadb.expression.tuple_value_expression import TupleValueExpression from evadb.optimizer.operators import ( LogicalCreate, diff --git a/test/integration_tests/test_create_index_executor.py b/test/integration_tests/test_create_index_executor.py index 8bf2bb7e6..edf31d3cc 100644 --- a/test/integration_tests/test_create_index_executor.py +++ b/test/integration_tests/test_create_index_executor.py @@ -17,6 +17,7 @@ from test.markers import macos_skip_marker from test.util import get_evadb_for_testing, load_udfs_for_testing +import faiss import numpy as np import pandas as pd import pytest @@ -26,6 +27,7 @@ from evadb.models.storage.batch import Batch from evadb.server.command_handler import execute_query_fetch_all from evadb.storage.storage_engine import StorageEngine +from evadb.udfs.udf_bootstrap_queries import Text_feat_udf_query from evadb.utils.generic_utils import try_to_import_faiss