Skip to content

Commit

Permalink
feat: insertion update index (georgia-tech-db#1246)
Browse files Browse the repository at this point in the history
Break the feature into multiple PRs.

We can merge this PR after
georgia-tech-db#1244.
  • Loading branch information
jiashenC authored and a0x8o committed Nov 22, 2023
1 parent 5981932 commit 7cfa37d
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 8 deletions.
2 changes: 1 addition & 1 deletion docs/source/reference/evaql/insert.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ MyVideo Table schema
INSERT INTO TABLE
-----------------

Insert a tuple into a table.
Insert a tuple into a table. If there is an index built on the table, the index will be automatically updated. Currently, we only support index automatic update with FAISS and SQLite data.

.. code:: text
Expand Down
Empty file added evadb.db
Empty file.
95 changes: 90 additions & 5 deletions evadb/executor/create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def __init__(self, db: EvaDBDatabase, node: CreateIndexPlan):
super().__init__(db, node)

<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> da04707f (feat: insertion update index (#1246))
self.name = self.node.name
self.if_not_exists = self.node.if_not_exists
self.table_ref = self.node.table_ref
Expand All @@ -57,6 +60,7 @@ def __init__(self, db: EvaDBDatabase, node: CreateIndexPlan):
self.project_expr_list = self.node.project_expr_list
self.index_def = self.node.index_def

<<<<<<< HEAD
def exec(self, *args, **kwargs):
# Vector type specific creation.
if self.vector_store_type == VectorStoreType.PGVECTOR:
Expand All @@ -65,6 +69,11 @@ def exec(self, *args, **kwargs):
# Vector type specific creation.
if self.node.vector_store_type == VectorStoreType.PGVECTOR:
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
=======
def exec(self, *args, **kwargs):
# Vector type specific creation.
if self.vector_store_type == VectorStoreType.PGVECTOR:
>>>>>>> da04707f (feat: insertion update index (#1246))
self._create_native_index()
else:
self._create_evadb_index()
Expand All @@ -75,11 +84,15 @@ def exec(self, *args, **kwargs):

# Create index through the native storage engine.
def _create_native_index(self):
<<<<<<< HEAD
<<<<<<< HEAD
table = self.table_ref.table
=======
table = self.node.table_ref.table
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
=======
table = self.table_ref.table
>>>>>>> da04707f (feat: insertion update index (#1246))
db_catalog_entry = self.catalog().get_database_catalog_entry(
table.database_name
)
Expand All @@ -89,13 +102,18 @@ def _create_native_index(self):
) as handler:
# As other libraries, we default to HNSW and L2 distance.
resp = handler.execute_native_query(
<<<<<<< HEAD
<<<<<<< HEAD
f"""CREATE INDEX {self.name} ON {table.table_name}
USING hnsw ({self.col_list[0].name} vector_l2_ops)"""
=======
f"""CREATE INDEX {self.node.name} ON {table.table_name}
USING hnsw ({self.node.col_list[0].name} vector_l2_ops)"""
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
=======
f"""CREATE INDEX {self.name} ON {table.table_name}
USING hnsw ({self.col_list[0].name} vector_l2_ops)"""
>>>>>>> da04707f (feat: insertion update index (#1246))
)
if resp.error is not None:
raise ExecutorError(
Expand All @@ -118,26 +136,70 @@ def _get_evadb_index_save_path(self) -> Path:
# Create EvaDB index.
def _create_evadb_index(self):
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> da04707f (feat: insertion update index (#1246))
# Find function expression.
function_expression, function_expression_signature = None, None
for project_expr in self.project_expr_list:
if isinstance(project_expr, FunctionExpression):
function_expression = project_expr
function_expression_signature = project_expr.signature()
<<<<<<< HEAD
=======
if self.catalog().get_index_catalog_entry_by_name(self.node.name):
msg = f"Index {self.node.name} already exists."
if self.node.if_not_exists:
logger.warn(msg)
return
=======

# Get feature tables.
feat_tb_catalog_entry = self.table_ref.table.table_obj

# Get feature column.
feat_col_name = self.col_list[0].name
feat_col_catalog_entry = [
col for col in feat_tb_catalog_entry.columns if col.name == feat_col_name
][0]

if function_expression is not None:
feat_col_name = function_expression.output_objs[0].name

index_catalog_entry = self.catalog().get_index_catalog_entry_by_name(self.name)
index_path = self._get_evadb_index_save_path()

if index_catalog_entry is not None:
msg = f"Index {self.name} already exists."
if self.if_not_exists:
if (
index_catalog_entry.feat_column == feat_col_catalog_entry
and index_catalog_entry.function_signature
== function_expression_signature
and index_catalog_entry.type == self.node.vector_store_type
):
# Only update index if everything matches.
logger.warn(msg + " It will be updated on existing table.")
index = VectorStoreFactory.init_vector_store(
self.vector_store_type,
self.name,
**handle_vector_store_params(
self.vector_store_type, index_path
),
)
else:
# Skip index update if CREATE INDEX runs on a different index.
logger.warn(msg)
return
>>>>>>> da04707f (feat: insertion update index (#1246))
else:
logger.error(msg)
raise ExecutorError(msg)

index = None
index_path = self._get_evadb_index_save_path()
else:
index = None

try:
<<<<<<< HEAD
# Get feature tables.
feat_catalog_entry = self.node.table_ref.table.table_obj
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
Expand Down Expand Up @@ -228,15 +290,23 @@ def _create_evadb_index(self):
if function_expression is not None:
feat_col_name = function_expression.output_objs[0].name

=======
>>>>>>> da04707f (feat: insertion update index (#1246))
# Add features to index.
<<<<<<< HEAD
# TODO: batch size is hardcoded for now.
input_dim = -1
<<<<<<< HEAD
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
=======
<<<<<<< HEAD
<<<<<<< HEAD
<<<<<<< HEAD
>>>>>>> a6ef863c (feat: create index from projection (#1244))
=======
=======
>>>>>>> a3f66ab5 (feat: insertion update index (#1246))
>>>>>>> da04707f (feat: insertion update index (#1246))
for input_batch in self.children[0].exec():
input_batch.drop_column_alias()
feat = input_batch.column_as_numpy_array(feat_col_name)
Expand All @@ -260,6 +330,7 @@ def _create_evadb_index(self):

for i in range(len(input_batch)):
row_feat = feat[i].reshape(1, -1)
<<<<<<< HEAD
<<<<<<< HEAD

# Create new index if not exists.
Expand All @@ -271,14 +342,22 @@ def _create_evadb_index(self):
**handle_vector_store_params(
self.vector_store_type, index_path, self.catalog
=======
=======

# Create new index if not exists.
>>>>>>> da04707f (feat: insertion update index (#1246))
if index is None:
input_dim = row_feat.shape[1]
index = VectorStoreFactory.init_vector_store(
self.node.vector_store_type,
self.node.name,
self.vector_store_type,
self.name,
**handle_vector_store_params(
<<<<<<< HEAD
self.node.vector_store_type, index_path
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
=======
self.vector_store_type, index_path
>>>>>>> da04707f (feat: insertion update index (#1246))
),
)
index.create(input_dim)
Expand All @@ -291,6 +370,9 @@ def _create_evadb_index(self):

# Save to catalog.
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> da04707f (feat: insertion update index (#1246))
if index_catalog_entry is None:
self.catalog().insert_index_catalog_entry(
self.name,
Expand All @@ -300,6 +382,7 @@ def _create_evadb_index(self):
function_expression_signature,
self.index_def,
)
<<<<<<< HEAD
=======
self.catalog().insert_index_catalog_entry(
self.node.name,
Expand All @@ -312,6 +395,8 @@ def _create_evadb_index(self):
self.node.index_def,
)
>>>>>>> 2dacff69 (feat: sync master staging (#1050))
=======
>>>>>>> da04707f (feat: insertion update index (#1246))
except Exception as e:
# Delete index.
if index:
Expand Down
5 changes: 5 additions & 0 deletions evadb/executor/insert_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,15 @@ def exec(self, *args, **kwargs):
for column in table_catalog_entry.columns:
if column == index.feat_column:
is_index_on_current_table = True
<<<<<<< HEAD
break
if is_index_on_current_table:
create_index_query = index.index_def
create_index_query_list = create_index_query.split(" ")
=======
if is_index_on_current_table:
create_index_query_list = index.index_def.split(" ")
>>>>>>> da04707f (feat: insertion update index (#1246))
if_not_exists = " ".join(create_index_query_list[2:5]).lower()
if if_not_exists != "if not exists":
create_index_query = (
Expand Down
8 changes: 7 additions & 1 deletion evadb/parser/create_index_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,13 @@ def __str__(self) -> str:
if function_expr is None:
print_str += f" ({self.col_list[0].name})"
else:
print_str += f" ({function_expr.name}({self.col_list[0].name}))"

def traverse_create_function_expression_str(expr):
if isinstance(expr, TupleValueExpression):
return f"{self.col_list[0].name}"
return f"{expr.name}({traverse_create_function_expression_str(expr.children[0])})"

print_str += f" ({traverse_create_function_expression_str(function_expr)})"
print_str += f" USING {self._vector_store_type};"
<<<<<<< HEAD
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)
Expand Down
1 change: 0 additions & 1 deletion test/integration_tests/long/test_create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def test_index_already_exist(self):
@macos_skip_marker
def test_should_create_index_faiss(self):
query = "CREATE INDEX testCreateIndexName ON testCreateIndexFeatTable (feat) USING FAISS;"

execute_query_fetch_all(self.evadb, query)

# Test index catalog.
Expand Down
24 changes: 24 additions & 0 deletions test/integration_tests/long/test_similarity.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,30 @@ def test_index_auto_update_on_structured_table_during_insertion_with_faiss(self)
=======
>>>>>>> 40a10ce1 (Bump v0.3.4+ dev)

def test_index_auto_update_on_structured_table_during_insertion_with_faiss(self):
create_query = "CREATE TABLE testIndexAutoUpdate (img_path TEXT(100))"
execute_query_fetch_all(self.evadb, create_query)

for i, img_path in enumerate(self.img_path_list):
insert_query = (
f"INSERT INTO testIndexAutoUpdate (img_path) VALUES ('{img_path}')"
)
execute_query_fetch_all(self.evadb, insert_query)
if i == 0:
create_index_query = "CREATE INDEX testIndex ON testIndexAutoUpdate(DummyFeatureExtractor(Open(img_path))) USING FAISS"
execute_query_fetch_all(self.evadb, create_index_query)

select_query = """SELECT _row_id FROM testIndexAutoUpdate
ORDER BY Similarity(DummyFeatureExtractor(Open("{}")), DummyFeatureExtractor(Open(img_path)))
LIMIT 1;""".format(
self.img_path
)
explain_batch = execute_query_fetch_all(self.evadb, f"EXPLAIN {select_query}")
self.assertTrue("VectorIndexScan" in explain_batch.frames[0][0])

res_batch = execute_query_fetch_all(self.evadb, select_query)
self.assertEqual(res_batch.frames["testindexautoupdate._row_id"][0], 5)

@qdrant_skip_marker
def test_end_to_end_index_scan_should_work_correctly_on_image_dataset_qdrant(self):
for _ in range(2):
Expand Down

0 comments on commit 7cfa37d

Please sign in to comment.