diff --git a/docs/source/reference/evaql/insert.rst b/docs/source/reference/evaql/insert.rst index 4d0914e8f2..e29871f8be 100644 --- a/docs/source/reference/evaql/insert.rst +++ b/docs/source/reference/evaql/insert.rst @@ -15,7 +15,7 @@ MyVideo Table schema INSERT INTO TABLE ----------------- -Insert a tuple into a table. +Insert a tuple into a table. If there is an index built on the table, the index will be automatically updated. Currently, we only support index automatic update with FAISS and SQLite data. .. code:: text diff --git a/evadb.db b/evadb.db new file mode 100644 index 0000000000..e69de29bb2 diff --git a/evadb/executor/create_index_executor.py b/evadb/executor/create_index_executor.py index 16b581e61b..4aca203f24 100644 --- a/evadb/executor/create_index_executor.py +++ b/evadb/executor/create_index_executor.py @@ -44,22 +44,28 @@ class CreateIndexExecutor(AbstractExecutor): def __init__(self, db: EvaDBDatabase, node: CreateIndexPlan): super().__init__(db, node) + self.name = self.node.name + self.if_not_exists = self.node.if_not_exists + self.table_ref = self.node.table_ref + self.col_list = self.node.col_list + self.vector_store_type = self.node.vector_store_type + self.project_expr_list = self.node.project_expr_list + self.index_def = self.node.index_def + def exec(self, *args, **kwargs): # Vector type specific creation. - if self.node.vector_store_type == VectorStoreType.PGVECTOR: + if self.vector_store_type == VectorStoreType.PGVECTOR: self._create_native_index() else: self._create_evadb_index() yield Batch( - pd.DataFrame( - [f"Index {self.node.name} successfully added to the database."] - ) + pd.DataFrame([f"Index {self.name} successfully added to the database."]) ) # Create index through the native storage engine. def _create_native_index(self): - table = self.node.table_ref.table + table = self.table_ref.table db_catalog_entry = self.catalog().get_database_catalog_entry( table.database_name ) @@ -69,8 +75,8 @@ def _create_native_index(self): ) as handler: # As other libraries, we default to HNSW and L2 distance. resp = handler.execute_native_query( - f"""CREATE INDEX {self.node.name} ON {table.table_name} - USING hnsw ({self.node.col_list[0].name} vector_l2_ops)""" + f"""CREATE INDEX {self.name} ON {table.table_name} + USING hnsw ({self.col_list[0].name} vector_l2_ops)""" ) if resp.error is not None: raise ExecutorError( @@ -83,48 +89,70 @@ def _get_evadb_index_save_path(self) -> Path: if not index_dir.exists(): index_dir.mkdir(parents=True, exist_ok=True) return str( - index_dir - / Path("{}_{}.index".format(self.node.vector_store_type, self.node.name)) + index_dir / Path("{}_{}.index".format(self.vector_store_type, self.name)) ) # Create EvaDB index. def _create_evadb_index(self): - if self.catalog().get_index_catalog_entry_by_name(self.node.name): - msg = f"Index {self.node.name} already exists." - if self.node.if_not_exists: - logger.warn(msg) - return + # Find function expression. + function_expression, function_expression_signature = None, None + for project_expr in self.project_expr_list: + if isinstance(project_expr, FunctionExpression): + function_expression = project_expr + function_expression_signature = project_expr.signature() + + # Get feature tables. + feat_tb_catalog_entry = self.table_ref.table.table_obj + + # Get feature column. + feat_col_name = self.col_list[0].name + feat_col_catalog_entry = [ + col for col in feat_tb_catalog_entry.columns if col.name == feat_col_name + ][0] + + if function_expression is not None: + feat_col_name = function_expression.output_objs[0].name + + index_catalog_entry = self.catalog().get_index_catalog_entry_by_name(self.name) + index_path = self._get_evadb_index_save_path() + + if index_catalog_entry is not None: + msg = f"Index {self.name} already exists." + if self.if_not_exists: + if ( + index_catalog_entry.feat_column == feat_col_catalog_entry + and index_catalog_entry.function_signature + == function_expression_signature + and index_catalog_entry.type == self.node.vector_store_type + ): + # Only update index if everything matches. + logger.warn(msg + " It will be updated on existing table.") + index = VectorStoreFactory.init_vector_store( + self.vector_store_type, + self.name, + **handle_vector_store_params( + self.vector_store_type, index_path + ), + ) + else: + # Skip index update if CREATE INDEX runs on a different index. + logger.warn(msg) + return else: logger.error(msg) raise ExecutorError(msg) - - index = None - index_path = self._get_evadb_index_save_path() + else: + index = None try: - # Get feature tables. - feat_catalog_entry = self.node.table_ref.table.table_obj - - # Get feature column. - feat_col_name = self.node.col_list[0].name - feat_column = [ - col for col in feat_catalog_entry.columns if col.name == feat_col_name - ][0] - - # Find function expression. - function_expression = None - for project_expr in self.node.project_expr_list: - if isinstance(project_expr, FunctionExpression): - function_expression = project_expr - - if function_expression is not None: - feat_col_name = function_expression.output_objs[0].name - # Add features to index. +<<<<<<< HEAD # TODO: batch size is hardcoded for now. input_dim = -1 <<<<<<< HEAD <<<<<<< HEAD +======= +>>>>>>> a3f66ab5 (feat: insertion update index (#1246)) for input_batch in self.children[0].exec(): input_batch.drop_column_alias() feat = input_batch.column_as_numpy_array(feat_col_name) @@ -161,13 +189,15 @@ def _create_evadb_index(self): for i in range(len(input_batch)): row_feat = feat[i].reshape(1, -1) + + # Create new index if not exists. if index is None: input_dim = row_feat.shape[1] index = VectorStoreFactory.init_vector_store( - self.node.vector_store_type, - self.node.name, + self.vector_store_type, + self.name, **handle_vector_store_params( - self.node.vector_store_type, index_path + self.vector_store_type, index_path ), ) index.create(input_dim) @@ -179,16 +209,15 @@ def _create_evadb_index(self): index.persist() # Save to catalog. - self.catalog().insert_index_catalog_entry( - self.node.name, - index_path, - self.node.vector_store_type, - feat_column, - function_expression.signature() - if function_expression is not None - else None, - self.node.index_def, - ) + if index_catalog_entry is None: + self.catalog().insert_index_catalog_entry( + self.name, + index_path, + self.vector_store_type, + feat_col_catalog_entry, + function_expression_signature, + self.index_def, + ) except Exception as e: # Delete index. if index: diff --git a/evadb/executor/insert_executor.py b/evadb/executor/insert_executor.py index 9bbaa57480..8e38aea918 100644 --- a/evadb/executor/insert_executor.py +++ b/evadb/executor/insert_executor.py @@ -53,6 +53,25 @@ def exec(self, *args, **kwargs): storage_engine = StorageEngine.factory(self.db, table_catalog_entry) storage_engine.write(table_catalog_entry, batch) + # Index update if there is an index built on the table. + for index in self.db.catalog().get_all_index_catalog_entries(): + is_index_on_current_table = False + for column in table_catalog_entry.columns: + if column == index.feat_column: + is_index_on_current_table = True + if is_index_on_current_table: + create_index_query_list = index.index_def.split(" ") + if_not_exists = " ".join(create_index_query_list[2:5]).lower() + if if_not_exists != "if not exists": + create_index_query = ( + " ".join(create_index_query_list[:2]) + + " IF NOT EXISTS " + + " ".join(create_index_query_list[2:]) + ) + from evadb.server.command_handler import execute_query_fetch_all + + execute_query_fetch_all(self.db, create_index_query) + yield Batch( pd.DataFrame([f"Number of rows loaded: {str(len(values_to_insert))}"]) ) diff --git a/evadb/parser/create_index_statement.py b/evadb/parser/create_index_statement.py index c05783e965..b561094a42 100644 --- a/evadb/parser/create_index_statement.py +++ b/evadb/parser/create_index_statement.py @@ -17,6 +17,7 @@ from evadb.catalog.catalog_type import VectorStoreType from evadb.expression.abstract_expression import AbstractExpression from evadb.expression.function_expression import FunctionExpression +from evadb.expression.tuple_value_expression import TupleValueExpression from evadb.parser.create_statement import ColumnDefinition from evadb.parser.statement import AbstractStatement from evadb.parser.table_ref import TableRef @@ -74,7 +75,13 @@ def __str__(self) -> str: if function_expr is None: print_str += f" ({self.col_list[0].name})" else: - print_str += f" ({function_expr.name}({self.col_list[0].name}))" + + def traverse_create_function_expression_str(expr): + if isinstance(expr, TupleValueExpression): + return f"{self.col_list[0].name}" + return f"{expr.name}({traverse_create_function_expression_str(expr.children[0])})" + + print_str += f" ({traverse_create_function_expression_str(function_expr)})" print_str += f" USING {self._vector_store_type};" <<<<<<< HEAD <<<<<<< HEAD diff --git a/evadb/third_party/vector_stores/faiss.py b/evadb/third_party/vector_stores/faiss.py index 4ec566d678..4f31f568ff 100644 --- a/evadb/third_party/vector_stores/faiss.py +++ b/evadb/third_party/vector_stores/faiss.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os from pathlib import Path from typing import List @@ -38,6 +39,16 @@ def __init__(self, index_name: str, index_path: str) -> None: self._index_path = index_path self._index = None + import faiss + + # Load index from disk if it exists. + self._existing_id_set = set([]) + if self._index is None and os.path.exists(self._index_path): + self._index = faiss.read_index(self._index_path) + # Get existing IDs. + for i in range(self._index.ntotal): + self._existing_id_set.add(self._index.id_map.at(i)) + def create(self, vector_dim: int): import faiss @@ -49,7 +60,8 @@ def add(self, payload: List[FeaturePayload]): embedding = np.array(row.embedding, dtype="float32") if len(embedding.shape) != 2: embedding = embedding.reshape(1, -1) - self._index.add_with_ids(embedding, np.array([row.id])) + if row.id not in self._existing_id_set: + self._index.add_with_ids(embedding, np.array([row.id])) def persist(self): assert self._index is not None, "Please create an index before calling persist." @@ -58,10 +70,6 @@ def persist(self): faiss.write_index(self._index, self._index_path) def query(self, query: VectorIndexQuery) -> VectorIndexQueryResult: - import faiss - - if self._index is None: - self._index = faiss.read_index(self._index_path) assert self._index is not None, "Cannot query as index does not exists." embedding = np.array(query.embedding, dtype="float32") if len(embedding.shape) != 2: diff --git a/test/integration_tests/long/test_create_index_executor.py b/test/integration_tests/long/test_create_index_executor.py index 804bb5b2b3..feabb5bff5 100644 --- a/test/integration_tests/long/test_create_index_executor.py +++ b/test/integration_tests/long/test_create_index_executor.py @@ -134,7 +134,6 @@ def test_index_already_exist(self): @macos_skip_marker def test_should_create_index_faiss(self): query = "CREATE INDEX testCreateIndexName ON testCreateIndexFeatTable (feat) USING FAISS;" - execute_query_fetch_all(self.evadb, query) # Test index catalog. diff --git a/test/integration_tests/long/test_similarity.py b/test/integration_tests/long/test_similarity.py index 81bbdc0080..8dc7a2e3cc 100644 --- a/test/integration_tests/long/test_similarity.py +++ b/test/integration_tests/long/test_similarity.py @@ -70,6 +70,10 @@ def setUp(self): feature_table_catalog_entry = self.evadb.catalog().get_table_catalog_entry( "testSimilarityFeatureTable" ) + + # Image list. + self.img_path_list = [] + storage_engine = StorageEngine.factory(self.evadb, base_table_catalog_entry) for i in range(5): storage_engine.write( @@ -115,6 +119,8 @@ def setUp(self): ) execute_query_fetch_all(self.evadb, load_image_query) + self.img_path_list.append(img_save_path) + base_img -= 1 # Set the env variables. @@ -402,6 +408,30 @@ def test_end_to_end_index_scan_should_work_correctly_on_image_dataset_faiss(self drop_query = "DROP INDEX testFaissIndexImageDataset" execute_query_fetch_all(self.evadb, drop_query) + def test_index_auto_update_on_structured_table_during_insertion_with_faiss(self): + create_query = "CREATE TABLE testIndexAutoUpdate (img_path TEXT(100))" + execute_query_fetch_all(self.evadb, create_query) + + for i, img_path in enumerate(self.img_path_list): + insert_query = ( + f"INSERT INTO testIndexAutoUpdate (img_path) VALUES ('{img_path}')" + ) + execute_query_fetch_all(self.evadb, insert_query) + if i == 0: + create_index_query = "CREATE INDEX testIndex ON testIndexAutoUpdate(DummyFeatureExtractor(Open(img_path))) USING FAISS" + execute_query_fetch_all(self.evadb, create_index_query) + + select_query = """SELECT _row_id FROM testIndexAutoUpdate + ORDER BY Similarity(DummyFeatureExtractor(Open("{}")), DummyFeatureExtractor(Open(img_path))) + LIMIT 1;""".format( + self.img_path + ) + explain_batch = execute_query_fetch_all(self.evadb, f"EXPLAIN {select_query}") + self.assertTrue("VectorIndexScan" in explain_batch.frames[0][0]) + + res_batch = execute_query_fetch_all(self.evadb, select_query) + self.assertEqual(res_batch.frames["testindexautoupdate._row_id"][0], 5) + @qdrant_skip_marker def test_end_to_end_index_scan_should_work_correctly_on_image_dataset_qdrant(self): for _ in range(2):