diff --git a/src/bos/common/clients/cfs/__init__.py b/src/bos/common/clients/cfs/__init__.py new file mode 100644 index 00000000..bb123f0b --- /dev/null +++ b/src/bos/common/clients/cfs/__init__.py @@ -0,0 +1,24 @@ +# +# MIT License +# +# (C) Copyright 2024 Hewlett Packard Enterprise Development LP +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +from .client import CFSClient diff --git a/src/bos/common/clients/cfs/base.py b/src/bos/common/clients/cfs/base.py new file mode 100644 index 00000000..a4e11b6d --- /dev/null +++ b/src/bos/common/clients/cfs/base.py @@ -0,0 +1,69 @@ +# +# MIT License +# +# (C) Copyright 2021-2024 Hewlett Packard Enterprise Development LP +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +from abc import ABC +import logging + +from bos.common.clients.endpoints import BaseEndpoint +from bos.common.utils import PROTOCOL + +LOGGER = logging.getLogger(__name__) + +SERVICE_NAME = 'cray-cfs-api' +BASE_CFS_ENDPOINT = f"{PROTOCOL}://{SERVICE_NAME}/v3" + + +class BaseCfsEndpoint(BaseEndpoint, ABC): + """ + This base class provides generic access to the CFS API. + """ + BASE_ENDPOINT = BASE_CFS_ENDPOINT + + def get_items(self, **kwargs): + """Get information for all CFS items""" + return self.get(params=kwargs) + + def update_items(self, data): + """Update information for multiple CFS items""" + return self.patch(json=data) + + +class BasePagedCfsEndpoint(BaseCfsEndpoint, ABC): + """ + This base class provides generic access to the CFS API, for endpoints that support paging. + """ + ITEM_FIELD_NAME = '' + + def get_items(self, **kwargs): + """Get information for all CFS items""" + item_list = [] + while kwargs is not None: + response_json = super().get_items(**kwargs) + new_items = response_json[self.ITEM_FIELD_NAME] + LOGGER.debug("Query returned %d %ss", len(new_items), + self.ITEM_FIELD_NAME) + item_list.extend(new_items) + kwargs = response_json["next"] + LOGGER.debug("Returning %d %ss from CFS", len(item_list), + self.ITEM_FIELD_NAME) + return item_list diff --git a/src/bos/common/clients/cfs/client.py b/src/bos/common/clients/cfs/client.py new file mode 100644 index 00000000..2c2f4508 --- /dev/null +++ b/src/bos/common/clients/cfs/client.py @@ -0,0 +1,37 @@ +# +# MIT License +# +# (C) Copyright 2024 Hewlett Packard Enterprise Development LP +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +from bos.common.clients.api_client import APIClient +from bos.operators.utils.clients.bos.options import options + +from .components import ComponentEndpoint + + +class CFSClient(APIClient): + + def __init__(self): + super().__init__(read_timeout=options.cfs_read_timeout) + + @property + def components(self) -> ComponentEndpoint: + return self.get_endpoint(ComponentEndpoint) diff --git a/src/bos/common/clients/cfs/components.py b/src/bos/common/clients/cfs/components.py new file mode 100644 index 00000000..6b3a16f3 --- /dev/null +++ b/src/bos/common/clients/cfs/components.py @@ -0,0 +1,113 @@ +# +# MIT License +# +# (C) Copyright 2021-2022, 2024 Hewlett Packard Enterprise Development LP +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +from collections import defaultdict +import logging + +from .base import BasePagedCfsEndpoint + +LOGGER = logging.getLogger(__name__) + +GET_BATCH_SIZE = 200 +PATCH_BATCH_SIZE = 1000 + + +class ComponentEndpoint(BasePagedCfsEndpoint): + ENDPOINT = 'components' + ITEM_FIELD_NAME = 'components' + + def get_components(self, **kwargs): + return self.get_items(**kwargs) + + def patch_components(self, data): + return self.update_items(data) + + def get_components_from_id_list(self, id_list): + if not id_list: + LOGGER.warning( + "get_components_from_id_list called without IDs; returning without action." + ) + return [] + LOGGER.debug("get_components_from_id_list called with %d IDs", + len(id_list)) + component_list = [] + while id_list: + next_batch = id_list[:GET_BATCH_SIZE] + next_comps = self.get_components(ids=','.join(next_batch)) + component_list.extend(next_comps) + id_list = id_list[GET_BATCH_SIZE:] + LOGGER.debug( + "get_components_from_id_list returning a total of %d components from CFS", + len(component_list)) + return component_list + + def patch_desired_config(self, + node_ids, + desired_config, + enabled: bool = False, + tags=None, + clear_state: bool = False): + if not node_ids: + LOGGER.warning( + "patch_desired_config called without IDs; returning without action." + ) + return + LOGGER.debug( + "patch_desired_config called on %d IDs with desired_config=%s enabled=%s tags=%s" + " clear_state=%s", len(node_ids), desired_config, enabled, tags, + clear_state) + node_patch = { + 'enabled': enabled, + 'desired_config': desired_config, + 'tags': tags if tags else {} + } + data = {"patch": node_patch, "filters": {}} + if clear_state: + node_patch['state'] = [] + while node_ids: + data["filters"]["ids"] = ','.join(node_ids[:PATCH_BATCH_SIZE]) + self.patch_components(data) + node_ids = node_ids[PATCH_BATCH_SIZE:] + + def set_cfs(self, components, enabled: bool, clear_state: bool = False): + if not components: + LOGGER.warning( + "set_cfs called without components; returning without action.") + return + LOGGER.debug( + "set_cfs called on %d components with enabled=%s clear_state=%s", + len(components), enabled, clear_state) + configurations = defaultdict(list) + for component in components: + config_name = component.get('desired_state', + {}).get('configuration', '') + bos_session = component.get('session') + key = (config_name, bos_session) + configurations[key].append(component['id']) + for key, ids in configurations.items(): + config_name, bos_session = key + self.patch_desired_config(ids, + config_name, + enabled=enabled, + tags={'bos_session': bos_session}, + clear_state=clear_state) diff --git a/src/bos/operators/base.py b/src/bos/operators/base.py index 90129084..0bfc8ebc 100644 --- a/src/bos/operators/base.py +++ b/src/bos/operators/base.py @@ -36,9 +36,11 @@ from typing import Generator, List, NoReturn, Type from bos.common.clients.bss import BSSClient +from bos.common.clients.cfs import CFSClient from bos.common.clients.pcs import PCSClient from bos.common.utils import exc_type_msg from bos.common.values import Status +from bos.operators.filters import DesiredConfigurationSetInCFS from bos.operators.filters.base import BaseFilter from bos.operators.utils.clients.bos.options import options from bos.operators.utils.clients.bos import BOSClient @@ -67,7 +69,7 @@ class ApiClients: def __init__(self): #self.bos = BOSClient() self.bss = BSSClient() - #self.cfs = CFSClient() + self.cfs = CFSClient() #self.hsm = HSMClient() #self.ims = IMSClient() self.pcs = PCSClient() @@ -79,7 +81,7 @@ def __enter__(self): """ #self._stack.enter_context(self.bos) self._stack.enter_context(self.bss) - #self._stack.enter_context(self.cfs) + self._stack.enter_context(self.cfs) #self._stack.enter_context(self.hsm) #self._stack.enter_context(self.ims) self._stack.enter_context(self.pcs) @@ -120,7 +122,7 @@ def __init__(self) -> NoReturn: def client(self) -> ApiClients: """ Return the ApiClients object for this operator. - If it is not initialized, raise a ValueError (this should never be the case). + If it is not initialized, raise a ValueError (this should never be the case). """ if self._client is None: raise ValueError("Attempted to access uninitialized API client") @@ -136,6 +138,13 @@ def name(self) -> str: def filters(self) -> List[Type[BaseFilter]]: return [] + @property + def DesiredConfigurationSetInCFS(self) -> DesiredConfigurationSetInCFS: + """ + Shortcut to get a DesiredConfigurationSetInCFS filter with the cfs_client for this operator + """ + return DesiredConfigurationSetInCFS(self.client.cfs) + def run(self) -> NoReturn: """ The core method of the operator that periodically detects and acts on components. diff --git a/src/bos/operators/configuration.py b/src/bos/operators/configuration.py index 8e63d74d..816ce240 100644 --- a/src/bos/operators/configuration.py +++ b/src/bos/operators/configuration.py @@ -25,9 +25,8 @@ import logging from bos.common.values import Status -from bos.operators.utils.clients.cfs import set_cfs from bos.operators.base import BaseOperator, main -from bos.operators.filters import BOSQuery, DesiredConfigurationSetInCFS, NOT +from bos.operators.filters import BOSQuery, NOT LOGGER = logging.getLogger(__name__) @@ -51,12 +50,12 @@ def name(self): def filters(self): return [ BOSQuery(enabled=True, status=Status.configuring), - NOT(DesiredConfigurationSetInCFS()) + NOT(self.DesiredConfigurationSetInCFS) ] def _act(self, components): if components: - set_cfs(components, enabled=True) + self.client.cfs.components.set_cfs(components, enabled=True) return components diff --git a/src/bos/operators/filters/filters.py b/src/bos/operators/filters/filters.py index 0d2e909b..882971f2 100644 --- a/src/bos/operators/filters/filters.py +++ b/src/bos/operators/filters/filters.py @@ -28,11 +28,10 @@ import re from typing import List, Type +from bos.common.clients.cfs import CFSClient from bos.common.utils import get_current_time, load_timestamp from bos.operators.filters.base import BaseFilter, DetailsFilter, IDFilter, LocalFilter from bos.operators.utils.clients.bos import BOSClient -from bos.operators.utils.clients.cfs import get_components_from_id_list as \ - get_cfs_components_from_id_list from bos.operators.utils.clients.hsm import get_components as get_hsm_components LOGGER = logging.getLogger(__name__) @@ -213,13 +212,15 @@ def _sanitize_kernel_parameters(self, parameter_string): class DesiredConfigurationSetInCFS(LocalFilter): """ Returns when desired configuration is set in CFS """ - def __init__(self): - self.cfs_components_dict = {} + def __init__(self, cfs_client: CFSClient): super().__init__() + self.cfs_components_dict = {} + self.cfs_client = cfs_client def _filter(self, components: List[dict]) -> List[dict]: component_ids = [component['id'] for component in components] - cfs_components = get_cfs_components_from_id_list(id_list=component_ids) + cfs_components = self.cfs_client.components.get_components_from_id_list( + id_list=component_ids) self.cfs_components_dict = { component['id']: component for component in cfs_components diff --git a/src/bos/operators/power_on.py b/src/bos/operators/power_on.py index 9dc50af8..2f2e57bc 100644 --- a/src/bos/operators/power_on.py +++ b/src/bos/operators/power_on.py @@ -36,7 +36,6 @@ using_sbps_check_kernel_parameters, components_by_id from bos.common.values import Action, Status from bos.operators.utils.clients.ims import tag_image -from bos.operators.utils.clients.cfs import set_cfs from bos.operators.base import BaseOperator, main from bos.operators.filters import BOSQuery, HSMState from bos.server.dbs.boot_artifacts import record_boot_artifacts @@ -83,7 +82,7 @@ def _act(self, components: Union[List[dict], None]): raise Exception( f"Error encountered setting BSS information: {e}") from e try: - set_cfs(components, enabled=False, clear_state=True) + self.client.cfs.components.set_cfs(components, enabled=False, clear_state=True) except Exception as e: raise Exception( f"Error encountered setting CFS information: {e}") from e diff --git a/src/bos/operators/status.py b/src/bos/operators/status.py index 964fa829..874ede67 100644 --- a/src/bos/operators/status.py +++ b/src/bos/operators/status.py @@ -27,9 +27,8 @@ from bos.common.values import Phase, Status, Action, EMPTY_ACTUAL_STATE from bos.operators.base import BaseOperator, main from bos.operators.filters import DesiredBootStateIsOff, BootArtifactStatesMatch, \ - DesiredConfigurationIsNone, DesiredConfigurationSetInCFS, LastActionIs, TimeSinceLastAction + DesiredConfigurationIsNone, LastActionIs, TimeSinceLastAction from bos.operators.utils.clients.bos.options import options -from bos.operators.utils.clients.cfs import get_components as get_cfs_components LOGGER = logging.getLogger(__name__) @@ -47,14 +46,18 @@ def __init__(self): self.boot_artifact_states_match = BootArtifactStatesMatch()._match self.desired_configuration_is_none = DesiredConfigurationIsNone( )._match - self.desired_configuration_set_in_cfs = DesiredConfigurationSetInCFS( - )._match self.last_action_is_power_on = LastActionIs(Action.power_on)._match self.boot_wait_time_elapsed = TimeSinceLastAction( seconds=options.max_boot_wait_time)._match self.power_on_wait_time_elapsed = TimeSinceLastAction( seconds=options.max_power_on_wait_time)._match + def desired_configuration_set_in_cfs(self, *args, **kwargs): + """ + Shortcut to DesiredConfigurationSetInCFS._match method + """ + return self.DesiredConfigurationSetInCFS._match(*args, **kwargs) + @property def name(self): """ Unused for the status operator """ @@ -109,15 +112,14 @@ def _run_on_chunk(self, components) -> None: LOGGER.debug('Updated components: %s', updated_components) self.bos_client.components.update_components(updated_components) - @staticmethod - def _get_cfs_components(): + def _get_cfs_components(self): """ Gets all the components from CFS. We used to get only the components of interest, but that caused an HTTP request that was longer than uwsgi could handle when the number of nodes was very large. Requesting all components means none need to be specified in the request. """ - cfs_data = get_cfs_components() + cfs_data = self.client.cfs.components.get_components() cfs_states = {} for component in cfs_data: cfs_states[component['id']] = component diff --git a/src/bos/operators/utils/clients/cfs.py b/src/bos/operators/utils/clients/cfs.py deleted file mode 100644 index c3221ccd..00000000 --- a/src/bos/operators/utils/clients/cfs.py +++ /dev/null @@ -1,163 +0,0 @@ -# -# MIT License -# -# (C) Copyright 2021-2024 Hewlett Packard Enterprise Development LP -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the "Software"), -# to deal in the Software without restriction, including without limitation -# the rights to use, copy, modify, merge, publish, distribute, sublicense, -# and/or sell copies of the Software, and to permit persons to whom the -# Software is furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL -# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR -# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, -# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR -# OTHER DEALINGS IN THE SOFTWARE. -# -from collections import defaultdict -import logging -from bos.operators.utils.clients.bos.options import options -from requests.exceptions import HTTPError - -from bos.common.utils import compact_response_text, exc_type_msg, requests_retry_session, PROTOCOL - -SERVICE_NAME = 'cray-cfs-api' -BASE_ENDPOINT = f"{PROTOCOL}://{SERVICE_NAME}/v3" -COMPONENTS_ENDPOINT = f"{BASE_ENDPOINT}/components" - -LOGGER = logging.getLogger(__name__) - -GET_BATCH_SIZE = 200 -PATCH_BATCH_SIZE = 1000 - - -def get_components(session=None, **params): - """ - Makes GET request for CFS components. - Performs additional requests to get additional pages of components, if - needed. - Returns the list of CFS components - """ - if not session: - session = requests_retry_session(read_timeout=options.cfs_read_timeout) # pylint: disable=redundant-keyword-arg - component_list = [] - while params is not None: - LOGGER.debug("GET %s with params=%s", COMPONENTS_ENDPOINT, params) - response = session.get(COMPONENTS_ENDPOINT, params=params) - LOGGER.debug("Response status code=%d, reason=%s, body=%s", - response.status_code, response.reason, - compact_response_text(response.text)) - try: - response.raise_for_status() - except HTTPError as err: - LOGGER.error("Failed getting nodes from CFS: %s", - exc_type_msg(err)) - raise - response_json = response.json() - new_components = response_json["components"] - LOGGER.debug("Query returned %d components", len(new_components)) - component_list.extend(new_components) - params = response_json["next"] - LOGGER.debug("Returning %d components from CFS", len(component_list)) - return component_list - - -def patch_components(data, session=None): - if not data: - LOGGER.warning( - "patch_components called without data; returning without action.") - return - if not session: - session = requests_retry_session(read_timeout=options.cfs_read_timeout) # pylint: disable=redundant-keyword-arg - LOGGER.debug("PATCH %s with body=%s", COMPONENTS_ENDPOINT, data) - response = session.patch(COMPONENTS_ENDPOINT, json=data) - LOGGER.debug("Response status code=%d, reason=%s, body=%s", - response.status_code, response.reason, - compact_response_text(response.text)) - try: - response.raise_for_status() - except HTTPError as err: - LOGGER.error("Failed asking CFS to configure nodes: %s", - exc_type_msg(err)) - raise - - -def get_components_from_id_list(id_list): - if not id_list: - LOGGER.warning( - "get_components_from_id_list called without IDs; returning without action." - ) - return [] - LOGGER.debug("get_components_from_id_list called with %d IDs", - len(id_list)) - session = requests_retry_session(read_timeout=options.cfs_read_timeout) # pylint: disable=redundant-keyword-arg - component_list = [] - while id_list: - next_batch = id_list[:GET_BATCH_SIZE] - next_comps = get_components(session=session, ids=','.join(next_batch)) - component_list.extend(next_comps) - id_list = id_list[GET_BATCH_SIZE:] - LOGGER.debug( - "get_components_from_id_list returning a total of %d components from CFS", - len(component_list)) - return component_list - - -def patch_desired_config(node_ids, - desired_config, - enabled=False, - tags=None, - clear_state=False): - if not node_ids: - LOGGER.warning( - "patch_desired_config called without IDs; returning without action." - ) - return - LOGGER.debug( - "patch_desired_config called on %d IDs with desired_config=%s enabled=%s tags=%s" - " clear_state=%s", len(node_ids), desired_config, enabled, tags, - clear_state) - session = requests_retry_session(read_timeout=options.cfs_read_timeout) # pylint: disable=redundant-keyword-arg - node_patch = { - 'enabled': enabled, - 'desired_config': desired_config, - 'tags': tags if tags else {} - } - data = {"patch": node_patch, "filters": {}} - if clear_state: - node_patch['state'] = [] - while node_ids: - data["filters"]["ids"] = ','.join(node_ids[:PATCH_BATCH_SIZE]) - patch_components(data=data, session=session) - node_ids = node_ids[PATCH_BATCH_SIZE:] - - -def set_cfs(components, enabled, clear_state=False): - if not components: - LOGGER.warning( - "set_cfs called without components; returning without action.") - return - LOGGER.debug( - "set_cfs called on %d components with enabled=%s clear_state=%s", - len(components), enabled, clear_state) - configurations = defaultdict(list) - for component in components: - config_name = component.get('desired_state', - {}).get('configuration', '') - bos_session = component.get('session') - key = (config_name, bos_session) - configurations[key].append(component['id']) - for key, ids in configurations.items(): - config_name, bos_session = key - patch_desired_config(ids, - config_name, - enabled=enabled, - tags={'bos_session': bos_session}, - clear_state=clear_state)