diff --git a/CHANGELOG.md b/CHANGELOG.md index 64fb6b9a..fbd228b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add request timeouts to BOS reporter API calls ### Changed +- Create new BOS v2 `max_component_batch_size` option to limit number of components a BOS operator + will work on at once. - Code linting (no functional changes) ### Fixed diff --git a/api/openapi.yaml.in b/api/openapi.yaml.in index 5dfed717..5be2f42d 100644 --- a/api/openapi.yaml.in +++ b/api/openapi.yaml.in @@ -1047,6 +1047,12 @@ components: example: 1 minimum: 0 maximum: 1048576 + max_component_batch_size: + type: integer + description: The maximum number of components that a BOS operator will process at once. 0 means no limit. + example: 1000 + minimum: 0 + maximum: 131071 additionalProperties: true requestBodies: diff --git a/src/bos/operators/base.py b/src/bos/operators/base.py index 248dbf6f..dc530cb2 100644 --- a/src/bos/operators/base.py +++ b/src/bos/operators/base.py @@ -27,11 +27,12 @@ """ from abc import ABC, abstractmethod +import itertools import logging import threading import os import time -from typing import List, NoReturn, Type +from typing import Generator, List, NoReturn, Type from bos.common.utils import exc_type_msg from bos.common.values import Status @@ -74,6 +75,7 @@ class BaseOperator(ABC): def __init__(self) -> NoReturn: self.bos_client = BOSClient() + self.__max_batch_size = 0 @property @abstractmethod @@ -108,6 +110,23 @@ def run(self) -> NoReturn: LOGGER.exception('Unhandled exception getting polling frequency: %s', e) time.sleep(5) # A small sleep for when exceptions getting the polling frequency + @property + def max_batch_size(self) -> int: + max_batch_size = options.max_component_batch_size + if max_batch_size != self.__max_batch_size: + LOGGER.info("max_component_batch_size option set to %d", max_batch_size) + self.__max_batch_size = max_batch_size + return max_batch_size + + def _chunk_components(self, components: List[dict]) -> Generator[List[dict], None, None]: + """ + Break up the components into groups of no more than max_batch_size nodes, + and yield each group in turn. + If the max size is set to 0, just yield the entire list. + """ + for chunk in chunk_components(components, self.max_batch_size): + yield chunk + def _run(self) -> None: """ A single pass of detecting and acting on components """ components = self._get_components() @@ -115,6 +134,14 @@ def _run(self) -> None: LOGGER.debug('Found 0 components that require action') return LOGGER.info('Found %d components that require action', len(components)) + for chunk in self._chunk_components(components): + self._run_on_chunk(chunk) + + def _run_on_chunk(self, components: List[dict]) -> None: + """ + Acts on a chunk of components + """ + LOGGER.debug("Processing %d components", len(components)) # Only check for failed components if we track retries for this operator if self.retry_attempt_field: components = self._handle_failed_components(components) @@ -259,6 +286,18 @@ def _update_database_for_failure(self, components: List[dict]) -> None: self.bos_client.components.update_components(data) +def chunk_components(components: List[dict], + max_batch_size: int) -> Generator[List[dict], None, None]: + """ + Break up the components into groups of no more than max_batch_size nodes, + and yield each group in turn. + If the max size is set to 0, just yield the entire list. + """ + chunk_size = max_batch_size if max_batch_size > 0 else len(components) + for chunk in itertools.batched(components, chunk_size): + yield chunk + + def _update_log_level() -> None: """ Updates the current logging level base on the value in the options database """ try: diff --git a/src/bos/operators/discovery.py b/src/bos/operators/discovery.py index 6f5ec77a..14f1c7eb 100644 --- a/src/bos/operators/discovery.py +++ b/src/bos/operators/discovery.py @@ -78,8 +78,9 @@ def _run(self) -> None: LOGGER.info("No new component(s) discovered.") return LOGGER.info("%s new component(s) from HSM.", len(components_to_add)) - self.bos_client.components.put_components(components_to_add) - LOGGER.info("%s new component(s) added to BOS!", len(components_to_add)) + for chunk in self._chunk_components(components_to_add): + self.bos_client.components.put_components(chunk) + LOGGER.info("%s new component(s) added to BOS!", len(chunk)) @property def bos_components(self) -> Set[str]: diff --git a/src/bos/operators/session_setup.py b/src/bos/operators/session_setup.py index 748e98d9..fe59c9d1 100644 --- a/src/bos/operators/session_setup.py +++ b/src/bos/operators/session_setup.py @@ -27,7 +27,7 @@ from typing import Set from botocore.exceptions import ClientError -from bos.operators.base import BaseOperator, main +from bos.operators.base import BaseOperator, main, chunk_components from bos.operators.filters.filters import HSMState from bos.operators.utils.clients.hsm import Inventory from bos.operators.utils.clients.s3 import S3Object, S3ObjectNotFound @@ -74,7 +74,7 @@ def _run(self) -> None: inventory_cache = Inventory() for data in sessions: session = Session(data, inventory_cache, self.bos_client) - session.setup() + session.setup(self.max_batch_size) def _get_pending_sessions(self): return self.bos_client.sessions.get_sessions(status='pending') @@ -108,15 +108,15 @@ def template(self): self.tenant) return self._template - def setup(self): + def setup(self, max_batch_size: int): try: - component_ids = self._setup_components() + component_ids = self._setup_components(max_batch_size) except SessionSetupException as err: self._mark_failed(str(err)) else: self._mark_running(component_ids) - def _setup_components(self): + def _setup_components(self, max_batch_size: int): all_component_ids = [] data = [] stage = self.session_data.get("stage", False) @@ -138,8 +138,9 @@ def _setup_components(self): raise SessionSetupException(err) from err # No exception raised by previous block self._log(LOGGER.info, 'Found %d components that require updates', len(data)) - self._log(LOGGER.debug, f'Updated components: {data}') - self.bos_client.components.update_components(data) + for chunk in chunk_components(data, max_batch_size): + self._log(LOGGER.debug, f'Updated components: {chunk}') + self.bos_client.components.update_components(chunk) return list(set(all_component_ids)) def _get_boot_set_component_list(self, boot_set) -> Set[str]: diff --git a/src/bos/operators/status.py b/src/bos/operators/status.py index 529be818..e60ffd7a 100644 --- a/src/bos/operators/status.py +++ b/src/bos/operators/status.py @@ -74,6 +74,15 @@ def _run(self) -> None: if not components: LOGGER.debug('No enabled components found') return + LOGGER.debug('Found %d components that require action', len(components)) + for chunk in self._chunk_components(components): + self._run_on_chunk(chunk) + + def _run_on_chunk(self, components) -> None: + """ + Acts on a chunk of components + """ + LOGGER.debug("Processing %d components", len(components)) component_ids = [component['id'] for component in components] power_states = node_to_powerstate(component_ids) cfs_states = self._get_cfs_components() diff --git a/src/bos/operators/utils/clients/bos/options.py b/src/bos/operators/utils/clients/bos/options.py index ed15522d..3440facb 100644 --- a/src/bos/operators/utils/clients/bos/options.py +++ b/src/bos/operators/utils/clients/bos/options.py @@ -116,6 +116,9 @@ def component_actual_state_ttl(self): def default_retry_policy(self): return self.get_option('default_retry_policy', int, 3) + @property + def max_component_batch_size(self): + return self.get_option('max_component_batch_size', int, 2800) options = Options() diff --git a/src/bos/server/controllers/v2/options.py b/src/bos/server/controllers/v2/options.py index fab768b1..35d98fae 100644 --- a/src/bos/server/controllers/v2/options.py +++ b/src/bos/server/controllers/v2/options.py @@ -49,6 +49,7 @@ 'max_power_off_wait_time': 300, 'polling_frequency': 15, 'default_retry_policy': 3, + 'max_component_batch_size': 2800 }