Skip to content

Commit

Permalink
fix: native db bugs (#1223)
Browse files Browse the repository at this point in the history
1. `IF NOT EXISTS` support added for native databases.
2. Better error messaging for #1219

---------

Co-authored-by: Jiashen Cao <[email protected]>
Co-authored-by: Andy Xu <[email protected]>
  • Loading branch information
3 people authored Sep 27, 2023
1 parent 40966de commit ee3a489
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 49 deletions.
4 changes: 2 additions & 2 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ sphinx-remove-toctrees==0.0.3
sphinx_design==0.5.0
sphinx_multiversion>=0.2.4
sphinxcontrib-redoc==1.6.0
sphinxcontrib-youtube==1.4.1
sphinxcontrib-youtube==1.2.0
furo>=2022.4.7

setuptools>=41.0.1
Expand All @@ -37,4 +37,4 @@ jupytext==1.15.2
urllib3 < 2

# eva
git+https://github.com/georgia-tech-db/eva.git@master
git+https://github.com/georgia-tech-db/eva.git@staging
36 changes: 10 additions & 26 deletions evadb/binder/binder_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,35 +55,19 @@ def check_data_source_and_table_are_valid(
Validate the database is valid and the requested table in database is
also valid.
"""
db_catalog_entry = catalog.get_database_catalog_entry(database_name)

if db_catalog_entry is not None:
with get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
) as handler:
# Get table definition.
resp = handler.get_tables()

if resp.error is not None:
error = "There is no table in data source {}. Create the table using native query.".format(
database_name,
)
logger.error(error)
raise BinderError(error)

# Check table existence.
table_df = resp.data
if table_name not in table_df["table_name"].values:
error = "Table {} does not exist in data source {}. Create the table using native query.".format(
table_name,
database_name,
)
logger.error(error)
raise BinderError(error)
else:
error = None
if catalog.get_database_catalog_entry(database_name) is None:
error = "{} data source does not exist. Create the new database source using CREATE DATABASE.".format(
database_name,
)

if not catalog.check_table_exists(table_name, database_name):
error = "Table {} does not exist in data source {}. Create the table using native query.".format(
table_name,
database_name,
)

if error:
logger.error(error)
raise BinderError(error)

Expand Down
3 changes: 3 additions & 0 deletions evadb/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ def _bind_create_statement(self, node: CreateTableStatement):
node.query.target_list
)

# verify if the table to be created is valid.
# possible issues: the native database does not exists.

@bind.register(CreateIndexStatement)
def _bind_create_index_statement(self, node: CreateIndexStatement):
from evadb.binder.create_index_statement_binder import bind_create_index
Expand Down
41 changes: 35 additions & 6 deletions evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from evadb.parser.create_statement import ColumnDefinition
from evadb.parser.table_ref import TableInfo
from evadb.parser.types import FileFormatType
from evadb.third_party.databases.interface import get_database_handler
from evadb.utils.generic_utils import generate_file_path, get_file_checksum
from evadb.utils.logging_manager import logger

Expand Down Expand Up @@ -174,6 +175,32 @@ def drop_database_catalog_entry(self, database_entry: DatabaseCatalogEntry) -> b
# taken care by the underlying db
return self._db_catalog_service.delete_entry(database_entry)

def check_native_table_exists(self, table_name: str, database_name: str):
"""
Validate the database is valid and the requested table in database is
also valid.
"""
db_catalog_entry = self.get_database_catalog_entry(database_name)

if db_catalog_entry is None:
return False

with get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
) as handler:
# Get table definition.
resp = handler.get_tables()

if resp.error is not None:
return False

# Check table existence.
table_df = resp.data
if table_name not in table_df["table_name"].values:
return False

return True

"Table catalog services"

def insert_table_catalog_entry(
Expand Down Expand Up @@ -249,13 +276,15 @@ def rename_table_catalog_entry(
return self._table_catalog_service.rename_entry(curr_table, new_name.table_name)

def check_table_exists(self, table_name: str, database_name: str = None):
table_entry = self._table_catalog_service.get_entry_by_name(
database_name, table_name
)
if table_entry is None:
return False
is_native_table = database_name is not None

if is_native_table:
return self.check_native_table_exists(table_name, database_name)
else:
return True
table_entry = self._table_catalog_service.get_entry_by_name(
database_name, table_name
)
return table_entry is not None

def get_all_table_catalog_entries(self):
return self._table_catalog_service.get_all_entries()
Expand Down
10 changes: 4 additions & 6 deletions evadb/executor/create_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ def __init__(self, db: EvaDBDatabase, node: CreatePlan):
def exec(self, *args, **kwargs):
# create a table in the ative database if set
is_native_table = self.node.table_info.database_name is not None
check_if_exists = False
# if exists only supported for evadb tables
if not is_native_table:
check_if_exists = handle_if_not_exists(
self.catalog(), self.node.table_info, self.node.if_not_exists
)

check_if_exists = handle_if_not_exists(
self.catalog(), self.node.table_info, self.node.if_not_exists
)

if not check_if_exists:
create_table_done = False
Expand Down
20 changes: 11 additions & 9 deletions evadb/storage/native_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,17 @@ class NativeStorageEngine(AbstractStorageEngine):
def __init__(self, db: EvaDBDatabase):
super().__init__(db)

def _get_database_catalog_entry(self, database_name):
db_catalog_entry = self.db.catalog().get_database_catalog_entry(database_name)
if db_catalog_entry is None:
raise Exception(
f"Could not find database with name {database_name}. Please register the database using the `CREATE DATABASE` command."
)
return db_catalog_entry

def create(self, table: TableCatalogEntry):
try:
db_catalog_entry = self.db.catalog().get_database_catalog_entry(
table.database_name
)
db_catalog_entry = self._get_database_catalog_entry(table.database_name)
uri = None
with get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
Expand All @@ -122,9 +128,7 @@ def create(self, table: TableCatalogEntry):

def write(self, table: TableCatalogEntry, rows: Batch):
try:
db_catalog_entry = self.db.catalog().get_database_catalog_entry(
table.database_name
)
db_catalog_entry = self._get_database_catalog_entry(table.database_name)
with get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
) as handler:
Expand Down Expand Up @@ -156,9 +160,7 @@ def write(self, table: TableCatalogEntry, rows: Batch):

def read(self, table: TableCatalogEntry) -> Iterator[Batch]:
try:
db_catalog_entry = self.db.catalog().get_database_catalog_entry(
table.database_name
)
db_catalog_entry = self._get_database_catalog_entry(table.database_name)
with get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
) as handler:
Expand Down

0 comments on commit ee3a489

Please sign in to comment.