Skip to content

Commit

Permalink
Merge pull request #619 from Avaiga/feature/clean_test_orchetrator
Browse files Browse the repository at this point in the history
Feature/clean test orchetrator
  • Loading branch information
jrobinAV authored Jan 3, 2024
2 parents 170a14b + 1f04779 commit ce642bf
Show file tree
Hide file tree
Showing 39 changed files with 2,502 additions and 1,250 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/overall-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
run: pipenv run playwright install chromium --with-deps

- name: Pytest
run: pipenv run pytest --cov=taipy --cov-append --cov-report="xml:overall-coverage.xml" --cov-report term-missing tests
run: pipenv run pytest -m "not orchestrator_dispatcher and not modin and not standalone" --cov=taipy --cov-append --cov-report="xml:overall-coverage.xml" --cov-report term-missing tests

- name: Coverage
if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.11'
Expand Down
136 changes: 127 additions & 9 deletions .github/workflows/partial-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ jobs:
extra-pycodestyle-options: "--max-line-length=120 --exclude=tests/gui --ignore=E121,E123,E126,E226,E24,E704,W503,W504,E203"
extra-mypy-options: "--ignore-missing-imports --implicit-optional --no-namespace-packages --exclude (taipy/templates/|generate_pyi.py|tools) --follow-imports skip"
extra-isort-options: "--line-length=120 --force-grid-wrap=10 --multi-line=VERTICAL_HANGING_INDENT --trailing-comma"

tests:
needs: linter
timeout-minutes: 40
Expand Down Expand Up @@ -81,32 +82,149 @@ jobs:

- name: Pytest CLI
if: steps.changes.outputs.cli == 'true'
run: pipenv run pytest tests/cli
run: pipenv run pytest tests/cli

- name: Pytest Config
if: steps.changes.outputs.config == 'true'
run: pipenv run pytest tests/config
run: pipenv run pytest tests/config

- name: Pytest Core
if: steps.changes.outputs.core == 'true'
run: pipenv run pytest tests/core
run: pipenv run pytest -m "not orchestrator_dispatcher and not modin and not standalone" tests/core

- name: Pytest GUI
if: steps.changes.outputs.gui == 'true'
run: pipenv run pytest tests/gui
run: pipenv run pytest tests/gui

- name: Pytest GUI Core
if: steps.changes.outputs.gui-core == 'true'
run: pipenv run pytest tests/gui_core
run: pipenv run pytest tests/gui_core

- name: Pytest Logger
if: steps.changes.outputs.logger == 'true'
run: pipenv run pytest tests/logger
run: pipenv run pytest tests/logger

- name: Pytest Rest
if: steps.changes.outputs.rest == 'true'
run: pipenv run pytest tests/rest
run: pipenv run pytest tests/rest

- name: Pytest Rest
- name: Pytest Templates
if: steps.changes.outputs.templates == 'true'
run: pipenv run pytest tests/templates
run: pipenv run pytest tests/templates

submit_tests:
needs: linter
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11']
os: [ubuntu-latest, windows-latest, macos-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4

- uses: dorny/paths-filter@v2
id: changes
with:
filters: |
core:
- 'taipy/core/**'
- uses: actions/setup-python@v5
with:
python-version: ${{matrix.python-version}}

- name: Install pipenv
if: steps.changes.outputs.core == 'true'
run: curl https://raw.githubusercontent.com/pypa/pipenv/master/get-pipenv.py | python

- name: Install Dependencies
if: steps.changes.outputs.core == 'true'
run: pipenv install --dev --python=${{ matrix.python-version }}

- name: Setup LibMagic (MacOS)
if: matrix.os == 'macos-latest' && steps.changes.outputs.core == 'true'
run: brew install libmagic

- name: Pytest Core orchestrator_dispatcher
if: steps.changes.outputs.core == 'true'
run: pipenv run pytest -m "orchestrator_dispatcher" tests/core

standalone_tests:
needs: linter
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11']
os: [ubuntu-latest, windows-latest, macos-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4

- uses: dorny/paths-filter@v2
id: changes
with:
filters: |
core:
- 'taipy/core/**'
- uses: actions/setup-python@v5
with:
python-version: ${{matrix.python-version}}

- name: Install pipenv
if: steps.changes.outputs.core == 'true'
run: curl https://raw.githubusercontent.com/pypa/pipenv/master/get-pipenv.py | python

- name: Install Dependencies
if: steps.changes.outputs.core == 'true'
run: pipenv install --dev --python=${{ matrix.python-version }}

- name: Setup LibMagic (MacOS)
if: matrix.os == 'macos-latest' && steps.changes.outputs.core == 'true'
run: brew install libmagic

- name: Pytest Core standalone
if: steps.changes.outputs.core == 'true'
run: pipenv run pytest -m "standalone" tests/core

modin_tests:
needs: linter
timeout-minutes: 20
strategy:
fail-fast: false
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11']
os: [ubuntu-latest, windows-latest, macos-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4

- uses: dorny/paths-filter@v2
id: changes
with:
filters: |
core:
- 'taipy/core/**'
- uses: actions/setup-python@v5
with:
python-version: ${{matrix.python-version}}

- name: Install pipenv
if: steps.changes.outputs.core == 'true'
run: curl https://raw.githubusercontent.com/pypa/pipenv/master/get-pipenv.py | python

- name: Install Dependencies
if: steps.changes.outputs.core == 'true'
run: pipenv install --dev --python=${{ matrix.python-version }}

- name: Setup LibMagic (MacOS)
if: matrix.os == 'macos-latest' && steps.changes.outputs.core == 'true'
run: brew install libmagic

- name: Pytest Core modin
if: steps.changes.outputs.core == 'true'
run: pipenv run pytest -m "modin" tests/core
3 changes: 2 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ gitignore-parser = "==0.1.1"
kthread = "==0.2.3"
markdown = "==3.4.4"
marshmallow = "==3.20.1"
modin = {extras = ["dask"], version = "==0.23.0"}
modin = {extras = ["dask"], version = "==0.23.1"}
networkx = "==2.6"
openpyxl = "==3.1.2"
pandas = "==2.0.0"
Expand All @@ -41,6 +41,7 @@ autopep8 = "*"
black = "*"
flake8 = "*"
flake8-docstrings = "*"
freezegun = "*"
ipython = "*"
ipykernel = "*"
isort = "*"
Expand Down
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ filterwarnings =
ignore::FutureWarning:pyarrow
markers =
teste2e:End-to-end tests
orchestrator_dispatcher:Orchestrator dispatcher tests
modin:Tests using modin
standalone:Tests starting a standalone dispatcher thread
2 changes: 1 addition & 1 deletion taipy/core/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ verify_ssl = true
name = "pypi"

[packages]
modin = {extras = ["dask"], version = "==0.23.0"}
modin = {extras = ["dask"], version = "==0.23.1"}
networkx = "==2.6"
openpyxl = "==3.1.2"
pyarrow = "==10.0.1"
Expand Down
8 changes: 6 additions & 2 deletions taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import threading
from abc import abstractmethod
from queue import Empty
from typing import Dict, Optional

from taipy.config.config import Config
Expand All @@ -31,7 +32,7 @@ class _JobDispatcher(threading.Thread):
__logger = _TaipyLogger._get_logger()
_nb_available_workers: int = 1

def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
def __init__(self, orchestrator: _AbstractOrchestrator):
threading.Thread.__init__(self, name="Thread-Taipy-JobDispatcher")
self.daemon = True
self.orchestrator = orchestrator
Expand All @@ -58,7 +59,10 @@ def run(self):
with self.lock:
job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1)
self._execute_job(job)
except Exception: # In case the last job of the queue has been removed.
except Empty: # In case the last job of the queue has been removed.
pass
except Exception as e:
_TaipyLogger._get_logger().exception(e)
pass

def _can_execute(self) -> bool:
Expand Down
29 changes: 8 additions & 21 deletions taipy/core/_orchestrator/_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@

from taipy.config.config import Config
from taipy.logger._taipy_logger import _TaipyLogger

from ._abstract_orchestrator import _AbstractOrchestrator
from .._entity.submittable import Submittable
from ..data._data_manager_factory import _DataManagerFactory
from ..job._job_manager_factory import _JobManagerFactory
from ..job.job import Job
from ..job.job_id import JobId
from ..scenario.scenario import Scenario
from ..submission._submission_manager_factory import _SubmissionManagerFactory
from ..task.task import Task
from ._abstract_orchestrator import _AbstractOrchestrator


class _Orchestrator(_AbstractOrchestrator):
Expand Down Expand Up @@ -72,7 +70,6 @@ def submit(
submittable._ID_PREFIX, # type: ignore
getattr(submittable, "config_id", None),
)

jobs = []
tasks = submittable._get_sorted_tasks()
with cls.lock:
Expand All @@ -87,17 +84,13 @@ def submit(
force=force, # type: ignore
)
)

submission.jobs = jobs # type: ignore

cls._orchestrate_job_to_run_or_block(jobs)

if Config.job_config.is_development:
cls._check_and_execute_jobs_if_development_mode()
else:
if wait:
cls.__wait_until_job_finished(jobs, timeout=timeout)

cls._wait_until_job_finished(jobs, timeout=timeout)
return jobs

@classmethod
Expand All @@ -113,7 +106,6 @@ def submit_task(
Parameters:
task (Task^): The task to submit for execution.
submit_id (str): The optional id to differentiate each submission.
callbacks: The optional list of functions that should be executed on job status change.
force (bool): Enforce execution of the task even if its output data nodes are cached.
wait (bool): Wait for the orchestrated job created from the task submission to be finished
Expand All @@ -133,18 +125,14 @@ def submit_task(
itertools.chain([submission._update_submission_status], callbacks or []),
force,
)

jobs = [job]
submission.jobs = jobs # type: ignore

cls._orchestrate_job_to_run_or_block(jobs)

if Config.job_config.is_development:
cls._check_and_execute_jobs_if_development_mode()
else:
if wait:
cls.__wait_until_job_finished(job, timeout=timeout)

cls._wait_until_job_finished(job, timeout=timeout)
return job

@classmethod
Expand Down Expand Up @@ -182,23 +170,22 @@ def _orchestrate_job_to_run_or_block(cls, jobs: List[Job]):
cls.jobs_to_run.put(job)

@classmethod
def __wait_until_job_finished(cls, jobs: Union[List[Job], Job], timeout: Optional[Union[float, int]] = None):
def __check_if_timeout(start, timeout):
if timeout:
return (datetime.now() - start).seconds < timeout
def _wait_until_job_finished(cls, jobs: Union[List[Job], Job], timeout: Optional[Union[float, int]] = None):
# Note: this method should be prefixed by two underscores, but it has only one, so it can be mocked in tests.
def __check_if_timeout(st, to):
if to:
return (datetime.now() - st).seconds < to
return True

start = datetime.now()
jobs = jobs if isinstance(jobs, Iterable) else [jobs]
index = 0

while __check_if_timeout(start, timeout) and index < len(jobs):
try:
if jobs[index]._is_finished():
index = index + 1
else:
sleep(0.5) # Limit CPU usage

except Exception:
pass

Expand Down
2 changes: 1 addition & 1 deletion taipy/core/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"pyarrow>=10.0.1,<11.0",
"networkx>=2.6,<3.0",
"openpyxl>=3.1.2,<3.2",
"modin[dask]>=0.23.0,<1.0",
"modin[dask]>=0.23.1,<1.0",
"pymongo[srv]>=4.2.0,<5.0",
"sqlalchemy>=2.0.16,<2.1",
"toml>=0.10,<0.11",
Expand Down
Loading

0 comments on commit ce642bf

Please sign in to comment.