Skip to content

Commit

Permalink
Merge branch 'main' into am/upd-jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
amaslenn committed Nov 18, 2024
2 parents a211f22 + d61a046 commit db69757
Show file tree
Hide file tree
Showing 23 changed files with 349 additions and 43 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ jobs:
run: pip install -r requirements-dev.txt

- name: Run ruff linter
run: ruff check .
run: ruff check

- name: Run ruff formatter
run: ruff format --check --diff .
run: ruff format --check --diff

- name: Run pyright
run: pyright .
run: pyright

- name: Run vulture check
run: vulture src/ tests/
Expand Down
2 changes: 1 addition & 1 deletion conf/common/system/example_slurm_cluster.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
name = "example-cluster"
scheduler = "slurm"

install_path = "./install"
install_path = "./install_dir"
output_path = "./results"
default_partition = "partition_1"

Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,6 @@ min_confidence = 100

[tool.coverage.report]
exclude_also = ["@abstractmethod"]

[tool.pyright]
include = ["src", "tests"]
22 changes: 20 additions & 2 deletions src/cloudai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@
from .schema.test_template.sleep.slurm_command_gen_strategy import SleepSlurmCommandGenStrategy
from .schema.test_template.sleep.standalone_command_gen_strategy import SleepStandaloneCommandGenStrategy
from .schema.test_template.sleep.template import Sleep
from .schema.test_template.slurm_container.report_generation_strategy import (
SlurmContainerReportGenerationStrategy,
)
from .schema.test_template.slurm_container.slurm_command_gen_strategy import (
SlurmContainerCommandGenStrategy,
)
from .schema.test_template.slurm_container.template import SlurmContainer
from .schema.test_template.ucc_test.grading_strategy import UCCTestGradingStrategy
from .schema.test_template.ucc_test.report_generation_strategy import UCCTestReportGenerationStrategy
from .schema.test_template.ucc_test.slurm_command_gen_strategy import UCCTestSlurmCommandGenStrategy
Expand All @@ -98,6 +105,7 @@
SleepTestDefinition,
UCCTestDefinition,
)
from .test_definitions.slurm_container import SlurmContainerTestDefinition

Registry().add_runner("slurm", SlurmRunner)
Registry().add_runner("kubernetes", KubernetesRunner)
Expand All @@ -121,14 +129,21 @@
Registry().add_strategy(JobIdRetrievalStrategy, [SlurmSystem], [NeMoLauncher], NeMoLauncherSlurmJobIdRetrievalStrategy)
Registry().add_strategy(CommandGenStrategy, [SlurmSystem], [NeMoLauncher], NeMoLauncherSlurmCommandGenStrategy)
Registry().add_strategy(ReportGenerationStrategy, [SlurmSystem], [UCCTest], UCCTestReportGenerationStrategy)
Registry().add_strategy(
ReportGenerationStrategy,
[SlurmSystem],
[SlurmContainer],
SlurmContainerReportGenerationStrategy,
)
Registry().add_strategy(GradingStrategy, [SlurmSystem], [NeMoLauncher], NeMoLauncherGradingStrategy)

Registry().add_strategy(GradingStrategy, [SlurmSystem], [JaxToolbox], JaxToolboxGradingStrategy)
Registry().add_strategy(GradingStrategy, [SlurmSystem], [UCCTest], UCCTestGradingStrategy)
Registry().add_strategy(CommandGenStrategy, [SlurmSystem], [JaxToolbox], JaxToolboxSlurmCommandGenStrategy)
Registry().add_strategy(
JobIdRetrievalStrategy,
[SlurmSystem],
[ChakraReplay, JaxToolbox, NcclTest, UCCTest, Sleep],
[ChakraReplay, JaxToolbox, NcclTest, UCCTest, Sleep, SlurmContainer],
SlurmJobIdRetrievalStrategy,
)
Registry().add_strategy(JobIdRetrievalStrategy, [StandaloneSystem], [Sleep], StandaloneJobIdRetrievalStrategy)
Expand All @@ -141,13 +156,14 @@
Registry().add_strategy(
JobStatusRetrievalStrategy,
[SlurmSystem],
[ChakraReplay, UCCTest, NeMoLauncher, Sleep],
[ChakraReplay, UCCTest, NeMoLauncher, Sleep, SlurmContainer],
DefaultJobStatusRetrievalStrategy,
)
Registry().add_strategy(CommandGenStrategy, [SlurmSystem], [UCCTest], UCCTestSlurmCommandGenStrategy)
Registry().add_strategy(ReportGenerationStrategy, [SlurmSystem], [ChakraReplay], ChakraReplayReportGenerationStrategy)
Registry().add_strategy(GradingStrategy, [SlurmSystem], [ChakraReplay], ChakraReplayGradingStrategy)
Registry().add_strategy(CommandGenStrategy, [SlurmSystem], [ChakraReplay], ChakraReplaySlurmCommandGenStrategy)
Registry().add_strategy(CommandGenStrategy, [SlurmSystem], [SlurmContainer], SlurmContainerCommandGenStrategy)

Registry().add_installer("slurm", SlurmInstaller)
Registry().add_installer("standalone", StandaloneInstaller)
Expand All @@ -165,6 +181,7 @@
Registry().add_test_definition("JaxToolboxGPT", GPTTestDefinition)
Registry().add_test_definition("JaxToolboxGrok", GrokTestDefinition)
Registry().add_test_definition("JaxToolboxNemotron", NemotronTestDefinition)
Registry().add_test_definition("SlurmContainer", SlurmContainerTestDefinition)

Registry().add_test_template("ChakraReplay", ChakraReplay)
Registry().add_test_template("NcclTest", NcclTest)
Expand All @@ -174,6 +191,7 @@
Registry().add_test_template("JaxToolboxGPT", JaxToolbox)
Registry().add_test_template("JaxToolboxGrok", JaxToolbox)
Registry().add_test_template("JaxToolboxNemotron", JaxToolbox)
Registry().add_test_template("SlurmContainer", SlurmContainer)

__all__ = [
"BaseInstaller",
Expand Down
10 changes: 10 additions & 0 deletions src/cloudai/installer/slurm_installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ def install_one(self, item: Installable) -> InstallStatusResult:
if isinstance(item, DockerImage):
res = self._install_docker_image(item)
return InstallStatusResult(res.success, res.message)
elif isinstance(item, GitRepo):
return self._install_one_git_repo(item)
elif isinstance(item, PythonExecutable):
return self._install_python_executable(item)

Expand All @@ -139,6 +141,8 @@ def uninstall_one(self, item: Installable) -> InstallStatusResult:
return InstallStatusResult(res.success, res.message)
elif isinstance(item, PythonExecutable):
return self._uninstall_python_executable(item)
elif isinstance(item, GitRepo):
return self._uninstall_git_repo(item)

return InstallStatusResult(False, f"Unsupported item type: {type(item)}")

Expand All @@ -148,6 +152,12 @@ def is_installed_one(self, item: Installable) -> InstallStatusResult:
if res.success and res.docker_image_path:
item.installed_path = res.docker_image_path
return InstallStatusResult(res.success, res.message)
elif isinstance(item, GitRepo):
repo_path = self.system.install_path / item.repo_name
if repo_path.exists():
item.installed_path = repo_path
return InstallStatusResult(True)
return InstallStatusResult(False, f"Git repository {item.git_url} not cloned")
elif isinstance(item, PythonExecutable):
return self._is_python_executable_installed(item)

Expand Down
15 changes: 11 additions & 4 deletions src/cloudai/report_generator/report_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ def _generate_test_report(self, directory_path: Path, tr: TestRun) -> None:
tr (TestRun): The test run object.
"""
for subdir in directory_path.iterdir():
if subdir.is_dir() and tr.test.test_template.can_handle_directory(subdir):
tr.test.test_template.generate_report(tr.test.name, subdir, tr.sol)
else:
logging.warning(f"Skipping directory '{subdir}' for test '{tr.test.name}'")
if not subdir.is_dir():
logging.debug(f"Skipping file '{subdir}', not a directory.")
continue
if not tr.test.test_template.can_handle_directory(subdir):
logging.warning(
f"Skipping '{subdir}', can't hande with "
f"strategy={tr.test.test_template.report_generation_strategy}."
)
continue

tr.test.test_template.generate_report(tr.test.name, subdir, tr.sol)
1 change: 1 addition & 0 deletions src/cloudai/runner/slurm/slurm_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ def _submit_test(self, tr: TestRun) -> SlurmJob:
stderr=stderr,
message="Failed to retrieve job ID from command output.",
)
logging.info(f"Submitted slurm job: {job_id}")
return SlurmJob(tr, id=job_id)
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ def gen_exec_command(self, tr: TestRun) -> str:
)
self.final_cmd_args["cluster.gpus_per_node"] = self.system.gpus_per_node or "null"

repo_path = tdef.python_executable.git_repo.installed_path
repo_path = (
tdef.python_executable.git_repo.installed_path.absolute()
if tdef.python_executable.git_repo.installed_path is not None
else None
)
if not repo_path:
logging.warning(
f"Local clone of git repo {tdef.python_executable.git_repo} does not exist. "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
from pathlib import Path
from typing import Optional

from cloudai import ReportGenerationStrategy


class SlurmContainerReportGenerationStrategy(ReportGenerationStrategy):
"""Report generation strategy for a generic Slurm container test."""

def can_handle_directory(self, directory_path: Path) -> bool:
stdout_path = directory_path / "stdout.txt"
if stdout_path.exists():
with stdout_path.open("r") as file:
if re.search(
r"Training epoch \d+, iteration \d+/\d+ | lr: [\d.]+ | global_batch_size: \d+ | global_step: \d+ | "
r"reduced_train_loss: [\d.]+ | train_step_timing in s: [\d.]+",
file.read(),
):
return True
return False

def generate_report(self, test_name: str, directory_path: Path, sol: Optional[float] = None) -> None:
stdout_path = directory_path / "stdout.txt"
if not stdout_path.is_file():
return

with stdout_path.open("r") as file:
lines = file.readlines()
with open(directory_path / "report.csv", "w") as csv_file:
csv_file.write(
"epoch,iteration,lr,global_batch_size,global_step,reduced_train_loss,train_step_timing,consumed_samples\n"
)
for line in lines:
pattern = (
r"Training epoch (\d+), iteration (\d+)/\d+ \| lr: ([\d.]+) \| global_batch_size: (\d+) \| "
r"global_step: (\d+) \| reduced_train_loss: ([\d.]+) \| train_step_timing in s: ([\d.]+)"
)
if " | consumed_samples:" in line:
pattern = (
r"Training epoch (\d+), iteration (\d+)/\d+ \| lr: ([\d.]+) \| global_batch_size: (\d+) \| "
r"global_step: (\d+) \| reduced_train_loss: ([\d.]+) \| train_step_timing in s: ([\d.]+) "
r"\| consumed_samples: (\d+)"
)

match = re.match(pattern, line)
if match:
csv_file.write(",".join(match.groups()) + "\n")
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, cast

from cloudai import TestRun
from cloudai.systems.slurm.strategy import SlurmCommandGenStrategy
from cloudai.test_definitions.slurm_container import SlurmContainerTestDefinition


class SlurmContainerCommandGenStrategy(SlurmCommandGenStrategy):
"""Command generation strategy for generic Slurm container tests."""

def gen_srun_prefix(self, slurm_args: dict[str, Any], tr: TestRun) -> list[str]:
tdef: SlurmContainerTestDefinition = cast(SlurmContainerTestDefinition, tr.test.test_definition)
slurm_args["image_path"] = tdef.docker_image.installed_path
slurm_args["container_mounts"] = ",".join(tdef.container_mounts(self.system.install_path))

cmd = super().gen_srun_prefix(slurm_args, tr)
return cmd + ["--no-container-mount-home"]

def generate_test_command(self, env_vars: dict[str, str], cmd_args: dict[str, str], tr: TestRun) -> list[str]:
srun_command_parts: list[str] = []
if tr.test.extra_cmd_args:
srun_command_parts.append(tr.test.extra_cmd_args)

return srun_command_parts
23 changes: 23 additions & 0 deletions src/cloudai/schema/test_template/slurm_container/template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from cloudai import TestTemplate


class SlurmContainer(TestTemplate):
"""Generic Slurm container test template."""

pass
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ def gen_post_test(self, post_test: TestScenario, base_output_path: Path) -> str:
def _gen_srun_command(
self, slurm_args: Dict[str, Any], env_vars: Dict[str, str], cmd_args: Dict[str, str], tr: TestRun
) -> str:
srun_command_parts = self.gen_srun_prefix(slurm_args)
srun_command_parts = self.gen_srun_prefix(slurm_args, tr)
test_command_parts = self.generate_test_command(env_vars, cmd_args, tr)
return " ".join(srun_command_parts + test_command_parts)

def gen_srun_prefix(self, slurm_args: Dict[str, Any]) -> List[str]:
def gen_srun_prefix(self, slurm_args: Dict[str, Any], tr: TestRun) -> List[str]:
srun_command_parts = ["srun", f"--mpi={self.system.mpi}"]
if slurm_args.get("image_path"):
srun_command_parts.append(f'--container-image={slurm_args["image_path"]}')
Expand Down
Loading

0 comments on commit db69757

Please sign in to comment.