Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve create index #868

Closed
wants to merge 16 commits into from
22 changes: 6 additions & 16 deletions evadb/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,26 +80,16 @@ def _bind_create_index_statement(self, node: CreateIndexStatement):

# TODO: create index currently only works on TableInfo, but will extend later.
assert node.table_ref.is_table_atom(), "Index can only be created on Tableinfo"
if not node.udf_func:
# Feature table type needs to be float32 numpy array.
assert (
len(node.col_list) == 1
), f"Index can be only created on one column, but instead {len(node.col_list)} are provided"
col_def = node.col_list[0]

table_ref_obj = node.table_ref.table.table_obj
col_list = [
col for col in table_ref_obj.columns if col.name == col_def.name
]
assert (
len(col_list) == 1
), f"Index is created on non-existent column {col_def.name}"
self.bind(node.col_list[0])

col = col_list[0]
if not node.udf_func:
# Feature table type needs to be float32 numpy array.
col_entry = node.col_list[0].col_object
assert (
col.array_type == NdArrayType.FLOAT32
col_entry.array_type == NdArrayType.FLOAT32
), "Index input needs to be float32."
assert len(col.array_dimensions) == 2
assert len(col_entry.array_dimensions) == 2
else:
# Output of the UDF should be 2 dimension and float32 type.
udf_obj = self._catalog().get_udf_catalog_entry_by_name(node.udf_func.name)
Expand Down
30 changes: 14 additions & 16 deletions evadb/catalog/catalog_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,14 @@ def is_string_col(col: ColumnCatalogEntry):

def get_video_table_column_definitions() -> List[ColumnDefinition]:
"""
_id: unique id
name: video path
id: frame id
data: frame data
audio: frame audio
"""
columns = [
ColumnDefinition("_id", ColumnType.INTEGER, None, None),
ColumnDefinition(
VideoColumnName.name.name,
ColumnType.TEXT,
Expand Down Expand Up @@ -120,11 +122,13 @@ def get_image_table_column_definitions() -> List[ColumnDefinition]:

def get_document_table_column_definitions() -> List[ColumnDefinition]:
"""
_id: unique id
name: file path
chunk_id: chunk id (0-indexed for each file)
data: text data associated with the chunk
"""
columns = [
ColumnDefinition("_id", ColumnType.INTEGER, None, None),
ColumnDefinition(
DocumentColumnName.name.name,
ColumnType.TEXT,
Expand All @@ -147,12 +151,14 @@ def get_document_table_column_definitions() -> List[ColumnDefinition]:

def get_pdf_table_column_definitions() -> List[ColumnDefinition]:
"""
_id: unique id
name: pdf name
page: page no
paragraph: paragraph no
data: pdf paragraph data
"""
columns = [
ColumnDefinition("_id", ColumnType.INTEGER, None, None),
ColumnDefinition(PDFColumnName.name.name, ColumnType.TEXT, None, None),
ColumnDefinition(PDFColumnName.page.name, ColumnType.INTEGER, None, None),
ColumnDefinition(PDFColumnName.paragraph.name, ColumnType.INTEGER, None, None),
Expand Down Expand Up @@ -183,26 +189,18 @@ def get_table_primary_columns(
]
# _row_id for all the TableTypes, however for Video data and PDF data we also add frame_id (id) and paragraph as part of unique key
if table_catalog_obj.table_type == TableType.VIDEO_DATA:
# _row_id, id
primary_columns.append(
ColumnDefinition(VideoColumnName.id.name, ColumnType.INTEGER, None, None),
)
# _row_id * MAGIC_NUMBER + id
primary_columns = [
ColumnDefinition("_id", ColumnType.INTEGER, None, None),
]

elif table_catalog_obj.table_type == TableType.PDF_DATA:
# _row_id, paragraph
primary_columns.append(
ColumnDefinition(
PDFColumnName.paragraph.name, ColumnType.INTEGER, None, None
)
)
# _row_id * MAGIC_NUMBER + page * MAGIC_NUMBER + paragraph
primary_columns = [ColumnDefinition("_id", ColumnType.INTEGER, None, None)]

elif table_catalog_obj.table_type == TableType.DOCUMENT_DATA:
# _row_id, chunk_id
primary_columns.append(
ColumnDefinition(
DocumentColumnName.chunk_id.name, ColumnType.INTEGER, None, None
)
)
# _row_id * MAGIC_NUMBER + chunk_id
primary_columns = [ColumnDefinition("_id", ColumnType.INTEGER, None, None)]

return primary_columns

Expand Down
1 change: 1 addition & 0 deletions evadb/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@
IFRAMES = "IFRAMES"
AUDIORATE = "AUDIORATE"
DEFAULT_FUNCTION_EXPRESSION_COST = 100
MAGIC_NUMBER = 32000000000000 # 3.2e+13
37 changes: 7 additions & 30 deletions evadb/executor/create_index_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@

import pandas as pd

from evadb.catalog.sql_config import IDENTIFIER_COLUMN
from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import ExecutorError, handle_vector_store_params
from evadb.models.storage.batch import Batch
from evadb.plan_nodes.create_index_plan import CreateIndexPlan
from evadb.storage.storage_engine import StorageEngine
from evadb.third_party.vector_stores.types import FeaturePayload
from evadb.third_party.vector_stores.utils import VectorStoreFactory
from evadb.utils.logging_manager import logger
Expand Down Expand Up @@ -59,38 +57,17 @@ def _get_index_save_path(self) -> Path:

def _create_index(self):
try:
# Get feature tables.
feat_catalog_entry = self.node.table_ref.table.table_obj

# Get feature column.
feat_col_name = self.node.col_list[0].name
feat_column = [
col for col in feat_catalog_entry.columns if col.name == feat_col_name
][0]
feat_column = self.node.col_list[0].col_object

# Add features to index.
# TODO: batch size is hardcoded for now.
input_dim = -1
storage_engine = StorageEngine.factory(self.db, feat_catalog_entry)
for input_batch in storage_engine.read(feat_catalog_entry):
if self.node.udf_func:
# Create index through UDF expression.
# UDF(input column) -> 2 dimension feature vector.
input_batch.modify_column_alias(feat_catalog_entry.name.lower())
feat_batch = self.node.udf_func.evaluate(input_batch)
feat_batch.drop_column_alias()
input_batch.drop_column_alias()
feat = feat_batch.column_as_numpy_array("features")
else:
# Create index on the feature table directly.
# Pandas wraps numpy array as an object inside a numpy
# array. Use zero index to get the actual numpy array.
feat = input_batch.column_as_numpy_array(feat_col_name)

row_id = input_batch.column_as_numpy_array(IDENTIFIER_COLUMN)
for input_batch in self.children[0].exec():
row_ids = input_batch.column_as_numpy_array(input_batch.columns[0])
features = input_batch.column_as_numpy_array(input_batch.columns[1])

for i in range(len(input_batch)):
row_feat = feat[i].reshape(1, -1)
for row_id, feat in zip(row_ids, features):
row_feat = feat.reshape(1, -1)
if self.index is None:
input_dim = row_feat.shape[1]
self.index = VectorStoreFactory.init_vector_store(
Expand All @@ -103,7 +80,7 @@ def _create_index(self):
self.index.create(input_dim)

# Row ID for mapping back to the row.
self.index.add([FeaturePayload(row_id[i], row_feat)])
self.index.add([FeaturePayload(row_id, row_feat)])

# Persist index.
self.index.persist()
Expand Down
40 changes: 24 additions & 16 deletions evadb/executor/executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import glob
import os
from pathlib import Path
from typing import TYPE_CHECKING, Generator, List
from typing import TYPE_CHECKING, Callable, Generator, List

if TYPE_CHECKING:
from evadb.catalog.catalog_manager import CatalogManager
Expand All @@ -35,38 +35,46 @@ class ExecutorError(Exception):
pass


def _upsert_stats_related_to_function_expression(
expr: AbstractExpression, catalog: Callable
):
# try persisting stats for function expression
# avoiding raising error if it fails
try:
for func_expr in expr.find_all(FunctionExpression):
if func_expr.udf_obj and func_expr._stats:
udf_id = func_expr.udf_obj.row_id
catalog().upsert_udf_cost_catalog_entry(
udf_id, func_expr.udf_obj.name, func_expr._stats.prev_cost
)
except Exception as e:
logger.warning(f"{str(e)} while upserting cost for {func_expr.name}")


def apply_project(
batch: Batch, project_list: List[AbstractExpression], catalog: "CatalogManager"
batch: Batch, project_list: List[AbstractExpression], catalog: Callable
):
if not batch.empty() and project_list:
batches = [expr.evaluate(batch) for expr in project_list]
batch = Batch.merge_column_wise(batches)

# persist stats of function expression
# try persisting stats for function expression
for expr in project_list:
for func_expr in expr.find_all(FunctionExpression):
if func_expr.udf_obj and func_expr._stats:
udf_id = func_expr.udf_obj.row_id
catalog.upsert_udf_cost_catalog_entry(
udf_id, func_expr.udf_obj.name, func_expr._stats.prev_cost
)
_upsert_stats_related_to_function_expression(expr, catalog)

return batch


def apply_predicate(
batch: Batch, predicate: AbstractExpression, catalog: "CatalogManager"
batch: Batch, predicate: AbstractExpression, catalog: Callable
) -> Batch:
if not batch.empty() and predicate is not None:
outcomes = predicate.evaluate(batch)
batch.drop_zero(outcomes)

# persist stats of function expression
for func_expr in predicate.find_all(FunctionExpression):
if func_expr.udf_obj and func_expr._stats:
udf_id = func_expr.udf_obj.row_id
catalog.upsert_udf_cost_catalog_entry(
udf_id, func_expr.udf_obj.name, func_expr._stats.prev_cost
)
_upsert_stats_related_to_function_expression(predicate, catalog)

return batch


Expand Down
6 changes: 2 additions & 4 deletions evadb/executor/hash_join_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
probe_batch.reassign_indices_to_hash(hash_keys)
join_batch = Batch.join(probe_batch, build_batch)
join_batch.reset_index()
join_batch = apply_predicate(join_batch, self.predicate, self.catalog())
join_batch = apply_project(
join_batch, self.join_project, self.catalog()
)
join_batch = apply_predicate(join_batch, self.predicate, self.catalog)
join_batch = apply_project(join_batch, self.join_project, self.catalog)
yield join_batch
4 changes: 2 additions & 2 deletions evadb/executor/lateral_join_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
result_batch = Batch.join(outer_batch, result_batch)
result_batch.reset_index()
result_batch = apply_predicate(
result_batch, self.predicate, self.catalog()
result_batch, self.predicate, self.catalog
)
result_batch = apply_project(
result_batch, self.join_project, self.catalog()
result_batch, self.join_project, self.catalog
)
if not result_batch.empty():
yield result_batch
2 changes: 1 addition & 1 deletion evadb/executor/nested_loop_join_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
result_batch = Batch.join(row1, row2)
result_batch.reset_index()
result_batch = apply_predicate(
result_batch, self.predicate, self.catalog()
result_batch, self.predicate, self.catalog
)
if not result_batch.empty():
yield result_batch
2 changes: 1 addition & 1 deletion evadb/executor/predicate_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ def __init__(self, db: EvaDBDatabase, node: PredicatePlan):
def exec(self, *args, **kwargs) -> Iterator[Batch]:
child_executor = self.children[0]
for batch in child_executor.exec(**kwargs):
batch = apply_predicate(batch, self.predicate, self.catalog())
batch = apply_predicate(batch, self.predicate, self.catalog)
if not batch.empty():
yield batch
2 changes: 1 addition & 1 deletion evadb/executor/project_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, db: EvaDBDatabase, node: ProjectPlan):
def exec(self, *args, **kwargs) -> Iterator[Batch]:
child_executor = self.children[0]
for batch in child_executor.exec(**kwargs):
batch = apply_project(batch, self.target_list, self.catalog())
batch = apply_project(batch, self.target_list, self.catalog)

if not batch.empty():
yield batch
4 changes: 2 additions & 2 deletions evadb/executor/seq_scan_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
batch.modify_column_alias(self.alias)

# We do the predicate first
batch = apply_predicate(batch, self.predicate, self.catalog())
batch = apply_predicate(batch, self.predicate, self.catalog)
# Then do project
batch = apply_project(batch, self.project_expr, self.catalog())
batch = apply_project(batch, self.project_expr, self.catalog)

if not batch.empty():
yield batch
3 changes: 3 additions & 0 deletions evadb/expression/arithmetic_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@ def __eq__(self, other):
if not isinstance(other, ArithmeticExpression):
return False
return is_subtree_equal and self.etype == other.etype

def __hash__(self) -> int:
return super().__hash__()
13 changes: 9 additions & 4 deletions evadb/models/storage/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,14 +341,19 @@ def combine_batches(
"""
Creates Batch by combining two batches using some arithmetic expression.
"""
assert (
len(first.columns) == 1 and len(second.columns) == 1
), "Arithmatic operations only supported on Batch with one column"
lvalues = first._frames.to_numpy()
rvalues = second._frames.to_numpy()
if expression == ExpressionType.ARITHMETIC_ADD:
return Batch(pd.DataFrame(first._frames + second._frames))
return Batch(pd.DataFrame(lvalues + rvalues))
elif expression == ExpressionType.ARITHMETIC_SUBTRACT:
return Batch(pd.DataFrame(first._frames - second._frames))
return Batch(pd.DataFrame(lvalues - rvalues))
elif expression == ExpressionType.ARITHMETIC_MULTIPLY:
return Batch(pd.DataFrame(first._frames * second._frames))
return Batch(pd.DataFrame(lvalues * rvalues))
elif expression == ExpressionType.ARITHMETIC_DIVIDE:
return Batch(pd.DataFrame(first._frames / second._frames))
return Batch(pd.DataFrame(lvalues / rvalues))

def reassign_indices_to_hash(self, indices) -> None:
"""
Expand Down
6 changes: 6 additions & 0 deletions evadb/optimizer/rules/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,9 @@ def apply(self, before: LogicalJoin, context: OptimizerContext):
yield tracker


# Vector Index Queries


class CombineSimilarityOrderByAndLimitToVectorIndexScan(Rule):
"""
This rule currently rewrites Order By + Limit to a vector index scan.
Expand Down Expand Up @@ -768,6 +771,7 @@ def apply(self, before: LogicalCreateUDF, context: OptimizerContext):
class LogicalCreateIndexToVectorIndex(Rule):
def __init__(self):
pattern = Pattern(OperatorType.LOGICALCREATEINDEX)
pattern.append_child(Pattern(OperatorType.DUMMY))
super().__init__(RuleType.LOGICAL_CREATE_INDEX_TO_VECTOR_INDEX, pattern)

def promise(self):
Expand All @@ -784,6 +788,8 @@ def apply(self, before: LogicalCreateIndex, context: OptimizerContext):
before.vector_store_type,
before.udf_func,
)
for child in before.children:
after.append_child(child)
yield after


Expand Down
Loading