diff --git a/evadb/binder/binder_utils.py b/evadb/binder/binder_utils.py index bb1b36edb..bab97a9ca 100644 --- a/evadb/binder/binder_utils.py +++ b/evadb/binder/binder_utils.py @@ -48,6 +48,18 @@ class BinderError(Exception): pass +def check_data_source_is_valid(catalog: CatalogManager, database_name: str): + db_catalog_entry = catalog.get_database_catalog_entry(database_name) + + if db_catalog_entry is None: + error = "{} data source does not exist. Create the new database source using CREATE DATABASE.".format( + database_name, + ) + logger.error(error) + raise BinderError(error) + return True + + def check_data_source_and_table_are_valid( catalog: CatalogManager, database_name: str, table_name: str ): @@ -90,7 +102,7 @@ def check_data_source_and_table_are_valid( def create_table_catalog_entry_for_data_source( - table_name: str, column_info: pd.DataFrame + table_name: str, database_name: str, column_info: pd.DataFrame ): column_name_list = list(column_info["name"]) column_type_list = [ @@ -99,7 +111,7 @@ def create_table_catalog_entry_for_data_source( ] column_list = [] for name, dtype in zip(column_name_list, column_type_list): - column_list.append(ColumnCatalogEntry(name, dtype)) + column_list.append(ColumnCatalogEntry(name.lower(), dtype)) # Assemble table. table_catalog_entry = TableCatalogEntry( @@ -107,6 +119,7 @@ def create_table_catalog_entry_for_data_source( file_url=None, table_type=TableType.NATIVE_DATA, columns=column_list, + database_name=database_name, ) return table_catalog_entry @@ -140,7 +153,7 @@ def bind_native_table_info(catalog: CatalogManager, table_info: TableInfo): # Assemble columns. column_df = handler.get_columns(table_info.table_name).data table_info.table_obj = create_table_catalog_entry_for_data_source( - table_info.table_name, column_df + table_info.table_name, table_info.database_name, column_df ) @@ -339,7 +352,7 @@ def get_column_definition_from_select_target_list( for col_name, output_obj in output_objs: binded_col_list.append( ColumnDefinition( - col_name, + col_name.lower(), output_obj.type, output_obj.array_type, output_obj.array_dimensions, diff --git a/evadb/binder/statement_binder.py b/evadb/binder/statement_binder.py index 002ea0cfb..d067a6e56 100644 --- a/evadb/binder/statement_binder.py +++ b/evadb/binder/statement_binder.py @@ -20,6 +20,7 @@ BinderError, bind_table_info, check_column_name_is_string, + check_data_source_is_valid, check_groupby_pattern, check_table_object_is_groupable, drop_row_id_from_target_list, diff --git a/evadb/binder/statement_binder_context.py b/evadb/binder/statement_binder_context.py index b1101a2b3..32dc12c7d 100644 --- a/evadb/binder/statement_binder_context.py +++ b/evadb/binder/statement_binder_context.py @@ -139,6 +139,9 @@ def get_binded_column( A tuple of alias and column object """ + # binder is case insensitive + col_name = col_name.lower() + def raise_error(): err_msg = f"Found invalid column {col_name}" logger.error(err_msg) diff --git a/evadb/catalog/models/utils.py b/evadb/catalog/models/utils.py index 30773af12..cdcd6c8ec 100644 --- a/evadb/catalog/models/utils.py +++ b/evadb/catalog/models/utils.py @@ -137,6 +137,7 @@ class TableCatalogEntry: identifier_column: str = "id" columns: List[ColumnCatalogEntry] = field(compare=False, default_factory=list) row_id: int = None + database_name: str = "EvaDB" @dataclass(unsafe_hash=True) diff --git a/evadb/executor/create_executor.py b/evadb/executor/create_executor.py index 39541d971..ab59be6e0 100644 --- a/evadb/executor/create_executor.py +++ b/evadb/executor/create_executor.py @@ -12,9 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from evadb.binder.binder_utils import create_table_catalog_entry_for_data_source from evadb.database import EvaDBDatabase from evadb.executor.abstract_executor import AbstractExecutor -from evadb.executor.executor_utils import handle_if_not_exists +from evadb.executor.executor_utils import ( + create_table_catalog_entry_for_native_table, + handle_if_not_exists, +) from evadb.plan_nodes.create_plan import CreatePlan from evadb.storage.storage_engine import StorageEngine from evadb.utils.logging_manager import logger @@ -25,15 +29,27 @@ def __init__(self, db: EvaDBDatabase, node: CreatePlan): super().__init__(db, node) def exec(self, *args, **kwargs): - if not handle_if_not_exists( - self.catalog(), self.node.table_info, self.node.if_not_exists - ): + # create a table in the ative database if set + is_native_table = self.node.table_info.database_name is not None + check_if_exists = False + # if exists only supported for evadb tables + if not is_native_table: + check_if_exists = handle_if_not_exists( + self.catalog(), self.node.table_info, self.node.if_not_exists + ) + + if not check_if_exists: create_table_done = False logger.debug(f"Creating table {self.node.table_info}") - catalog_entry = self.catalog().create_and_insert_table_catalog_entry( - self.node.table_info, self.node.column_list - ) + if not is_native_table: + catalog_entry = self.catalog().create_and_insert_table_catalog_entry( + self.node.table_info, self.node.column_list + ) + else: + catalog_entry = create_table_catalog_entry_for_native_table( + self.node.table_info, self.node.column_list + ) storage_engine = StorageEngine.factory(self.db, catalog_entry) try: storage_engine.create(table=catalog_entry) diff --git a/evadb/executor/executor_utils.py b/evadb/executor/executor_utils.py index 89cf1e473..cc4471dfe 100644 --- a/evadb/executor/executor_utils.py +++ b/evadb/executor/executor_utils.py @@ -17,10 +17,14 @@ from pathlib import Path from typing import TYPE_CHECKING, Generator, List +from evadb.catalog.catalog_utils import xform_column_definitions_to_catalog_entries +from evadb.catalog.models.utils import ColumnCatalogEntry, TableCatalogEntry +from evadb.parser.create_statement import ColumnDefinition + if TYPE_CHECKING: from evadb.catalog.catalog_manager import CatalogManager -from evadb.catalog.catalog_type import VectorStoreType +from evadb.catalog.catalog_type import TableType, VectorStoreType from evadb.expression.abstract_expression import AbstractExpression from evadb.expression.function_expression import FunctionExpression from evadb.models.storage.batch import Batch @@ -169,3 +173,19 @@ def handle_vector_store_params( return {"index_db": str(Path(index_path).parent)} else: raise ValueError("Unsupported vector store type: {}".format(vector_store_type)) + + +def create_table_catalog_entry_for_native_table( + table_info: TableInfo, column_list: List[ColumnDefinition] +): + column_catalog_entires = xform_column_definitions_to_catalog_entries(column_list) + + # Assemble table. + table_catalog_entry = TableCatalogEntry( + name=table_info.table_name, + file_url=None, + table_type=TableType.NATIVE_DATA, + columns=column_catalog_entires, + database_name=table_info.database_name, + ) + return table_catalog_entry diff --git a/evadb/storage/native_storage_engine.py b/evadb/storage/native_storage_engine.py index d56557ed9..5130a8e70 100644 --- a/evadb/storage/native_storage_engine.py +++ b/evadb/storage/native_storage_engine.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from contextlib import contextmanager from typing import Iterator import pandas as pd @@ -20,7 +21,10 @@ from evadb.database import EvaDBDatabase from evadb.models.storage.batch import Batch from evadb.storage.abstract_storage_engine import AbstractStorageEngine -from evadb.third_party.databases.interface import get_database_handler +from evadb.third_party.databases.interface import ( + get_database_connection, + get_database_handler, +) from evadb.utils.logging_manager import logger @@ -29,25 +33,41 @@ def __init__(self, db: EvaDBDatabase): super().__init__(db) def create(self, table: TableCatalogEntry): - pass + db_catalog_entry = self.db.catalog().get_database_catalog_entry( + table.database_name + ) + with get_database_handler( + db_catalog_entry.engine, **db_catalog_entry.params + ) as handler: + col_definitions = + handler.execute_native_query( + f"CREATE TABLE {table.name} ({col_definitions})" + ) def write(self, table: TableCatalogEntry, rows: Batch): pass - def read(self, database_name: str, table: TableCatalogEntry) -> Iterator[Batch]: + def read(self, table: TableCatalogEntry) -> Iterator[Batch]: try: db_catalog_entry = self.db.catalog().get_database_catalog_entry( - database_name + table.database_name ) - handler = get_database_handler( + with get_database_handler( db_catalog_entry.engine, **db_catalog_entry.params - ) - handler.connect() + ) as handler: + data_df = handler.execute_native_query( + f"SELECT * FROM {table.name}" + ).data - data_df = handler.execute_native_query(f"SELECT * FROM {table.name}").data - yield Batch(pd.DataFrame(data_df)) + # Handling case-sensitive databases like SQLite can be tricky. + # Currently, EvaDB converts all columns to lowercase, which may result + # in issues with these databases. As we move forward, we are actively + # working on improving this aspect within Binder. For more information, + # please refer to https://github.com/georgia-tech-db/evadb/issues/1079. + data_df.columns = data_df.columns.str.lower() + yield Batch(pd.DataFrame(data_df)) except Exception as e: - err_msg = f"Failed to read the table {table.name} in data source {database_name} with exception {str(e)}" + err_msg = f"Failed to read the table {table.name} in data source {table.database_name} with exception {str(e)}" logger.exception(err_msg) raise Exception(err_msg) diff --git a/evadb/third_party/databases/interface.py b/evadb/third_party/databases/interface.py index f49403b92..f58e5dc48 100644 --- a/evadb/third_party/databases/interface.py +++ b/evadb/third_party/databases/interface.py @@ -14,9 +14,10 @@ # limitations under the License. import importlib import os +from contextlib import contextmanager -def get_database_handler(engine: str, **kwargs): +def _get_database_handler(engine: str, **kwargs): """ Return the database handler. User should modify this function for their new integrated handlers. @@ -41,6 +42,16 @@ def get_database_handler(engine: str, **kwargs): raise NotImplementedError(f"Engine {engine} is not supported") +@contextmanager +def get_database_handler(engine: str, **kwargs): + handler = _get_database_handler(engine, **kwargs) + handler.connect() + try: + yield handler + finally: + handler.disconnect() + + def dynamic_import(handler_dir): import_path = f"evadb.third_party.databases.{handler_dir}.{handler_dir}_handler" return importlib.import_module(import_path) diff --git a/evadb/third_party/databases/sqlite/sqlite_handler.py b/evadb/third_party/databases/sqlite/sqlite_handler.py index 204db36d2..7256280ad 100644 --- a/evadb/third_party/databases/sqlite/sqlite_handler.py +++ b/evadb/third_party/databases/sqlite/sqlite_handler.py @@ -108,10 +108,16 @@ def get_columns(self, table_name: str) -> DBHandlerResponse: def _fetch_results_as_df(self, cursor): try: + # Handling case-sensitive databases like SQLite can be tricky. Currently, + # EvaDB converts all columns to lowercase, which may result in issues with + # these databases. As we move forward, we are actively working on improving + # this aspect within Binder. + # For more information, please refer to https://github.com/georgia-tech-db/evadb/issues/1079. + res = cursor.fetchall() res_df = pd.DataFrame( res, - columns=[desc[0] for desc in cursor.description] + columns=[desc[0].lower() for desc in cursor.description] if cursor.description else [], ) diff --git a/test/third_party_tests/test_native_executor.py b/test/third_party_tests/test_native_executor.py index 7259f4ef0..834f43492 100644 --- a/test/third_party_tests/test_native_executor.py +++ b/test/third_party_tests/test_native_executor.py @@ -38,7 +38,7 @@ def _create_table_in_native_database(self): """USE test_data_source { CREATE TABLE test_table ( name VARCHAR(10), - age INT, + Age INT, comment VARCHAR (100) ) }""", @@ -49,7 +49,7 @@ def _insert_value_into_native_database(self, col1, col2, col3): self.evadb, f"""USE test_data_source {{ INSERT INTO test_table ( - name, age, comment + name, Age, comment ) VALUES ( '{col1}', {col2}, '{col3}' ) @@ -67,7 +67,7 @@ def _drop_table_in_native_database(self): def _create_evadb_table_using_select_query(self): execute_query_fetch_all( self.evadb, - """CREATE TABLE eva_table AS SELECT name, age FROM test_data_source.test_table;""", + """CREATE TABLE eva_table AS SELECT name, Age FROM test_data_source.test_table;""", ) # check if the create table is successful