Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring/#221 fixed mypy warnings #225

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/changes_0.1.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ Code name:
* #203: Cleaned-up package names and directory structure
* #217: Rename dataflow abstraction files
* #219: Applied PTB checks and fixes
* #221: Fixed mypy warnings
Empty file added exasol/analytics/py.typed
Empty file.
6 changes: 1 addition & 5 deletions exasol/analytics/query_handler/context/connection_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
class ConnectionName(DBObjectName):
"""A DBObjectName class which represents the name of a connection object"""

@typechecked
def __init__(self, connection_name: str):
super().__init__(connection_name.upper())


class ConnectionNameImpl(DBObjectNameImpl, ConnectionName):

Expand All @@ -19,4 +15,4 @@ def fully_qualified(self) -> str:

@typechecked
def __init__(self, connection_name: str):
super().__init__(connection_name)
super().__init__(connection_name.upper())
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Generic, TypeVar
from typing import Generic, Optional, TypeVar

from exasol.analytics.query_handler.context.proxy.db_object_name_proxy import (
DBObjectNameProxy,
Expand All @@ -15,6 +15,6 @@ def __init__(self, db_object_name_with_schema: NameType, global_counter_value: i
super().__init__(db_object_name_with_schema, global_counter_value)

@property
def schema_name(self) -> SchemaName:
def schema_name(self) -> Optional[SchemaName]:
self._check_if_released()
return self._db_object_name.schema_name
1 change: 1 addition & 0 deletions exasol/analytics/query_handler/context/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,6 @@ def transfer_object_to(
"""
pass

@abstractmethod
def get_connection(self, name: str) -> Connection:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_aaf_query_loop_lua_script_generator() -> ExasolLuaScriptGenerator:
]
jinja_template_location = JinjaTemplateLocation(
package_name=constants.BASE_PACKAGE,
package_path=constants.TEMPLATES_DIR,
package_path=str(constants.TEMPLATES_DIR),
template_file_name=constants.LUA_SCRIPT_TEMPLATE,
)
generator = ExasolLuaScriptGenerator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@ def __init__(

def bundle_lua_scripts(self, output_buffer: IO):
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_dir = Path(tmp_dir)
self.copy_lua_source_files(tmp_dir)
self.run_lua_amlg(tmp_dir, output_buffer)
tmp_path = Path(tmp_dir)
self.copy_lua_source_files(tmp_path)
self.run_lua_amlg(tmp_path, output_buffer)

def copy_lua_source_files(self, tmp_dir: Path):
for src in self.lua_source_files + [self.lua_main_file]:
dst = tmp_dir / src.name
logger.debug(f"Copy {src} to {tmp_dir}")
shutil.copy(src, dst)
shutil.copy(str(src), dst)

def run_lua_amlg(self, tmp_dir: Path, output_buffer: IO):
output_file = tmp_dir / f"bundle_{time.time()}.lua"
bash_command = "amalg.lua -o {out_path} -s {main_file} {modules}".format(
tmp_dir=tmp_dir,
out_path=output_file,
main_file=self.lua_main_file.name,
modules=" ".join(self.lua_modules),
Expand Down
2 changes: 1 addition & 1 deletion exasol/analytics/query_handler/deployment/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_password(pwd: str, user: str, env_var: str, descr: str) -> str:

def load_and_render_statement(template_name, **kwargs) -> str:
env = Environment(
loader=PackageLoader(constants.BASE_PACKAGE, constants.TEMPLATES_DIR),
loader=PackageLoader(constants.BASE_PACKAGE, str(constants.TEMPLATES_DIR)),
autoescape=select_autoescape(),
)
template = env.get_template(template_name)
Expand Down
10 changes: 5 additions & 5 deletions exasol/analytics/query_handler/graph/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ def __new__(cls, name: str, bases: Tuple[Type, ...], attrs: Any):
"""
result_type = type(name, bases, attrs)

def _configured_new(cls: Type[cls]):
def _configured_new(cls: Type[_T]):
"""This function is called for subclasses of classes that declare _Meta as their metaclass."""
return _new(cls, result_type)

result_type.__new__ = _configured_new
result_type.__init__ = _init
result_type.__setattr__ = _setattr
result_type.__delattr__ = _delattr
result_type.__new__ = _configured_new # type: ignore
ckunki marked this conversation as resolved.
Show resolved Hide resolved
result_type.__init__ = _init # type: ignore
result_type.__setattr__ = _setattr # type: ignore
result_type.__delattr__ = _delattr # type: ignore
return result_type


Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import dataclasses

from exasol_bucketfs_utils_python.abstract_bucketfs_location import (
AbstractBucketFSLocation,
)
import exasol.bucketfs as bfs

from exasol.analytics.query_handler.graph.stage.sql.input_output import (
SQLStageInputOutput,
Expand All @@ -13,5 +11,5 @@
@dataclasses.dataclass(frozen=True, eq=True)
class SQLStageGraphExecutionInput:
input: SQLStageInputOutput
result_bucketfs_location: AbstractBucketFSLocation
result_bucketfs_location: bfs.path.PathLike
sql_stage_graph: SQLStageGraph
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from exasol.analytics.query_handler.query_handler import QueryHandler
from exasol.analytics.query_handler.result import Continue, Finish
from exasol.analytics.utils.errors import UninitializedAttributeError


class ResultHandlerReturnValue(enum.Enum):
Expand Down Expand Up @@ -55,23 +56,38 @@ def __init__(
self._current_query_handler: Optional[
QueryHandler[List[SQLStageInputOutput], SQLStageInputOutput]
] = None
self._current_query_handler_context: Optional[ScopeQueryHandlerContext] = None
self._current_qh_context: Optional[ScopeQueryHandlerContext] = None
self._create_current_query_handler()

def _check_is_valid(self):
if self._current_query_handler is None:
raise RuntimeError("No current query handler set.")

def get_current_query_handler(
self,
) -> QueryHandler[List[SQLStageInputOutput], SQLStageInputOutput]:
self._check_is_valid()
return self._current_query_handler
value = self._current_query_handler
if value is None:
raise RuntimeError("No current query handler set.")
return value

@property
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Property method current_query_handler_context checks the private attribute _current_query_handler_context and raises an exception when it is None to avoid code duplication.

Questions:

  • Q1) The names of the private attribute and the property differ on in the prefix _ and potentially could be mixed up. Should we use different names that can easier be distinguished?
  • Q2) The property is public. Should it be private?

Copy link
Collaborator

@tkilias tkilias Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the original is private, the new one needs to be private, too. This means it needs another name. Good question, what the name should be. The same is true for current_stage property. Thinking about a prefix like _checked_*.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I like the proposal _checked_*.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now used

  • @property _checked_current_qh_context(self)
  • attribute self._current_qh_context

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See last push

def _checked_current_qh_context(self) -> ScopeQueryHandlerContext:
value = self._current_qh_context
if value is None:
raise UninitializedAttributeError(
"Current query handler context is undefined."
)
return value

@property
def _checked_current_stage(self) -> SQLStage:
value = self._current_stage
if value is None:
raise UninitializedAttributeError("Current stage is None.")
return value

def handle_result(
self, result: Union[Continue, Finish[SQLStageInputOutput]]
) -> ResultHandlerReturnValue:
self._check_is_valid()
# check if current query handler is set
self.get_current_query_handler()
if isinstance(result, Finish):
return self._handle_finished_result(result)
elif isinstance(result, Continue):
Expand All @@ -90,7 +106,7 @@ def _handle_finished_result(
return self._try_to_move_to_next_stage()

def _try_to_move_to_next_stage(self) -> ResultHandlerReturnValue:
self._current_query_handler_context.release()
self._checked_current_qh_context.release()
if self._is_not_last_stage():
self._move_to_next_stage()
return ResultHandlerReturnValue.CONTINUE_PROCESSING
Expand All @@ -101,7 +117,7 @@ def _try_to_move_to_next_stage(self) -> ResultHandlerReturnValue:
def invalidate(self):
self._current_stage = None
self._current_query_handler = None
self._current_query_handler_context = None
self._current_qh_context = None

def _is_not_last_stage(self):
return self._current_stage_index < len(self._stages_in_execution_order) - 1
Expand All @@ -113,7 +129,7 @@ def _move_to_next_stage(self):

def _create_current_query_handler(self):
stage_inputs = self._stage_inputs_map[self._current_stage]
self._current_query_handler_context = (
self._current_qh_context = (
self._query_handler_context.get_child_query_handler_context()
)
result_bucketfs_location = self._result_bucketfs_location.joinpath(
Expand All @@ -123,12 +139,12 @@ def _create_current_query_handler(self):
result_bucketfs_location=result_bucketfs_location,
sql_stage_inputs=stage_inputs,
)
self._current_query_handler = self._current_stage.create_train_query_handler(
stage_input, self._current_query_handler_context
self._current_query_handler = self._checked_current_stage.create_train_query_handler(
stage_input, self._current_qh_context
)

def _add_result_to_successors(self, result: SQLStageInputOutput):
successors = self._sql_stage_graph.successors(self._current_stage)
successors = self._sql_stage_graph.successors(self._checked_current_stage)
if len(successors) == 0:
raise RuntimeError("Programming error")
self._add_result_to_inputs_of_successors(result, successors)
Expand All @@ -146,7 +162,7 @@ def _add_result_to_reference_counting_bag(
object_proxies = find_object_proxies(result)
for object_proxy in object_proxies:
if object_proxy not in self._reference_counting_bag:
self._current_query_handler_context.transfer_object_to(
self._checked_current_qh_context.transfer_object_to(
object_proxy, self._query_handler_context
)
for _ in successors:
Expand All @@ -160,7 +176,7 @@ def _transfer_ownership_of_result_to_query_result_handler(self, result):
object_proxy
)
else:
self._current_query_handler_context.transfer_object_to(
self._checked_current_qh_context.transfer_object_to(
object_proxy, self._query_handler_context
)

Expand Down
23 changes: 14 additions & 9 deletions exasol/analytics/query_handler/python_query_handler_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from exasol.analytics.query_handler.result import Continue, Finish
from exasol.analytics.query_handler.udf.runner.state import QueryHandlerRunnerState
from exasol.analytics.sql_executor.interface import SQLExecutor
from exasol.analytics.utils.errors import UninitializedAttributeError

LOGGER = logging.getLogger(__file__)

Expand Down Expand Up @@ -72,8 +73,8 @@ def _handle_continue(self, result: Continue) -> Union[Continue, Finish[ResultTyp
self._cleanup_query_handler_context()
self._execute_queries(result.query_list)
input_query_result = self._run_input_query(result)
result = self._state.query_handler.handle_query_result(input_query_result)
return result
_result = self._state.query_handler.handle_query_result(input_query_result)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this rename?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because result was already a parameter of the method and mypy complained about potentially different types being assigned to the same variable.

return _result

def _run_input_query(self, result: Continue) -> PythonQueryResult:
input_query_view, input_query = self._wrap_return_query(result.input_query)
Expand Down Expand Up @@ -116,24 +117,28 @@ def _release_and_create_query_handler_context_of_input_query(self):
def _wrap_return_query(
self, input_query: SelectQueryWithColumnDefinition
) -> Tuple[str, str]:
if self._state.input_query_query_handler_context is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs investigation

raise UninitializedAttributeError(
"Current state's input query query handler context is not set."
)
temporary_view_name = (
self._state.input_query_query_handler_context.get_temporary_view_name()
)
input_query_create_view_string = cleandoc(
f"""
CREATE OR REPLACE VIEW {temporary_view_name.fully_qualified} AS
{input_query.query_string};
"""
CREATE OR REPLACE VIEW {temporary_view_name.fully_qualified} AS
{input_query.query_string};
"""
)
full_qualified_columns = [
col.name.fully_qualified for col in input_query.output_columns
]
columns_str = ",\n".join(full_qualified_columns)
input_query_string = cleandoc(
f"""
SELECT
{textwrap.indent(columns_str, " " * 4)}
FROM {temporary_view_name.fully_qualified};
"""
SELECT
{textwrap.indent(columns_str, " " * 4)}
FROM {temporary_view_name.fully_qualified};
"""
)
return input_query_create_view_string, input_query_string
2 changes: 1 addition & 1 deletion exasol/analytics/query_handler/query/drop/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def query_string(self) -> str:
return f"DROP VIEW IF EXISTS {self._view_name.fully_qualified};"

@property
def view_name(self) -> TableName:
def view_name(self) -> ViewName:
return self._view_name
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,20 @@ def __init__(self, data: List[Tuple[Any, ...]], columns: List[Column]):
}
self._next()

def _range(self, num_rows: Union[int, str]) -> range:
if isinstance(num_rows, int):
return range(num_rows - 1)
if num_rows == "all":
return range(len(self._data) - 1)
raise ValueError(f'num_rows must be an int or str "all" but is {num_rows}')

def fetch_as_dataframe(
self, num_rows: Union[int, str], start_col=0
) -> Optional[pd.DataFrame]:
batch_list = []
if num_rows == "all":
num_rows = len(self._data)
if self._current_row is not None:
batch_list.append(self._current_row)
for i in range(num_rows - 1):
for i in self._range(num_rows):
self._next()
if self._current_row is not None:
batch_list.append(self._current_row)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import collections
from typing import Any, Iterator, List, OrderedDict, Union
from typing import Any, ForwardRef, Iterator, List, Optional, OrderedDict, Union

from exasol.analytics.query_handler.query.result.interface import QueryResult, Row
from exasol.analytics.schema.column import Column
from exasol.analytics.schema.column_name import ColumnName
from exasol.analytics.schema.column_type import ColumnType


class UDFQueryResult(QueryResult):

def __init__(
Expand Down Expand Up @@ -51,7 +50,9 @@ def rowcount(self) -> int:

def fetch_as_dataframe(
self, num_rows: Union[str, int], start_col: int = 0
) -> "pandas.DataFrame":
) -> Optional[ForwardRef("pandas.DataFrame")]: # type: ignore[valid-type]
# This place intentionally uses a forward reference to avoid importing
# pandas which might take several seconds.
df = self._ctx.get_dataframe(num_rows, start_col=self._start_col)
self._initialized = True
if df is None:
Expand Down
2 changes: 1 addition & 1 deletion exasol/analytics/query_handler/udf/runner/call_udf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from exasol.analytics.query_handler.udf.runner.udf import QueryHandlerRunnerUDF

udf = QueryHandlerRunnerUDF(exa)
udf = QueryHandlerRunnerUDF(exa) # type: ignore


def run(ctx):
Expand Down
2 changes: 1 addition & 1 deletion exasol/analytics/query_handler/udf/runner/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
class QueryHandlerRunnerState:
top_level_query_handler_context: TopLevelQueryHandlerContext
query_handler: QueryHandler
connection_lookup: UDFConnectionLookup
connection_lookup: Optional[UDFConnectionLookup] = None
input_query_query_handler_context: Optional[ScopeQueryHandlerContext] = None
input_query_output_columns: Optional[List[Column]] = None
Loading