Skip to content

Commit

Permalink
feat: create index from projection (#1244)
Browse files Browse the repository at this point in the history
The first step to do automatic index updates on insertions. 

Replace the old version of creating an index, which directly reads data
from the storage engine.

It now reads data from the children's plans: SeqScan and Storage.
  • Loading branch information
jiashenC authored Oct 2, 2023
1 parent 495ce7d commit 277161e
Show file tree
Hide file tree
Showing 19 changed files with 241 additions and 84 deletions.
12 changes: 12 additions & 0 deletions evadb/binder/binder_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
if TYPE_CHECKING:
from evadb.binder.statement_binder_context import StatementBinderContext
from evadb.catalog.catalog_manager import CatalogManager

from evadb.catalog.sql_config import ROW_NUM_COLUMN
from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
from evadb.expression.function_expression import FunctionExpression
from evadb.expression.tuple_value_expression import TupleValueExpression
Expand Down Expand Up @@ -171,6 +173,16 @@ def extend_star(
return target_list


def create_row_num_tv_expr(table_alias):
tv_expr = TupleValueExpression(name=ROW_NUM_COLUMN)
tv_expr.table_alias = table_alias
tv_expr.col_alias = f"{table_alias}.{ROW_NUM_COLUMN.lower()}"
tv_expr.col_object = ColumnCatalogEntry(
name=ROW_NUM_COLUMN, type=ColumnType.INTEGER
)
return tv_expr


def check_groupby_pattern(table_ref: TableRef, groupby_string: str) -> None:
# match the pattern of group by clause (e.g., 16 frames or 8 samples)
pattern = re.search(r"^\d+\s*(?:frames|samples|paragraphs)$", groupby_string)
Expand Down
30 changes: 20 additions & 10 deletions evadb/binder/create_index_statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,26 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from evadb.binder.binder_utils import BinderError
from evadb.binder.binder_utils import BinderError, create_row_num_tv_expr
from evadb.binder.statement_binder import StatementBinder
from evadb.catalog.catalog_type import NdArrayType, VectorStoreType
from evadb.expression.function_expression import FunctionExpression
from evadb.parser.create_index_statement import CreateIndexStatement
from evadb.third_party.databases.interface import get_database_handler


def bind_create_index(binder: StatementBinder, node: CreateIndexStatement):
binder.bind(node.table_ref)
if node.function:
binder.bind(node.function)

# Bind all projection expressions.
func_project_expr = None
for project_expr in node.project_expr_list:
binder.bind(project_expr)
if isinstance(project_expr, FunctionExpression):
func_project_expr = project_expr

# Append ROW_NUM_COLUMN.
node.project_expr_list += [create_row_num_tv_expr(node.table_ref.alias)]

# TODO: create index currently only supports single numpy column.
assert len(node.col_list) == 1, "Index cannot be created on more than 1 column"
Expand Down Expand Up @@ -54,13 +63,14 @@ def bind_create_index(binder: StatementBinder, node: CreateIndexStatement):
# underlying native storage engine.
return

if not node.function:
# Feature table type needs to be float32 numpy array.
assert (
len(node.col_list) == 1
), f"Index can be only created on one column, but instead {len(node.col_list)} are provided"
col_def = node.col_list[0]
# Index can be only created on single column.
assert (
len(node.col_list) == 1
), f"Index can be only created on one column, but instead {len(node.col_list)} are provided"
col_def = node.col_list[0]

if func_project_expr is None:
# Feature table type needs to be float32 numpy array.
table_ref_obj = node.table_ref.table.table_obj
col_list = [col for col in table_ref_obj.columns if col.name == col_def.name]
assert (
Expand All @@ -78,7 +88,7 @@ def bind_create_index(binder: StatementBinder, node: CreateIndexStatement):
else:
# Output of the function should be 2 dimension and float32 type.
function_obj = binder._catalog().get_function_catalog_entry_by_name(
node.function.name
func_project_expr.name
)
for output in function_obj.outputs:
assert (
Expand Down
15 changes: 4 additions & 11 deletions evadb/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
resolve_alias_table_value_expression,
)
from evadb.binder.statement_binder_context import StatementBinderContext
from evadb.catalog.catalog_type import ColumnType, TableType, VideoColumnName
from evadb.catalog.catalog_type import ColumnType, TableType
from evadb.catalog.catalog_utils import get_metadata_properties, is_document_table
from evadb.configuration.constants import EvaDB_INSTALLATION_DIR
from evadb.expression.abstract_expression import AbstractExpression, ExpressionType
Expand Down Expand Up @@ -258,16 +258,9 @@ def _bind_tableref(self, node: TableRef):

@bind.register(TupleValueExpression)
def _bind_tuple_expr(self, node: TupleValueExpression):
table_alias, col_obj = self._binder_context.get_binded_column(
node.name, node.table_alias
)
node.table_alias = table_alias
if node.name == VideoColumnName.audio:
self._binder_context.enable_audio_retrieval()
if node.name == VideoColumnName.data:
self._binder_context.enable_video_retrieval()
node.col_alias = "{}.{}".format(table_alias, node.name.lower())
node.col_object = col_obj
from evadb.binder.tuple_value_expression_binder import bind_tuple_expr

bind_tuple_expr(self, node)

@bind.register(FunctionExpression)
def _bind_func_expr(self, node: FunctionExpression):
Expand Down
30 changes: 30 additions & 0 deletions evadb/binder/tuple_value_expression_binder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from evadb.binder.statement_binder import StatementBinder
from evadb.catalog.catalog_type import VideoColumnName
from evadb.expression.tuple_value_expression import TupleValueExpression


def bind_tuple_expr(binder: StatementBinder, node: TupleValueExpression):
table_alias, col_obj = binder._binder_context.get_binded_column(
node.name, node.table_alias
)
node.table_alias = table_alias
if node.name == VideoColumnName.audio:
binder._binder_context.enable_audio_retrieval()
if node.name == VideoColumnName.data:
binder._binder_context.enable_video_retrieval()
node.col_alias = "{}.{}".format(table_alias, node.name.lower())
node.col_object = col_obj
8 changes: 7 additions & 1 deletion evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,15 @@ def insert_index_catalog_entry(
vector_store_type: VectorStoreType,
feat_column: ColumnCatalogEntry,
function_signature: str,
index_def: str,
) -> IndexCatalogEntry:
index_catalog_entry = self._index_service.insert_entry(
name, save_file_path, vector_store_type, feat_column, function_signature
name,
save_file_path,
vector_store_type,
feat_column,
function_signature,
index_def,
)
return index_catalog_entry

Expand Down
6 changes: 6 additions & 0 deletions evadb/catalog/models/index_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class IndexCatalog(BaseModel):
`_feat_column_id:` the `_row_id` of the `ColumnCatalog` entry for the column on which the index is built.
`_function_signature:` if the index is created by running function expression on input column, this will store
the function signature of the used function. Otherwise, this field is None.
`_index_def:` the original SQL statement that is used to create this index. We record this to rerun create index
on updated table.
"""

__tablename__ = "index_catalog"
Expand All @@ -42,6 +44,7 @@ class IndexCatalog(BaseModel):
"column_id", Integer, ForeignKey("column_catalog._row_id", ondelete="CASCADE")
)
_function_signature = Column("function", String, default=None)
_index_def = Column("index_def", String, default=None)

_feat_column = relationship(
"ColumnCatalog",
Expand All @@ -55,12 +58,14 @@ def __init__(
type: VectorStoreType,
feat_column_id: int = None,
function_signature: str = None,
index_def: str = None,
):
self._name = name
self._save_file_path = save_file_path
self._type = type
self._feat_column_id = feat_column_id
self._function_signature = function_signature
self._index_def = index_def

def as_dataclass(self) -> "IndexCatalogEntry":
feat_column = self._feat_column.as_dataclass() if self._feat_column else None
Expand All @@ -71,5 +76,6 @@ def as_dataclass(self) -> "IndexCatalogEntry":
type=self._type,
feat_column_id=self._feat_column_id,
function_signature=self._function_signature,
index_def=self._index_def,
feat_column=feat_column,
)
1 change: 1 addition & 0 deletions evadb/catalog/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class IndexCatalogEntry:
row_id: int = None
feat_column_id: int = None
function_signature: str = None
index_def: str = None
feat_column: ColumnCatalogEntry = None


Expand Down
8 changes: 7 additions & 1 deletion evadb/catalog/services/index_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@ def insert_entry(
type: VectorStoreType,
feat_column: ColumnCatalogEntry,
function_signature: str,
index_def: str,
) -> IndexCatalogEntry:
index_entry = IndexCatalog(
name, save_file_path, type, feat_column.row_id, function_signature
name,
save_file_path,
type,
feat_column.row_id,
function_signature,
index_def,
)
index_entry = index_entry.save(self.session)
return index_entry.as_dataclass()
Expand Down
35 changes: 17 additions & 18 deletions evadb/executor/create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import ExecutorError, handle_vector_store_params
from evadb.expression.function_expression import FunctionExpression
from evadb.models.storage.batch import Batch
from evadb.plan_nodes.create_index_plan import CreateIndexPlan
from evadb.storage.storage_engine import StorageEngine
from evadb.third_party.databases.interface import get_database_handler
from evadb.third_party.vector_stores.types import FeaturePayload
from evadb.third_party.vector_stores.utils import VectorStoreFactory
Expand Down Expand Up @@ -101,25 +101,21 @@ def _create_evadb_index(self):
col for col in feat_catalog_entry.columns if col.name == feat_col_name
][0]

# Find function expression.
function_expression = None
for project_expr in self.node.project_expr_list:
if isinstance(project_expr, FunctionExpression):
function_expression = project_expr

if function_expression is not None:
feat_col_name = function_expression.output_objs[0].name

# Add features to index.
# TODO: batch size is hardcoded for now.
input_dim = -1
storage_engine = StorageEngine.factory(self.db, feat_catalog_entry)
for input_batch in storage_engine.read(feat_catalog_entry):
if self.node.function:
# Create index through function expression.
# Function(input column) -> 2 dimension feature vector.
input_batch.modify_column_alias(feat_catalog_entry.name.lower())
feat_batch = self.node.function.evaluate(input_batch)
feat_batch.drop_column_alias()
input_batch.drop_column_alias()
feat = feat_batch.column_as_numpy_array("features")
else:
# Create index on the feature table directly.
# Pandas wraps numpy array as an object inside a numpy
# array. Use zero index to get the actual numpy array.
feat = input_batch.column_as_numpy_array(feat_col_name)

for input_batch in self.children[0].exec():
input_batch.drop_column_alias()
feat = input_batch.column_as_numpy_array(feat_col_name)
row_num = input_batch.column_as_numpy_array(ROW_NUM_COLUMN)

for i in range(len(input_batch)):
Expand Down Expand Up @@ -147,7 +143,10 @@ def _create_evadb_index(self):
index_path,
self.node.vector_store_type,
feat_column,
self.node.function.signature() if self.node.function else None,
function_expression.signature()
if function_expression is not None
else None,
self.node.index_def,
)
except Exception as e:
# Delete index.
Expand Down
20 changes: 14 additions & 6 deletions evadb/optimizer/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,8 @@ def __init__(
table_ref: TableRef,
col_list: List[ColumnDefinition],
vector_store_type: VectorStoreType,
function: FunctionExpression = None,
project_expr_list: List[AbstractExpression],
index_def: str,
children: List = None,
):
super().__init__(OperatorType.LOGICALCREATEINDEX, children)
Expand All @@ -1093,7 +1094,8 @@ def __init__(
self._table_ref = table_ref
self._col_list = col_list
self._vector_store_type = vector_store_type
self._function = function
self._project_expr_list = project_expr_list
self._index_def = index_def

@property
def name(self):
Expand All @@ -1116,8 +1118,12 @@ def vector_store_type(self):
return self._vector_store_type

@property
def function(self):
return self._function
def project_expr_list(self):
return self._project_expr_list

@property
def index_def(self):
return self._index_def

def __eq__(self, other):
is_subtree_equal = super().__eq__(other)
Expand All @@ -1130,7 +1136,8 @@ def __eq__(self, other):
and self.table_ref == other.table_ref
and self.col_list == other.col_list
and self.vector_store_type == other.vector_store_type
and self.function == other.function
and self.project_expr_list == other.project_expr_list
and self.index_def == other.index_def
)

def __hash__(self) -> int:
Expand All @@ -1142,7 +1149,8 @@ def __hash__(self) -> int:
self.table_ref,
tuple(self.col_list),
self.vector_store_type,
self.function,
tuple(self.project_expr_list),
self.index_def,
)
)

Expand Down
13 changes: 12 additions & 1 deletion evadb/optimizer/rules/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,19 @@ def apply(self, before: LogicalCreateIndex, context: OptimizerContext):
before.table_ref,
before.col_list,
before.vector_store_type,
before.function,
before.project_expr_list,
before.index_def,
)
child = SeqScanPlan(None, before.project_expr_list, before.table_ref.alias)
batch_mem_size = context.db.config.get_value("executor", "batch_mem_size")
child.append_child(
StoragePlan(
before.table_ref.table.table_obj,
before.table_ref,
batch_mem_size=batch_mem_size,
)
)
after.append_child(child)
yield after


Expand Down
3 changes: 2 additions & 1 deletion evadb/optimizer/statement_to_opr_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ def visit_create_index(self, statement: CreateIndexStatement):
statement.table_ref,
statement.col_list,
statement.vector_store_type,
statement.function,
statement.project_expr_list,
statement.index_def,
)
self._plan = create_index_opr

Expand Down
Loading

0 comments on commit 277161e

Please sign in to comment.