Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
mharding-hpe committed Oct 4, 2024
1 parent 2c6e8b0 commit 557767d
Showing 1 changed file with 75 additions and 68 deletions.
143 changes: 75 additions & 68 deletions src/bos/server/controllers/v2/boot_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
from functools import partial
import logging
from bos.common.utils import exc_type_msg, requests_retry_session
from bos.operators.utils.boot_image_metadata import BootImageMetaData
from bos.operators.utils.boot_image_metadata.factory import BootImageMetaDataFactory
from bos.operators.utils.clients.ims import get_arch_from_image_data, get_image, \
get_ims_id_from_s3_url, ImageNotFound
from bos.operators.utils.clients.s3 import S3Object, ArtifactNotFound
from bos.operators.utils.clients.s3 import S3Object, S3Url, ArtifactNotFound
from bos.server.controllers.v2.options import OptionsData
from bos.server.utils import canonize_xname, ParsingException

Expand Down Expand Up @@ -157,75 +156,45 @@ def _validate_boot_set(bs: dict, operation: str, options_data: OptionsData) -> l
for field in HARDWARE_SPECIFIER_FIELDS]
if not any(specified):
raise BootSetError(f"No non-empty hardware specifier field {HARDWARE_SPECIFIER_FIELDS}")
try:
if any(node[:3] == "nid" for node in bs["node_list"]):
msg = "Has NID in 'node_list'"
if options_data.reject_nids:
raise BootSetError(msg)
# Otherwise, log this as a warning -- even if reject_nids is not set,
# BOS still doesn't support NIDs, so this is still undesirable
warning_msgs.append(msg)
except KeyError:
# If there is no node_list field, not a problem
pass

if "node_list" in bs and any(node[:3] == "nid" for node in bs["node_list"]):
msg = "Has NID in 'node_list'"
if options_data.reject_nids:
raise BootSetError(msg)
# Otherwise, log this as a warning -- even if reject_nids is not set,
# BOS still doesn't support NIDs, so this is still undesirable
warning_msgs.append(msg)

if operation in ['boot', 'reboot']:
# Verify that the boot artifacts exist
try:
image_metadata = BootImageMetaDataFactory(bs)()
except Exception as err:
raise BootSetError(f"Can't find boot artifacts. Error: {exc_type_msg(err)}") from err
validate_boot_artifacts(bs)
except BootSetWarning as err:
warning_msgs.append(str(err))

try:
validate_ims_boot_image(bs, image_metadata, options_data)
validate_ims_boot_image(bs, options_data)
except BootSetWarning as err:
warning_msgs.append(str(err))

# Check boot artifacts' S3 headers
for boot_artifact in ["kernel"]:
try:
artifact = getattr(image_metadata.boot_artifacts, boot_artifact)
path = artifact ['link']['path']
etag = artifact['link']['etag']
obj = S3Object(path, etag)
_ = obj.object_header
except Exception as err:
raise BootSetError(f"Can't find {boot_artifact} in "
f"{image_metadata.manifest_s3_url.url}. "
f"Error: {exc_type_msg(err)}") from err

for boot_artifact in ["initrd", "boot_parameters"]:
try:
artifact = getattr(image_metadata.boot_artifacts, boot_artifact)
if not artifact:
raise ArtifactNotFound()
path = artifact ['link']['path']
etag = artifact['link']['etag']
obj = S3Object(path, etag)
_ = obj.object_header
except ArtifactNotFound as err:
warning_msgs.append(
f"{image_metadata.manifest_s3_url.url} doesn't contain a {boot_artifact}")
except Exception as err:
warning_msgs.append(f"Can't find {boot_artifact} in "
f"{image_metadata.manifest_s3_url.url}. "
f"Warning: {exc_type_msg(err)}")

return warning_msgs


def validate_ims_boot_image(bs: dict, image_metadata: BootImageMetaData,
options_data: OptionsData) -> None:
def validate_ims_boot_image(bs: dict, options_data: OptionsData) -> None:
"""
If the boot set architecture is not set to Other, check that the IMS image
architecture matches the boot set architecture (treating a boot set architecture
of Unknown as X86)
Otherwise, at least validate whether the boot image is in IMS, if we expect it to be.
"""
try:
bs_path = bs["path"]
except KeyError as err:
raise BootSetError("Missing required 'path' field") from err

bs_arch = bs.get("arch", DEFAULT_ARCH)

ims_id = get_ims_image_id(image_metadata)
ims_id = get_ims_image_id(bs_path)

# If IMS being inaccessible is not a fatal error, then reduce the number
# of retries we make, to prevent a lengthy delay
Expand All @@ -235,11 +204,11 @@ def validate_ims_boot_image(bs: dict, image_metadata: BootImageMetaData,
image_data = get_ims_image_data(ims_id, num_retries)
except ImageNotFound as err:
if options_data.ims_images_must_exist:
raise err
raise BootSetError(str(err)) from err
raise BootSetWarning(str(err)) from err
except Exception as err:
if options_data.ims_errors_fatal:
raise err
raise BootSetError(exc_type_msg(err)) from err
if bs_arch != 'Other':
# This means that this error is preventing us from validating the
# boot set architecture
Expand All @@ -256,7 +225,7 @@ def validate_ims_boot_image(bs: dict, image_metadata: BootImageMetaData,
except Exception as err:
# This most likely indicates that the IMS image data we got wasn't even a dict
if options_data.ims_errors_fatal:
raise err
raise BootSetError(exc_type_msg(err)) from err
raise BootSetWarning(str(err)) from err

if EXPECTED_IMS_ARCH[bs_arch] != ims_image_arch:
Expand Down Expand Up @@ -307,20 +276,20 @@ def validate_sanitize_boot_set(bs_name: str, bs_data: dict, options_data: Option
raise ParsingException(f"boot_sets key ({bs_name}) does not match 'name' "
f"field of corresponding boot set ({bs_data['name']})")

# Get the boot image metadata
# Check the boot artifacts
try:
image_metadata = BootImageMetaDataFactory(bs_data)()
validate_boot_artifacts(bs_data)
except (BootSetError, BootSetWarning) as err:
LOGGER.warning(str(err))

# Validate the boot set IMS image
try:
validate_ims_boot_image(bs_data, options_data)
except BootSetWarning as err:
LOGGER.warning("Boot set '%s': %s", bs_name, err)
LOGGER.warning('Boot set contents: %s', bs_data)
except Exception as err:
LOGGER.warning("Can't find boot artifacts: %s", exc_type_msg(err))
else: # No exception was raised in try block
# Validate the boot set IMS image
try:
validate_ims_boot_image(bs_data, image_metadata, options_data)
except BootSetWarning as err:
LOGGER.warning("Boot set '%s': %s", bs_name, err)
LOGGER.warning('Boot set contents: %s', bs_data)
except Exception as err:
raise ParsingException(str(err)) from err
raise ParsingException(exc_type_msg(err)) from err

# Validate that the boot set has at least one of the HARDWARE_SPECIFIER_FIELDS
if not any(field_name in bs_data for field_name in HARDWARE_SPECIFIER_FIELDS):
Expand Down Expand Up @@ -361,14 +330,14 @@ def validate_sanitize_boot_set(bs_name: str, bs_data: dict, options_data: Option
bs_data["node_list"] = new_node_list


def get_ims_image_id(image_metadata: BootImageMetaData) -> str:
def get_ims_image_id(path: str) -> str:
"""
If the image is an IMS image, return its ID.
Raise NonImsImage otherwise,
Note that this does not actually check IMS to see if the ID
exists.
"""
s3_url = image_metadata.manifest_s3_url
s3_url = S3Url(path)
ims_id = get_ims_id_from_s3_url(s3_url)
if ims_id:
return ims_id
Expand All @@ -387,3 +356,41 @@ def get_ims_image_data(ims_id: str, num_retries: int|None=None) -> dict:
# https://github.com/pylint-dev/pylint/issues/2271
kwargs['session'] = requests_retry_session(retries=4) # pylint: disable=redundant-keyword-arg
return get_image(**kwargs)


def validate_boot_artifacts(bs: dict):
# Verify that the boot artifacts exist
try:
image_metadata = BootImageMetaDataFactory(bs)()
except Exception as err:
raise BootSetError(f"Can't find boot artifacts. Error: {exc_type_msg(err)}") from err

# Check boot artifacts' S3 headers
for boot_artifact in ["kernel"]:
try:
artifact = getattr(image_metadata.boot_artifacts, boot_artifact)
path = artifact ['link']['path']
etag = artifact['link']['etag']
obj = S3Object(path, etag)
_ = obj.object_header
except Exception as err:
raise BootSetError(f"Can't find {boot_artifact} in "
f"{image_metadata.manifest_s3_url.url}. "
f"Error: {exc_type_msg(err)}") from err

for boot_artifact in ["initrd", "boot_parameters"]:
try:
artifact = getattr(image_metadata.boot_artifacts, boot_artifact)
if not artifact:
raise ArtifactNotFound()
path = artifact ['link']['path']
etag = artifact['link']['etag']
obj = S3Object(path, etag)
_ = obj.object_header
except ArtifactNotFound as err:
raise BootSetWarning(
f"{image_metadata.manifest_s3_url.url} doesn't contain a {boot_artifact}") from err
except Exception as err:
raise BootSetWarning(
f"Unable to check {boot_artifact} in {image_metadata.manifest_s3_url.url}. "
f"Warning: {exc_type_msg(err)}")

0 comments on commit 557767d

Please sign in to comment.