Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add column matching for function pipelining #1417

Open
wants to merge 15 commits into
base: staging
Choose a base branch
from
3 changes: 0 additions & 3 deletions evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,3 @@ def get_configuration_catalog_value(self, key: str, default: Any = None) -> Any:
if table_entry:
return table_entry.value
return default

def get_all_configuration_catalog_entries(self) -> List:
return self._config_catalog_service.get_all_entries()
1 change: 0 additions & 1 deletion evadb/catalog/catalog_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ class VectorStoreType(EvaDBEnum):
PINECONE # noqa: F821
PGVECTOR # noqa: F821
CHROMADB # noqa: F821
WEAVIATE # noqa: F821
MILVUS # noqa: F821


Expand Down
8 changes: 0 additions & 8 deletions evadb/evadb_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@
"evadb_installation_dir": "",
"datasets_dir": "",
"catalog_database_uri": "",
"index_dir": "",
"cache_dir": "",
"s3_download_dir": "",
"tmp_dir": "",
"function_dir": "",
"model_dir": "",
"application": "evadb",
"mode": "release",
"batch_mem_size": 30000000,
Expand All @@ -47,6 +41,4 @@
"MILVUS_PASSWORD": "",
"MILVUS_DB_NAME": "",
"MILVUS_TOKEN": "",
"WEAVIATE_API_KEY": "",
"WEAVIATE_API_URL": "",
}
11 changes: 0 additions & 11 deletions evadb/executor/executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,6 @@ def handle_vector_store_params(
),
"PINECONE_ENV": catalog().get_configuration_catalog_value("PINECONE_ENV"),
}
elif vector_store_type == VectorStoreType.WEAVIATE:
# Weaviate Configuration
# Weaviate API key and URL Can be obtained from cluster details on Weaviate Cloud Services (WCS) dashboard
return {
"WEAVIATE_API_KEY": catalog().get_configuration_catalog_value(
"WEAVIATE_API_KEY"
),
"WEAVIATE_API_URL": catalog().get_configuration_catalog_value(
"WEAVIATE_API_URL"
),
}
elif vector_store_type == VectorStoreType.MILVUS:
return {
"MILVUS_URI": catalog().get_configuration_catalog_value("MILVUS_URI"),
Expand Down
9 changes: 0 additions & 9 deletions evadb/executor/set_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.parser.set_statement import SetStatement

RESERVED_CONFIG_KEYWORDS = ["CONFIG", "CONFIGS"]


class SetExecutor(AbstractExecutor):
def __init__(self, db: EvaDBDatabase, node: SetStatement):
Expand All @@ -39,13 +37,6 @@ def exec(self, *args, **kwargs):
will be replaced
"""

if self.node.config_name in RESERVED_CONFIG_KEYWORDS:
raise Exception(
"{} is a reserved keyword for configurations. Please use a word other than the following list: {}".format(
self.node.config_name, RESERVED_CONFIG_KEYWORDS
)
)

self.catalog().upsert_configuration_catalog_entry(
key=self.node.config_name,
value=self.node.config_value.value,
Expand Down
25 changes: 9 additions & 16 deletions evadb/executor/show_info_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def exec(self, *args, **kwargs):
self.node.show_type is ShowType.FUNCTIONS
or ShowType.TABLES
or ShowType.DATABASES
or ShowType.CONFIGS
or ShowType.CONFIG
), f"Show command does not support type {self.node.show_type}"

if self.node.show_type is ShowType.FUNCTIONS:
Expand All @@ -50,23 +50,16 @@ def exec(self, *args, **kwargs):
databases = self.catalog().get_all_database_catalog_entries()
for db in databases:
show_entries.append(db.display_format())
elif self.node.show_type is ShowType.CONFIGS:
elif self.node.show_type is ShowType.CONFIG:
value = self.catalog().get_configuration_catalog_value(
key=self.node.show_val.upper(),
)
show_entries = {}
# CONFIGS is a special word, which is used to display all the configurations
if self.node.show_val.upper() == ShowType.CONFIGS.name:
configs = self.catalog().get_all_configuration_catalog_entries()
for config in configs:
show_entries[config.key] = config.value
if value is not None:
show_entries = {self.node.show_val: [value]}
else:
value = self.catalog().get_configuration_catalog_value(
key=self.node.show_val.upper(),
raise Exception(
"No configuration found with key {}".format(self.node.show_val)
)
show_entries = {}
if value is not None:
show_entries = {self.node.show_val: [value]}
else:
raise Exception(
"No configuration found with key {}".format(self.node.show_val)
)

yield Batch(pd.DataFrame(show_entries))
2 changes: 0 additions & 2 deletions evadb/expression/function_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ def evaluate(self, batch: Batch, **kwargs) -> Batch:
with self._stats.timer:
# apply the function and project the required columns
outcomes = self._apply_function_expression(func, batch, **kwargs)

# process outcomes only if output is not empty
if outcomes.frames.empty is False:
outcomes = outcomes.project(self.projection_columns)
Expand Down Expand Up @@ -183,7 +182,6 @@ def _apply_function_expression(self, func: Callable, batch: Batch, **kwargs):
func_args = Batch.merge_column_wise(
[child.evaluate(batch, **kwargs) for child in self.children]
)

if not self._cache:
return func_args.apply_function_expression(func)

Expand Down
3 changes: 0 additions & 3 deletions evadb/functions/function_bootstrap_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@
Upper_function_query = """CREATE FUNCTION IF NOT EXISTS UPPER
INPUT (input ANYTYPE)
OUTPUT (output NDARRAY STR(ANYDIM))
TYPE HelperFunction
IMPL '{}/functions/helpers/upper.py';
""".format(
EvaDB_INSTALLATION_DIR
Expand All @@ -226,7 +225,6 @@
Lower_function_query = """CREATE FUNCTION IF NOT EXISTS LOWER
INPUT (input ANYTYPE)
OUTPUT (output NDARRAY STR(ANYDIM))
TYPE HelperFunction
IMPL '{}/functions/helpers/lower.py';
""".format(
EvaDB_INSTALLATION_DIR
Expand All @@ -235,7 +233,6 @@
Concat_function_query = """CREATE FUNCTION IF NOT EXISTS CONCAT
INPUT (input ANYTYPE)
OUTPUT (output NDARRAY STR(ANYDIM))
TYPE HelperFunction
IMPL '{}/functions/helpers/concat.py';
""".format(
EvaDB_INSTALLATION_DIR
Expand Down
3 changes: 1 addition & 2 deletions evadb/interfaces/relational/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,7 @@ def create_vector_index(
index_name (str): Name of the index.
table_name (str): Name of the table.
expr (str): Expression used to build the vector index.

using (str): Method used for indexing, can be `FAISS` or `QDRANT` or `PINECONE` or `CHROMADB` or `WEAVIATE` or `MILVUS`.
using (str): Method used for indexing, can be `FAISS` or `QDRANT` or `PINECONE` or `CHROMADB` or `MILVUS`.

Returns:
EvaDBCursor: The EvaDBCursor object.
Expand Down
177 changes: 174 additions & 3 deletions evadb/models/storage/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from typing import Callable, Iterable, List, TypeVar, Union

import numpy as np
import pandas as pd

from evadb.catalog.catalog_type import NdArrayType
from evadb.expression.abstract_expression import ExpressionType
from evadb.parser.alias import Alias
from evadb.utils.generic_utils import PickleSerializer
Expand Down Expand Up @@ -170,7 +172,20 @@ def apply_function_expression(self, expr: Callable) -> Batch:
"""
Execute function expression on frames.
"""
self.drop_column_alias()
try:
if (
hasattr(expr, "forward")
and hasattr(expr.forward, "tags")
and (len(expr.forward.tags["input"]) != 0)
):
input_tags = expr.forward.tags["input"][0]
output_tags = expr.forward.tags["output"][0]
self.drop_column_alias(metadata=(input_tags, output_tags))
else:
self.drop_column_alias()
except (TypeError, KeyError):
self.drop_column_alias()

return Batch(expr(self._frames))

def iterrows(self):
Expand Down Expand Up @@ -433,16 +448,172 @@ def modify_column_alias(self, alias: Union[Alias, str]) -> None:

self._frames.columns = new_col_names

def drop_column_alias(self) -> None:
def drop_column_alias(self, metadata=None) -> None:
# table1.a, table1.b, table1.c -> a, b, c

new_col_names = []
for col_name in self.columns:
if isinstance(col_name, str) and "." in col_name:
new_col_names.append(col_name.split(".")[1])
else:
new_col_names.append(col_name)

# Iterate over each column in the dataframe
self._frames.columns = new_col_names
if metadata is not None and isinstance(metadata[0], pd.DataFrame):
input_meta, output_meta = metadata
defined_column_names = [entry for entry in input_meta.columns]
defined_column_types = [entry for entry in input_meta.column_types]
defined_column_shapes = [entry for entry in input_meta.column_shapes]
column_rename_map = {}

def is_shape_matching(data, expected_shape):
"""
Check if the shape of the data matches the expected shape..
"""

data_shape = data.shape
if len(data_shape) != len(expected_shape):
return False

for data_dim, expected_dim in zip(data_shape, expected_shape):
if expected_dim is not None and data_dim != expected_dim:
return False

return True

def get_basic_element(data):
# Check if the data is iterable (but not a string, as strings are also iterable)
if isinstance(data, Iterable) and not isinstance(data, (str, bytes)):
# If the data is empty, return None
if len(data) == 0:
return None
# Recursively get the first element
return get_basic_element(data[0])
else:
# If the data is not iterable, return it as is
return data

def deduce_and_map_type(element, check_type):
python_type_to_ndarray_type = {
int: NdArrayType.INT64, # Python's int is commonly mapped to NumPy's np.int64
float: NdArrayType.FLOAT64, # Python's float maps to np.float64
bool: NdArrayType.BOOL, # Python's bool maps to np.bool_
str: NdArrayType.STR, # Python's str maps to np.str_
bytes: NdArrayType.UINT8, # Python's bytes type maps to np.uint8 (common for byte data)
complex: NdArrayType.FLOAT64, # Python's complex type maps to np.float64 (real part)
datetime: NdArrayType.DATETIME, # datetime maps to np.datetime64
np.int8: NdArrayType.INT8,
np.uint8: NdArrayType.UINT8,
np.int16: NdArrayType.INT16,
np.int32: NdArrayType.INT32,
np.int64: NdArrayType.INT64,
np.float32: NdArrayType.FLOAT32,
np.float64: NdArrayType.FLOAT64,
np.unicode_: NdArrayType.UNICODE,
np.str_: NdArrayType.STR,
np.bool_: NdArrayType.BOOL,
np.datetime64: NdArrayType.DATETIME,
}
flexible_type_mapping = {
NdArrayType.INT8: [
NdArrayType.INT8,
NdArrayType.INT16,
NdArrayType.INT32,
NdArrayType.INT64,
NdArrayType.FLOAT32,
NdArrayType.FLOAT64,
],
NdArrayType.UINT8: [
NdArrayType.UINT8,
NdArrayType.INT8,
NdArrayType.INT16,
NdArrayType.INT32,
NdArrayType.INT64,
NdArrayType.FLOAT32,
NdArrayType.FLOAT64,
],
NdArrayType.INT16: [
NdArrayType.INT8,
NdArrayType.INT16,
NdArrayType.INT32,
NdArrayType.INT64,
NdArrayType.FLOAT32,
NdArrayType.FLOAT64,
],
NdArrayType.INT32: [
NdArrayType.INT8,
NdArrayType.INT16,
NdArrayType.INT32,
NdArrayType.INT64,
NdArrayType.FLOAT32,
NdArrayType.FLOAT64,
],
NdArrayType.INT64: [
NdArrayType.INT8,
NdArrayType.INT16,
NdArrayType.INT32,
NdArrayType.INT64,
NdArrayType.FLOAT32,
NdArrayType.FLOAT64,
],
NdArrayType.FLOAT32: [NdArrayType.FLOAT64, NdArrayType.FLOAT32],
NdArrayType.FLOAT64: [NdArrayType.FLOAT64, NdArrayType.FLOAT32],
}
element_type = type(element)
if isinstance(element, int):
return check_type in [
NdArrayType.INT16,
NdArrayType.INT32,
NdArrayType.INT64,
NdArrayType.FLOAT32,
NdArrayType.FLOAT64,
]
if isinstance(element, float):
return check_type in [NdArrayType.FLOAT32, NdArrayType.FLOAT64]

# Special handling for numpy types
if isinstance(element, np.generic):
element_type = np.dtype(type(element)).type
deduced_type = python_type_to_ndarray_type.get(element_type)
if deduced_type == check_type:
return True
if (
deduced_type is not None
and check_type in flexible_type_mapping[deduced_type]
):
return True
return False

for col_name in self.columns:
match = False
for i, def_name in enumerate(list(defined_column_names)):
# If the column name matches, keep it as is
if def_name == col_name:
column_rename_map[col_name] = col_name
defined_column_names.remove(col_name)
defined_column_types.pop(i)
defined_column_shapes.pop(i)
match = True
# if the column name doesnt match
if not match:
for i, def_name in enumerate(list(defined_column_names)):
# check if shape match
sample_data = self._frames[col_name].iloc[0]
if hasattr(sample_data, "shape") and is_shape_matching(
sample_data, defined_column_shapes[i]
):
basic_element = get_basic_element(sample_data)
if deduce_and_map_type(
basic_element, defined_column_types[i]
):
column_rename_map[col_name] = def_name
defined_column_names.remove(def_name)
defined_column_types.pop(i)
defined_column_shapes.pop(i)
break

# Rename columns in the dataframe
self._frames.rename(columns=column_rename_map, inplace=True)

def to_numpy(self):
return self._frames.to_numpy()
Expand Down
3 changes: 1 addition & 2 deletions evadb/parser/evadb.lark
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ function_metadata_key: uid

function_metadata_value: constant

vector_store_type: USING (FAISS | QDRANT | PINECONE | PGVECTOR | CHROMADB | WEAVIATE | MILVUS)
vector_store_type: USING (FAISS | QDRANT | PINECONE | PGVECTOR | CHROMADB | MILVUS)

index_elem: ("(" uid_list ")"
| "(" function_call ")")
Expand Down Expand Up @@ -448,7 +448,6 @@ QDRANT: "QDRANT"i
PINECONE: "PINECONE"i
PGVECTOR: "PGVECTOR"i
CHROMADB: "CHROMADB"i
WEAVIATE: "WEAVIATE"i
MILVUS: "MILVUS"i

// Computer vision tasks
Expand Down
Loading