Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create table in integration table from evadb select query #1125

Merged
merged 7 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions evadb/executor/create_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
# limitations under the License.
from evadb.database import EvaDBDatabase
from evadb.executor.abstract_executor import AbstractExecutor
from evadb.executor.executor_utils import handle_if_not_exists
from evadb.executor.executor_utils import (
create_table_catalog_entry_for_native_table,
handle_if_not_exists,
)
from evadb.plan_nodes.create_plan import CreatePlan
from evadb.storage.storage_engine import StorageEngine
from evadb.utils.logging_manager import logger
Expand All @@ -25,16 +28,29 @@ def __init__(self, db: EvaDBDatabase, node: CreatePlan):
super().__init__(db, node)

def exec(self, *args, **kwargs):
if not handle_if_not_exists(
self.catalog(), self.node.table_info, self.node.if_not_exists
):
# create a table in the ative database if set
is_native_table = self.node.table_info.database_name is not None
check_if_exists = False
# if exists only supported for evadb tables
if not is_native_table:
check_if_exists = handle_if_not_exists(
self.catalog(), self.node.table_info, self.node.if_not_exists
)

if not check_if_exists:
create_table_done = False
logger.debug(f"Creating table {self.node.table_info}")

catalog_entry = self.catalog().create_and_insert_table_catalog_entry(
self.node.table_info, self.node.column_list
)
if not is_native_table:
catalog_entry = self.catalog().create_and_insert_table_catalog_entry(
self.node.table_info, self.node.column_list
)
else:
catalog_entry = create_table_catalog_entry_for_native_table(
self.node.table_info, self.node.column_list
)
storage_engine = StorageEngine.factory(self.db, catalog_entry)

try:
storage_engine.create(table=catalog_entry)
create_table_done = True
Expand Down
2 changes: 1 addition & 1 deletion evadb/executor/use_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ def exec(self, *args, **kwargs) -> Iterator[Batch]:
resp = handler.execute_native_query(self._query_string)

if resp and resp.error is None:
return Batch(resp.data)
yield Batch(resp.data)
else:
raise ExecutorError(resp.error)
156 changes: 143 additions & 13 deletions evadb/storage/native_storage_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,146 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterator
from typing import Iterator, List

import numpy as np
import pandas as pd
from sqlalchemy import Column, MetaData, Table, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from evadb.catalog.catalog_type import ColumnType
from evadb.catalog.models.table_catalog import TableCatalogEntry
from evadb.catalog.models.utils import ColumnCatalogEntry
from evadb.catalog.schema_utils import SchemaUtils
from evadb.database import EvaDBDatabase
from evadb.models.storage.batch import Batch
from evadb.storage.abstract_storage_engine import AbstractStorageEngine
from evadb.third_party.databases.interface import get_database_handler
from evadb.utils.generic_utils import PickleSerializer
from evadb.utils.logging_manager import logger


# Define a function to create a table
def create_table(uri: str, table_name: str, columns: dict):
"""
Create a table in the database using sqlalchmey.

Parameters:
uri (str): the sqlalchmey uri to connect to the database
table_name (str): The name of the table to create.
columns (dict): A dictionary where keys are column names and values are column types.
"""

# Create a Base class for declarative models
Base = declarative_base()

attr_dict = {"__tablename__": table_name}

# make the first column primary otherwise sqlalchmey complains
column_name, column_type = columns.popitem()
attr_dict.update({column_name: Column(column_type.type, primary_key=True)})

attr_dict.update(columns)

# dynamic schema generation
# https://sparrigan.github.io/sql/sqla/2016/01/03/dynamic-tables.html
_ = type(f"__placeholder_class_name__{table_name}", (Base,), attr_dict)()

# Create a database engine (SQLite in this example)
engine = create_engine(uri)

# Create a session
Session = sessionmaker(bind=engine)
session = Session()
# Create the table in the database
Base.metadata.create_all(engine)

# Commit the changes to the database and close the session
session.commit()
session.close()


def _dict_to_sql_row(dict_row: dict, columns: List[ColumnCatalogEntry]):
# Serialize numpy data
for col in columns:
if col.type == ColumnType.NDARRAY:
dict_row[col.name] = PickleSerializer.serialize(dict_row[col.name])
elif isinstance(dict_row[col.name], (np.generic,)):
# Sqlalchemy does not consume numpy generic data types
# convert numpy datatype to python generic datatype using tolist()
# eg. np.int64 -> int
# https://stackoverflow.com/a/53067954
dict_row[col.name] = dict_row[col.name].tolist()
return dict_row


def _deserialize_sql_row(sql_row: tuple, columns: List[ColumnCatalogEntry]):
# Deserialize numpy data
dict_row = {}
for idx, col in enumerate(columns):
# hack, we skip deserializing if sql_row[col.name] is not of type bytes
if col.type == ColumnType.NDARRAY and isinstance(sql_row[col.name], bytes):
dict_row[col.name] = PickleSerializer.deserialize(sql_row[idx])
else:
dict_row[col.name] = sql_row[idx]
return dict_row


class NativeStorageEngine(AbstractStorageEngine):
def __init__(self, db: EvaDBDatabase):
super().__init__(db)

def create(self, table: TableCatalogEntry):
try:
db_catalog_entry = self.db.catalog().get_database_catalog_entry(
table.database_name
)
uri = None
with get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
) as handler:
uri = handler.get_sqlalchmey_uri()
sqlalchemy_schema = SchemaUtils.xform_to_sqlalchemy_schema(table.columns)
create_table(uri, table.name, sqlalchemy_schema)
except Exception as e:
err_msg = f"Failed to create the table {table.name} in data source {table.database_name} with exception {str(e)}"
logger.exception(err_msg)
raise Exception(err_msg)

def write(self, table: TableCatalogEntry, rows: Batch):
pass
try:
db_catalog_entry = self.db.catalog().get_database_catalog_entry(
table.database_name
)
with get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
) as handler:
uri = handler.get_sqlalchmey_uri()

# Create a metadata object
engine = create_engine(uri)
metadata = MetaData()

# Retrieve the SQLAlchemy table object for the existing table
table_to_update = Table(table.name, metadata, autoload_with=engine)
columns = rows.frames.keys()
data = []
# Todo: validate the data type before inserting into the table
for record in rows.frames.values:
row_data = {col: record[idx] for idx, col in enumerate(columns)}
data.append(_dict_to_sql_row(row_data, table.columns))

Session = sessionmaker(bind=engine)
session = Session()
session.execute(table_to_update.insert(), data)
session.commit()
session.close()

except Exception as e:
err_msg = f"Failed to write to the table {table.name} in data source {table.database_name} with exception {str(e)}"
logger.exception(err_msg)
raise Exception(err_msg)

def read(self, table: TableCatalogEntry) -> Iterator[Batch]:
try:
Expand All @@ -39,18 +161,26 @@ def read(self, table: TableCatalogEntry) -> Iterator[Batch]:
with get_database_handler(
db_catalog_entry.engine, **db_catalog_entry.params
) as handler:
data_df = handler.execute_native_query(
f"SELECT * FROM {table.name}"
).data

# Handling case-sensitive databases like SQLite can be tricky.
# Currently, EvaDB converts all columns to lowercase, which may result
# in issues with these databases. As we move forward, we are actively
# working on improving this aspect within Binder. For more information,
# please refer to https://github.com/georgia-tech-db/evadb/issues/1079.
data_df.columns = data_df.columns.str.lower()
yield Batch(pd.DataFrame(data_df))
uri = handler.get_sqlalchmey_uri()

# Create a metadata object
engine = create_engine(uri)
metadata = MetaData()

Session = sessionmaker(bind=engine)
session = Session()
# Retrieve the SQLAlchemy table object for the existing table
table_to_read = Table(table.name, metadata, autoload_with=engine)
result = session.execute(table_to_read.select()).fetchall()
data_batch = []
# todo check if the column order is consistent
for row in result:
data_batch.append(_deserialize_sql_row(row, table.columns))

if data_batch:
yield Batch(pd.DataFrame(data_batch))

session.close()
except Exception as e:
err_msg = f"Failed to read the table {table.name} in data source {table.database_name} with exception {str(e)}"
logger.exception(err_msg)
Expand Down
47 changes: 47 additions & 0 deletions evadb/third_party/databases/sqlite/sqlite_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import sqlite3

import pandas as pd
Expand Down Expand Up @@ -105,6 +106,10 @@ def get_columns(self, table_name: str) -> DBHandlerResponse:
pragma_df = pd.read_sql_query(query, self.connection)
columns_df = pragma_df[["name", "type"]].copy()
columns_df.rename(columns={"type": "dtype"}, inplace=True)
columns_df["dtype"] = columns_df["dtype"].apply(
self._sqlite_to_python_types
)

return DBHandlerResponse(data=columns_df)
except sqlite3.Error as e:
return DBHandlerResponse(data=None, error=str(e))
Expand Down Expand Up @@ -146,3 +151,45 @@ def execute_native_query(self, query_string: str) -> DBHandlerResponse:
return DBHandlerResponse(data=self._fetch_results_as_df(cursor))
except sqlite3.Error as e:
return DBHandlerResponse(data=None, error=str(e))

def _sqlite_to_python_types(self, sqlite_type: str):
xzdandy marked this conversation as resolved.
Show resolved Hide resolved
mapping = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably already explained once, but why don't we directly map to ColumnType?

And, how do we handle types that are not part of our ColumnType (e.g., DATETIME)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Don't want to expose EvaDB types to third-party integrations.
  2. We don't support it right now.

"INT": int,
"INTEGER": int,
"TINYINT": int,
"SMALLINT": int,
"MEDIUMINT": int,
"BIGINT": int,
"UNSIGNED BIG INT": int,
"INT2": int,
"INT8": int,
"CHARACTER": str,
"VARCHAR": str,
"VARYING CHARACTER": str,
"NCHAR": str,
"NATIVE CHARACTER": str,
"NVARCHAR": str,
"TEXT": str,
"CLOB": str,
"BLOB": bytes,
"REAL": float,
"DOUBLE": float,
"DOUBLE PRECISION": float,
"FLOAT": float,
"NUMERIC": float,
"DECIMAL": float,
"BOOLEAN": bool,
"DATE": datetime.date,
"DATETIME": datetime.datetime,
}

def preprocess_sqlite_type(sqlite_type):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the format of sqlite_type passed in?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't need a separate local function anymore. I can directly call the split on the input of the outer function.

return sqlite_type.split("(")[0].strip().upper()

sqlite_type = preprocess_sqlite_type(sqlite_type)
if sqlite_type in mapping:
return mapping[sqlite_type]
else:
raise Exception(
f"Unsupported column {sqlite_type} encountered in the sqlite table. Please raise a feature request!"
)
28 changes: 27 additions & 1 deletion test/third_party_tests/test_native_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,27 @@ def _create_evadb_table_using_select_query(self):
"DROP TABLE IF EXISTS eva_table;",
)

def _create_native_table_using_select_query(self):
execute_query_fetch_all(
self.evadb,
"""CREATE TABLE test_data_source.derived_table AS SELECT name, age FROM test_data_source.test_table;""",
)
res_batch = execute_query_fetch_all(
self.evadb,
"SELECT * FROM test_data_source.derived_table",
)
self.assertEqual(len(res_batch), 2)
self.assertEqual(res_batch.frames["derived_table.name"][0], "aa")
self.assertEqual(res_batch.frames["derived_table.age"][0], 1)
self.assertEqual(res_batch.frames["derived_table.name"][1], "bb")
self.assertEqual(res_batch.frames["derived_table.age"][1], 2)
execute_query_fetch_all(
self.evadb,
"""USE test_data_source {
DROP TABLE IF EXISTS derived_table
}""",
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the clean up code? If so, shall we put it in the cls teardown?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. The test class is getting complicated and needs to be simplified.


def _execute_evadb_query(self):
self._create_table_in_native_database()
self._insert_value_into_native_database("aa", 1, "aaaa")
Expand All @@ -99,6 +120,7 @@ def _execute_evadb_query(self):
self.assertEqual(res_batch.frames["test_table.age"][1], 2)

self._create_evadb_table_using_select_query()
self._create_native_table_using_select_query()
self._drop_table_in_native_database()

def _execute_native_query(self):
Expand Down Expand Up @@ -187,8 +209,12 @@ def test_should_run_query_in_mariadb(self):

def test_should_run_query_in_sqlite(self):
# Create database.
import os

current_file_dir = os.path.dirname(os.path.abspath(__file__))

params = {
"database": "evadb.db",
"database": f"{current_file_dir}/evadb.db",
}
query = f"""CREATE DATABASE test_data_source
WITH ENGINE = "sqlite",
Expand Down