Skip to content

Commit

Permalink
Merge pull request #341 from Cray-HPE/casmtriage-7147-csm-1.6
Browse files Browse the repository at this point in the history
CASMTRIAGE-7147: Create max_component_batch_size option
  • Loading branch information
mharding-hpe authored Jul 23, 2024
2 parents 0ab9107 + 5bec250 commit 2bfa9c0
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 10 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add request timeouts to BOS reporter API calls

### Changed
- Create new BOS v2 `max_component_batch_size` option to limit number of components a BOS operator
will work on at once.
- Code linting (no functional changes)

### Fixed
Expand Down
6 changes: 6 additions & 0 deletions api/openapi.yaml.in
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,12 @@ components:
example: 1
minimum: 0
maximum: 1048576
max_component_batch_size:
type: integer
description: The maximum number of components that a BOS operator will process at once. 0 means no limit.
example: 1000
minimum: 0
maximum: 131071
additionalProperties: true

requestBodies:
Expand Down
41 changes: 40 additions & 1 deletion src/bos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
"""

from abc import ABC, abstractmethod
import itertools
import logging
import threading
import os
import time
from typing import List, NoReturn, Type
from typing import Generator, List, NoReturn, Type

from bos.common.utils import exc_type_msg
from bos.common.values import Status
Expand Down Expand Up @@ -74,6 +75,7 @@ class BaseOperator(ABC):

def __init__(self) -> NoReturn:
self.bos_client = BOSClient()
self.__max_batch_size = 0

@property
@abstractmethod
Expand Down Expand Up @@ -108,13 +110,38 @@ def run(self) -> NoReturn:
LOGGER.exception('Unhandled exception getting polling frequency: %s', e)
time.sleep(5) # A small sleep for when exceptions getting the polling frequency

@property
def max_batch_size(self) -> int:
max_batch_size = options.max_component_batch_size
if max_batch_size != self.__max_batch_size:
LOGGER.info("max_component_batch_size option set to %d", max_batch_size)
self.__max_batch_size = max_batch_size
return max_batch_size

def _chunk_components(self, components: List[dict]) -> Generator[List[dict], None, None]:
"""
Break up the components into groups of no more than max_batch_size nodes,
and yield each group in turn.
If the max size is set to 0, just yield the entire list.
"""
for chunk in chunk_components(components, self.max_batch_size):
yield chunk

def _run(self) -> None:
""" A single pass of detecting and acting on components """
components = self._get_components()
if not components:
LOGGER.debug('Found 0 components that require action')
return
LOGGER.info('Found %d components that require action', len(components))
for chunk in self._chunk_components(components):
self._run_on_chunk(chunk)

def _run_on_chunk(self, components: List[dict]) -> None:
"""
Acts on a chunk of components
"""
LOGGER.debug("Processing %d components", len(components))
# Only check for failed components if we track retries for this operator
if self.retry_attempt_field:
components = self._handle_failed_components(components)
Expand Down Expand Up @@ -259,6 +286,18 @@ def _update_database_for_failure(self, components: List[dict]) -> None:
self.bos_client.components.update_components(data)


def chunk_components(components: List[dict],
max_batch_size: int) -> Generator[List[dict], None, None]:
"""
Break up the components into groups of no more than max_batch_size nodes,
and yield each group in turn.
If the max size is set to 0, just yield the entire list.
"""
chunk_size = max_batch_size if max_batch_size > 0 else len(components)
for chunk in itertools.batched(components, chunk_size):
yield chunk


def _update_log_level() -> None:
""" Updates the current logging level base on the value in the options database """
try:
Expand Down
5 changes: 3 additions & 2 deletions src/bos/operators/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ def _run(self) -> None:
LOGGER.info("No new component(s) discovered.")
return
LOGGER.info("%s new component(s) from HSM.", len(components_to_add))
self.bos_client.components.put_components(components_to_add)
LOGGER.info("%s new component(s) added to BOS!", len(components_to_add))
for chunk in self._chunk_components(components_to_add):
self.bos_client.components.put_components(chunk)
LOGGER.info("%s new component(s) added to BOS!", len(chunk))

@property
def bos_components(self) -> Set[str]:
Expand Down
15 changes: 8 additions & 7 deletions src/bos/operators/session_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from typing import Set
from botocore.exceptions import ClientError

from bos.operators.base import BaseOperator, main
from bos.operators.base import BaseOperator, main, chunk_components
from bos.operators.filters.filters import HSMState
from bos.operators.utils.clients.hsm import Inventory
from bos.operators.utils.clients.s3 import S3Object, S3ObjectNotFound
Expand Down Expand Up @@ -74,7 +74,7 @@ def _run(self) -> None:
inventory_cache = Inventory()
for data in sessions:
session = Session(data, inventory_cache, self.bos_client)
session.setup()
session.setup(self.max_batch_size)

def _get_pending_sessions(self):
return self.bos_client.sessions.get_sessions(status='pending')
Expand Down Expand Up @@ -108,15 +108,15 @@ def template(self):
self.tenant)
return self._template

def setup(self):
def setup(self, max_batch_size: int):
try:
component_ids = self._setup_components()
component_ids = self._setup_components(max_batch_size)
except SessionSetupException as err:
self._mark_failed(str(err))
else:
self._mark_running(component_ids)

def _setup_components(self):
def _setup_components(self, max_batch_size: int):
all_component_ids = []
data = []
stage = self.session_data.get("stage", False)
Expand All @@ -138,8 +138,9 @@ def _setup_components(self):
raise SessionSetupException(err) from err
# No exception raised by previous block
self._log(LOGGER.info, 'Found %d components that require updates', len(data))
self._log(LOGGER.debug, f'Updated components: {data}')
self.bos_client.components.update_components(data)
for chunk in chunk_components(data, max_batch_size):
self._log(LOGGER.debug, f'Updated components: {chunk}')
self.bos_client.components.update_components(chunk)
return list(set(all_component_ids))

def _get_boot_set_component_list(self, boot_set) -> Set[str]:
Expand Down
9 changes: 9 additions & 0 deletions src/bos/operators/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ def _run(self) -> None:
if not components:
LOGGER.debug('No enabled components found')
return
LOGGER.debug('Found %d components that require action', len(components))
for chunk in self._chunk_components(components):
self._run_on_chunk(chunk)

def _run_on_chunk(self, components) -> None:
"""
Acts on a chunk of components
"""
LOGGER.debug("Processing %d components", len(components))
component_ids = [component['id'] for component in components]
power_states = node_to_powerstate(component_ids)
cfs_states = self._get_cfs_components()
Expand Down
3 changes: 3 additions & 0 deletions src/bos/operators/utils/clients/bos/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ def component_actual_state_ttl(self):
def default_retry_policy(self):
return self.get_option('default_retry_policy', int, 3)

@property
def max_component_batch_size(self):
return self.get_option('max_component_batch_size', int, 2800)


options = Options()
1 change: 1 addition & 0 deletions src/bos/server/controllers/v2/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
'max_power_off_wait_time': 300,
'polling_frequency': 15,
'default_retry_policy': 3,
'max_component_batch_size': 2800
}


Expand Down

0 comments on commit 2bfa9c0

Please sign in to comment.