From fae27d045e713b420a34afe619670ff9e988efea Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Sun, 17 Sep 2023 11:23:03 -0400 Subject: [PATCH 1/9] create pgvector index on table --- evadb/binder/statement_binder.py | 49 ++++++++++-- evadb/catalog/catalog_type.py | 1 + evadb/executor/create_index_executor.py | 75 +++++++++++++------ evadb/optimizer/rules/rules.py | 2 +- evadb/parser/evadb.lark | 4 +- evadb/parser/lark_visitor/__init__.py | 7 +- .../parser/lark_visitor/_create_statements.py | 28 ++++--- .../databases/postgres/postgres_handler.py | 23 ++++-- 8 files changed, 137 insertions(+), 52 deletions(-) diff --git a/evadb/binder/statement_binder.py b/evadb/binder/statement_binder.py index 4d33034973..b2af1a6446 100644 --- a/evadb/binder/statement_binder.py +++ b/evadb/binder/statement_binder.py @@ -34,6 +34,7 @@ ColumnType, NdArrayType, TableType, + VectorStoreType, VideoColumnName, ) from evadb.catalog.catalog_utils import get_metadata_properties, is_document_table @@ -51,6 +52,7 @@ from evadb.parser.statement import AbstractStatement from evadb.parser.table_ref import TableRef from evadb.parser.types import FunctionType +from evadb.third_party.databases.interface import get_database_handler from evadb.third_party.huggingface.binder import assign_hf_function from evadb.utils.generic_utils import ( load_function_class_from_file, @@ -146,7 +148,34 @@ def _bind_create_index_statement(self, node: CreateIndexStatement): assert len(node.col_list) == 1, "Index cannot be created on more than 1 column" # TODO: create index currently only works on TableInfo, but will extend later. - assert node.table_ref.is_table_atom(), "Index can only be created on Tableinfo" + assert ( + node.table_ref.is_table_atom() + ), "Index can only be created on an existing table" + + # Vector type specific check. + catalog = self._catalog() + if node.vector_store_type == VectorStoreType.PGVECTOR: + db_catalog_entry = catalog.get_database_catalog_entry( + node.table_ref.table.database_name + ) + if db_catalog_entry.engine != "postgres": + raise BinderError( + "PGVECTOR index works only with Postgres data source." + ) + with get_database_handler( + db_catalog_entry.engine, **db_catalog_entry.params + ) as handler: + # Check if vector extension is enabled, which is required for PGVECTOR. + df = handler.execute_native_query( + "SELECT * FROM pg_extension WHERE extname = 'vector'" + ).data + if len(df) == 0: + raise BinderError("PGVECTOR extension is not enabled.") + + # Skip the rest of checking, because it will be anyway taken care by the + # underlying native storage engine. + return + if not node.function: # Feature table type needs to be float32 numpy array. assert ( @@ -163,23 +192,29 @@ def _bind_create_index_statement(self, node: CreateIndexStatement): ), f"Index is created on non-existent column {col_def.name}" col = col_list[0] - assert ( - col.array_type == NdArrayType.FLOAT32 - ), "Index input needs to be float32." assert len(col.array_dimensions) == 2 + + # Vector type specific check. + if node.vector_store_type == VectorStoreType.FAISS: + assert ( + col.array_type == NdArrayType.FLOAT32 + ), "Index input needs to be float32." else: # Output of the function should be 2 dimension and float32 type. function_obj = self._catalog().get_function_catalog_entry_by_name( node.function.name ) for output in function_obj.outputs: - assert ( - output.array_type == NdArrayType.FLOAT32 - ), "Index input needs to be float32." assert ( len(output.array_dimensions) == 2 ), "Index input needs to be 2 dimensional." + # Vector type speciic check. + if node.vector_store_type == VectorStoreType.FAISS: + assert ( + output.array_type == NdArrayType.FLOAT32 + ), "Index input needs to be float32." + @bind.register(SelectStatement) def _bind_select_statement(self, node: SelectStatement): if node.from_table: diff --git a/evadb/catalog/catalog_type.py b/evadb/catalog/catalog_type.py index 73a2d5728a..50a096c322 100644 --- a/evadb/catalog/catalog_type.py +++ b/evadb/catalog/catalog_type.py @@ -115,6 +115,7 @@ class VectorStoreType(EvaDBEnum): FAISS # noqa: F821 QDRANT # noqa: F821 PINECONE # noqa: F821 + PGVECTOR # noqa: F821 class VideoColumnName(EvaDBEnum): diff --git a/evadb/executor/create_index_executor.py b/evadb/executor/create_index_executor.py index 6fce741697..d9a53f3690 100644 --- a/evadb/executor/create_index_executor.py +++ b/evadb/executor/create_index_executor.py @@ -16,6 +16,7 @@ import pandas as pd +from evadb.catalog.catalog_type import VectorStoreType from evadb.catalog.sql_config import ROW_NUM_COLUMN from evadb.database import EvaDBDatabase from evadb.executor.abstract_executor import AbstractExecutor @@ -23,6 +24,7 @@ from evadb.models.storage.batch import Batch from evadb.plan_nodes.create_index_plan import CreateIndexPlan from evadb.storage.storage_engine import StorageEngine +from evadb.third_party.databases.interface import get_database_handler from evadb.third_party.vector_stores.types import FeaturePayload from evadb.third_party.vector_stores.utils import VectorStoreFactory from evadb.utils.logging_manager import logger @@ -33,18 +35,11 @@ def __init__(self, db: EvaDBDatabase, node: CreateIndexPlan): super().__init__(db, node) def exec(self, *args, **kwargs): - if self.catalog().get_index_catalog_entry_by_name(self.node.name): - msg = f"Index {self.node.name} already exists." - if self.node.if_not_exists: - logger.warn(msg) - return - else: - logger.error(msg) - raise ExecutorError(msg) - - self.index_path = self._get_index_save_path() - self.index = None - self._create_index() + # Vector type specific creation. + if self.node.vector_store_type == VectorStoreType.PGVECTOR: + self._create_native_index() + else: + self._create_evadb_index() yield Batch( pd.DataFrame( @@ -52,7 +47,28 @@ def exec(self, *args, **kwargs): ) ) - def _get_index_save_path(self) -> Path: + # Create index through the native storage engine. + def _create_native_index(self): + table = self.node.table_ref.table + db_catalog_entry = self.catalog().get_database_catalog_entry( + table.database_name + ) + + with get_database_handler( + db_catalog_entry.engine, **db_catalog_entry.params + ) as handler: + columns = table.table_obj.columns + # As other libraries, we default to HNSW and L2 distance. + resp = handler.execute_native_query( + f"""CREATE INDEX {self.node.name} ON {table.table_name} USING hnsw ({columns[0].name} vector_l2_ops)""" + ) + if resp.error is not None: + raise ExecutorError( + f"Native engine create index encounters error: {resp.error}" + ) + + # On-disk saving path for EvaDB index. + def _get_evadb_index_save_path(self) -> Path: index_dir = Path(self.config.get_value("storage", "index_dir")) if not index_dir.exists(): index_dir.mkdir(parents=True, exist_ok=True) @@ -61,7 +77,20 @@ def _get_index_save_path(self) -> Path: / Path("{}_{}.index".format(self.node.vector_store_type, self.node.name)) ) - def _create_index(self): + # Create EvaDB index. + def _create_evadb_index(self): + if self.catalog().get_index_catalog_entry_by_name(self.node.name): + msg = f"Index {self.node.name} already exists." + if self.node.if_not_exists: + logger.warn(msg) + return + else: + logger.error(msg) + raise ExecutorError(msg) + + index = None + index_path = self._get_index_save_path() + try: # Get feature tables. feat_catalog_entry = self.node.table_ref.table.table_obj @@ -95,35 +124,35 @@ def _create_index(self): for i in range(len(input_batch)): row_feat = feat[i].reshape(1, -1) - if self.index is None: + if index is None: input_dim = row_feat.shape[1] - self.index = VectorStoreFactory.init_vector_store( + index = VectorStoreFactory.init_vector_store( self.node.vector_store_type, self.node.name, **handle_vector_store_params( - self.node.vector_store_type, self.index_path + self.node.vector_store_type, index_path ), ) - self.index.create(input_dim) + index.create(input_dim) # Row ID for mapping back to the row. - self.index.add([FeaturePayload(row_num[i], row_feat)]) + index.add([FeaturePayload(row_num[i], row_feat)]) # Persist index. - self.index.persist() + index.persist() # Save to catalog. self.catalog().insert_index_catalog_entry( self.node.name, - self.index_path, + index_path, self.node.vector_store_type, feat_column, self.node.function.signature() if self.node.function else None, ) except Exception as e: # Delete index. - if self.index: - self.index.delete() + if index: + index.delete() # Throw exception back to user. raise ExecutorError(str(e)) diff --git a/evadb/optimizer/rules/rules.py b/evadb/optimizer/rules/rules.py index 6ab84b1172..f7eea13e9f 100644 --- a/evadb/optimizer/rules/rules.py +++ b/evadb/optimizer/rules/rules.py @@ -16,7 +16,7 @@ from typing import TYPE_CHECKING -from evadb.catalog.catalog_type import TableType +from evadb.catalog.catalog_type import TableType, VectorStoreType from evadb.catalog.catalog_utils import is_video_table from evadb.constants import CACHEABLE_FUNCTIONS from evadb.executor.execution_context import Context diff --git a/evadb/parser/evadb.lark b/evadb/parser/evadb.lark index 0682f4a583..635dff94fa 100644 --- a/evadb/parser/evadb.lark +++ b/evadb/parser/evadb.lark @@ -53,7 +53,7 @@ function_metadata_key: uid function_metadata_value: string_literal | decimal_literal -vector_store_type: USING (FAISS | QDRANT | PINECONE) +vector_store_type: USING (FAISS | QDRANT | PINECONE | PGVECTOR ) index_elem: ("(" uid_list ")" | "(" function_call ")") @@ -412,10 +412,10 @@ DOCUMENT: "DOCUMENT"i PDF: "PDF"i // Index types -HNSW: "HNSW"i FAISS: "FAISS"i QDRANT: "QDRANT"i PINECONE: "PINECONE"i +PGVECTOR: "PGVECTOR"i // Computer vision tasks diff --git a/evadb/parser/lark_visitor/__init__.py b/evadb/parser/lark_visitor/__init__.py index 220f2a6d50..2aa7e78373 100644 --- a/evadb/parser/lark_visitor/__init__.py +++ b/evadb/parser/lark_visitor/__init__.py @@ -17,7 +17,11 @@ from lark import Tree, visitors from evadb.parser.lark_visitor._common_clauses_ids import CommonClauses -from evadb.parser.lark_visitor._create_statements import CreateDatabase, CreateTable +from evadb.parser.lark_visitor._create_statements import ( + CreateDatabase, + CreateIndex, + CreateTable, +) from evadb.parser.lark_visitor._delete_statement import Delete from evadb.parser.lark_visitor._drop_statement import DropObject from evadb.parser.lark_visitor._explain_statement import Explain @@ -59,6 +63,7 @@ class LarkInterpreter( LarkBaseInterpreter, CommonClauses, CreateTable, + CreateIndex, CreateDatabase, Expressions, Functions, diff --git a/evadb/parser/lark_visitor/_create_statements.py b/evadb/parser/lark_visitor/_create_statements.py index 5b7c4376a5..8287ed113d 100644 --- a/evadb/parser/lark_visitor/_create_statements.py +++ b/evadb/parser/lark_visitor/_create_statements.py @@ -232,19 +232,9 @@ def length_dimension_list(self, tree): dimensions = self.dimension_helper(tree) return dimensions - def vector_store_type(self, tree): - vector_store_type = None - token = tree.children[1] - - if str.upper(token) == "FAISS": - vector_store_type = VectorStoreType.FAISS - elif str.upper(token) == "QDRANT": - vector_store_type = VectorStoreType.QDRANT - elif str.upper(token) == "PINECONE": - vector_store_type = VectorStoreType.PINECONE - return vector_store_type - # INDEX CREATION +# INDEX CREATION +class CreateIndex: def create_index(self, tree): index_name = None if_not_exists = False @@ -284,6 +274,20 @@ def create_index(self, tree): index_name, if_not_exists, table_ref, col_list, vector_store_type, function ) + def vector_store_type(self, tree): + vector_store_type = None + token = tree.children[1] + + if str.upper(token) == "FAISS": + vector_store_type = VectorStoreType.FAISS + elif str.upper(token) == "QDRANT": + vector_store_type = VectorStoreType.QDRANT + elif str.upper(token) == "PINECONE": + vector_store_type = VectorStoreType.PINECONE + elif str.upper(token) == "PGVECTOR": + vector_store_type = VectorStoreType.PGVECTOR + return vector_store_type + class CreateDatabase: def create_database(self, tree): diff --git a/evadb/third_party/databases/postgres/postgres_handler.py b/evadb/third_party/databases/postgres/postgres_handler.py index 0f591c1338..dc0c3388df 100644 --- a/evadb/third_party/databases/postgres/postgres_handler.py +++ b/evadb/third_party/databases/postgres/postgres_handler.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import numpy as np import pandas as pd import psycopg2 @@ -106,9 +107,11 @@ def get_columns(self, table_name: str) -> DBHandlerResponse: return DBHandlerResponse(data=None, error="Not connected to the database.") try: - query = f"SELECT column_name as name, data_type as dtype FROM information_schema.columns WHERE table_name='{table_name}'" + query = f"SELECT column_name as name, data_type as dtype, udt_name FROM information_schema.columns WHERE table_name='{table_name}'" columns_df = pd.read_sql_query(query, self.connection) - columns_df["dtype"] = columns_df["dtype"].apply(self._pg_to_python_types) + columns_df["dtype"] = columns_df.apply( + lambda x: self._pg_to_python_types(x["dtype"], x["udt_name"]), axis=1 + ) return DBHandlerResponse(data=columns_df) except psycopg2.Error as e: return DBHandlerResponse(data=None, error=str(e)) @@ -154,8 +157,8 @@ def execute_native_query(self, query_string: str) -> DBHandlerResponse: except psycopg2.Error as e: return DBHandlerResponse(data=None, error=str(e)) - def _pg_to_python_types(self, pg_type: str): - mapping = { + def _pg_to_python_types(self, pg_type: str, udt_name: str): + primitive_type_mapping = { "integer": int, "bigint": int, "smallint": int, @@ -169,8 +172,16 @@ def _pg_to_python_types(self, pg_type: str): # Add more mappings as needed } - if pg_type in mapping: - return mapping[pg_type] + user_defined_type_mapping = { + "vector": np.ndarray + # Handle user defined types constructed by Postgres extension. + } + + print("Type conversion", pg_type, udt_name) + if pg_type in primitive_type_mapping: + return primitive_type_mapping[pg_type] + elif pg_type == "USER-DEFINED" and udt_name in user_defined_type_mapping: + return user_defined_type_mapping[udt_name] else: raise Exception( f"Unsupported column {pg_type} encountered in the postgres table. Please raise a feature request!" From 9d8869fff1f1b222f23ea30812ecb94811f5ddba Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Sun, 17 Sep 2023 11:23:21 -0400 Subject: [PATCH 2/9] add create index test case --- .../test_native_create_index.py | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 test/third_party_tests/test_native_create_index.py diff --git a/test/third_party_tests/test_native_create_index.py b/test/third_party_tests/test_native_create_index.py new file mode 100644 index 0000000000..f34bce61de --- /dev/null +++ b/test/third_party_tests/test_native_create_index.py @@ -0,0 +1,93 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +from pathlib import Path +from test.markers import macos_skip_marker +from test.util import get_evadb_for_testing, load_functions_for_testing + +import numpy as np +import pandas as pd +import pytest + +from evadb.catalog.catalog_type import VectorStoreType +from evadb.executor.executor_utils import ExecutorError +from evadb.models.storage.batch import Batch +from evadb.server.command_handler import execute_query_fetch_all +from evadb.storage.storage_engine import StorageEngine +from evadb.utils.generic_utils import try_to_import_faiss + + +@pytest.mark.notparallel +class CreateIndexTest(unittest.TestCase): + def setUp(self): + self.evadb = get_evadb_for_testing() + self.evadb.catalog().reset() + + def test_native_engine_should_create_index(self): + # Create database. + params = { + "user": "eva", + "password": "password", + "host": "localhost", + "port": "5432", + "database": "evadb", + } + query = f"""CREATE DATABASE test_data_source + WITH ENGINE = "postgres", + PARAMETERS = {params};""" + execute_query_fetch_all(self.evadb, query) + + # Create table. + query = """USE test_data_source { + CREATE TABLE test_vector (embedding vector(3)) + }""" + execute_query_fetch_all(self.evadb, query) + + # Insert data. + vector_list = [ + [0,0,0], + [1,1,1], + [2,2,2], + ] + for vector in vector_list: + query = f"""USE test_data_source {{ + INSERT INTO test_vector (embedding) VALUES ('{vector}') + }}""" + execute_query_fetch_all(self.evadb, query) + + # Create index. + query = """CREATE INDEX test_index + ON test_data_source.test_vector (embedding) + USING PGVECTOR""" + execute_query_fetch_all(self.evadb, query) + + # Check the existence of index. + query = """USE test_data_source { + SELECT indexname, indexdef FROM pg_indexes WHERE tablename = 'test_vector' + }""" + df = execute_query_fetch_all(self.evadb, query).frames + + self.assertEqual(len(df), 1) + self.assertEqual(df["indexname"][0], "test_index") + self.assertEqual( + df["indexdef"][0], + """CREATE INDEX test_index ON public.test_vector USING hnsw (embedding vector_l2_ops)""", + ) + + # Clean up. + query = "USE test_data_source { DROP INDEX test_index }" + execute_query_fetch_all(self.evadb, query) + query = "USE test_data_source { DROP TABLE test_vector }" + execute_query_fetch_all(self.evadb, query) From 8c412500790c756cd0a8c47ff5e13ebf5bcb7edd Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Sun, 17 Sep 2023 11:26:39 -0400 Subject: [PATCH 3/9] fix lint error --- evadb/optimizer/rules/rules.py | 2 +- .../databases/postgres/postgres_handler.py | 1 - .../test_native_create_index.py | 21 ++++++------------- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/evadb/optimizer/rules/rules.py b/evadb/optimizer/rules/rules.py index f7eea13e9f..6ab84b1172 100644 --- a/evadb/optimizer/rules/rules.py +++ b/evadb/optimizer/rules/rules.py @@ -16,7 +16,7 @@ from typing import TYPE_CHECKING -from evadb.catalog.catalog_type import TableType, VectorStoreType +from evadb.catalog.catalog_type import TableType from evadb.catalog.catalog_utils import is_video_table from evadb.constants import CACHEABLE_FUNCTIONS from evadb.executor.execution_context import Context diff --git a/evadb/third_party/databases/postgres/postgres_handler.py b/evadb/third_party/databases/postgres/postgres_handler.py index dc0c3388df..4eb92f3d4d 100644 --- a/evadb/third_party/databases/postgres/postgres_handler.py +++ b/evadb/third_party/databases/postgres/postgres_handler.py @@ -177,7 +177,6 @@ def _pg_to_python_types(self, pg_type: str, udt_name: str): # Handle user defined types constructed by Postgres extension. } - print("Type conversion", pg_type, udt_name) if pg_type in primitive_type_mapping: return primitive_type_mapping[pg_type] elif pg_type == "USER-DEFINED" and udt_name in user_defined_type_mapping: diff --git a/test/third_party_tests/test_native_create_index.py b/test/third_party_tests/test_native_create_index.py index f34bce61de..dc88b43755 100644 --- a/test/third_party_tests/test_native_create_index.py +++ b/test/third_party_tests/test_native_create_index.py @@ -13,20 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import unittest -from pathlib import Path -from test.markers import macos_skip_marker -from test.util import get_evadb_for_testing, load_functions_for_testing +from test.util import get_evadb_for_testing -import numpy as np -import pandas as pd import pytest -from evadb.catalog.catalog_type import VectorStoreType -from evadb.executor.executor_utils import ExecutorError -from evadb.models.storage.batch import Batch from evadb.server.command_handler import execute_query_fetch_all -from evadb.storage.storage_engine import StorageEngine -from evadb.utils.generic_utils import try_to_import_faiss @pytest.mark.notparallel @@ -57,9 +48,9 @@ def test_native_engine_should_create_index(self): # Insert data. vector_list = [ - [0,0,0], - [1,1,1], - [2,2,2], + [0, 0, 0], + [1, 1, 1], + [2, 2, 2], ] for vector in vector_list: query = f"""USE test_data_source {{ @@ -68,7 +59,7 @@ def test_native_engine_should_create_index(self): execute_query_fetch_all(self.evadb, query) # Create index. - query = """CREATE INDEX test_index + query = """CREATE INDEX test_index ON test_data_source.test_vector (embedding) USING PGVECTOR""" execute_query_fetch_all(self.evadb, query) @@ -82,7 +73,7 @@ def test_native_engine_should_create_index(self): self.assertEqual(len(df), 1) self.assertEqual(df["indexname"][0], "test_index") self.assertEqual( - df["indexdef"][0], + df["indexdef"][0], """CREATE INDEX test_index ON public.test_vector USING hnsw (embedding vector_l2_ops)""", ) From d5c54f76c543364a98f1cc79a8dd88e83f857b87 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Sun, 17 Sep 2023 14:14:26 -0400 Subject: [PATCH 4/9] vector index scan --- evadb/executor/vector_index_scan_executor.py | 39 +++++++++++++++----- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/evadb/executor/vector_index_scan_executor.py b/evadb/executor/vector_index_scan_executor.py index 6c74fef534..568d9297e4 100644 --- a/evadb/executor/vector_index_scan_executor.py +++ b/evadb/executor/vector_index_scan_executor.py @@ -44,17 +44,13 @@ def __init__(self, db: EvaDBDatabase, node: VectorIndexScanPlan): self.search_query_expr = node.search_query_expr def exec(self, *args, **kwargs) -> Iterator[Batch]: - # Fetch the index from disk. - index_catalog_entry = self.catalog().get_index_catalog_entry_by_name( - self.index_name - ) - self.index_path = index_catalog_entry.save_file_path - self.index = VectorStoreFactory.init_vector_store( - self.node.vector_store_type, - self.index_name, - **handle_vector_store_params(self.node.vector_store_type, self.index_path), - ) + if self.node.vector_store_type == VectorStoreFactory.PGVECTOR: + self._native_vector_index_scan() + else: + self._evadb_vector_index_scan(*args, **kwargs) + + def _get_search_query_results(self): # Get the query feature vector. Create a dummy # batch to retreat a single file path. dummy_batch = Batch( @@ -69,6 +65,29 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]: search_batch.drop_column_alias() search_feat = search_batch.column_as_numpy_array(feature_col_name)[0] search_feat = search_feat.reshape(1, -1) + return search_feat + + + def _native_vector_index_scan(self): + search_feat = self._get_search_query_results() + search_feat = search_feat.reshape(-1) + + # TODO: we need to access the Postgres handler here, but some parameters are only stored in children. + + + def _evadb_vector_index_scan(self, *args, **kwargs): + # Fetch the index from disk. + index_catalog_entry = self.catalog().get_index_catalog_entry_by_name( + self.index_name + ) + self.index_path = index_catalog_entry.save_file_path + self.index = VectorStoreFactory.init_vector_store( + self.node.vector_store_type, + self.index_name, + **handle_vector_store_params(self.node.vector_store_type, self.index_path), + ) + + search_feat = self._get_search_query_results() index_result = self.index.query( VectorIndexQuery(search_feat, self.limit_count.value) ) From ecc89395a3466ecfaf48404aa134f9896cc15873 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Mon, 18 Sep 2023 21:35:50 -0400 Subject: [PATCH 5/9] add index scan for native pgvector --- evadb/executor/create_index_executor.py | 3 +- evadb/executor/vector_index_scan_executor.py | 35 ++++++++++---- evadb/optimizer/operators.py | 23 ++++----- evadb/optimizer/rules/rules.py | 48 ++++++++++++------- evadb/plan_nodes/abstract_plan.py | 2 +- evadb/plan_nodes/vector_index_scan_plan.py | 23 ++++----- ...dex.py => test_native_similarity_index.py} | 33 ++++++++++--- 7 files changed, 102 insertions(+), 65 deletions(-) rename test/third_party_tests/{test_native_create_index.py => test_native_similarity_index.py} (64%) diff --git a/evadb/executor/create_index_executor.py b/evadb/executor/create_index_executor.py index d9a53f3690..12f5eef65c 100644 --- a/evadb/executor/create_index_executor.py +++ b/evadb/executor/create_index_executor.py @@ -60,7 +60,8 @@ def _create_native_index(self): columns = table.table_obj.columns # As other libraries, we default to HNSW and L2 distance. resp = handler.execute_native_query( - f"""CREATE INDEX {self.node.name} ON {table.table_name} USING hnsw ({columns[0].name} vector_l2_ops)""" + f"""CREATE INDEX {self.node.name} ON {table.table_name} + USING hnsw ({self.node.col_list[0].name} vector_l2_ops)""" ) if resp.error is not None: raise ExecutorError( diff --git a/evadb/executor/vector_index_scan_executor.py b/evadb/executor/vector_index_scan_executor.py index 568d9297e4..b5c9ad347a 100644 --- a/evadb/executor/vector_index_scan_executor.py +++ b/evadb/executor/vector_index_scan_executor.py @@ -22,9 +22,13 @@ from evadb.executor.executor_utils import handle_vector_store_params from evadb.models.storage.batch import Batch from evadb.plan_nodes.vector_index_scan_plan import VectorIndexScanPlan +from evadb.plan_nodes.storage_plan import StoragePlan from evadb.third_party.vector_stores.types import VectorIndexQuery from evadb.third_party.vector_stores.utils import VectorStoreFactory +from evadb.third_party.databases.interface import get_database_handler +from evadb.executor.executor_utils import ExecutorError from evadb.utils.logging_manager import logger +from evadb.catalog.models.utils import VectorStoreType # Helper function for getting row_num column alias. @@ -39,15 +43,17 @@ class VectorIndexScanExecutor(AbstractExecutor): def __init__(self, db: EvaDBDatabase, node: VectorIndexScanPlan): super().__init__(db, node) - self.index_name = node.index_name + self.index_name = node.index.name + self.vector_store_type = node.index.type + self.feat_column = node.index.feat_column self.limit_count = node.limit_count self.search_query_expr = node.search_query_expr def exec(self, *args, **kwargs) -> Iterator[Batch]: - if self.node.vector_store_type == VectorStoreFactory.PGVECTOR: - self._native_vector_index_scan() + if self.vector_store_type == VectorStoreType.PGVECTOR: + return self._native_vector_index_scan() else: - self._evadb_vector_index_scan(*args, **kwargs) + return self._evadb_vector_index_scan(*args, **kwargs) def _get_search_query_results(self): @@ -70,10 +76,19 @@ def _get_search_query_results(self): def _native_vector_index_scan(self): search_feat = self._get_search_query_results() - search_feat = search_feat.reshape(-1) - - # TODO: we need to access the Postgres handler here, but some parameters are only stored in children. - + search_feat = search_feat.reshape(-1).tolist() + + tb_catalog_entry = list(self.node.find_all(StoragePlan))[0].table + db_catalog_entry = self.db.catalog().get_database_catalog_entry(tb_catalog_entry.database_name) + with get_database_handler(db_catalog_entry.engine, **db_catalog_entry.params) as handler: + resp = handler.execute_native_query(f"""SELECT * FROM {tb_catalog_entry.name} + ORDER BY {self.feat_column.name} <-> '{search_feat}' + LIMIT {self.limit_count}""") + if resp.error is not None: + raise ExecutorError(f"Native index can encounters {resp.error}") + res = Batch(frames=resp.data) + res.modify_column_alias(tb_catalog_entry.name) + yield res def _evadb_vector_index_scan(self, *args, **kwargs): # Fetch the index from disk. @@ -82,9 +97,9 @@ def _evadb_vector_index_scan(self, *args, **kwargs): ) self.index_path = index_catalog_entry.save_file_path self.index = VectorStoreFactory.init_vector_store( - self.node.vector_store_type, + self.vector_store_type, self.index_name, - **handle_vector_store_params(self.node.vector_store_type, self.index_path), + **handle_vector_store_params(self.vector_store_type, self.index_path), ) search_feat = self._get_search_query_results() diff --git a/evadb/optimizer/operators.py b/evadb/optimizer/operators.py index cc9346409e..696ca234b7 100644 --- a/evadb/optimizer/operators.py +++ b/evadb/optimizer/operators.py @@ -29,6 +29,7 @@ from evadb.parser.create_statement import ColumnDefinition from evadb.parser.table_ref import TableInfo, TableRef from evadb.parser.types import JoinType, ObjectType, ShowType +from evadb.catalog.models.utils import IndexCatalogEntry class OperatorType(IntEnum): @@ -154,7 +155,7 @@ def find_all(self, operator_type: Any): """ for node in self.bfs(): - if isinstance(node, operator_type): + if isinstance(node, operator_type) or self.opr_type == operator_type: yield node @@ -1207,25 +1208,19 @@ def __hash__(self) -> int: class LogicalVectorIndexScan(Operator): def __init__( self, - index_name: str, - vector_store_type: VectorStoreType, + index: IndexCatalogEntry, limit_count: ConstantValueExpression, search_query_expr: FunctionExpression, children: List = None, ): super().__init__(OperatorType.LOGICAL_VECTOR_INDEX_SCAN, children) - self._index_name = index_name - self._vector_store_type = vector_store_type + self._index = index self._limit_count = limit_count self._search_query_expr = search_query_expr @property - def index_name(self): - return self._index_name - - @property - def vector_store_type(self): - return self._vector_store_type + def index(self): + return self._index @property def limit_count(self): @@ -1241,8 +1236,7 @@ def __eq__(self, other): return False return ( is_subtree_equal - and self.index_name == other.index_name - and self.vector_store_type == other.vector_store_type + and self.index == other.index and self.limit_count == other.limit_count and self.search_query_expr == other.search_query_expr ) @@ -1251,8 +1245,7 @@ def __hash__(self) -> int: return hash( ( super().__hash__(), - self.index_name, - self.vector_store_type, + self.index, self.limit_count, self.search_query_expr, ) diff --git a/evadb/optimizer/rules/rules.py b/evadb/optimizer/rules/rules.py index 6ab84b1172..93b298dd52 100644 --- a/evadb/optimizer/rules/rules.py +++ b/evadb/optimizer/rules/rules.py @@ -47,6 +47,8 @@ from evadb.plan_nodes.predicate_plan import PredicatePlan from evadb.plan_nodes.project_plan import ProjectPlan from evadb.plan_nodes.show_info_plan import ShowInfoPlan +from evadb.catalog.models.utils import IndexCatalogEntry +from evadb.catalog.catalog_type import VectorStoreType if TYPE_CHECKING: from evadb.optimizer.optimizer_context import OptimizerContext @@ -551,6 +553,11 @@ def _exists_predicate(opr): if not func_orderby_expr or func_orderby_expr.name != "Similarity": return + # Traverse to the LogicalGet operator. + tb_catalog_entry = list(sub_tree_root.opr.find_all(LogicalGet))[0].table_obj + db_catalog_entry = catalog_manager().get_database_catalog_entry(tb_catalog_entry.database_name) + is_postgres_data_source = db_catalog_entry is not None and db_catalog_entry.engine == "postgres" + # Check if there exists an index on table and column. query_func_expr, base_func_expr = func_orderby_expr.children @@ -561,26 +568,34 @@ def _exists_predicate(opr): # Get column catalog entry and function_signature. column_catalog_entry = tv_expr.col_object - function_signature = ( - None - if isinstance(base_func_expr, TupleValueExpression) - else base_func_expr.signature() - ) - # Get index catalog. Check if an index exists for matching - # function signature and table columns. - index_catalog_entry = ( - catalog_manager().get_index_catalog_entry_by_column_and_function_signature( - column_catalog_entry, function_signature + # Only check the index existence when building on EvaDB data. + if not is_postgres_data_source: + + # Get function_signature. + function_signature = ( + None + if isinstance(base_func_expr, TupleValueExpression) + else base_func_expr.signature() + ) + + # Get index catalog. Check if an index exists for matching + # function signature and table columns. + index_catalog_entry = ( + catalog_manager().get_index_catalog_entry_by_column_and_function_signature( + column_catalog_entry, function_signature + ) + ) + if not index_catalog_entry: + return + else: + index_catalog_entry = IndexCatalogEntry( + name="", save_file_path="", type=VectorStoreType.PGVECTOR, feat_column=column_catalog_entry, ) - ) - if not index_catalog_entry: - return # Construct the Vector index scan plan. vector_index_scan_node = LogicalVectorIndexScan( - index_catalog_entry.name, - index_catalog_entry.type, + index_catalog_entry, limit_node.limit_count, query_func_expr, ) @@ -1263,8 +1278,7 @@ def check(self, grp_id: int, context: OptimizerContext): def apply(self, before: LogicalVectorIndexScan, context: OptimizerContext): after = VectorIndexScanPlan( - before.index_name, - before.vector_store_type, + before.index, before.limit_count, before.search_query_expr, ) diff --git a/evadb/plan_nodes/abstract_plan.py b/evadb/plan_nodes/abstract_plan.py index e70c5b14cc..0f6830041a 100644 --- a/evadb/plan_nodes/abstract_plan.py +++ b/evadb/plan_nodes/abstract_plan.py @@ -126,5 +126,5 @@ def find_all(self, plan_type: Any): """ for node in self.bfs(): - if isinstance(node, plan_type): + if isinstance(node, plan_type) or self.opr_type == plan_type: yield node diff --git a/evadb/plan_nodes/vector_index_scan_plan.py b/evadb/plan_nodes/vector_index_scan_plan.py index f396327b0e..9f0d75421c 100644 --- a/evadb/plan_nodes/vector_index_scan_plan.py +++ b/evadb/plan_nodes/vector_index_scan_plan.py @@ -12,11 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from evadb.catalog.catalog_type import VectorStoreType from evadb.expression.constant_value_expression import ConstantValueExpression from evadb.expression.function_expression import FunctionExpression from evadb.plan_nodes.abstract_plan import AbstractPlan from evadb.plan_nodes.types import PlanOprType +from evadb.catalog.models.utils import IndexCatalogEntry class VectorIndexScanPlan(AbstractPlan): @@ -33,24 +33,18 @@ class VectorIndexScanPlan(AbstractPlan): def __init__( self, - index_name: str, - vector_store_type: VectorStoreType, + index: IndexCatalogEntry, limit_count: ConstantValueExpression, search_query_expr: FunctionExpression, ): super().__init__(PlanOprType.VECTOR_INDEX_SCAN) - self._index_name = index_name - self._vector_store_type = vector_store_type + self._index = index self._limit_count = limit_count self._search_query_expr = search_query_expr @property - def index_name(self): - return self._index_name - - @property - def vector_store_type(self): - return self._vector_store_type + def index(self): + return self._index @property def limit_count(self): @@ -62,8 +56,8 @@ def search_query_expr(self): def __str__(self): return "VectorIndexScan(index_name={}, vector_store_type={}, limit_count={}, search_query_expr={})".format( - self._index_name, - self.vector_store_type, + self._index.name, + self._index.vector_store_type, self._limit_count, self._search_query_expr, ) @@ -72,8 +66,7 @@ def __hash__(self) -> int: return hash( ( super().__hash__(), - self.index_name, - self.vector_store_type, + self.index, self.limit_count, self.search_query_expr, ) diff --git a/test/third_party_tests/test_native_create_index.py b/test/third_party_tests/test_native_similarity_index.py similarity index 64% rename from test/third_party_tests/test_native_create_index.py rename to test/third_party_tests/test_native_similarity_index.py index dc88b43755..26e25498a2 100644 --- a/test/third_party_tests/test_native_create_index.py +++ b/test/third_party_tests/test_native_similarity_index.py @@ -18,6 +18,7 @@ import pytest from evadb.server.command_handler import execute_query_fetch_all +from test.util import create_sample_image, load_functions_for_testing @pytest.mark.notparallel @@ -26,6 +27,12 @@ def setUp(self): self.evadb = get_evadb_for_testing() self.evadb.catalog().reset() + # Get sample image. + self.img_path = create_sample_image() + + # Load functions. + load_functions_for_testing(self.evadb, mode="debug") + def test_native_engine_should_create_index(self): # Create database. params = { @@ -42,19 +49,21 @@ def test_native_engine_should_create_index(self): # Create table. query = """USE test_data_source { - CREATE TABLE test_vector (embedding vector(3)) + CREATE TABLE test_vector (idx INTEGER, dummy INTEGER, embedding vector(27)) }""" execute_query_fetch_all(self.evadb, query) # Insert data. vector_list = [ - [0, 0, 0], - [1, 1, 1], - [2, 2, 2], + [0.0 for _ in range(9)] + [1.0 for _ in range(9)] + [2.0 for _ in range(9)], + [1.0 for _ in range(9)] + [2.0 for _ in range(9)] + [3.0 for _ in range(9)], + [2.0 for _ in range(9)] + [3.0 for _ in range(9)] + [4.0 for _ in range(9)], + [3.0 for _ in range(9)] + [4.0 for _ in range(9)] + [5.0 for _ in range(9)], + [4.0 for _ in range(9)] + [5.0 for _ in range(9)] + [6.0 for _ in range(9)], ] - for vector in vector_list: + for idx, vector in enumerate(vector_list): query = f"""USE test_data_source {{ - INSERT INTO test_vector (embedding) VALUES ('{vector}') + INSERT INTO test_vector (idx, dummy, embedding) VALUES ({idx}, {idx}, '{vector}') }}""" execute_query_fetch_all(self.evadb, query) @@ -77,6 +86,18 @@ def test_native_engine_should_create_index(self): """CREATE INDEX test_index ON public.test_vector USING hnsw (embedding vector_l2_ops)""", ) + # Check the index scan plan. + query = f"""SELECT idx, embedding FROM test_data_source.test_vector + ORDER BY Similarity(DummyFeatureExtractor(Open('{self.img_path}')), embedding) + LIMIT 1""" + df = execute_query_fetch_all(self.evadb, f"EXPLAIN {query}").frames + self.assertIn("VectorIndexScan", df[0][0]) + + # Check results. + df = execute_query_fetch_all(self.evadb, query).frames + self.assertEqual(df["test_vector.idx"][0], 0) + self.assertNotIn("test_vector.dummy", df.columns) + # Clean up. query = "USE test_data_source { DROP INDEX test_index }" execute_query_fetch_all(self.evadb, query) From 4131125ad39e19f4ca3f97c67da7ae1dd3f9848e Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Tue, 19 Sep 2023 11:07:46 -0400 Subject: [PATCH 6/9] fix lint issue --- evadb/executor/create_index_executor.py | 5 ++-- evadb/executor/vector_index_scan_executor.py | 27 ++++++++++--------- evadb/optimizer/operators.py | 2 +- evadb/optimizer/rules/rules.py | 25 +++++++++-------- evadb/plan_nodes/vector_index_scan_plan.py | 2 +- .../test_native_similarity_index.py | 11 +++++--- 6 files changed, 40 insertions(+), 32 deletions(-) diff --git a/evadb/executor/create_index_executor.py b/evadb/executor/create_index_executor.py index 12f5eef65c..8e9ff56c92 100644 --- a/evadb/executor/create_index_executor.py +++ b/evadb/executor/create_index_executor.py @@ -57,10 +57,9 @@ def _create_native_index(self): with get_database_handler( db_catalog_entry.engine, **db_catalog_entry.params ) as handler: - columns = table.table_obj.columns # As other libraries, we default to HNSW and L2 distance. resp = handler.execute_native_query( - f"""CREATE INDEX {self.node.name} ON {table.table_name} + f"""CREATE INDEX {self.node.name} ON {table.table_name} USING hnsw ({self.node.col_list[0].name} vector_l2_ops)""" ) if resp.error is not None: @@ -90,7 +89,7 @@ def _create_evadb_index(self): raise ExecutorError(msg) index = None - index_path = self._get_index_save_path() + index_path = self._get_evadb_index_save_path() try: # Get feature tables. diff --git a/evadb/executor/vector_index_scan_executor.py b/evadb/executor/vector_index_scan_executor.py index b5c9ad347a..2b58f5c337 100644 --- a/evadb/executor/vector_index_scan_executor.py +++ b/evadb/executor/vector_index_scan_executor.py @@ -16,19 +16,18 @@ import pandas as pd +from evadb.catalog.models.utils import VectorStoreType from evadb.catalog.sql_config import ROW_NUM_COLUMN from evadb.database import EvaDBDatabase from evadb.executor.abstract_executor import AbstractExecutor -from evadb.executor.executor_utils import handle_vector_store_params +from evadb.executor.executor_utils import ExecutorError, handle_vector_store_params from evadb.models.storage.batch import Batch -from evadb.plan_nodes.vector_index_scan_plan import VectorIndexScanPlan from evadb.plan_nodes.storage_plan import StoragePlan +from evadb.plan_nodes.vector_index_scan_plan import VectorIndexScanPlan +from evadb.third_party.databases.interface import get_database_handler from evadb.third_party.vector_stores.types import VectorIndexQuery from evadb.third_party.vector_stores.utils import VectorStoreFactory -from evadb.third_party.databases.interface import get_database_handler -from evadb.executor.executor_utils import ExecutorError from evadb.utils.logging_manager import logger -from evadb.catalog.models.utils import VectorStoreType # Helper function for getting row_num column alias. @@ -55,7 +54,6 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]: else: return self._evadb_vector_index_scan(*args, **kwargs) - def _get_search_query_results(self): # Get the query feature vector. Create a dummy # batch to retreat a single file path. @@ -73,17 +71,22 @@ def _get_search_query_results(self): search_feat = search_feat.reshape(1, -1) return search_feat - def _native_vector_index_scan(self): search_feat = self._get_search_query_results() search_feat = search_feat.reshape(-1).tolist() tb_catalog_entry = list(self.node.find_all(StoragePlan))[0].table - db_catalog_entry = self.db.catalog().get_database_catalog_entry(tb_catalog_entry.database_name) - with get_database_handler(db_catalog_entry.engine, **db_catalog_entry.params) as handler: - resp = handler.execute_native_query(f"""SELECT * FROM {tb_catalog_entry.name} - ORDER BY {self.feat_column.name} <-> '{search_feat}' - LIMIT {self.limit_count}""") + db_catalog_entry = self.db.catalog().get_database_catalog_entry( + tb_catalog_entry.database_name + ) + with get_database_handler( + db_catalog_entry.engine, **db_catalog_entry.params + ) as handler: + resp = handler.execute_native_query( + f"""SELECT * FROM {tb_catalog_entry.name} + ORDER BY {self.feat_column.name} <-> '{search_feat}' + LIMIT {self.limit_count}""" + ) if resp.error is not None: raise ExecutorError(f"Native index can encounters {resp.error}") res = Batch(frames=resp.data) diff --git a/evadb/optimizer/operators.py b/evadb/optimizer/operators.py index 696ca234b7..59d6fa1c09 100644 --- a/evadb/optimizer/operators.py +++ b/evadb/optimizer/operators.py @@ -22,6 +22,7 @@ from evadb.catalog.models.function_io_catalog import FunctionIOCatalogEntry from evadb.catalog.models.function_metadata_catalog import FunctionMetadataCatalogEntry from evadb.catalog.models.table_catalog import TableCatalogEntry +from evadb.catalog.models.utils import IndexCatalogEntry from evadb.expression.abstract_expression import AbstractExpression from evadb.expression.constant_value_expression import ConstantValueExpression from evadb.expression.function_expression import FunctionExpression @@ -29,7 +30,6 @@ from evadb.parser.create_statement import ColumnDefinition from evadb.parser.table_ref import TableInfo, TableRef from evadb.parser.types import JoinType, ObjectType, ShowType -from evadb.catalog.models.utils import IndexCatalogEntry class OperatorType(IntEnum): diff --git a/evadb/optimizer/rules/rules.py b/evadb/optimizer/rules/rules.py index 93b298dd52..5481321f40 100644 --- a/evadb/optimizer/rules/rules.py +++ b/evadb/optimizer/rules/rules.py @@ -16,8 +16,9 @@ from typing import TYPE_CHECKING -from evadb.catalog.catalog_type import TableType +from evadb.catalog.catalog_type import TableType, VectorStoreType from evadb.catalog.catalog_utils import is_video_table +from evadb.catalog.models.utils import IndexCatalogEntry from evadb.constants import CACHEABLE_FUNCTIONS from evadb.executor.execution_context import Context from evadb.expression.expression_utils import ( @@ -47,8 +48,6 @@ from evadb.plan_nodes.predicate_plan import PredicatePlan from evadb.plan_nodes.project_plan import ProjectPlan from evadb.plan_nodes.show_info_plan import ShowInfoPlan -from evadb.catalog.models.utils import IndexCatalogEntry -from evadb.catalog.catalog_type import VectorStoreType if TYPE_CHECKING: from evadb.optimizer.optimizer_context import OptimizerContext @@ -555,8 +554,12 @@ def _exists_predicate(opr): # Traverse to the LogicalGet operator. tb_catalog_entry = list(sub_tree_root.opr.find_all(LogicalGet))[0].table_obj - db_catalog_entry = catalog_manager().get_database_catalog_entry(tb_catalog_entry.database_name) - is_postgres_data_source = db_catalog_entry is not None and db_catalog_entry.engine == "postgres" + db_catalog_entry = catalog_manager().get_database_catalog_entry( + tb_catalog_entry.database_name + ) + is_postgres_data_source = ( + db_catalog_entry is not None and db_catalog_entry.engine == "postgres" + ) # Check if there exists an index on table and column. query_func_expr, base_func_expr = func_orderby_expr.children @@ -571,7 +574,6 @@ def _exists_predicate(opr): # Only check the index existence when building on EvaDB data. if not is_postgres_data_source: - # Get function_signature. function_signature = ( None @@ -581,16 +583,17 @@ def _exists_predicate(opr): # Get index catalog. Check if an index exists for matching # function signature and table columns. - index_catalog_entry = ( - catalog_manager().get_index_catalog_entry_by_column_and_function_signature( - column_catalog_entry, function_signature - ) + index_catalog_entry = catalog_manager().get_index_catalog_entry_by_column_and_function_signature( + column_catalog_entry, function_signature ) if not index_catalog_entry: return else: index_catalog_entry = IndexCatalogEntry( - name="", save_file_path="", type=VectorStoreType.PGVECTOR, feat_column=column_catalog_entry, + name="", + save_file_path="", + type=VectorStoreType.PGVECTOR, + feat_column=column_catalog_entry, ) # Construct the Vector index scan plan. diff --git a/evadb/plan_nodes/vector_index_scan_plan.py b/evadb/plan_nodes/vector_index_scan_plan.py index 9f0d75421c..585c442ee9 100644 --- a/evadb/plan_nodes/vector_index_scan_plan.py +++ b/evadb/plan_nodes/vector_index_scan_plan.py @@ -12,11 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from evadb.catalog.models.utils import IndexCatalogEntry from evadb.expression.constant_value_expression import ConstantValueExpression from evadb.expression.function_expression import FunctionExpression from evadb.plan_nodes.abstract_plan import AbstractPlan from evadb.plan_nodes.types import PlanOprType -from evadb.catalog.models.utils import IndexCatalogEntry class VectorIndexScanPlan(AbstractPlan): diff --git a/test/third_party_tests/test_native_similarity_index.py b/test/third_party_tests/test_native_similarity_index.py index 26e25498a2..74c8775845 100644 --- a/test/third_party_tests/test_native_similarity_index.py +++ b/test/third_party_tests/test_native_similarity_index.py @@ -13,12 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. import unittest -from test.util import get_evadb_for_testing +from test.util import ( + create_sample_image, + get_evadb_for_testing, + load_functions_for_testing, +) import pytest from evadb.server.command_handler import execute_query_fetch_all -from test.util import create_sample_image, load_functions_for_testing @pytest.mark.notparallel @@ -87,8 +90,8 @@ def test_native_engine_should_create_index(self): ) # Check the index scan plan. - query = f"""SELECT idx, embedding FROM test_data_source.test_vector - ORDER BY Similarity(DummyFeatureExtractor(Open('{self.img_path}')), embedding) + query = f"""SELECT idx, embedding FROM test_data_source.test_vector + ORDER BY Similarity(DummyFeatureExtractor(Open('{self.img_path}')), embedding) LIMIT 1""" df = execute_query_fetch_all(self.evadb, f"EXPLAIN {query}").frames self.assertIn("VectorIndexScan", df[0][0]) From 0951e22d832f11e2a70e04624c6be6213f407ba8 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Wed, 20 Sep 2023 14:02:01 -0400 Subject: [PATCH 7/9] clean up test case --- .../test_native_similarity_index.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/test/third_party_tests/test_native_similarity_index.py b/test/third_party_tests/test_native_similarity_index.py index 74c8775845..f13699dd38 100644 --- a/test/third_party_tests/test_native_similarity_index.py +++ b/test/third_party_tests/test_native_similarity_index.py @@ -26,7 +26,7 @@ @pytest.mark.notparallel class CreateIndexTest(unittest.TestCase): - def setUp(self): + def setUpClass(self): self.evadb = get_evadb_for_testing() self.evadb.catalog().reset() @@ -36,7 +36,6 @@ def setUp(self): # Load functions. load_functions_for_testing(self.evadb, mode="debug") - def test_native_engine_should_create_index(self): # Create database. params = { "user": "eva", @@ -50,6 +49,14 @@ def test_native_engine_should_create_index(self): PARAMETERS = {params};""" execute_query_fetch_all(self.evadb, query) + def tearDownClass(self): + # Clean up. + query = "USE test_data_source { DROP INDEX test_index }" + execute_query_fetch_all(self.evadb, query) + query = "USE test_data_source { DROP TABLE test_vector }" + execute_query_fetch_all(self.evadb, query) + + def test_native_engine_should_create_index(self): # Create table. query = """USE test_data_source { CREATE TABLE test_vector (idx INTEGER, dummy INTEGER, embedding vector(27)) @@ -100,9 +107,3 @@ def test_native_engine_should_create_index(self): df = execute_query_fetch_all(self.evadb, query).frames self.assertEqual(df["test_vector.idx"][0], 0) self.assertNotIn("test_vector.dummy", df.columns) - - # Clean up. - query = "USE test_data_source { DROP INDEX test_index }" - execute_query_fetch_all(self.evadb, query) - query = "USE test_data_source { DROP TABLE test_vector }" - execute_query_fetch_all(self.evadb, query) From 35640f833782b3aeb7900f01458f6e01dca8086b Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Wed, 20 Sep 2023 15:12:07 -0400 Subject: [PATCH 8/9] separate binder for index --- evadb/binder/create_index_statement_binder.py | 101 ++++++++++++++++++ evadb/binder/statement_binder.py | 92 ++-------------- .../test_native_similarity_index.py | 20 ++-- 3 files changed, 119 insertions(+), 94 deletions(-) create mode 100644 evadb/binder/create_index_statement_binder.py diff --git a/evadb/binder/create_index_statement_binder.py b/evadb/binder/create_index_statement_binder.py new file mode 100644 index 0000000000..d59fc96534 --- /dev/null +++ b/evadb/binder/create_index_statement_binder.py @@ -0,0 +1,101 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from evadb.binder.binder_utils import ( + BinderError, +) +from evadb.catalog.catalog_type import ( + NdArrayType, + VectorStoreType, +) +from evadb.parser.create_index_statement import CreateIndexStatement +from evadb.third_party.databases.interface import get_database_handler + +from evadb.binder.statement_binder import StatementBinder + +def bind_create_index(binder: StatementBinder, node: CreateIndexStatement): + binder.bind(node.table_ref) + if node.function: + binder.bind(node.function) + + # TODO: create index currently only supports single numpy column. + assert len(node.col_list) == 1, "Index cannot be created on more than 1 column" + + # TODO: create index currently only works on TableInfo, but will extend later. + assert ( + node.table_ref.is_table_atom() + ), "Index can only be created on an existing table" + + # Vector type specific check. + catalog = binder._catalog() + if node.vector_store_type == VectorStoreType.PGVECTOR: + db_catalog_entry = catalog.get_database_catalog_entry( + node.table_ref.table.database_name + ) + if db_catalog_entry.engine != "postgres": + raise BinderError( + "PGVECTOR index works only with Postgres data source." + ) + with get_database_handler( + db_catalog_entry.engine, **db_catalog_entry.params + ) as handler: + # Check if vector extension is enabled, which is required for PGVECTOR. + df = handler.execute_native_query( + "SELECT * FROM pg_extension WHERE extname = 'vector'" + ).data + if len(df) == 0: + raise BinderError("PGVECTOR extension is not enabled.") + + # Skip the rest of checking, because it will be anyway taken care by the + # underlying native storage engine. + return + + if not node.function: + # Feature table type needs to be float32 numpy array. + assert ( + len(node.col_list) == 1 + ), f"Index can be only created on one column, but instead {len(node.col_list)} are provided" + col_def = node.col_list[0] + + table_ref_obj = node.table_ref.table.table_obj + col_list = [ + col for col in table_ref_obj.columns if col.name == col_def.name + ] + assert ( + len(col_list) == 1 + ), f"Index is created on non-existent column {col_def.name}" + + col = col_list[0] + assert len(col.array_dimensions) == 2 + + # Vector type specific check. + if node.vector_store_type == VectorStoreType.FAISS: + assert ( + col.array_type == NdArrayType.FLOAT32 + ), "Index input needs to be float32." + else: + # Output of the function should be 2 dimension and float32 type. + function_obj = binder._catalog().get_function_catalog_entry_by_name( + node.function.name + ) + for output in function_obj.outputs: + assert ( + len(output.array_dimensions) == 2 + ), "Index input needs to be 2 dimensional." + + # Vector type speciic check. + if node.vector_store_type == VectorStoreType.FAISS: + assert ( + output.array_type == NdArrayType.FLOAT32 + ), "Index input needs to be float32." diff --git a/evadb/binder/statement_binder.py b/evadb/binder/statement_binder.py index b2af1a6446..1c6d5abe2c 100644 --- a/evadb/binder/statement_binder.py +++ b/evadb/binder/statement_binder.py @@ -30,13 +30,7 @@ resolve_alias_table_value_expression, ) from evadb.binder.statement_binder_context import StatementBinderContext -from evadb.catalog.catalog_type import ( - ColumnType, - NdArrayType, - TableType, - VectorStoreType, - VideoColumnName, -) +from evadb.catalog.catalog_type import ColumnType, TableType, VideoColumnName from evadb.catalog.catalog_utils import get_metadata_properties, is_document_table from evadb.configuration.constants import EvaDB_INSTALLATION_DIR from evadb.expression.abstract_expression import AbstractExpression, ExpressionType @@ -52,7 +46,6 @@ from evadb.parser.statement import AbstractStatement from evadb.parser.table_ref import TableRef from evadb.parser.types import FunctionType -from evadb.third_party.databases.interface import get_database_handler from evadb.third_party.huggingface.binder import assign_hf_function from evadb.utils.generic_utils import ( load_function_class_from_file, @@ -138,83 +131,6 @@ def _bind_create_function_statement(self, node: CreateFunctionStatement): ), f"{node.function_type} functions' input and output are auto assigned" node.inputs, node.outputs = inputs, outputs - @bind.register(CreateIndexStatement) - def _bind_create_index_statement(self, node: CreateIndexStatement): - self.bind(node.table_ref) - if node.function: - self.bind(node.function) - - # TODO: create index currently only supports single numpy column. - assert len(node.col_list) == 1, "Index cannot be created on more than 1 column" - - # TODO: create index currently only works on TableInfo, but will extend later. - assert ( - node.table_ref.is_table_atom() - ), "Index can only be created on an existing table" - - # Vector type specific check. - catalog = self._catalog() - if node.vector_store_type == VectorStoreType.PGVECTOR: - db_catalog_entry = catalog.get_database_catalog_entry( - node.table_ref.table.database_name - ) - if db_catalog_entry.engine != "postgres": - raise BinderError( - "PGVECTOR index works only with Postgres data source." - ) - with get_database_handler( - db_catalog_entry.engine, **db_catalog_entry.params - ) as handler: - # Check if vector extension is enabled, which is required for PGVECTOR. - df = handler.execute_native_query( - "SELECT * FROM pg_extension WHERE extname = 'vector'" - ).data - if len(df) == 0: - raise BinderError("PGVECTOR extension is not enabled.") - - # Skip the rest of checking, because it will be anyway taken care by the - # underlying native storage engine. - return - - if not node.function: - # Feature table type needs to be float32 numpy array. - assert ( - len(node.col_list) == 1 - ), f"Index can be only created on one column, but instead {len(node.col_list)} are provided" - col_def = node.col_list[0] - - table_ref_obj = node.table_ref.table.table_obj - col_list = [ - col for col in table_ref_obj.columns if col.name == col_def.name - ] - assert ( - len(col_list) == 1 - ), f"Index is created on non-existent column {col_def.name}" - - col = col_list[0] - assert len(col.array_dimensions) == 2 - - # Vector type specific check. - if node.vector_store_type == VectorStoreType.FAISS: - assert ( - col.array_type == NdArrayType.FLOAT32 - ), "Index input needs to be float32." - else: - # Output of the function should be 2 dimension and float32 type. - function_obj = self._catalog().get_function_catalog_entry_by_name( - node.function.name - ) - for output in function_obj.outputs: - assert ( - len(output.array_dimensions) == 2 - ), "Index input needs to be 2 dimensional." - - # Vector type speciic check. - if node.vector_store_type == VectorStoreType.FAISS: - assert ( - output.array_type == NdArrayType.FLOAT32 - ), "Index input needs to be float32." - @bind.register(SelectStatement) def _bind_select_statement(self, node: SelectStatement): if node.from_table: @@ -285,6 +201,12 @@ def _bind_create_statement(self, node: CreateTableStatement): node.query.target_list ) + @bind.register(CreateIndexStatement) + def _bind_create_index_statement(self, node: CreateIndexStatement): + from evadb.binder.create_index_statement_binder import bind_create_index + + bind_create_index(self, node) + @bind.register(RenameTableStatement) def _bind_rename_table_statement(self, node: RenameTableStatement): self.bind(node.old_table_ref) diff --git a/test/third_party_tests/test_native_similarity_index.py b/test/third_party_tests/test_native_similarity_index.py index f13699dd38..22065ce7dc 100644 --- a/test/third_party_tests/test_native_similarity_index.py +++ b/test/third_party_tests/test_native_similarity_index.py @@ -26,15 +26,16 @@ @pytest.mark.notparallel class CreateIndexTest(unittest.TestCase): - def setUpClass(self): - self.evadb = get_evadb_for_testing() - self.evadb.catalog().reset() + @classmethod + def setUpClass(cls): + cls.evadb = get_evadb_for_testing() + cls.evadb.catalog().reset() # Get sample image. - self.img_path = create_sample_image() + cls.img_path = create_sample_image() # Load functions. - load_functions_for_testing(self.evadb, mode="debug") + load_functions_for_testing(cls.evadb, mode="debug") # Create database. params = { @@ -47,14 +48,15 @@ def setUpClass(self): query = f"""CREATE DATABASE test_data_source WITH ENGINE = "postgres", PARAMETERS = {params};""" - execute_query_fetch_all(self.evadb, query) + execute_query_fetch_all(cls.evadb, query) - def tearDownClass(self): + @classmethod + def tearDownClass(cls): # Clean up. query = "USE test_data_source { DROP INDEX test_index }" - execute_query_fetch_all(self.evadb, query) + execute_query_fetch_all(cls.evadb, query) query = "USE test_data_source { DROP TABLE test_vector }" - execute_query_fetch_all(self.evadb, query) + execute_query_fetch_all(cls.evadb, query) def test_native_engine_should_create_index(self): # Create table. From d7c2f143929d072fa85901c388e86583d4a80889 Mon Sep 17 00:00:00 2001 From: Jiashen Cao Date: Wed, 20 Sep 2023 15:12:25 -0400 Subject: [PATCH 9/9] fix lint --- evadb/binder/create_index_statement_binder.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/evadb/binder/create_index_statement_binder.py b/evadb/binder/create_index_statement_binder.py index d59fc96534..ea14c49020 100644 --- a/evadb/binder/create_index_statement_binder.py +++ b/evadb/binder/create_index_statement_binder.py @@ -12,17 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from evadb.binder.binder_utils import ( - BinderError, -) -from evadb.catalog.catalog_type import ( - NdArrayType, - VectorStoreType, -) +from evadb.binder.binder_utils import BinderError +from evadb.binder.statement_binder import StatementBinder +from evadb.catalog.catalog_type import NdArrayType, VectorStoreType from evadb.parser.create_index_statement import CreateIndexStatement from evadb.third_party.databases.interface import get_database_handler -from evadb.binder.statement_binder import StatementBinder def bind_create_index(binder: StatementBinder, node: CreateIndexStatement): binder.bind(node.table_ref) @@ -44,9 +39,7 @@ def bind_create_index(binder: StatementBinder, node: CreateIndexStatement): node.table_ref.table.database_name ) if db_catalog_entry.engine != "postgres": - raise BinderError( - "PGVECTOR index works only with Postgres data source." - ) + raise BinderError("PGVECTOR index works only with Postgres data source.") with get_database_handler( db_catalog_entry.engine, **db_catalog_entry.params ) as handler: @@ -69,9 +62,7 @@ def bind_create_index(binder: StatementBinder, node: CreateIndexStatement): col_def = node.col_list[0] table_ref_obj = node.table_ref.table.table_obj - col_list = [ - col for col in table_ref_obj.columns if col.name == col_def.name - ] + col_list = [col for col in table_ref_obj.columns if col.name == col_def.name] assert ( len(col_list) == 1 ), f"Index is created on non-existent column {col_def.name}"