Skip to content

Commit

Permalink
feat: insertion update index (georgia-tech-db#1246)
Browse files Browse the repository at this point in the history
Break the feature into multiple PRs.

We can merge this PR after
georgia-tech-db#1244.
  • Loading branch information
jiashenC authored and a0x8o committed Nov 22, 2023
1 parent cd2afee commit fce3500
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 7 deletions.
2 changes: 1 addition & 1 deletion docs/source/reference/evaql/insert.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ MyVideo Table schema
INSERT INTO TABLE
-----------------

Insert a tuple into a table.
Insert a tuple into a table. If there is an index built on the table, the index will be automatically updated. Currently, we only support index automatic update with FAISS and SQLite data.

.. code:: text
Expand Down
Empty file added evadb.db
Empty file.
121 changes: 116 additions & 5 deletions evadb/executor/create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,17 @@ def __init__(self, db: EvaDBDatabase, node: CreateIndexPlan):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> da04707f (feat: insertion update index (#1246))
=======
>>>>>>> 2170a7a9 (Bump v0.3.4+ dev)
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
=======
>>>>>>> da04707f (feat: insertion update index (#1246))
>>>>>>> a590a82c (feat: insertion update index (#1246))
self.name = self.node.name
self.if_not_exists = self.node.if_not_exists
self.table_ref = self.node.table_ref
Expand All @@ -105,15 +110,21 @@ def exec(self, *args, **kwargs):
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))
=======
def exec(self, *args, **kwargs):
# Vector type specific creation.
if self.vector_store_type == VectorStoreType.PGVECTOR:
>>>>>>> da04707f (feat: insertion update index (#1246))
<<<<<<< HEAD
=======
>>>>>>> 2170a7a9 (Bump v0.3.4+ dev)
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))
self._create_native_index()
else:
self._create_evadb_index()
Expand All @@ -128,23 +139,32 @@ def _create_native_index(self):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> 2170a7a9 (Bump v0.3.4+ dev)
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))
table = self.table_ref.table
=======
table = self.node.table_ref.table
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
table = self.table_ref.table
>>>>>>> da04707f (feat: insertion update index (#1246))
=======
>>>>>>> 2170a7a9 (Bump v0.3.4+ dev)
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
=======
table = self.table_ref.table
>>>>>>> da04707f (feat: insertion update index (#1246))
>>>>>>> a590a82c (feat: insertion update index (#1246))
db_catalog_entry = self.catalog().get_database_catalog_entry(
table.database_name
)
Expand All @@ -158,10 +178,13 @@ def _create_native_index(self):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> 2170a7a9 (Bump v0.3.4+ dev)
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))
f"""CREATE INDEX {self.name} ON {table.table_name}
USING hnsw ({self.col_list[0].name} vector_l2_ops)"""
=======
Expand All @@ -170,14 +193,20 @@ def _create_native_index(self):
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))
=======
f"""CREATE INDEX {self.name} ON {table.table_name}
USING hnsw ({self.col_list[0].name} vector_l2_ops)"""
>>>>>>> da04707f (feat: insertion update index (#1246))
<<<<<<< HEAD
=======
>>>>>>> 2170a7a9 (Bump v0.3.4+ dev)
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))
)
if resp.error is not None:
raise ExecutorError(
Expand All @@ -203,12 +232,17 @@ def _create_evadb_index(self):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> da04707f (feat: insertion update index (#1246))
=======
>>>>>>> 2170a7a9 (Bump v0.3.4+ dev)
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
=======
>>>>>>> da04707f (feat: insertion update index (#1246))
>>>>>>> a590a82c (feat: insertion update index (#1246))
# Find function expression.
function_expression, function_expression_signature = None, None
for project_expr in self.project_expr_list:
Expand All @@ -218,10 +252,13 @@ def _create_evadb_index(self):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> 2170a7a9 (Bump v0.3.4+ dev)
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))
=======
if self.catalog().get_index_catalog_entry_by_name(self.node.name):
msg = f"Index {self.node.name} already exists."
Expand All @@ -230,18 +267,61 @@ def _create_evadb_index(self):
return
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
=======
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
=======

# Get feature tables.
feat_tb_catalog_entry = self.table_ref.table.table_obj

# Get feature column.
feat_col_name = self.col_list[0].name
feat_col_catalog_entry = [
col for col in feat_tb_catalog_entry.columns if col.name == feat_col_name
][0]

if function_expression is not None:
feat_col_name = function_expression.output_objs[0].name

index_catalog_entry = self.catalog().get_index_catalog_entry_by_name(self.name)
index_path = self._get_evadb_index_save_path()

if index_catalog_entry is not None:
msg = f"Index {self.name} already exists."
if self.if_not_exists:
if (
index_catalog_entry.feat_column == feat_col_catalog_entry
and index_catalog_entry.function_signature
== function_expression_signature
and index_catalog_entry.type == self.node.vector_store_type
):
# Only update index if everything matches.
logger.warn(msg + " It will be updated on existing table.")
index = VectorStoreFactory.init_vector_store(
self.vector_store_type,
self.name,
**handle_vector_store_params(
self.vector_store_type, index_path
),
)
else:
# Skip index update if CREATE INDEX runs on a different index.
logger.warn(msg)
return
>>>>>>> da04707f (feat: insertion update index (#1246))
>>>>>>> a590a82c (feat: insertion update index (#1246))
else:
logger.error(msg)
raise ExecutorError(msg)

index = None
index_path = self._get_evadb_index_save_path()
else:
index = None

try:
<<<<<<< HEAD
# Get feature tables.
feat_catalog_entry = self.node.table_ref.table.table_obj
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
Expand Down Expand Up @@ -473,8 +553,11 @@ def _create_evadb_index(self):
if function_expression is not None:
feat_col_name = function_expression.output_objs[0].name

<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))
=======
>>>>>>> da04707f (feat: insertion update index (#1246))
# Add features to index.
<<<<<<< HEAD
Expand All @@ -491,6 +574,7 @@ def _create_evadb_index(self):
=======
>>>>>>> a3f66ab5 (feat: insertion update index (#1246))
>>>>>>> da04707f (feat: insertion update index (#1246))
<<<<<<< HEAD
=======
# Add features to index.
# TODO: batch size is hardcoded for now.
Expand All @@ -505,6 +589,8 @@ def _create_evadb_index(self):
<<<<<<< HEAD
>>>>>>> a6ef863c (feat: create index from projection (#1244))
>>>>>>> a747c7e3 (feat: create index from projection (#1244))
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))
for input_batch in self.children[0].exec():
input_batch.drop_column_alias()
feat = input_batch.column_as_numpy_array(feat_col_name)
Expand Down Expand Up @@ -559,10 +645,13 @@ def _create_evadb_index(self):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> 2170a7a9 (Bump v0.3.4+ dev)
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))

# Create new index if not exists.
if index is None:
Expand Down Expand Up @@ -595,18 +684,29 @@ def _create_evadb_index(self):
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
=======

# Create new index if not exists.
>>>>>>> da04707f (feat: insertion update index (#1246))
if index is None:
input_dim = row_feat.shape[1]
index = VectorStoreFactory.init_vector_store(
self.node.vector_store_type,
self.node.name,
self.vector_store_type,
self.name,
**handle_vector_store_params(
<<<<<<< HEAD
self.node.vector_store_type, index_path
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
<<<<<<< HEAD
<<<<<<< HEAD
>>>>>>> 2170a7a9 (Bump v0.3.4+ dev)
=======
>>>>>>> c5f43c65 (Bump v0.3.4+ dev)
=======
=======
self.vector_store_type, index_path
>>>>>>> da04707f (feat: insertion update index (#1246))
>>>>>>> a590a82c (feat: insertion update index (#1246))
),
)
index.create(input_dim)
Expand All @@ -622,12 +722,17 @@ def _create_evadb_index(self):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> da04707f (feat: insertion update index (#1246))
=======
>>>>>>> 9fe75f29 (feat: sync master staging (#1050))
=======
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
=======
>>>>>>> da04707f (feat: insertion update index (#1246))
>>>>>>> a590a82c (feat: insertion update index (#1246))
if index_catalog_entry is None:
self.catalog().insert_index_catalog_entry(
self.name,
Expand All @@ -640,6 +745,9 @@ def _create_evadb_index(self):
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))
=======
self.catalog().insert_index_catalog_entry(
self.node.name,
Expand All @@ -654,6 +762,7 @@ def _create_evadb_index(self):
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
>>>>>>> da04707f (feat: insertion update index (#1246))
<<<<<<< HEAD
=======
=======
self.catalog().insert_index_catalog_entry(
Expand Down Expand Up @@ -682,6 +791,8 @@ def _create_evadb_index(self):
)
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
>>>>>>> b87af508 (feat: sync master staging (#1050))
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))
except Exception as e:
# Delete index.
if index:
Expand Down
3 changes: 3 additions & 0 deletions evadb/parser/create_index_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ def traverse_create_function_expression_str(expr):
return f"{expr.name}({traverse_create_function_expression_str(expr.children[0])})"

print_str += f" ({traverse_create_function_expression_str(function_expr)})"
<<<<<<< HEAD
print_str += f" USING {self._vector_store_type};"
<<<<<<< HEAD
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
Expand Down Expand Up @@ -330,6 +331,8 @@ def __str__(self) -> str:
print_str += f" ({self.col_list[0].name})"
else:
print_str += f" ({function_expr.name}({self.col_list[0].name}))"
=======
>>>>>>> a590a82c (feat: insertion update index (#1246))
print_str += f" USING {self._vector_store_type};"
<<<<<<< HEAD
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
Expand Down
1 change: 0 additions & 1 deletion test/integration_tests/long/test_create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def test_index_already_exist(self):
@macos_skip_marker
def test_should_create_index_faiss(self):
query = "CREATE INDEX testCreateIndexName ON testCreateIndexFeatTable (feat) USING FAISS;"

execute_query_fetch_all(self.evadb, query)

# Test index catalog.
Expand Down
24 changes: 24 additions & 0 deletions test/integration_tests/long/test_similarity.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,30 @@ def test_index_auto_update_on_structured_table_during_insertion_with_faiss(self)
=======
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)

def test_index_auto_update_on_structured_table_during_insertion_with_faiss(self):
create_query = "CREATE TABLE testIndexAutoUpdate (img_path TEXT(100))"
execute_query_fetch_all(self.evadb, create_query)

for i, img_path in enumerate(self.img_path_list):
insert_query = (
f"INSERT INTO testIndexAutoUpdate (img_path) VALUES ('{img_path}')"
)
execute_query_fetch_all(self.evadb, insert_query)
if i == 0:
create_index_query = "CREATE INDEX testIndex ON testIndexAutoUpdate(DummyFeatureExtractor(Open(img_path))) USING FAISS"
execute_query_fetch_all(self.evadb, create_index_query)

select_query = """SELECT _row_id FROM testIndexAutoUpdate
ORDER BY Similarity(DummyFeatureExtractor(Open("{}")), DummyFeatureExtractor(Open(img_path)))
LIMIT 1;""".format(
self.img_path
)
explain_batch = execute_query_fetch_all(self.evadb, f"EXPLAIN {select_query}")
self.assertTrue("VectorIndexScan" in explain_batch.frames[0][0])

res_batch = execute_query_fetch_all(self.evadb, select_query)
self.assertEqual(res_batch.frames["testindexautoupdate._row_id"][0], 5)

@qdrant_skip_marker
def test_end_to_end_index_scan_should_work_correctly_on_image_dataset_qdrant(self):
for _ in range(2):
Expand Down

0 comments on commit fce3500

Please sign in to comment.