Skip to content

Commit

Permalink
allow job on partition (not tested)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffnvidia committed Nov 20, 2024
1 parent bc7f363 commit 0943339
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 19 deletions.
50 changes: 35 additions & 15 deletions src/cloudai/systems/slurm/slurm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ def get_group_node_names(self, partition_name: str, group_name: str) -> List[str
return [node.name for node in self.get_group_nodes(partition_name, group_name)]

def get_available_nodes_from_group(
self, partition_name: str, group_name: str, number_of_nodes: Union[int, str]
self, number_of_nodes: Union[int, str], partition_name: str, group_name: Optional[str] = None
) -> List[SlurmNode]:
"""
Retrieve a specific number of potentially available nodes from a group within a partition.
Expand All @@ -466,17 +466,19 @@ def get_available_nodes_from_group(
ValueError: If the partition or group is not found, or if the requested number of nodes exceeds the
available nodes.
"""
group_print = f"group '{group_name}' in " if group_name else ""

self.validate_partition_and_group(partition_name, group_name)

self.update_node_states()

grouped_nodes = self.group_nodes_by_state(partition_name, group_name)

try:
allocated_nodes = self.allocate_nodes(grouped_nodes, number_of_nodes, group_name)
allocated_nodes = self.allocate_nodes(grouped_nodes, number_of_nodes, partition_name, group_name)

logging.info(
f"Allocated nodes from group '{group_name}' in partition '{partition_name}': "
f"Allocated nodes from {group_print}partition '{partition_name}': "
f"{[node.name for node in allocated_nodes]}"
)

Expand All @@ -490,7 +492,7 @@ def get_available_nodes_from_group(

return []

def validate_partition_and_group(self, partition_name: str, group_name: str) -> None:
def validate_partition_and_group(self, partition_name: str, group_name: Optional[str] = None) -> None:
"""
Validate that the partition and group exist.
Expand All @@ -504,10 +506,12 @@ def validate_partition_and_group(self, partition_name: str, group_name: str) ->
"""
if partition_name not in self.groups:
raise ValueError(f"Partition '{partition_name}' not found.")
if group_name not in self.groups[partition_name]:
if group_name and group_name not in self.groups[partition_name]:
raise ValueError(f"Group '{group_name}' not found in partition '{partition_name}'.")

def group_nodes_by_state(self, partition_name: str, group_name: str) -> Dict[SlurmNodeState, List[SlurmNode]]:
def group_nodes_by_state(
self, partition_name: str, group_name: Optional[str]
) -> Dict[SlurmNodeState, List[SlurmNode]]:
"""
Group nodes by their states, excluding nodes allocated to the current user.
Expand All @@ -534,15 +538,24 @@ def group_nodes_by_state(self, partition_name: str, group_name: str) -> Dict[Slu
SlurmNodeState.COMPLETING: [],
SlurmNodeState.ALLOCATED: [],
}

for node in self.groups[partition_name][group_name]:
if group_name:
nodes = self.groups[partition_name][group_name]
else:
nodes = []
for group_name in self.groups[partition_name]:
nodes.extend(self.groups[partition_name][group_name])
for node in nodes:
if node.state in grouped_nodes and (not reserved_nodes or node.name in reserved_nodes):
grouped_nodes[node.state].append(node)

return grouped_nodes

def allocate_nodes(
self, grouped_nodes: Dict[SlurmNodeState, List[SlurmNode]], number_of_nodes: Union[int, str], group_name: str
self,
grouped_nodes: Dict[SlurmNodeState, List[SlurmNode]],
number_of_nodes: Union[int, str],
partition_name: str,
group_name: Optional[str],
) -> List[SlurmNode]:
"""
Allocate nodes based on the requested number or maximum availability.
Expand All @@ -551,6 +564,7 @@ def allocate_nodes(
grouped_nodes (Dict[SlurmNodeState, List[SlurmNode]]): Nodes grouped by their state.
number_of_nodes (Union[int, str]): The number of nodes to allocate, or 'max_avail' to allocate
all available nodes.
partition_name (str): The name of the partition.
group_name (str): The name of the group.
Returns:
Expand All @@ -559,6 +573,8 @@ def allocate_nodes(
Raises:
ValueError: If the requested number of nodes exceeds the available nodes.
"""
# Allocate nodes based on priority: idle, then completing, then allocated
group_or_partition = f"group '{group_name}'" if group_name else f"partition '{partition_name}'"
allocated_nodes = []

if isinstance(number_of_nodes, str) and number_of_nodes == "max_avail":
Expand All @@ -567,7 +583,7 @@ def allocate_nodes(

if len(allocated_nodes) == 0:
raise ValueError(
f"CloudAI is requesting the maximum available nodes from the group '{group_name}', "
f"CloudAI is requesting the maximum available nodes from the {group_or_partition}, "
f"but no nodes are available. Please review the available nodes in the system and ensure "
f"there are sufficient resources to meet the requirements of the test scenario. Additionally, "
f"verify that the system is capable of hosting the maximum number of nodes specified in the test "
Expand All @@ -581,7 +597,7 @@ def allocate_nodes(

if len(allocated_nodes) < number_of_nodes:
raise ValueError(
f"CloudAI is requesting {number_of_nodes} nodes from the group '{group_name}', but only "
f"CloudAI is requesting {number_of_nodes} nodes from the {group_or_partition}, but only "
f"{len(allocated_nodes)} nodes are available. Please review the available nodes in the system "
f"and ensure there are enough resources to meet the requested node count. Additionally, "
f"verify that the system can accommodate the number of nodes required by the test scenario."
Expand Down Expand Up @@ -838,11 +854,15 @@ def parse_nodes(self, nodes: List[str]) -> List[str]:
for node_spec in nodes:
if ":" in node_spec:
parts = node_spec.split(":")
if len(parts) != 3:
raise ValueError("Format should be partition:group:num_nodes")
partition_name, group_name, num_nodes_spec = parts
if len(parts) == 2:
partition_name, num_nodes_spec = parts
group_name = None
elif len(parts) == 3:
partition_name, group_name, num_nodes_spec = parts
else:
raise ValueError("Format should be partition:group:num_nodes or partition:num_nodes")
num_nodes = int(num_nodes_spec) if num_nodes_spec != "max_avail" else num_nodes_spec
group_nodes = self.get_available_nodes_from_group(partition_name, group_name, num_nodes)
group_nodes = self.get_available_nodes_from_group(num_nodes, partition_name, group_name)
parsed_nodes += [node.name for node in group_nodes]
else:
# Handle both individual node names and ranges
Expand Down
11 changes: 7 additions & 4 deletions tests/test_slurm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,17 @@ def test_get_available_nodes_exceeding_limit_no_callstack(
partition_name = "main"
num_nodes = 5

slurm_system.get_available_nodes_from_group(partition_name, group_name, num_nodes)
slurm_system.get_available_nodes_from_group(num_nodes, partition_name, group_name)

log_message = "CloudAI is requesting 5 nodes from the group 'group1', but only 0 nodes are available."
assert log_message in caplog.text


def test_allocate_nodes_max_avail(slurm_system: SlurmSystem, grouped_nodes: dict[SlurmNodeState, list[SlurmNode]]):
partition_name = "main"
group_name = "group_name"

available_nodes = slurm_system.allocate_nodes(grouped_nodes, "max_avail", group_name)
available_nodes = slurm_system.allocate_nodes(grouped_nodes, "max_avail", partition_name, group_name)
expected_node_names = [
grouped_nodes[SlurmNodeState.IDLE][0].name,
grouped_nodes[SlurmNodeState.IDLE][1].name,
Expand All @@ -202,9 +203,10 @@ def test_allocate_nodes_max_avail(slurm_system: SlurmSystem, grouped_nodes: dict
def test_allocate_nodes_num_nodes_integers(
slurm_system: SlurmSystem, grouped_nodes: dict[SlurmNodeState, list[SlurmNode]]
):
partition_name = "main"
group_name = "group_name"

available_nodes = slurm_system.allocate_nodes(grouped_nodes, 1, group_name)
available_nodes = slurm_system.allocate_nodes(grouped_nodes, 1, partition_name, group_name)
expected_node_names = [grouped_nodes[SlurmNodeState.IDLE][0].name]

returned_node_names = [node.name for node in available_nodes]
Expand All @@ -215,6 +217,7 @@ def test_allocate_nodes_num_nodes_integers(
def test_allocate_nodes_exceeding_limit(
slurm_system: SlurmSystem, grouped_nodes: dict[SlurmNodeState, list[SlurmNode]]
):
partition_name = "main"
group_name = "group_name"
num_nodes = 5
available_nodes = 4
Expand All @@ -228,4 +231,4 @@ def test_allocate_nodes_exceeding_limit(
f"verify that the system can accommodate the number of nodes required by the test scenario."
),
):
slurm_system.allocate_nodes(grouped_nodes, num_nodes, group_name)
slurm_system.allocate_nodes(grouped_nodes, num_nodes, partition_name, group_name)

0 comments on commit 0943339

Please sign in to comment.