Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean standalone run running under with context to automatically shutdown #833

Merged
merged 45 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
cc56f86
Clean standalone run running under with context to automatically shut…
jrobinAV Feb 14, 2024
7b59149
TO REMOVE
jrobinAV Feb 14, 2024
5dd3f03
Attempt to fix linter
jrobinAV Feb 14, 2024
006e08a
Attempt to fix linter
jrobinAV Feb 14, 2024
45d5a0a
Attempt to fix linter
jrobinAV Feb 14, 2024
be05c7e
Attempt to fix linter
jrobinAV Feb 14, 2024
dce2160
Attempt to fix linter
jrobinAV Feb 14, 2024
8b58289
Remove useless test
jrobinAV Feb 14, 2024
f7220f8
Close opened excel file in finally block
jrobinAV Feb 14, 2024
64f4150
:recycle: Close opened Excel file in finally block
jrobinAV Feb 14, 2024
37782b7
TO REVERT
jrobinAV Feb 15, 2024
6ce5b0f
Merge branch 'develop' into fix/cannot-submit-futures-after-shutdown
jrobinAV Feb 15, 2024
f4ee858
Attempt to isolate side effect
jrobinAV Feb 15, 2024
9688637
Attempt to isolate side effect
jrobinAV Feb 15, 2024
bdeba8e
Attempt to isolate side effect
jrobinAV Feb 15, 2024
5ccf466
Attempt to isolate side effect
jrobinAV Feb 15, 2024
7f780d3
Attempt to fix side effect
jrobinAV Feb 16, 2024
ef537b9
Attempt to fix side effect
jrobinAV Feb 16, 2024
87716f6
Attempt to fix side effect
jrobinAV Feb 16, 2024
29899fc
Attempt to fix linter
jrobinAV Feb 16, 2024
e92665a
Attempt to fix linter
jrobinAV Feb 16, 2024
b622570
Attempt to fix linter
jrobinAV Feb 16, 2024
90c6880
Rerun tests
jrobinAV Feb 16, 2024
5c0061a
Add traces
jrobinAV Feb 16, 2024
e3f21b4
fix linter
jrobinAV Feb 16, 2024
5d41268
attempt to delete file
jrobinAV Feb 16, 2024
a3352a6
attempt to reproduce minimal test
jrobinAV Feb 16, 2024
83c1d04
fix linter
jrobinAV Feb 16, 2024
91b7235
TRy without writing
jrobinAV Feb 16, 2024
0a2ecac
TRy without writing
jrobinAV Feb 16, 2024
6ee78ca
Add traces
jrobinAV Feb 16, 2024
fb5d515
Add traces
jrobinAV Feb 16, 2024
b14e577
Merge branch 'develop' into fix/cannot-submit-futures-after-shutdown
jrobinAV Feb 16, 2024
332590f
simplify
jrobinAV Feb 16, 2024
4667e17
simplify
jrobinAV Feb 19, 2024
24559da
simplify
jrobinAV Feb 19, 2024
54c58c3
back up file for testing
jrobinAV Feb 19, 2024
e0e0ab5
linter
jrobinAV Feb 19, 2024
2db0454
add openpyxl version
jrobinAV Feb 19, 2024
2a53077
Try work around
jrobinAV Feb 19, 2024
4fb5b9d
Try workaround
jrobinAV Feb 19, 2024
549846c
Try except
jrobinAV Feb 19, 2024
9a0489e
Attempt to make all the tests pass
jrobinAV Feb 19, 2024
3528fca
Attempt to make all the tests pass
jrobinAV Feb 19, 2024
28ecea4
fix linter
jrobinAV Feb 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
from typing import Optional

from ...job.job import Job
from .._abstract_orchestrator import _AbstractOrchestrator
from ._job_dispatcher import _JobDispatcher
Expand All @@ -19,7 +17,7 @@
class _DevelopmentJobDispatcher(_JobDispatcher):
"""Manages job dispatching (instances of `Job^` class) in a synchronous way."""

def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
def __init__(self, orchestrator: _AbstractOrchestrator):
super().__init__(orchestrator)

def start(self):
Expand Down
8 changes: 3 additions & 5 deletions taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import threading
from abc import abstractmethod
from queue import Empty
from typing import Dict, Optional
from typing import Dict

from taipy.config.config import Config
from taipy.logger._taipy_logger import _TaipyLogger
Expand All @@ -32,7 +32,7 @@ class _JobDispatcher(threading.Thread):
_logger = _TaipyLogger._get_logger()
_nb_available_workers: int = 1

def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
def __init__(self, orchestrator: _AbstractOrchestrator):
jrobinAV marked this conversation as resolved.
Show resolved Hide resolved
threading.Thread.__init__(self, name="Thread-Taipy-JobDispatcher")
self.daemon = True
self.orchestrator = orchestrator
Expand Down Expand Up @@ -66,9 +66,7 @@ def run(self):
except Exception as e:
_TaipyLogger._get_logger().exception(e)
pass

# The dispatcher is now shutting down, let's shutdown its executor.
self._executor.shutdown(wait=True)
self._logger.info("Job dispatcher stopped.")

def _can_execute(self) -> bool:
"""Returns True if the dispatcher have resources to execute a new job."""
Expand Down
14 changes: 11 additions & 3 deletions taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,28 @@
class _StandaloneJobDispatcher(_JobDispatcher):
"""Manages job dispatching (instances of `Job^` class) in an asynchronous way using a ProcessPoolExecutor."""

def __init__(self, orchestrator: Optional[_AbstractOrchestrator], subproc_initializer: Optional[Callable] = None):
def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Optional[Callable] = None):
super().__init__(orchestrator)
max_workers = Config.job_config.max_nb_of_workers or 1
self._executor: Executor = ProcessPoolExecutor(max_workers=max_workers, initializer=subproc_initializer) # type: ignore
self._executor: Executor = ProcessPoolExecutor(
max_workers=max_workers,
initializer=subproc_initializer
) # type: ignore
self._nb_available_workers = self._executor._max_workers # type: ignore

def run(self):
with self._executor:
super().run()
self._logger.info("Standalone job dispatcher: Pool executor shut down")

jrobinAV marked this conversation as resolved.
Show resolved Hide resolved
def _dispatch(self, job: Job):
"""Dispatches the given `Job^` on an available worker for execution.

Parameters:
job (Job^): The job to submit on an executor with an available worker.
"""
self._nb_available_workers -= 1

self._nb_available_workers -= 1
config_as_string = _TomlSerializer()._serialize(Config._applied_config) # type: ignore[attr-defined]
future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string)

Expand Down
10 changes: 5 additions & 5 deletions taipy/core/_orchestrator/_orchestrator_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.

import typing
from importlib import util
from typing import Optional, Type

Expand All @@ -27,7 +27,7 @@ class _OrchestratorFactory:
_TAIPY_ENTERPRISE_CORE_DISPATCHER_MODULE = _TAIPY_ENTERPRISE_MODULE + ".core._orchestrator._dispatcher"
__TAIPY_ENTERPRISE_BUILD_DISPATCHER_METHOD = "_build_dispatcher"

_orchestrator: Optional[_Orchestrator] = None
_orchestrator: Optional[_AbstractOrchestrator] = None
_dispatcher: Optional[_JobDispatcher] = None

@classmethod
Expand Down Expand Up @@ -80,20 +80,20 @@ def __build_standalone_job_dispatcher(cls, force_restart=False):
cls._TAIPY_ENTERPRISE_CORE_DISPATCHER_MODULE, cls.__TAIPY_ENTERPRISE_BUILD_DISPATCHER_METHOD
)(cls._orchestrator)
else:
cls._dispatcher = _StandaloneJobDispatcher(cls._orchestrator) # type: ignore
cls._dispatcher = _StandaloneJobDispatcher(typing.cast(_AbstractOrchestrator, cls._orchestrator))
cls._dispatcher.start() # type: ignore

@classmethod
def __build_development_job_dispatcher(cls):
if isinstance(cls._dispatcher, _StandaloneJobDispatcher):
cls._dispatcher.stop()
cls._dispatcher = _DevelopmentJobDispatcher(cls._orchestrator) # type: ignore
cls._dispatcher = _DevelopmentJobDispatcher(typing.cast(_AbstractOrchestrator, cls._orchestrator))

@classmethod
def __build_enterprise_job_dispatcher(cls, force_restart=False):
cls._dispatcher = _load_fct(
cls._TAIPY_ENTERPRISE_CORE_DISPATCHER_MODULE, cls.__TAIPY_ENTERPRISE_BUILD_DISPATCHER_METHOD
)(cls._orchestrator, force_restart)
)(typing.cast(_AbstractOrchestrator, cls._orchestrator), force_restart)
if cls._dispatcher:
cls._dispatcher.start()
else:
Expand Down
105 changes: 53 additions & 52 deletions taipy/core/data/excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,58 +195,59 @@ def _read(self):
return self._read_as()

def _read_as(self):
excel_file = load_workbook(self._path)
exposed_type = self.properties[self._EXPOSED_TYPE_PROPERTY]
work_books = dict()
sheet_names = excel_file.sheetnames

user_provided_sheet_names = self.properties.get(self.__SHEET_NAME_PROPERTY) or []
if not isinstance(user_provided_sheet_names, (List, Set, Tuple)):
user_provided_sheet_names = [user_provided_sheet_names]

provided_sheet_names = user_provided_sheet_names or sheet_names

for sheet_name in provided_sheet_names:
if sheet_name not in sheet_names:
raise NonExistingExcelSheet(sheet_name, self._path)

if isinstance(exposed_type, List):
if len(provided_sheet_names) != len(self.properties[self._EXPOSED_TYPE_PROPERTY]):
raise ExposedTypeLengthMismatch(
f"Expected {len(provided_sheet_names)} exposed types, got "
f"{len(self.properties[self._EXPOSED_TYPE_PROPERTY])}"
)

for i, sheet_name in enumerate(provided_sheet_names):
work_sheet = excel_file[sheet_name]
sheet_exposed_type = exposed_type

if not isinstance(sheet_exposed_type, str):
if isinstance(exposed_type, dict):
sheet_exposed_type = exposed_type.get(sheet_name, self._EXPOSED_TYPE_PANDAS)
elif isinstance(exposed_type, List):
sheet_exposed_type = exposed_type[i]

if isinstance(sheet_exposed_type, str):
if sheet_exposed_type == self._EXPOSED_TYPE_NUMPY:
work_books[sheet_name] = self._read_as_pandas_dataframe(sheet_name).to_numpy()
elif sheet_exposed_type == self._EXPOSED_TYPE_PANDAS:
work_books[sheet_name] = self._read_as_pandas_dataframe(sheet_name)
continue

res = list()
for row in work_sheet.rows:
res.append([col.value for col in row])
if self.properties[self._HAS_HEADER_PROPERTY] and res:
header = res.pop(0)
for i, row in enumerate(res):
res[i] = sheet_exposed_type(**dict([[h, r] for h, r in zip(header, row)]))
else:
for i, row in enumerate(res):
res[i] = sheet_exposed_type(*row)
work_books[sheet_name] = res

excel_file.close()
try:
excel_file = load_workbook(self._path)
exposed_type = self.properties[self._EXPOSED_TYPE_PROPERTY]
work_books = dict()
sheet_names = excel_file.sheetnames

user_provided_sheet_names = self.properties.get(self.__SHEET_NAME_PROPERTY) or []
if not isinstance(user_provided_sheet_names, (List, Set, Tuple)):
user_provided_sheet_names = [user_provided_sheet_names]

provided_sheet_names = user_provided_sheet_names or sheet_names

for sheet_name in provided_sheet_names:
if sheet_name not in sheet_names:
raise NonExistingExcelSheet(sheet_name, self._path)
jrobinAV marked this conversation as resolved.
Show resolved Hide resolved

if isinstance(exposed_type, List):
if len(provided_sheet_names) != len(self.properties[self._EXPOSED_TYPE_PROPERTY]):
raise ExposedTypeLengthMismatch(
f"Expected {len(provided_sheet_names)} exposed types, got "
f"{len(self.properties[self._EXPOSED_TYPE_PROPERTY])}"
)

for i, sheet_name in enumerate(provided_sheet_names):
work_sheet = excel_file[sheet_name]
sheet_exposed_type = exposed_type

if not isinstance(sheet_exposed_type, str):
if isinstance(exposed_type, dict):
sheet_exposed_type = exposed_type.get(sheet_name, self._EXPOSED_TYPE_PANDAS)
elif isinstance(exposed_type, List):
sheet_exposed_type = exposed_type[i]

if isinstance(sheet_exposed_type, str):
if sheet_exposed_type == self._EXPOSED_TYPE_NUMPY:
work_books[sheet_name] = self._read_as_pandas_dataframe(sheet_name).to_numpy()
elif sheet_exposed_type == self._EXPOSED_TYPE_PANDAS:
work_books[sheet_name] = self._read_as_pandas_dataframe(sheet_name)
continue

res = list()
for row in work_sheet.rows:
res.append([col.value for col in row])
if self.properties[self._HAS_HEADER_PROPERTY] and res:
header = res.pop(0)
for i, row in enumerate(res):
res[i] = sheet_exposed_type(**dict([[h, r] for h, r in zip(header, row)]))
else:
for i, row in enumerate(res):
res[i] = sheet_exposed_type(*row)
work_books[sheet_name] = res
finally:
jrobinAV marked this conversation as resolved.
Show resolved Hide resolved
excel_file.close()

if len(provided_sheet_names) == 1:
return work_books[provided_sheet_names[0]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# specific language governing permissions and limitations under the License.

from concurrent.futures import Executor, Future
from typing import List, Optional
from typing import List

from taipy.core import Job
from taipy.core._orchestrator._abstract_orchestrator import _AbstractOrchestrator
Expand All @@ -35,7 +35,7 @@ def submit(self, fn, *args, **kwargs):


class MockStandaloneDispatcher(_StandaloneJobDispatcher):
def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
def __init__(self, orchestrator: _AbstractOrchestrator):
super(_StandaloneJobDispatcher, self).__init__(orchestrator)
self._executor: Executor = MockProcessPoolExecutor()
self.dispatch_calls: List = []
Expand Down
4 changes: 2 additions & 2 deletions tests/core/job/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,10 @@ def _dispatch(task: Task, job: Job, mode=JobConfig._DEVELOPMENT_MODE):
_TaskManager._set(task)
_JobManager._set(job)
dispatcher: Union[_StandaloneJobDispatcher, _DevelopmentJobDispatcher] = _StandaloneJobDispatcher(
_OrchestratorFactory._orchestrator
_OrchestratorFactory._orchestrator # type: ignore
)
if mode == JobConfig._DEVELOPMENT_MODE:
dispatcher = _DevelopmentJobDispatcher(_OrchestratorFactory._orchestrator)
dispatcher = _DevelopmentJobDispatcher(_OrchestratorFactory._orchestrator) # type: ignore
dispatcher._dispatch(job)


Expand Down
55 changes: 1 addition & 54 deletions tests/core/scenario/test_scenario_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from taipy.core.task._task_manager import _TaskManager
from taipy.core.task.task import Task
from taipy.core.task.task_id import TaskId
from tests.core.utils import assert_true_after_time
from tests.core.utils.NotifyMock import NotifyMock


Expand Down Expand Up @@ -1166,9 +1165,7 @@ def addition(n1, n2):
return n1 + n2


def test_scenarios_comparison_development_mode():
Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)

def test_scenarios_comparison():
scenario_config = Config.configure_scenario(
"Awesome_scenario",
[
Expand Down Expand Up @@ -1215,56 +1212,6 @@ def test_scenarios_comparison_development_mode():
_ScenarioManager._compare(scenario_1, scenario_2, data_node_config_id="abc")


@pytest.mark.standalone
def test_scenarios_comparison_standalone_mode():
jrobinAV marked this conversation as resolved.
Show resolved Hide resolved
Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE)

scenario_config = Config.configure_scenario(
"Awesome_scenario",
[
Config.configure_task(
"mult_by_2",
mult_by_2,
[Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
)
],
comparators={"bar": [subtraction], "foo": [subtraction, addition]},
)

_OrchestratorFactory._build_dispatcher()

assert scenario_config.comparators is not None
scenario_1 = _ScenarioManager._create(scenario_config)
scenario_2 = _ScenarioManager._create(scenario_config)

with pytest.raises(InsufficientScenarioToCompare):
_ScenarioManager._compare(scenario_1, data_node_config_id="bar")

scenario_3 = Scenario("awesome_scenario_config", [], {})
with pytest.raises(DifferentScenarioConfigs):
_ScenarioManager._compare(scenario_1, scenario_3, data_node_config_id="bar")

_ScenarioManager._submit(scenario_1.id)
_ScenarioManager._submit(scenario_2.id)

bar_comparison = _ScenarioManager._compare(scenario_1, scenario_2, data_node_config_id="bar")["bar"]
assert_true_after_time(lambda: bar_comparison["subtraction"] == 0)

foo_comparison = _ScenarioManager._compare(scenario_1, scenario_2, data_node_config_id="foo")["foo"]
assert_true_after_time(lambda: len(foo_comparison.keys()) == 2)
assert_true_after_time(lambda: foo_comparison["addition"] == 2)
assert_true_after_time(lambda: foo_comparison["subtraction"] == 0)

assert_true_after_time(lambda: len(_ScenarioManager._compare(scenario_1, scenario_2).keys()) == 2)

with pytest.raises(NonExistingScenarioConfig):
_ScenarioManager._compare(scenario_3, scenario_3)

with pytest.raises(NonExistingComparator):
_ScenarioManager._compare(scenario_1, scenario_2, data_node_config_id="abc")


def test_tags():
Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
_OrchestratorFactory._build_dispatcher()
Expand Down
Loading