Skip to content

Commit

Permalink
feat: insertion update index (#1246)
Browse files Browse the repository at this point in the history
Break the feature into multiple PRs. 

We can merge this PR after
#1244.
  • Loading branch information
jiashenC authored Oct 4, 2023
1 parent e59092d commit a3f66ab
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 58 deletions.
2 changes: 1 addition & 1 deletion docs/source/reference/evaql/insert.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ MyVideo Table schema
INSERT INTO TABLE
-----------------

Insert a tuple into a table.
Insert a tuple into a table. If there is an index built on the table, the index will be automatically updated. Currently, we only support index automatic update with FAISS and SQLite data.

.. code:: text
Expand Down
Empty file added evadb.db
Empty file.
124 changes: 74 additions & 50 deletions evadb/executor/create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,28 @@ class CreateIndexExecutor(AbstractExecutor):
def __init__(self, db: EvaDBDatabase, node: CreateIndexPlan):
super().__init__(db, node)

self.name = self.node.name
self.if_not_exists = self.node.if_not_exists
self.table_ref = self.node.table_ref
self.col_list = self.node.col_list
self.vector_store_type = self.node.vector_store_type
self.project_expr_list = self.node.project_expr_list
self.index_def = self.node.index_def

def exec(self, *args, **kwargs):
# Vector type specific creation.
if self.node.vector_store_type == VectorStoreType.PGVECTOR:
if self.vector_store_type == VectorStoreType.PGVECTOR:
self._create_native_index()
else:
self._create_evadb_index()

yield Batch(
pd.DataFrame(
[f"Index {self.node.name} successfully added to the database."]
)
pd.DataFrame([f"Index {self.name} successfully added to the database."])
)

# Create index through the native storage engine.
def _create_native_index(self):
table = self.node.table_ref.table
table = self.table_ref.table
db_catalog_entry = self.catalog().get_database_catalog_entry(
table.database_name
)
Expand All @@ -59,8 +65,8 @@ def _create_native_index(self):
) as handler:
# As other libraries, we default to HNSW and L2 distance.
resp = handler.execute_native_query(
f"""CREATE INDEX {self.node.name} ON {table.table_name}
USING hnsw ({self.node.col_list[0].name} vector_l2_ops)"""
f"""CREATE INDEX {self.name} ON {table.table_name}
USING hnsw ({self.col_list[0].name} vector_l2_ops)"""
)
if resp.error is not None:
raise ExecutorError(
Expand All @@ -73,60 +79,79 @@ def _get_evadb_index_save_path(self) -> Path:
if not index_dir.exists():
index_dir.mkdir(parents=True, exist_ok=True)
return str(
index_dir
/ Path("{}_{}.index".format(self.node.vector_store_type, self.node.name))
index_dir / Path("{}_{}.index".format(self.vector_store_type, self.name))
)

# Create EvaDB index.
def _create_evadb_index(self):
if self.catalog().get_index_catalog_entry_by_name(self.node.name):
msg = f"Index {self.node.name} already exists."
if self.node.if_not_exists:
logger.warn(msg)
return
# Find function expression.
function_expression, function_expression_signature = None, None
for project_expr in self.project_expr_list:
if isinstance(project_expr, FunctionExpression):
function_expression = project_expr
function_expression_signature = project_expr.signature()

# Get feature tables.
feat_tb_catalog_entry = self.table_ref.table.table_obj

# Get feature column.
feat_col_name = self.col_list[0].name
feat_col_catalog_entry = [
col for col in feat_tb_catalog_entry.columns if col.name == feat_col_name
][0]

if function_expression is not None:
feat_col_name = function_expression.output_objs[0].name

index_catalog_entry = self.catalog().get_index_catalog_entry_by_name(self.name)
index_path = self._get_evadb_index_save_path()

if index_catalog_entry is not None:
msg = f"Index {self.name} already exists."
if self.if_not_exists:
if (
index_catalog_entry.feat_column == feat_col_catalog_entry
and index_catalog_entry.function_signature
== function_expression_signature
and index_catalog_entry.type == self.node.vector_store_type
):
# Only update index if everything matches.
logger.warn(msg + " It will be updated on existing table.")
index = VectorStoreFactory.init_vector_store(
self.vector_store_type,
self.name,
**handle_vector_store_params(
self.vector_store_type, index_path
),
)
else:
# Skip index update if CREATE INDEX runs on a different index.
logger.warn(msg)
return
else:
logger.error(msg)
raise ExecutorError(msg)

index = None
index_path = self._get_evadb_index_save_path()
else:
index = None

try:
# Get feature tables.
feat_catalog_entry = self.node.table_ref.table.table_obj

# Get feature column.
feat_col_name = self.node.col_list[0].name
feat_column = [
col for col in feat_catalog_entry.columns if col.name == feat_col_name
][0]

# Find function expression.
function_expression = None
for project_expr in self.node.project_expr_list:
if isinstance(project_expr, FunctionExpression):
function_expression = project_expr

if function_expression is not None:
feat_col_name = function_expression.output_objs[0].name

# Add features to index.
# TODO: batch size is hardcoded for now.
input_dim = -1
for input_batch in self.children[0].exec():
input_batch.drop_column_alias()
feat = input_batch.column_as_numpy_array(feat_col_name)
row_num = input_batch.column_as_numpy_array(ROW_NUM_COLUMN)

for i in range(len(input_batch)):
row_feat = feat[i].reshape(1, -1)

# Create new index if not exists.
if index is None:
input_dim = row_feat.shape[1]
index = VectorStoreFactory.init_vector_store(
self.node.vector_store_type,
self.node.name,
self.vector_store_type,
self.name,
**handle_vector_store_params(
self.node.vector_store_type, index_path
self.vector_store_type, index_path
),
)
index.create(input_dim)
Expand All @@ -138,16 +163,15 @@ def _create_evadb_index(self):
index.persist()

# Save to catalog.
self.catalog().insert_index_catalog_entry(
self.node.name,
index_path,
self.node.vector_store_type,
feat_column,
function_expression.signature()
if function_expression is not None
else None,
self.node.index_def,
)
if index_catalog_entry is None:
self.catalog().insert_index_catalog_entry(
self.name,
index_path,
self.vector_store_type,
feat_col_catalog_entry,
function_expression_signature,
self.index_def,
)
except Exception as e:
# Delete index.
if index:
Expand Down
19 changes: 19 additions & 0 deletions evadb/executor/insert_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,25 @@ def exec(self, *args, **kwargs):
storage_engine = StorageEngine.factory(self.db, table_catalog_entry)
storage_engine.write(table_catalog_entry, batch)

# Index update if there is an index built on the table.
for index in self.db.catalog().get_all_index_catalog_entries():
is_index_on_current_table = False
for column in table_catalog_entry.columns:
if column == index.feat_column:
is_index_on_current_table = True
if is_index_on_current_table:
create_index_query_list = index.index_def.split(" ")
if_not_exists = " ".join(create_index_query_list[2:5]).lower()
if if_not_exists != "if not exists":
create_index_query = (
" ".join(create_index_query_list[:2])
+ " IF NOT EXISTS "
+ " ".join(create_index_query_list[2:])
)
from evadb.server.command_handler import execute_query_fetch_all

execute_query_fetch_all(self.db, create_index_query)

yield Batch(
pd.DataFrame([f"Number of rows loaded: {str(len(values_to_insert))}"])
)
9 changes: 8 additions & 1 deletion evadb/parser/create_index_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from evadb.catalog.catalog_type import VectorStoreType
from evadb.expression.abstract_expression import AbstractExpression
from evadb.expression.function_expression import FunctionExpression
from evadb.expression.tuple_value_expression import TupleValueExpression
from evadb.parser.create_statement import ColumnDefinition
from evadb.parser.statement import AbstractStatement
from evadb.parser.table_ref import TableRef
Expand Down Expand Up @@ -59,7 +60,13 @@ def __str__(self) -> str:
if function_expr is None:
print_str += f" ({self.col_list[0].name})"
else:
print_str += f" ({function_expr.name}({self.col_list[0].name}))"

def traverse_create_function_expression_str(expr):
if isinstance(expr, TupleValueExpression):
return f"{self.col_list[0].name}"
return f"{expr.name}({traverse_create_function_expression_str(expr.children[0])})"

print_str += f" ({traverse_create_function_expression_str(function_expr)})"
print_str += f" USING {self._vector_store_type};"
return print_str

Expand Down
18 changes: 13 additions & 5 deletions evadb/third_party/vector_stores/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from pathlib import Path
from typing import List

Expand All @@ -38,6 +39,16 @@ def __init__(self, index_name: str, index_path: str) -> None:
self._index_path = index_path
self._index = None

import faiss

# Load index from disk if it exists.
self._existing_id_set = set([])
if self._index is None and os.path.exists(self._index_path):
self._index = faiss.read_index(self._index_path)
# Get existing IDs.
for i in range(self._index.ntotal):
self._existing_id_set.add(self._index.id_map.at(i))

def create(self, vector_dim: int):
import faiss

Expand All @@ -49,7 +60,8 @@ def add(self, payload: List[FeaturePayload]):
embedding = np.array(row.embedding, dtype="float32")
if len(embedding.shape) != 2:
embedding = embedding.reshape(1, -1)
self._index.add_with_ids(embedding, np.array([row.id]))
if row.id not in self._existing_id_set:
self._index.add_with_ids(embedding, np.array([row.id]))

def persist(self):
assert self._index is not None, "Please create an index before calling persist."
Expand All @@ -58,10 +70,6 @@ def persist(self):
faiss.write_index(self._index, self._index_path)

def query(self, query: VectorIndexQuery) -> VectorIndexQueryResult:
import faiss

if self._index is None:
self._index = faiss.read_index(self._index_path)
assert self._index is not None, "Cannot query as index does not exists."
embedding = np.array(query.embedding, dtype="float32")
if len(embedding.shape) != 2:
Expand Down
1 change: 0 additions & 1 deletion test/integration_tests/long/test_create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def test_index_already_exist(self):
@macos_skip_marker
def test_should_create_index_faiss(self):
query = "CREATE INDEX testCreateIndexName ON testCreateIndexFeatTable (feat) USING FAISS;"

execute_query_fetch_all(self.evadb, query)

# Test index catalog.
Expand Down
30 changes: 30 additions & 0 deletions test/integration_tests/long/test_similarity.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def setUp(self):
feature_table_catalog_entry = self.evadb.catalog().get_table_catalog_entry(
"testSimilarityFeatureTable"
)

# Image list.
self.img_path_list = []

storage_engine = StorageEngine.factory(self.evadb, base_table_catalog_entry)
for i in range(5):
storage_engine.write(
Expand Down Expand Up @@ -115,6 +119,8 @@ def setUp(self):
)
execute_query_fetch_all(self.evadb, load_image_query)

self.img_path_list.append(img_save_path)

base_img -= 1

# Set the env variables.
Expand Down Expand Up @@ -402,6 +408,30 @@ def test_end_to_end_index_scan_should_work_correctly_on_image_dataset_faiss(self
drop_query = "DROP INDEX testFaissIndexImageDataset"
execute_query_fetch_all(self.evadb, drop_query)

def test_index_auto_update_on_structured_table_during_insertion_with_faiss(self):
create_query = "CREATE TABLE testIndexAutoUpdate (img_path TEXT(100))"
execute_query_fetch_all(self.evadb, create_query)

for i, img_path in enumerate(self.img_path_list):
insert_query = (
f"INSERT INTO testIndexAutoUpdate (img_path) VALUES ('{img_path}')"
)
execute_query_fetch_all(self.evadb, insert_query)
if i == 0:
create_index_query = "CREATE INDEX testIndex ON testIndexAutoUpdate(DummyFeatureExtractor(Open(img_path))) USING FAISS"
execute_query_fetch_all(self.evadb, create_index_query)

select_query = """SELECT _row_id FROM testIndexAutoUpdate
ORDER BY Similarity(DummyFeatureExtractor(Open("{}")), DummyFeatureExtractor(Open(img_path)))
LIMIT 1;""".format(
self.img_path
)
explain_batch = execute_query_fetch_all(self.evadb, f"EXPLAIN {select_query}")
self.assertTrue("VectorIndexScan" in explain_batch.frames[0][0])

res_batch = execute_query_fetch_all(self.evadb, select_query)
self.assertEqual(res_batch.frames["testindexautoupdate._row_id"][0], 5)

@qdrant_skip_marker
def test_end_to_end_index_scan_should_work_correctly_on_image_dataset_qdrant(self):
for _ in range(2):
Expand Down

0 comments on commit a3f66ab

Please sign in to comment.