From 3735d6bceb173b27eded865202ebf459d8202500 Mon Sep 17 00:00:00 2001 From: jeffnvidia Date: Sun, 20 Oct 2024 14:23:02 +0300 Subject: [PATCH] working version --- .../slurm_command_gen_strategy.py | 2 +- src/cloudai/systems/slurm/slurm_system.py | 45 +++++++++---------- tests/test_slurm_system.py | 25 +++-------- 3 files changed, 28 insertions(+), 44 deletions(-) diff --git a/src/cloudai/schema/test_template/nemo_launcher/slurm_command_gen_strategy.py b/src/cloudai/schema/test_template/nemo_launcher/slurm_command_gen_strategy.py index f8167d63..2eb7bf17 100644 --- a/src/cloudai/schema/test_template/nemo_launcher/slurm_command_gen_strategy.py +++ b/src/cloudai/schema/test_template/nemo_launcher/slurm_command_gen_strategy.py @@ -101,7 +101,6 @@ def _prepare_environment(self, cmd_args: Dict[str, str], extra_env_vars: Dict[st if "training.values" in self.final_cmd_args: self.final_cmd_args["training"] = self.final_cmd_args.pop("training.values") - self.final_cmd_args["cluster.partition"] = self.system.default_partition self._handle_reservation() def _handle_reservation(self) -> None: @@ -193,6 +192,7 @@ def _generate_cmd_args_str(self, args: Dict[str, str], nodes: List[str]) -> str: if nodes: nodes_str = ",".join(nodes) + cmd_arg_str_parts.append(f"cluster.partition={self.system.default_partition}") cmd_arg_str_parts.append(f"+cluster.nodelist=\\'{nodes_str}\\'\n") return " ".join(cmd_arg_str_parts + env_var_str_parts) diff --git a/src/cloudai/systems/slurm/slurm_system.py b/src/cloudai/systems/slurm/slurm_system.py index e716f4a9..39d553b7 100644 --- a/src/cloudai/systems/slurm/slurm_system.py +++ b/src/cloudai/systems/slurm/slurm_system.py @@ -142,11 +142,14 @@ def groups(self) -> Dict[str, Dict[str, List[SlurmNode]]]: groups: Dict[str, Dict[str, List[SlurmNode]]] = {} for part in self.partitions: groups[part.name] = {} - for group in part.groups: - node_names = set() - for group_nodes in group.nodes: - node_names.update(set(parse_node_list(group_nodes))) - groups[part.name][group.name] = [node for node in part.slurm_nodes if node.name in node_names] + if part.groups: + for group in part.groups: + node_names = set() + for group_nodes in group.nodes: + node_names.update(set(parse_node_list(group_nodes))) + groups[part.name][group.name] = [node for node in part.slurm_nodes if node.name in node_names] + else: + groups[part.name][part.name] = [node for node in part.slurm_nodes] return groups @@ -474,23 +477,14 @@ def get_available_nodes_from_group( grouped_nodes = self.group_nodes_by_state(partition_name, group_name) - try: - allocated_nodes = self.allocate_nodes(grouped_nodes, number_of_nodes, partition_name, group_name) - - logging.info( - f"Allocated nodes from {group_print}partition '{partition_name}': " - f"{[node.name for node in allocated_nodes]}" - ) - - return allocated_nodes + allocated_nodes = self.allocate_nodes(grouped_nodes, number_of_nodes, partition_name, group_name) - except ValueError as e: - logging.error( - f"Error occurred while allocating nodes from group '{group_name}' in partition '{partition_name}': {e}", - exc_info=True, - ) + logging.info( + f"Allocated nodes from {group_print}partition '{partition_name}': " + f"{[node.name for node in allocated_nodes]}" + ) - return [] + return allocated_nodes def validate_partition_and_group(self, partition_name: str, group_name: Optional[str] = None) -> None: """ @@ -538,12 +532,14 @@ def group_nodes_by_state( SlurmNodeState.COMPLETING: [], SlurmNodeState.ALLOCATED: [], } + if group_name: nodes = self.groups[partition_name][group_name] else: nodes = [] for group_name in self.groups[partition_name]: nodes.extend(self.groups[partition_name][group_name]) + for node in nodes: if node.state in grouped_nodes and (not reserved_nodes or node.name in reserved_nodes): grouped_nodes[node.state].append(node) @@ -594,12 +590,12 @@ def allocate_nodes( for state in grouped_nodes: while grouped_nodes[state] and len(allocated_nodes) < number_of_nodes: allocated_nodes.append(grouped_nodes[state].pop(0)) - + if len(allocated_nodes) < number_of_nodes: raise ValueError( - f"CloudAI is requesting {number_of_nodes} nodes from the {group_or_partition}, but only " - f"{len(allocated_nodes)} nodes are available. Please review the available nodes in the system " - f"and ensure there are enough resources to meet the requested node count. Additionally, " + f"CloudAI is requesting {number_of_nodes} nodes from the {group_or_partition}, but there are only " + f"{len(allocated_nodes)} nodes in {group_or_partition}. Please review the available nodes in the " + f"system and ensure there are enough resources to meet the requested node count. Additionally, " f"verify that the system can accommodate the number of nodes required by the test scenario." ) else: @@ -857,6 +853,7 @@ def parse_nodes(self, nodes: List[str]) -> List[str]: if len(parts) == 2: partition_name, num_nodes_spec = parts group_name = None + self.default_partition = partition_name elif len(parts) == 3: partition_name, group_name, num_nodes_spec = parts else: diff --git a/tests/test_slurm_system.py b/tests/test_slurm_system.py index 61fda9fd..790b9ad4 100644 --- a/tests/test_slurm_system.py +++ b/tests/test_slurm_system.py @@ -166,19 +166,6 @@ def grouped_nodes() -> dict[SlurmNodeState, list[SlurmNode]]: return grouped_nodes -def test_get_available_nodes_exceeding_limit_no_callstack( - slurm_system: SlurmSystem, grouped_nodes: Dict[SlurmNodeState, List[SlurmNode]], caplog -): - group_name = "group1" - partition_name = "main" - num_nodes = 5 - - slurm_system.get_available_nodes_from_group(num_nodes, partition_name, group_name) - - log_message = "CloudAI is requesting 5 nodes from the group 'group1', but only 0 nodes are available." - assert log_message in caplog.text - - def test_allocate_nodes_max_avail(slurm_system: SlurmSystem, grouped_nodes: dict[SlurmNodeState, list[SlurmNode]]): partition_name = "main" group_name = "group_name" @@ -216,16 +203,16 @@ def test_allocate_nodes_exceeding_limit( slurm_system: SlurmSystem, grouped_nodes: dict[SlurmNodeState, list[SlurmNode]] ): partition_name = "main" - group_name = "group_name" + group_name = "group1" num_nodes = 5 - available_nodes = 4 - + total_nodes = 4 + with pytest.raises( ValueError, match=re.escape( - f"CloudAI is requesting {num_nodes} nodes from the group '{group_name}', but only " - f"{available_nodes} nodes are available. Please review the available nodes in the system " - f"and ensure there are enough resources to meet the requested node count. Additionally, " + f"CloudAI is requesting {num_nodes} nodes from the group '{group_name}', but there are only " + f"{total_nodes} nodes in group '{group_name}'. Please review the available nodes in the " + f"system and ensure there are enough resources to meet the requested node count. Additionally, " f"verify that the system can accommodate the number of nodes required by the test scenario." ), ):