Skip to content

Commit

Permalink
working version
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffnvidia committed Oct 20, 2024
1 parent 4e3bd61 commit 3735d6b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ def _prepare_environment(self, cmd_args: Dict[str, str], extra_env_vars: Dict[st
if "training.values" in self.final_cmd_args:
self.final_cmd_args["training"] = self.final_cmd_args.pop("training.values")

self.final_cmd_args["cluster.partition"] = self.system.default_partition
self._handle_reservation()

def _handle_reservation(self) -> None:
Expand Down Expand Up @@ -193,6 +192,7 @@ def _generate_cmd_args_str(self, args: Dict[str, str], nodes: List[str]) -> str:

if nodes:
nodes_str = ",".join(nodes)
cmd_arg_str_parts.append(f"cluster.partition={self.system.default_partition}")
cmd_arg_str_parts.append(f"+cluster.nodelist=\\'{nodes_str}\\'\n")

return " ".join(cmd_arg_str_parts + env_var_str_parts)
Expand Down
45 changes: 21 additions & 24 deletions src/cloudai/systems/slurm/slurm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,14 @@ def groups(self) -> Dict[str, Dict[str, List[SlurmNode]]]:
groups: Dict[str, Dict[str, List[SlurmNode]]] = {}
for part in self.partitions:
groups[part.name] = {}
for group in part.groups:
node_names = set()
for group_nodes in group.nodes:
node_names.update(set(parse_node_list(group_nodes)))
groups[part.name][group.name] = [node for node in part.slurm_nodes if node.name in node_names]
if part.groups:
for group in part.groups:
node_names = set()
for group_nodes in group.nodes:
node_names.update(set(parse_node_list(group_nodes)))
groups[part.name][group.name] = [node for node in part.slurm_nodes if node.name in node_names]
else:
groups[part.name][part.name] = [node for node in part.slurm_nodes]

return groups

Expand Down Expand Up @@ -474,23 +477,14 @@ def get_available_nodes_from_group(

grouped_nodes = self.group_nodes_by_state(partition_name, group_name)

try:
allocated_nodes = self.allocate_nodes(grouped_nodes, number_of_nodes, partition_name, group_name)

logging.info(
f"Allocated nodes from {group_print}partition '{partition_name}': "
f"{[node.name for node in allocated_nodes]}"
)

return allocated_nodes
allocated_nodes = self.allocate_nodes(grouped_nodes, number_of_nodes, partition_name, group_name)

except ValueError as e:
logging.error(
f"Error occurred while allocating nodes from group '{group_name}' in partition '{partition_name}': {e}",
exc_info=True,
)
logging.info(
f"Allocated nodes from {group_print}partition '{partition_name}': "
f"{[node.name for node in allocated_nodes]}"
)

return []
return allocated_nodes

def validate_partition_and_group(self, partition_name: str, group_name: Optional[str] = None) -> None:
"""
Expand Down Expand Up @@ -538,12 +532,14 @@ def group_nodes_by_state(
SlurmNodeState.COMPLETING: [],
SlurmNodeState.ALLOCATED: [],
}

if group_name:
nodes = self.groups[partition_name][group_name]
else:
nodes = []
for group_name in self.groups[partition_name]:
nodes.extend(self.groups[partition_name][group_name])

for node in nodes:
if node.state in grouped_nodes and (not reserved_nodes or node.name in reserved_nodes):
grouped_nodes[node.state].append(node)
Expand Down Expand Up @@ -594,12 +590,12 @@ def allocate_nodes(
for state in grouped_nodes:
while grouped_nodes[state] and len(allocated_nodes) < number_of_nodes:
allocated_nodes.append(grouped_nodes[state].pop(0))

if len(allocated_nodes) < number_of_nodes:
raise ValueError(
f"CloudAI is requesting {number_of_nodes} nodes from the {group_or_partition}, but only "
f"{len(allocated_nodes)} nodes are available. Please review the available nodes in the system "
f"and ensure there are enough resources to meet the requested node count. Additionally, "
f"CloudAI is requesting {number_of_nodes} nodes from the {group_or_partition}, but there are only "
f"{len(allocated_nodes)} nodes in {group_or_partition}. Please review the available nodes in the "
f"system and ensure there are enough resources to meet the requested node count. Additionally, "
f"verify that the system can accommodate the number of nodes required by the test scenario."
)
else:
Expand Down Expand Up @@ -857,6 +853,7 @@ def parse_nodes(self, nodes: List[str]) -> List[str]:
if len(parts) == 2:
partition_name, num_nodes_spec = parts
group_name = None
self.default_partition = partition_name
elif len(parts) == 3:
partition_name, group_name, num_nodes_spec = parts
else:
Expand Down
25 changes: 6 additions & 19 deletions tests/test_slurm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,6 @@ def grouped_nodes() -> dict[SlurmNodeState, list[SlurmNode]]:
return grouped_nodes


def test_get_available_nodes_exceeding_limit_no_callstack(
slurm_system: SlurmSystem, grouped_nodes: Dict[SlurmNodeState, List[SlurmNode]], caplog
):
group_name = "group1"
partition_name = "main"
num_nodes = 5

slurm_system.get_available_nodes_from_group(num_nodes, partition_name, group_name)

log_message = "CloudAI is requesting 5 nodes from the group 'group1', but only 0 nodes are available."
assert log_message in caplog.text


def test_allocate_nodes_max_avail(slurm_system: SlurmSystem, grouped_nodes: dict[SlurmNodeState, list[SlurmNode]]):
partition_name = "main"
group_name = "group_name"
Expand Down Expand Up @@ -216,16 +203,16 @@ def test_allocate_nodes_exceeding_limit(
slurm_system: SlurmSystem, grouped_nodes: dict[SlurmNodeState, list[SlurmNode]]
):
partition_name = "main"
group_name = "group_name"
group_name = "group1"
num_nodes = 5
available_nodes = 4

total_nodes = 4
with pytest.raises(
ValueError,
match=re.escape(
f"CloudAI is requesting {num_nodes} nodes from the group '{group_name}', but only "
f"{available_nodes} nodes are available. Please review the available nodes in the system "
f"and ensure there are enough resources to meet the requested node count. Additionally, "
f"CloudAI is requesting {num_nodes} nodes from the group '{group_name}', but there are only "
f"{total_nodes} nodes in group '{group_name}'. Please review the available nodes in the "
f"system and ensure there are enough resources to meet the requested node count. Additionally, "
f"verify that the system can accommodate the number of nodes required by the test scenario."
),
):
Expand Down

0 comments on commit 3735d6b

Please sign in to comment.