From 7cfa37d90d7bcdfe6d815d63c2fb92f7626643a7 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Tue, 3 Oct 2023 22:50:56 -0400 Subject: [PATCH] feat: insertion update index (#1246) Break the feature into multiple PRs. We can merge this PR after https://github.com/georgia-tech-db/evadb/pull/1244. --- docs/source/reference/evaql/insert.rst | 2 +- evadb.db | 0 evadb/executor/create_index_executor.py | 95 ++++++++++++++++++- evadb/executor/insert_executor.py | 5 + evadb/parser/create_index_statement.py | 8 +- .../long/test_create_index_executor.py | 1 - .../integration_tests/long/test_similarity.py | 24 +++++ 7 files changed, 127 insertions(+), 8 deletions(-) create mode 100644 evadb.db diff --git a/docs/source/reference/evaql/insert.rst b/docs/source/reference/evaql/insert.rst index 4d0914e8f..e29871f8b 100644 --- a/docs/source/reference/evaql/insert.rst +++ b/docs/source/reference/evaql/insert.rst @@ -15,7 +15,7 @@ MyVideo Table schema INSERT INTO TABLE ----------------- -Insert a tuple into a table. +Insert a tuple into a table. If there is an index built on the table, the index will be automatically updated. Currently, we only support index automatic update with FAISS and SQLite data. .. code:: text diff --git a/evadb.db b/evadb.db new file mode 100644 index 000000000..e69de29bb diff --git a/evadb/executor/create_index_executor.py b/evadb/executor/create_index_executor.py index ce640fe0a..05bc41884 100644 --- a/evadb/executor/create_index_executor.py +++ b/evadb/executor/create_index_executor.py @@ -49,6 +49,9 @@ def __init__(self, db: EvaDBDatabase, node: CreateIndexPlan): super().__init__(db, node) <<<<<<< HEAD +<<<<<<< HEAD +======= +>>>>>>> da04707f (feat: insertion update index (#1246)) self.name = self.node.name self.if_not_exists = self.node.if_not_exists self.table_ref = self.node.table_ref @@ -57,6 +60,7 @@ def __init__(self, db: EvaDBDatabase, node: CreateIndexPlan): self.project_expr_list = self.node.project_expr_list self.index_def = self.node.index_def +<<<<<<< HEAD def exec(self, *args, **kwargs): # Vector type specific creation. if self.vector_store_type == VectorStoreType.PGVECTOR: @@ -65,6 +69,11 @@ def exec(self, *args, **kwargs): # Vector type specific creation. if self.node.vector_store_type == VectorStoreType.PGVECTOR: >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) +======= + def exec(self, *args, **kwargs): + # Vector type specific creation. + if self.vector_store_type == VectorStoreType.PGVECTOR: +>>>>>>> da04707f (feat: insertion update index (#1246)) self._create_native_index() else: self._create_evadb_index() @@ -75,11 +84,15 @@ def exec(self, *args, **kwargs): # Create index through the native storage engine. def _create_native_index(self): +<<<<<<< HEAD <<<<<<< HEAD table = self.table_ref.table ======= table = self.node.table_ref.table >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) +======= + table = self.table_ref.table +>>>>>>> da04707f (feat: insertion update index (#1246)) db_catalog_entry = self.catalog().get_database_catalog_entry( table.database_name ) @@ -89,6 +102,7 @@ def _create_native_index(self): ) as handler: # As other libraries, we default to HNSW and L2 distance. resp = handler.execute_native_query( +<<<<<<< HEAD <<<<<<< HEAD f"""CREATE INDEX {self.name} ON {table.table_name} USING hnsw ({self.col_list[0].name} vector_l2_ops)""" @@ -96,6 +110,10 @@ def _create_native_index(self): f"""CREATE INDEX {self.node.name} ON {table.table_name} USING hnsw ({self.node.col_list[0].name} vector_l2_ops)""" >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) +======= + f"""CREATE INDEX {self.name} ON {table.table_name} + USING hnsw ({self.col_list[0].name} vector_l2_ops)""" +>>>>>>> da04707f (feat: insertion update index (#1246)) ) if resp.error is not None: raise ExecutorError( @@ -118,26 +136,70 @@ def _get_evadb_index_save_path(self) -> Path: # Create EvaDB index. def _create_evadb_index(self): <<<<<<< HEAD +<<<<<<< HEAD +======= +>>>>>>> da04707f (feat: insertion update index (#1246)) # Find function expression. function_expression, function_expression_signature = None, None for project_expr in self.project_expr_list: if isinstance(project_expr, FunctionExpression): function_expression = project_expr function_expression_signature = project_expr.signature() +<<<<<<< HEAD ======= if self.catalog().get_index_catalog_entry_by_name(self.node.name): msg = f"Index {self.node.name} already exists." if self.node.if_not_exists: logger.warn(msg) return +======= + + # Get feature tables. + feat_tb_catalog_entry = self.table_ref.table.table_obj + + # Get feature column. + feat_col_name = self.col_list[0].name + feat_col_catalog_entry = [ + col for col in feat_tb_catalog_entry.columns if col.name == feat_col_name + ][0] + + if function_expression is not None: + feat_col_name = function_expression.output_objs[0].name + + index_catalog_entry = self.catalog().get_index_catalog_entry_by_name(self.name) + index_path = self._get_evadb_index_save_path() + + if index_catalog_entry is not None: + msg = f"Index {self.name} already exists." + if self.if_not_exists: + if ( + index_catalog_entry.feat_column == feat_col_catalog_entry + and index_catalog_entry.function_signature + == function_expression_signature + and index_catalog_entry.type == self.node.vector_store_type + ): + # Only update index if everything matches. + logger.warn(msg + " It will be updated on existing table.") + index = VectorStoreFactory.init_vector_store( + self.vector_store_type, + self.name, + **handle_vector_store_params( + self.vector_store_type, index_path + ), + ) + else: + # Skip index update if CREATE INDEX runs on a different index. + logger.warn(msg) + return +>>>>>>> da04707f (feat: insertion update index (#1246)) else: logger.error(msg) raise ExecutorError(msg) - - index = None - index_path = self._get_evadb_index_save_path() + else: + index = None try: +<<<<<<< HEAD # Get feature tables. feat_catalog_entry = self.node.table_ref.table.table_obj >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) @@ -228,7 +290,10 @@ def _create_evadb_index(self): if function_expression is not None: feat_col_name = function_expression.output_objs[0].name +======= +>>>>>>> da04707f (feat: insertion update index (#1246)) # Add features to index. +<<<<<<< HEAD # TODO: batch size is hardcoded for now. input_dim = -1 <<<<<<< HEAD @@ -236,7 +301,12 @@ def _create_evadb_index(self): ======= <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD >>>>>>> a6ef863c (feat: create index from projection (#1244)) +======= +======= +>>>>>>> a3f66ab5 (feat: insertion update index (#1246)) +>>>>>>> da04707f (feat: insertion update index (#1246)) for input_batch in self.children[0].exec(): input_batch.drop_column_alias() feat = input_batch.column_as_numpy_array(feat_col_name) @@ -260,6 +330,7 @@ def _create_evadb_index(self): for i in range(len(input_batch)): row_feat = feat[i].reshape(1, -1) +<<<<<<< HEAD <<<<<<< HEAD # Create new index if not exists. @@ -271,14 +342,22 @@ def _create_evadb_index(self): **handle_vector_store_params( self.vector_store_type, index_path, self.catalog ======= +======= + + # Create new index if not exists. +>>>>>>> da04707f (feat: insertion update index (#1246)) if index is None: input_dim = row_feat.shape[1] index = VectorStoreFactory.init_vector_store( - self.node.vector_store_type, - self.node.name, + self.vector_store_type, + self.name, **handle_vector_store_params( +<<<<<<< HEAD self.node.vector_store_type, index_path >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) +======= + self.vector_store_type, index_path +>>>>>>> da04707f (feat: insertion update index (#1246)) ), ) index.create(input_dim) @@ -291,6 +370,9 @@ def _create_evadb_index(self): # Save to catalog. <<<<<<< HEAD +<<<<<<< HEAD +======= +>>>>>>> da04707f (feat: insertion update index (#1246)) if index_catalog_entry is None: self.catalog().insert_index_catalog_entry( self.name, @@ -300,6 +382,7 @@ def _create_evadb_index(self): function_expression_signature, self.index_def, ) +<<<<<<< HEAD ======= self.catalog().insert_index_catalog_entry( self.node.name, @@ -312,6 +395,8 @@ def _create_evadb_index(self): self.node.index_def, ) >>>>>>> 2dacff69 (feat: sync master staging (#1050)) +======= +>>>>>>> da04707f (feat: insertion update index (#1246)) except Exception as e: # Delete index. if index: diff --git a/evadb/executor/insert_executor.py b/evadb/executor/insert_executor.py index d2dccd96a..7a9805d0a 100644 --- a/evadb/executor/insert_executor.py +++ b/evadb/executor/insert_executor.py @@ -59,10 +59,15 @@ def exec(self, *args, **kwargs): for column in table_catalog_entry.columns: if column == index.feat_column: is_index_on_current_table = True +<<<<<<< HEAD break if is_index_on_current_table: create_index_query = index.index_def create_index_query_list = create_index_query.split(" ") +======= + if is_index_on_current_table: + create_index_query_list = index.index_def.split(" ") +>>>>>>> da04707f (feat: insertion update index (#1246)) if_not_exists = " ".join(create_index_query_list[2:5]).lower() if if_not_exists != "if not exists": create_index_query = ( diff --git a/evadb/parser/create_index_statement.py b/evadb/parser/create_index_statement.py index ed081b849..e73f99ea2 100644 --- a/evadb/parser/create_index_statement.py +++ b/evadb/parser/create_index_statement.py @@ -139,7 +139,13 @@ def __str__(self) -> str: if function_expr is None: print_str += f" ({self.col_list[0].name})" else: - print_str += f" ({function_expr.name}({self.col_list[0].name}))" + + def traverse_create_function_expression_str(expr): + if isinstance(expr, TupleValueExpression): + return f"{self.col_list[0].name}" + return f"{expr.name}({traverse_create_function_expression_str(expr.children[0])})" + + print_str += f" ({traverse_create_function_expression_str(function_expr)})" print_str += f" USING {self._vector_store_type};" <<<<<<< HEAD >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) diff --git a/test/integration_tests/long/test_create_index_executor.py b/test/integration_tests/long/test_create_index_executor.py index dd5d74d5c..f44ef8f82 100644 --- a/test/integration_tests/long/test_create_index_executor.py +++ b/test/integration_tests/long/test_create_index_executor.py @@ -134,7 +134,6 @@ def test_index_already_exist(self): @macos_skip_marker def test_should_create_index_faiss(self): query = "CREATE INDEX testCreateIndexName ON testCreateIndexFeatTable (feat) USING FAISS;" - execute_query_fetch_all(self.evadb, query) # Test index catalog. diff --git a/test/integration_tests/long/test_similarity.py b/test/integration_tests/long/test_similarity.py index e46719347..fc8d1f053 100644 --- a/test/integration_tests/long/test_similarity.py +++ b/test/integration_tests/long/test_similarity.py @@ -472,6 +472,30 @@ def test_index_auto_update_on_structured_table_during_insertion_with_faiss(self) ======= >>>>>>> 40a10ce1 (Bump v0.3.4+ dev) + def test_index_auto_update_on_structured_table_during_insertion_with_faiss(self): + create_query = "CREATE TABLE testIndexAutoUpdate (img_path TEXT(100))" + execute_query_fetch_all(self.evadb, create_query) + + for i, img_path in enumerate(self.img_path_list): + insert_query = ( + f"INSERT INTO testIndexAutoUpdate (img_path) VALUES ('{img_path}')" + ) + execute_query_fetch_all(self.evadb, insert_query) + if i == 0: + create_index_query = "CREATE INDEX testIndex ON testIndexAutoUpdate(DummyFeatureExtractor(Open(img_path))) USING FAISS" + execute_query_fetch_all(self.evadb, create_index_query) + + select_query = """SELECT _row_id FROM testIndexAutoUpdate + ORDER BY Similarity(DummyFeatureExtractor(Open("{}")), DummyFeatureExtractor(Open(img_path))) + LIMIT 1;""".format( + self.img_path + ) + explain_batch = execute_query_fetch_all(self.evadb, f"EXPLAIN {select_query}") + self.assertTrue("VectorIndexScan" in explain_batch.frames[0][0]) + + res_batch = execute_query_fetch_all(self.evadb, select_query) + self.assertEqual(res_batch.frames["testindexautoupdate._row_id"][0], 5) + @qdrant_skip_marker def test_end_to_end_index_scan_should_work_correctly_on_image_dataset_qdrant(self): for _ in range(2):