From a7cb2b8b296e80ea37107f6cc26f671c69023f21 Mon Sep 17 00:00:00 2001 From: Oleksandr Saienko Date: Tue, 10 Sep 2024 10:37:53 +0100 Subject: [PATCH 1/8] added lazy loading dir --- custom_model_runner/datarobot_drum/lazy_loading/__init__.py | 0 custom_model_runner/requirements.txt | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 custom_model_runner/datarobot_drum/lazy_loading/__init__.py diff --git a/custom_model_runner/datarobot_drum/lazy_loading/__init__.py b/custom_model_runner/datarobot_drum/lazy_loading/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/custom_model_runner/requirements.txt b/custom_model_runner/requirements.txt index 152daf4d2..9f9a95731 100644 --- a/custom_model_runner/requirements.txt +++ b/custom_model_runner/requirements.txt @@ -24,4 +24,4 @@ Pillow<=10.3.0 julia<=0.5.7 termcolor packaging -markupsafe<=2.1.3 +markupsafe<=2.1.3 \ No newline at end of file From 3f2c044569ac5e77dbb22ce9059f2a2501b05afa Mon Sep 17 00:00:00 2001 From: Oleksandr Saienko Date: Wed, 11 Sep 2024 14:42:54 +0100 Subject: [PATCH 2/8] added layer-to-extract-lazy-loading-info --- .../datarobot_drum/lazy_loading/constants.py | 19 ++ .../lazy_loading/progress_percentage.py | 75 ++++++ .../lazy_loading/remote_file.py | 53 ++++ .../lazy_loading/remote_files.py | 250 +++++++++++++++++ .../lazy_loading/remote_repos.py | 251 ++++++++++++++++++ .../lazy_loading/runtime_params_helper.py | 38 +++ .../lazy_loading/s3_file_download_helper.py | 96 +++++++ .../storage_file_download_helper.py | 46 ++++ .../lazy_loading/storage_utils.py | 222 ++++++++++++++++ .../drum/lazy_loading/__init__.py | 0 .../lazy_loading/test_lazy_loading_info.py | 108 ++++++++ 11 files changed, 1158 insertions(+) create mode 100644 custom_model_runner/datarobot_drum/lazy_loading/constants.py create mode 100644 custom_model_runner/datarobot_drum/lazy_loading/progress_percentage.py create mode 100644 custom_model_runner/datarobot_drum/lazy_loading/remote_file.py create mode 100644 custom_model_runner/datarobot_drum/lazy_loading/remote_files.py create mode 100644 custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py create mode 100644 custom_model_runner/datarobot_drum/lazy_loading/runtime_params_helper.py create mode 100644 custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py create mode 100644 custom_model_runner/datarobot_drum/lazy_loading/storage_file_download_helper.py create mode 100644 custom_model_runner/datarobot_drum/lazy_loading/storage_utils.py create mode 100644 tests/unit/datarobot_drum/drum/lazy_loading/__init__.py create mode 100644 tests/unit/datarobot_drum/drum/lazy_loading/test_lazy_loading_info.py diff --git a/custom_model_runner/datarobot_drum/lazy_loading/constants.py b/custom_model_runner/datarobot_drum/lazy_loading/constants.py new file mode 100644 index 000000000..db541e9f4 --- /dev/null +++ b/custom_model_runner/datarobot_drum/lazy_loading/constants.py @@ -0,0 +1,19 @@ +# +# Copyright 2024 DataRobot, Inc. and its affiliates. +# +# All rights reserved. +# +# DataRobot, Inc. +# +# This is proprietary source code of DataRobot, Inc. and its +# affiliates. +# +# Released under the terms of DataRobot Tool and Utility Agreement. +import os + +MLOPS_RUNTIME_PARAM_PREFIX = "MLOPS_RUNTIME_PARAM_" +MLOPS_LAZY_LOADING_DATA_ENV_VARIABLE = "MLOPS_LAZY_LOADING_DATA" +MLOPS_REPOSITORY_SECRET_PREFIX = "MLOPS_REPOSITORY_SECRET_PREFIX_" +AWS_DEFAULT_REGION = 'us-east-1' +REMOTE_FILE_SUFFIX = ".remote" +METADATA_FILE = "model-metadata.yaml" diff --git a/custom_model_runner/datarobot_drum/lazy_loading/progress_percentage.py b/custom_model_runner/datarobot_drum/lazy_loading/progress_percentage.py new file mode 100644 index 000000000..b65b5514e --- /dev/null +++ b/custom_model_runner/datarobot_drum/lazy_loading/progress_percentage.py @@ -0,0 +1,75 @@ +# +# Copyright 2024 DataRobot, Inc. and its affiliates. +# +# All rights reserved. +# +# DataRobot, Inc. +# +# This is proprietary source code of DataRobot, Inc. and its +# affiliates. +# +# Released under the terms of DataRobot Tool and Utility Agreement. +import logging +import time +from multiprocessing import Lock + +from custom_model_runner.datarobot_drum.lazy_loading.remote_file import RemoteFile + +logger = logging.getLogger(__name__) + + +class ProgressPercentage: + def __init__( + self, + update_interval_secs=10, + update_interval_mb=100, + remote_file: RemoteFile = None, + ): + self._remote_file = remote_file + self._update_interval_secs = update_interval_secs + self._update_interval_bytes = update_interval_mb * 1024 * 1024 + + self._seen_so_far = 0 + self._lock = Lock() + self._last_update_time = time.time() + self._last_update_size_bytes = 0 + + def _reset(self): + self._seen_so_far = 0 + self._lock = Lock() + self._last_update_time = time.time() + self._last_update_size_bytes = 0 + + def set_file(self, remote_file: RemoteFile): + self._remote_file = remote_file + self._reset() + + def __call__(self, bytes_amount): + if self._remote_file is None: + raise Exception("remote_file attribute is None.") + with self._lock: + self._seen_so_far += bytes_amount + current_time = time.time() + if ( + current_time - self._last_update_time >= self._update_interval_secs + or self._seen_so_far >= (self._last_update_size_bytes + self._update_interval_bytes) + ): + self._print_progress() + self._last_update_time = current_time + self._last_update_size_bytes = self._seen_so_far + + def _print_progress(self): + seen_so_far_mb = self._seen_so_far / (1024**2) + percentage = (seen_so_far_mb / self._remote_file.size_mb) * 100 + # logger.info( #TODO: fix logging + print( + f"{self._remote_file.remote_path} {percentage:.2f}% ({seen_so_far_mb:.1f}/{self._remote_file.size_mb:.1f} MB)\n", + end="", + flush=True, + ) + + @staticmethod + def done_downloading_file(remote_file: RemoteFile): + logger.info( + f"Done downloading file: Total Time: {remote_file.download_time:.1f} sec, rate: {remote_file.download_rate_mb_sec:.1f} MB/sec" + ) diff --git a/custom_model_runner/datarobot_drum/lazy_loading/remote_file.py b/custom_model_runner/datarobot_drum/lazy_loading/remote_file.py new file mode 100644 index 000000000..2c2d3a107 --- /dev/null +++ b/custom_model_runner/datarobot_drum/lazy_loading/remote_file.py @@ -0,0 +1,53 @@ +# +# Copyright 2024 DataRobot, Inc. and its affiliates. +# +# All rights reserved. +# +# DataRobot, Inc. +# +# This is proprietary source code of DataRobot, Inc. and its +# affiliates. +# +# Released under the terms of DataRobot Tool and Utility Agreement. +# from datarobot_custom_code.utils import calculate_rate + + +class RemoteFile: + def __init__(self, remote_path, local_path, repository_id): + self.remote_path = remote_path + self.local_path = local_path + self.repository_id = repository_id + self.size_bytes = None + self.download_time = None + self.download_start_time = None + self.download_status = None + self.error_msg = None + + @property + def size_mb(self): + if self.size_bytes is None: + return None + return self.size_bytes / 1024 / 1024 + + @property + def download_rate_mb_sec(self): + if self.download_time > 0: + return (self.size_bytes / self.download_time) / (1024 * 1024) + else: + return None + + def __str__(self): + s = f" free_mb: + err_msg = f"Error not enough disk space to download: Required: {self._total_size_mb:.1f} MB > Free: {free_mb:.1f} MB" + raise Exception(err_msg) + + def _prepare_dir_structure(self): + for remote_file in self._remote_files: + dir_name = os.path.dirname(remote_file.local_path) + print("Directory name: ", dir_name) + os.makedirs(dir_name, exist_ok=True) + + def _update_local_path(self): + for remote_file in self._remote_files: + remote_file.local_path = os.path.join(self._local_dir, remote_file.local_path) + + def _download_file(self, remote_file: RemoteFile, progress: ProgressPercentage): + repo = self._remote_repos.get_remote_repo(remote_file.repository_id) + progress.set_file(remote_file) + download_ok = repo.download_file(remote_file, progress) + if not download_ok: + return download_ok + + # Verify size: + if not os.path.exists(remote_file.local_path): + remote_file.error_msg = "File does not exist on local path: {}".format( + remote_file.local_path + ) + return False + local_size_bytes = os.path.getsize(remote_file.local_path) + if local_size_bytes != remote_file.size_bytes: + remote_file.error_msg = ( + "Size of local file {} is not equal to remote file size: {}".format( + local_size_bytes, remote_file.size_bytes + ) + ) + return False + + return True + + def download(self, progress: ProgressPercentage): + """ + Download all remote files from remote repository. + :return: + """ + + print("Downloading remote files ...") + self.validate() + update_status, error_list = self.update_files_info() + if update_status is False: + return False, error_list + + self._update_local_path() + self._check_disk_space() + self._prepare_dir_structure() + + self._download_start_time = time.time() + overall_download_ok = True + error_list = [] + for remote_file in self._remote_files: + download_ok = self._download_file(remote_file, progress) + if download_ok is False: + overall_download_ok = False + error_list.append(f"{remote_file.remote_path}: {remote_file.error_msg}") + progress.done_downloading_file(remote_file) + print(remote_file) + self._download_end_time = time.time() + + return overall_download_ok, error_list \ No newline at end of file diff --git a/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py b/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py new file mode 100644 index 000000000..3ab5f3889 --- /dev/null +++ b/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py @@ -0,0 +1,251 @@ +# +# Copyright 2024 DataRobot, Inc. and its affiliates. +# +# All rights reserved. +# +# DataRobot, Inc. +# +# This is proprietary source code of DataRobot, Inc. and its +# affiliates. +# +# Released under the terms of DataRobot Tool and Utility Agreement. +import logging +import os +import time +from abc import ABC +from abc import abstractmethod + +import boto3 +from botocore import client +from botocore.exceptions import ClientError + +from custom_model_runner.datarobot_drum.lazy_loading.constants import AWS_DEFAULT_REGION +from custom_model_runner.datarobot_drum.lazy_loading.progress_percentage import ProgressPercentage +from custom_model_runner.datarobot_drum.lazy_loading.remote_file import RemoteFile +from custom_model_runner.datarobot_drum.lazy_loading.runtime_params_helper import handle_credentials_param +from custom_model_runner.datarobot_drum.lazy_loading.storage_utils import calculate_rate +from custom_model_runner.datarobot_drum.lazy_loading.storage_utils import has_mandatory_keys + +logger = logging.getLogger(__name__) + + +class FileRepo(ABC): + + def __init__(self, name): + self._name = name + + def __str__(self): + return self._name + + def name(self): + return self._name + + @abstractmethod + def is_file_exist(self, file_path): + pass + + @abstractmethod + def get_file_size(self, file_path): + pass + + def download_file(self, remote_file: RemoteFile, progress): + pass + + +def build_s3_uri(bucket_name: str, file_path: str) -> str: + return f"s3://{bucket_name}/{file_path}" + + +class S3FileRepo(FileRepo): + mandatory_keys = {"type", "bucket_name", "credential_id", "repository_id", "bucket_name"} + + def __init__(self, name, bucket_name, credentials_id=None): + super().__init__(name) + self._bucket_name = bucket_name + self._credentials_id = credentials_id + storage_credentials = handle_credentials_param(credentials_id) + # TODO: Add credentials type check + self._s3 = boto3.client( + 's3', + endpoint_url=storage_credentials['endpointUrl'] if 'endpointUrl' in storage_credentials else None, + aws_access_key_id=storage_credentials['awsAccessKeyId'], + aws_secret_access_key=storage_credentials['awsSecretAccessKey'], + aws_session_token=( + storage_credentials['sessionToken'] + if 'sessionToken' in storage_credentials + else None + ), + region_name=( + storage_credentials['region'] if 'region' in storage_credentials else AWS_DEFAULT_REGION + ), + config=client.Config(signature_version='s3v4'), + ) + + def __str__(self): + return f"{self._name} [s3]: bucket: {self._bucket_name}, credentials_env: {self._credentials_id}" + + def is_file_exist(self, file_path) -> bool: + # Head the object to get its metadata, including content length + try: + self._s3.head_object(Bucket=self._bucket_name, Key=file_path) + return True + except ClientError as e: + # Check if the exception is a 404 error + if e.response["Error"]["Code"] == "404": + return False + else: + # Re-raise the exception if it's not a 404 error + raise + + def get_file_size(self, file_path): + """ + Get the file size in bytes. + :param file_path: + :return: Size in bytes if file exists, None if not exists. Raise Exception otherwise + """ + # Head the object to get its metadata, including content length + try: + response = self._s3.head_object(Bucket=self._bucket_name, Key=file_path) + # Extract and return the content length (file size) + file_size = response["ContentLength"] + return file_size + except ClientError as e: + # Check if the exception is a 404 error + if e.response["Error"]["Code"] == "404": + return None + else: + # Re-raise the exception if it's not a 404 error + raise + + def download_file(self, remote_file: RemoteFile, progress: ProgressPercentage): + # TODO: handle buffer_size + # def download_file(self, result_list, file_info, output_dir, lock, buffer_size, verify_checksum): + + # print("Bucket: {}, .Object: {}, Output Dir: {}". + # format(file_info["bucket_name"], file_info["object_key"], output_dir)) + # + # result_info = {} + try: + logger.debug("Downloading file: {}".format(remote_file.local_path)) + with open(remote_file.local_path, "wb") as file_handle: + start_time = time.time() + self._s3.download_fileobj( + self._bucket_name, + remote_file.remote_path, + file_handle, + Callback=progress, + Config=boto3.s3.transfer.TransferConfig( + max_io_queue=1, io_chunksize=1024 * 1024 + ), + ) + + # # Calculate elapsed time and bandwidth + end_time = time.time() + elapsed_time = end_time - start_time + + remote_file.download_status = True + local_file_size = os.path.getsize(remote_file.local_path) + + remote_file.download_start_time = start_time + remote_file.download_time = elapsed_time + + logger.debug( + "Downloaded: {}. Bandwidth: {:.1f}".format( + remote_file.remote_path, + calculate_rate(local_file_size, elapsed_time), + ) + ) + return True + except Exception as e: + err_msg = "Error downloading {}: {}".format(remote_file.remote_path, e) + logger.error(err_msg) + remote_file.error = err_msg + return False + + +class RemoteRepos: + def __init__(self): + self._repos = {} + + def from_dict_v2(self, repos_dict: object) -> object: + """ + Build the RemoteRepos instance from a dictionary. + :param repos_dict: + :return: + """ + for repo_dict in repos_dict: + #repo_dict = repos_dict[repo_name] + if "type" not in repo_dict: + raise KeyError(f"Missing 'type' key in {repo_dict}") + + if repo_dict["type"] == "s3": + is_valid, missing_fields = has_mandatory_keys(repo_dict, S3FileRepo.mandatory_keys) + if is_valid: + repository_id = repo_dict["repository_id"] + repo_obj = S3FileRepo( + repo_dict["repository_id"], repo_dict["bucket_name"], repo_dict["credential_id"] + ) + else: + raise Exception( + f"Repo has missing fields for S3 Repo: {missing_fields}" + ) + + else: + raise Exception( + f"Type {repo_dict['type']} is not supported, only S3 repos are currently supported" + ) + # From This stage on all repo objects are generic + self._repos[repository_id] = repo_obj + return self + + def from_dict(self, repos_dict): + """ + Build the RemoteRepos instance from a dictionary. + :param repo_dict: + :return: + """ + for repo_name in repos_dict: + repo_dict = repos_dict[repo_name] + if "type" not in repo_dict: + raise KeyError(f"Missing 'type' key in {repo_dict}") + + if repo_dict["type"] == "s3": + is_valid, missing_fields = has_mandatory_keys(repo_dict, S3FileRepo.mandatory_keys) + if is_valid: + repo_obj = S3FileRepo( + repo_name, repo_dict["bucket"], repo_dict["credentials_env"] + ) + else: + raise Exception( + f"Repo {repo_name} has missing fields for S3 Repo: {missing_fields}" + ) + + else: + raise Exception( + f"Type {repo_dict['type']} is not supported, only S3 repos are currently supported" + ) + # From This stage on all repo objects are generic + self._repos[repo_name] = repo_obj + return self + + def get_remote_repos(self): + """ + Get all the names of the remote repos. + :return: list of repo names + """ + return self._repos.keys() + + def get_remote_repo(self, repo_name) -> FileRepo: + """ + Get a RemoteRepo instance by name. + :param repo_name: + :return: RemoteRepo instance + """ + if repo_name not in self._repos: + raise KeyError(f"Repo {repo_name} not found.") + return self._repos[repo_name] + + def exists(self, repo_name): + if repo_name in self._repos: + return True + return False diff --git a/custom_model_runner/datarobot_drum/lazy_loading/runtime_params_helper.py b/custom_model_runner/datarobot_drum/lazy_loading/runtime_params_helper.py new file mode 100644 index 000000000..4ac60b570 --- /dev/null +++ b/custom_model_runner/datarobot_drum/lazy_loading/runtime_params_helper.py @@ -0,0 +1,38 @@ +# +# Copyright 2024 DataRobot, Inc. and its affiliates. +# +# All rights reserved. +# +# DataRobot, Inc. +# +# This is proprietary source code of DataRobot, Inc. and its +# affiliates. +# +# Released under the terms of DataRobot Tool and Utility Agreement. +import json +import logging +import os + +from custom_model_runner.datarobot_drum.lazy_loading.constants import MLOPS_REPOSITORY_SECRET_PREFIX + +logger = logging.getLogger(__name__) + +def handle_credentials_param(credential_id): + """ + Take a credential_id and create corresponding credentials object from env variable + so the client that requires the credentials can use that object. + + :param credential_id: + """ + credentials_env_variable = MLOPS_REPOSITORY_SECRET_PREFIX + credential_id.upper() + param_json = os.environ.get(credentials_env_variable, None) + if param_json is None: + raise EnvironmentError("expected environment variable '{}' to be set".format(credentials_env_variable)) + # logger.debug(f"param_json: {param_json}") TODO: mask credentials for logging + + json_content = json.loads(param_json) + if param_json is None: + raise EnvironmentError("expected environment variable '{}' to be json".format(credentials_env_variable)) + + logger.debug("Successfully loaded JSON content") + return json_content["payload"] diff --git a/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py b/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py new file mode 100644 index 000000000..70f2d7dd4 --- /dev/null +++ b/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py @@ -0,0 +1,96 @@ +# +# Copyright 2024 DataRobot, Inc. and its affiliates. +# +# All rights reserved. +# +# DataRobot, Inc. +# +# This is proprietary source code of DataRobot, Inc. and its +# affiliates. +# +# Released under the terms of DataRobot Tool and Utility Agreement. +import logging +import os +import time +import urllib + +from progress_percentage import ProgressPercentage +from storage_file_download_helper import StorageFileDownloadHelper +#from utils import calculate_rate +from storage_utils import urlparse +#from datarobot_custom_code.utils import verify_file_integrity + +logger = logging.getLogger(__name__) + + +class S3FileDownloadHelper(StorageFileDownloadHelper): + def __init__(self): + + self._s3 = None + # TODO: implement dr-storage client + + def get_file_size(self, file_uri): + # TODO: implement dr-storage get_file_size + file_size = 1000 + return file_size + + def download_file(self, result_list, file_info, output_dir, lock, buffer_size, verify_checksum): + logger.debug( + "Bucket: {}, .Object: {}, Output Dir: {}".format( + file_info["bucket_name"], file_info["object_key"], output_dir + ) + ) + result_info = {} + try: + logger.debug("Downloading file: {}".format(file_info["local_file"])) + # Initialize variables for bandwidth calculation + start_time = time.time() + + # TODO: Implement Downloading the file + end_time = time.time() + elapsed_time = end_time - start_time + + #TODO: implement file file integrity check + #result_info["download_ok"] = verify_file_integrity(file_info, verify_checksum) + local_file_size = os.path.getsize(file_info["local_file"]) + logger.debug(f"Elapsed time: {elapsed_time}") + logger.debug(f"File size: {local_file_size}") + + result_info["index"] = file_info["index"] + result_info["elapsed_time"] = elapsed_time + result_info["total_time_sec"] = elapsed_time + #TODO: Implement Downloading rate calc + download_rate = 100.0 + #download_rate = calculate_rate(local_file_size, elapsed_time) + #result_info["rate_mb_sec"] = download_rate + logger.debug( + "Downloaded: {}. Bandwidth: {:.1f}".format( + file_info["object_key"], + download_rate, + ) + ) + result_list.append(result_info) + return 0 + except Exception as e: + logger.error("Error downloading {}: {}".format(file_info["object_key"], e)) + return 0 + + def is_uri_directory(self, file_uri): + parsed_uri = urllib.parse.urlparse(file_uri) + bucket_name = parsed_uri.netloc + object_key = parsed_uri.path.lstrip("/") + # Use head_object to check if the S3 URI points to an object or a directory + try: + self._s3.head_object(Bucket=bucket_name, Key=object_key) + return False # File exists, it's not a directory + except self._s3.exceptions.ClientError as e: + if e.response["Error"]["Code"] == "404": + logger.error("Directory does not exist") + return True # Directory does not exist, it's a directory + else: + raise # Other errors should be raised + + def list_uris_in_directory(self, dir_uri): + # TODO: implement Parse the S3 directory URI to extract bucket name and prefix (directory path) + file_uris = [] + return file_uris diff --git a/custom_model_runner/datarobot_drum/lazy_loading/storage_file_download_helper.py b/custom_model_runner/datarobot_drum/lazy_loading/storage_file_download_helper.py new file mode 100644 index 000000000..a9c63400c --- /dev/null +++ b/custom_model_runner/datarobot_drum/lazy_loading/storage_file_download_helper.py @@ -0,0 +1,46 @@ +# +# Copyright 2024 DataRobot, Inc. and its affiliates. +# +# All rights reserved. +# +# DataRobot, Inc. +# +# This is proprietary source code of DataRobot, Inc. and its +# affiliates. +# +# Released under the terms of DataRobot Tool and Utility Agreement. +from abc import ABC +from abc import abstractmethod + + +class StorageFileDownloadHelper(ABC): + def __init__(self, storage_credentials): + self.storage_credentials = storage_credentials + + @abstractmethod + def get_file_size(self, file_uri): + """ + Get the file size in bytes for the given file URI. + """ + pass + + @abstractmethod + def download_file(self, result_list, file_info, output_dir, lock, buffer_size, verify_checksum): + """ + Download the file specified by the file URI. + """ + pass + + @abstractmethod + def is_uri_directory(self, file_uri): + """ + Check if the URI is a directory or a file + """ + pass + + @abstractmethod + def list_uris_in_directory(self, dir_uri): + """ + Return a list of all the URIs in the given directory + """ + pass diff --git a/custom_model_runner/datarobot_drum/lazy_loading/storage_utils.py b/custom_model_runner/datarobot_drum/lazy_loading/storage_utils.py new file mode 100644 index 000000000..48c89d173 --- /dev/null +++ b/custom_model_runner/datarobot_drum/lazy_loading/storage_utils.py @@ -0,0 +1,222 @@ +# +# Copyright 2024 DataRobot, Inc. and its affiliates. +# +# All rights reserved. +# +# DataRobot, Inc. +# +# This is proprietary source code of DataRobot, Inc. and its +# affiliates. +# +# Released under the terms of DataRobot Tool and Utility Agreement. +import hashlib +import os +import sys +import tarfile +import zipfile +from urllib.parse import urlparse + +import psutil +import yaml + + +def has_mandatory_keys(d: dict, required_keys: set) -> (bool, set): + """ + Check if the dictionary `d` contains all the keys in `required_keys`. + + Args: + d (dict): The dictionary to check. + required_keys (set): A set of keys that must be present in the dictionary. + + Returns: + (bool, set): A tuple containing a boolean indicating if all required keys are present, + and a set of missing keys if any. + """ + # Find the missing keys by subtracting the keys in the dictionary from the required keys + missing_keys = required_keys - d.keys() + + # If there are no missing keys, return True and an empty set + if not missing_keys: + return True, set() + else: + return False, missing_keys + + +def calculate_sha256(file_path): + # Open the file in binary mode + with open(file_path, "rb") as f: + # Initialize the hash object with SHA256 + sha256_hash = hashlib.sha256() + + # Read the file in chunks to avoid loading the entire file into memory + chunk_size = 4096 # You can adjust the chunk size as needed + while True: + # Read a chunk of data from the file + data = f.read(chunk_size) + if not data: + break # End of file + + # Update the hash object with the data read from the file + sha256_hash.update(data) + + # Get the hexadecimal digest of the hash + hex_digest = sha256_hash.hexdigest() + return hex_digest + + +def get_disk_space(dir_to_check): + # Get disk usage statistics + disk_usage = psutil.disk_usage(dir_to_check) + # Convert bytes to megabytes + total_mb = disk_usage.total / (1024 * 1024) + used_mb = disk_usage.used / (1024 * 1024) + free_mb = disk_usage.free / (1024 * 1024) + # Return disk space size in MB + return total_mb, used_mb, free_mb + + +def bytes_to_mb_str(bytes_value): + mb_value = bytes_value / (1024 * 1024) # Convert bytes to megabytes + return f"{mb_value:.2f} MB" + + +def calculate_rate(size_bytes, time_seconds): + if time_seconds == 0: + return "Infinity MB/sec" + rate_mb_sec = size_bytes / ( + time_seconds * 1024 * 1024 + ) # Convert bytes to MB and divide by time in seconds + return rate_mb_sec + + +def calculate_rate_str(size_bytes, time_seconds): + if time_seconds == 0: + return "Infinity MB/sec" + return "{:.2f} MB/sec".format(calculate_rate(size_bytes, time_seconds)) + + +def parse_s3_uri(s3_uri): + parsed_uri = urlparse(s3_uri) + if parsed_uri.scheme != "s3": + raise ValueError("Not an S3 URI") + + bucket_name = parsed_uri.netloc + object_key = parsed_uri.path.lstrip("/") + return bucket_name, object_key + + +def list_zip_contents(zip_file): + try: + # Open the zip file for reading + with zipfile.ZipFile(zip_file, "r") as zip_ref: + # Extract all contents to the current working directory + # zip_ref.extractall() + # Alternatively, you can extract specific files by passing their names to extract(): + # zip_ref.extract('file_name.txt') + # zip_ref.extract('directory_name') + + # Get a list of the names of all contents in the zip file + zip_contents = zip_ref.namelist() + return zip_contents + except zipfile.BadZipFile as e: + print(f"Error: {e}") + return None + + +def extract_zip_content(zip_file, file_to_extract=None, dest_dir=None): + try: + # Open the zip file for reading + with zipfile.ZipFile(zip_file, "r") as zip_ref: + # Extract all contents to the current working directory + if file_to_extract: + print(f"Extracting {file_to_extract}") + zip_ref.extract(file_to_extract) + else: + print(f"Extracting {zip_file} to {dest_dir}") + zip_ref.extractall(dest_dir) + # Alternatively, you can extract specific files by passing their names to extract(): + # zip_ref.extract('file_name.txt') + # zip_ref.extract('directory_name') + + # Get a list of the names of all contents in the zip file + zip_contents = zip_ref.namelist() + return zip_contents + except zipfile.BadZipFile as e: + print(f"Error: {e}") + return None + + +def list_tar_contents(tar_file): + try: + # Open the tar file for reading + with tarfile.open(tar_file, "r") as tar: + # List the contents of the tar file + tar_contents = tar.getnames() + return tar_contents + except tarfile.TarError as e: + print(f"Error: {e}") + return None + + +def load_yaml_from_file(file_path): + try: + with open(file_path, "r") as file: + yaml_data = yaml.safe_load(file) + return yaml_data + except FileNotFoundError: + print(f"File '{file_path}' not found.") + return None + except yaml.YAMLError as e: + print(f"Error loading YAML from file: {e}") + return None + + +def sum_file_sizes(list_of_dicts): + total_size = 0 + for d in list_of_dicts: + total_size += d.get("file_size", 0) + return total_size + + +def verify_file_checksum(file_path, expected_sha256): + sha256 = calculate_sha256(file_path) + if sha256 != expected_sha256: + print("Error checksum does not match expected checksum", file=sys.stderr) + return False + else: + print("Success checksum matches expected checksum") + return True + + +def merge_lists(list1, list2): + merged_list = [] + index_dict = {item["index"]: item for item in list2} + for item1 in list1: + index = item1.get("index") + if index is not None and index in index_dict: + item2 = index_dict[index] + merged_item = {**item1, **item2} + merged_list.append(merged_item) + return merged_list + + +def verify_file_integrity(file_info, verify_checksum): + local_file_size = os.path.getsize(file_info["local_file"]) + + if local_file_size == file_info["file_size"]: + print("All is good file sizes match") + if verify_checksum: + if file_info["checksum"] is None: + print("Checksum not found in file info - skipping") + else: + print("Verifying checksum") + return verify_file_checksum(file_info["local_file"], file_info["checksum"]) + + return True + else: + print( + "Error local_file_size != file_size {} {}".format( + local_file_size, file_info["file_size"] + ) + ) + return False diff --git a/tests/unit/datarobot_drum/drum/lazy_loading/__init__.py b/tests/unit/datarobot_drum/drum/lazy_loading/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/datarobot_drum/drum/lazy_loading/test_lazy_loading_info.py b/tests/unit/datarobot_drum/drum/lazy_loading/test_lazy_loading_info.py new file mode 100644 index 000000000..8380d4727 --- /dev/null +++ b/tests/unit/datarobot_drum/drum/lazy_loading/test_lazy_loading_info.py @@ -0,0 +1,108 @@ +import json +import os +import shutil +from pathlib import Path + +import pytest +from datarobot_drum.lazy_loading.constants import MLOPS_LAZY_LOADING_DATA_ENV_VARIABLE + +from custom_model_runner.datarobot_drum.lazy_loading.constants import MLOPS_REPOSITORY_SECRET_PREFIX +from custom_model_runner.datarobot_drum.lazy_loading.remote_files import RemoteFiles + +@pytest.fixture +def code_root_dir(): + return "/tmp/code" + +# Fetch credentials form Env variables and create single runtime +# env variable MLOPS_REPOSITORY_SECRET to emulate DRUM env +def set_aws_credentials( + credential_id, + mode="regular", + cred_runtime_param_name=None, + aws_access_key_id=None, + aws_secret_access_key=None, + aws_session_token=None, + endpoint_url=None, +): + credential_payload = { + "credentialType": "s3", + "awsAccessKeyId": aws_access_key_id, + "awsSecretAccessKey": aws_secret_access_key, + } + + if endpoint_url is not None: + credential_payload["endpointUrl"] = endpoint_url + + if aws_session_token is not None: + credential_payload["sessionToken"] = aws_session_token + + mlops_credential_secret = {"type": "credential", "payload": credential_payload} + os.environ[cred_runtime_param_name] = json.dumps(mlops_credential_secret) + +@pytest.fixture +def get_credential(): + credential_id = "669d2623485e94b838e637bb" + return credential_id + +@pytest.fixture +def lazy_loading_config(get_credential): + + lazy_loading_config_dict = { + "repositories": [ + { + "type": "s3", + "repository_id": "669d2623485e94b838e637bf", + "bucket_name": "llm-artifacts-dev", + "credential_id": get_credential + } + ], + "files": [ + { + "remote_path": "llm-artifacts/artifact_1.bin", + "local_path": "/tmp/artifact_1.bin", + "repository_id": "669d2623485e94b838e637bf" + }, + { + "remote_path": "llm-artifacts/artifact_2.bin", + "local_path": "/tmp/artifact_2.bin", + "repository_id": "669d2623485e94b838e637bf" + }, + ] + } + return lazy_loading_config_dict + + +@pytest.fixture +def set_lazy_loading_env_config(lazy_loading_config, get_credential): + + # Fetch credentials form Env variables and create single runtime + # env variable MLOPS_REPOSITORY_SECRET to emulate DRUM env + # TODO: Credentials storing logic not finalized yet + mlops_credential_env_variable = MLOPS_REPOSITORY_SECRET_PREFIX + get_credential.upper() + set_aws_credentials( + credential_id=get_credential, + mode="runtime", + cred_runtime_param_name=mlops_credential_env_variable, + aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], + aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'], + endpoint_url=( + os.environ['STORAGE_ENDPOINT_URL'] if 'STORAGE_ENDPOINT_URL' in os.environ else None + ), + aws_session_token=( + os.environ['AWS_SESSION_TOKEN'] if 'AWS_SESSION_TOKEN' in os.environ else None + ), + ) + os.environ[MLOPS_LAZY_LOADING_DATA_ENV_VARIABLE] = json.dumps(lazy_loading_config) + + +class TestLazyLoadingConfig(object): + + def test_model_downloader(self, code_root_dir, lazy_loading_config, set_lazy_loading_env_config): + + remote_files = RemoteFiles(local_dir=code_root_dir).from_env_config() + + for remote_file in remote_files.get_remote_files(): + # TODO: add assert statements + assert remote_file.repository_id == lazy_loading_config["repositories"][0]["repository_id"] + + From 34249f31dd927aee206c57d7a5dbf66affcdef79 Mon Sep 17 00:00:00 2001 From: Oleksandr Saienko Date: Wed, 11 Sep 2024 15:18:07 +0100 Subject: [PATCH 3/8] clean --- .../lazy_loading/remote_files.py | 2 +- .../lazy_loading/remote_repos.py | 33 +------------------ .../lazy_loading/s3_file_download_helper.py | 4 --- .../lazy_loading/test_lazy_loading_info.py | 2 +- 4 files changed, 3 insertions(+), 38 deletions(-) diff --git a/custom_model_runner/datarobot_drum/lazy_loading/remote_files.py b/custom_model_runner/datarobot_drum/lazy_loading/remote_files.py index 49addf5e5..164c102a8 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/remote_files.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/remote_files.py @@ -113,7 +113,7 @@ def from_env_config(self): repo_dict = remote_files_config["repositories"] - self._remote_repos = RemoteRepos().from_dict_v2(repo_dict) + self._remote_repos = RemoteRepos().from_dict(repo_dict) remote_files = remote_files_config["files"] diff --git a/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py b/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py index 3ab5f3889..521938bec 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py @@ -167,14 +167,13 @@ class RemoteRepos: def __init__(self): self._repos = {} - def from_dict_v2(self, repos_dict: object) -> object: + def from_dict(self, repos_dict): """ Build the RemoteRepos instance from a dictionary. :param repos_dict: :return: """ for repo_dict in repos_dict: - #repo_dict = repos_dict[repo_name] if "type" not in repo_dict: raise KeyError(f"Missing 'type' key in {repo_dict}") @@ -198,36 +197,6 @@ def from_dict_v2(self, repos_dict: object) -> object: self._repos[repository_id] = repo_obj return self - def from_dict(self, repos_dict): - """ - Build the RemoteRepos instance from a dictionary. - :param repo_dict: - :return: - """ - for repo_name in repos_dict: - repo_dict = repos_dict[repo_name] - if "type" not in repo_dict: - raise KeyError(f"Missing 'type' key in {repo_dict}") - - if repo_dict["type"] == "s3": - is_valid, missing_fields = has_mandatory_keys(repo_dict, S3FileRepo.mandatory_keys) - if is_valid: - repo_obj = S3FileRepo( - repo_name, repo_dict["bucket"], repo_dict["credentials_env"] - ) - else: - raise Exception( - f"Repo {repo_name} has missing fields for S3 Repo: {missing_fields}" - ) - - else: - raise Exception( - f"Type {repo_dict['type']} is not supported, only S3 repos are currently supported" - ) - # From This stage on all repo objects are generic - self._repos[repo_name] = repo_obj - return self - def get_remote_repos(self): """ Get all the names of the remote repos. diff --git a/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py b/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py index 70f2d7dd4..79a152a0e 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py @@ -14,11 +14,7 @@ import time import urllib -from progress_percentage import ProgressPercentage from storage_file_download_helper import StorageFileDownloadHelper -#from utils import calculate_rate -from storage_utils import urlparse -#from datarobot_custom_code.utils import verify_file_integrity logger = logging.getLogger(__name__) diff --git a/tests/unit/datarobot_drum/drum/lazy_loading/test_lazy_loading_info.py b/tests/unit/datarobot_drum/drum/lazy_loading/test_lazy_loading_info.py index 8380d4727..6d6f00dc3 100644 --- a/tests/unit/datarobot_drum/drum/lazy_loading/test_lazy_loading_info.py +++ b/tests/unit/datarobot_drum/drum/lazy_loading/test_lazy_loading_info.py @@ -54,7 +54,7 @@ def lazy_loading_config(get_credential): "repository_id": "669d2623485e94b838e637bf", "bucket_name": "llm-artifacts-dev", "credential_id": get_credential - } + }, ], "files": [ { From 1b49f0fe454bf6c807ea8c011310dd588925eea9 Mon Sep 17 00:00:00 2001 From: Oleksandr Saienko Date: Wed, 11 Sep 2024 15:28:25 +0100 Subject: [PATCH 4/8] cleaned boto3 client --- .../lazy_loading/remote_files.py | 28 +-- .../lazy_loading/remote_repos.py | 172 +++++++++--------- 2 files changed, 91 insertions(+), 109 deletions(-) diff --git a/custom_model_runner/datarobot_drum/lazy_loading/remote_files.py b/custom_model_runner/datarobot_drum/lazy_loading/remote_files.py index 164c102a8..3667a8fa9 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/remote_files.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/remote_files.py @@ -85,32 +85,16 @@ def set_remote_repos(self, remote_repos): def get_remote_files(self): return self._remote_files - # def from_list(self, remote_files): - # for remote_file in remote_files: - # self._remote_files.append( - # RemoteFile( - # remote_path=remote_file["remote_path"], - # local_path=remote_file["local_path"], - # repo_name=remote_file["repo"], - # ) - # ) - # return self - - # def from_list_v2(self, remote_files): - # for remote_file in remote_files: - # self._remote_files.append( - # RemoteFile( - # remote_path=remote_file["remote_path"], - # local_path=remote_file["local_path"], - # repository_id=remote_file["repository_id"], - # ) - # ) - # return self - def from_env_config(self): + if MLOPS_LAZY_LOADING_DATA_ENV_VARIABLE not in os.environ: + raise Exception("Cant find lazy loading environment variable") + remote_files_config = json.loads(os.environ[MLOPS_LAZY_LOADING_DATA_ENV_VARIABLE]) + if remote_files_config is None: + raise Exception("Cant load lazy loading config from environment variable") + repo_dict = remote_files_config["repositories"] self._remote_repos = RemoteRepos().from_dict(repo_dict) diff --git a/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py b/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py index 521938bec..d7aaca646 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py @@ -65,37 +65,39 @@ def __init__(self, name, bucket_name, credentials_id=None): self._credentials_id = credentials_id storage_credentials = handle_credentials_param(credentials_id) # TODO: Add credentials type check - self._s3 = boto3.client( - 's3', - endpoint_url=storage_credentials['endpointUrl'] if 'endpointUrl' in storage_credentials else None, - aws_access_key_id=storage_credentials['awsAccessKeyId'], - aws_secret_access_key=storage_credentials['awsSecretAccessKey'], - aws_session_token=( - storage_credentials['sessionToken'] - if 'sessionToken' in storage_credentials - else None - ), - region_name=( - storage_credentials['region'] if 'region' in storage_credentials else AWS_DEFAULT_REGION - ), - config=client.Config(signature_version='s3v4'), - ) + # TODO: Implement dr-storage client initialization + # self._s3 = boto3.client( + # 's3', + # endpoint_url=storage_credentials['endpointUrl'] if 'endpointUrl' in storage_credentials else None, + # aws_access_key_id=storage_credentials['awsAccessKeyId'], + # aws_secret_access_key=storage_credentials['awsSecretAccessKey'], + # aws_session_token=( + # storage_credentials['sessionToken'] + # if 'sessionToken' in storage_credentials + # else None + # ), + # region_name=( + # storage_credentials['region'] if 'region' in storage_credentials else AWS_DEFAULT_REGION + # ), + # config=client.Config(signature_version='s3v4'), + # ) def __str__(self): return f"{self._name} [s3]: bucket: {self._bucket_name}, credentials_env: {self._credentials_id}" def is_file_exist(self, file_path) -> bool: - # Head the object to get its metadata, including content length - try: - self._s3.head_object(Bucket=self._bucket_name, Key=file_path) - return True - except ClientError as e: - # Check if the exception is a 404 error - if e.response["Error"]["Code"] == "404": - return False - else: - # Re-raise the exception if it's not a 404 error - raise + # TODO: implement Head the object to get its metadata, including content length + # try: + # self._s3.head_object(Bucket=self._bucket_name, Key=file_path) + # return True + # except ClientError as e: + # # Check if the exception is a 404 error + # if e.response["Error"]["Code"] == "404": + # return False + # else: + # # Re-raise the exception if it's not a 404 error + # raise + return True def get_file_size(self, file_path): """ @@ -103,64 +105,60 @@ def get_file_size(self, file_path): :param file_path: :return: Size in bytes if file exists, None if not exists. Raise Exception otherwise """ - # Head the object to get its metadata, including content length - try: - response = self._s3.head_object(Bucket=self._bucket_name, Key=file_path) - # Extract and return the content length (file size) - file_size = response["ContentLength"] - return file_size - except ClientError as e: - # Check if the exception is a 404 error - if e.response["Error"]["Code"] == "404": - return None - else: - # Re-raise the exception if it's not a 404 error - raise + #TODO: implement Head the object to get its metadata, including content length + # try: + # response = self._s3.head_object(Bucket=self._bucket_name, Key=file_path) + # # Extract and return the content length (file size) + # file_size = response["ContentLength"] + # return file_size + # except ClientError as e: + # # Check if the exception is a 404 error + # if e.response["Error"]["Code"] == "404": + # return None + # else: + # # Re-raise the exception if it's not a 404 error + # raise + return 100 def download_file(self, remote_file: RemoteFile, progress: ProgressPercentage): - # TODO: handle buffer_size - # def download_file(self, result_list, file_info, output_dir, lock, buffer_size, verify_checksum): - - # print("Bucket: {}, .Object: {}, Output Dir: {}". - # format(file_info["bucket_name"], file_info["object_key"], output_dir)) + # TODO: implement file download login using dr-storage + # try: + # logger.debug("Downloading file: {}".format(remote_file.local_path)) + # with open(remote_file.local_path, "wb") as file_handle: + # start_time = time.time() + # self._s3.download_fileobj( + # self._bucket_name, + # remote_file.remote_path, + # file_handle, + # Callback=progress, + # Config=boto3.s3.transfer.TransferConfig( + # max_io_queue=1, io_chunksize=1024 * 1024 + # ), + # ) # - # result_info = {} - try: - logger.debug("Downloading file: {}".format(remote_file.local_path)) - with open(remote_file.local_path, "wb") as file_handle: - start_time = time.time() - self._s3.download_fileobj( - self._bucket_name, - remote_file.remote_path, - file_handle, - Callback=progress, - Config=boto3.s3.transfer.TransferConfig( - max_io_queue=1, io_chunksize=1024 * 1024 - ), - ) - - # # Calculate elapsed time and bandwidth - end_time = time.time() - elapsed_time = end_time - start_time - - remote_file.download_status = True - local_file_size = os.path.getsize(remote_file.local_path) - - remote_file.download_start_time = start_time - remote_file.download_time = elapsed_time - - logger.debug( - "Downloaded: {}. Bandwidth: {:.1f}".format( - remote_file.remote_path, - calculate_rate(local_file_size, elapsed_time), - ) - ) - return True - except Exception as e: - err_msg = "Error downloading {}: {}".format(remote_file.remote_path, e) - logger.error(err_msg) - remote_file.error = err_msg - return False + # # # Calculate elapsed time and bandwidth + # end_time = time.time() + # elapsed_time = end_time - start_time + # + # remote_file.download_status = True + # local_file_size = os.path.getsize(remote_file.local_path) + # + # remote_file.download_start_time = start_time + # remote_file.download_time = elapsed_time + # + # logger.debug( + # "Downloaded: {}. Bandwidth: {:.1f}".format( + # remote_file.remote_path, + # calculate_rate(local_file_size, elapsed_time), + # ) + # ) + # return True + # except Exception as e: + # err_msg = "Error downloading {}: {}".format(remote_file.remote_path, e) + # logger.error(err_msg) + # remote_file.error = err_msg + # return False + return True class RemoteRepos: @@ -204,15 +202,15 @@ def get_remote_repos(self): """ return self._repos.keys() - def get_remote_repo(self, repo_name) -> FileRepo: + def get_remote_repo(self, repo_id) -> FileRepo: """ - Get a RemoteRepo instance by name. - :param repo_name: + Get a RemoteRepo instance by id. + :param repo_id: :return: RemoteRepo instance """ - if repo_name not in self._repos: - raise KeyError(f"Repo {repo_name} not found.") - return self._repos[repo_name] + if repo_id not in self._repos: + raise KeyError(f"Repo {repo_id} not found.") + return self._repos[repo_id] def exists(self, repo_name): if repo_name in self._repos: From f2cce489c736e54bb73b829a839ab8dfcb569aad Mon Sep 17 00:00:00 2001 From: Oleksandr Saienko Date: Wed, 11 Sep 2024 15:37:16 +0100 Subject: [PATCH 5/8] cleaned --- .../lazy_loading/remote_repos.py | 65 +------------------ .../lazy_loading/s3_file_download_helper.py | 3 +- 2 files changed, 5 insertions(+), 63 deletions(-) diff --git a/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py b/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py index d7aaca646..44f139796 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py @@ -10,20 +10,12 @@ # # Released under the terms of DataRobot Tool and Utility Agreement. import logging -import os -import time from abc import ABC from abc import abstractmethod -import boto3 -from botocore import client -from botocore.exceptions import ClientError - -from custom_model_runner.datarobot_drum.lazy_loading.constants import AWS_DEFAULT_REGION from custom_model_runner.datarobot_drum.lazy_loading.progress_percentage import ProgressPercentage from custom_model_runner.datarobot_drum.lazy_loading.remote_file import RemoteFile from custom_model_runner.datarobot_drum.lazy_loading.runtime_params_helper import handle_credentials_param -from custom_model_runner.datarobot_drum.lazy_loading.storage_utils import calculate_rate from custom_model_runner.datarobot_drum.lazy_loading.storage_utils import has_mandatory_keys logger = logging.getLogger(__name__) @@ -66,21 +58,6 @@ def __init__(self, name, bucket_name, credentials_id=None): storage_credentials = handle_credentials_param(credentials_id) # TODO: Add credentials type check # TODO: Implement dr-storage client initialization - # self._s3 = boto3.client( - # 's3', - # endpoint_url=storage_credentials['endpointUrl'] if 'endpointUrl' in storage_credentials else None, - # aws_access_key_id=storage_credentials['awsAccessKeyId'], - # aws_secret_access_key=storage_credentials['awsSecretAccessKey'], - # aws_session_token=( - # storage_credentials['sessionToken'] - # if 'sessionToken' in storage_credentials - # else None - # ), - # region_name=( - # storage_credentials['region'] if 'region' in storage_credentials else AWS_DEFAULT_REGION - # ), - # config=client.Config(signature_version='s3v4'), - # ) def __str__(self): return f"{self._name} [s3]: bucket: {self._bucket_name}, credentials_env: {self._credentials_id}" @@ -97,7 +74,7 @@ def is_file_exist(self, file_path) -> bool: # else: # # Re-raise the exception if it's not a 404 error # raise - return True + raise NotImplementedError def get_file_size(self, file_path): """ @@ -118,47 +95,11 @@ def get_file_size(self, file_path): # else: # # Re-raise the exception if it's not a 404 error # raise - return 100 + raise NotImplementedError def download_file(self, remote_file: RemoteFile, progress: ProgressPercentage): # TODO: implement file download login using dr-storage - # try: - # logger.debug("Downloading file: {}".format(remote_file.local_path)) - # with open(remote_file.local_path, "wb") as file_handle: - # start_time = time.time() - # self._s3.download_fileobj( - # self._bucket_name, - # remote_file.remote_path, - # file_handle, - # Callback=progress, - # Config=boto3.s3.transfer.TransferConfig( - # max_io_queue=1, io_chunksize=1024 * 1024 - # ), - # ) - # - # # # Calculate elapsed time and bandwidth - # end_time = time.time() - # elapsed_time = end_time - start_time - # - # remote_file.download_status = True - # local_file_size = os.path.getsize(remote_file.local_path) - # - # remote_file.download_start_time = start_time - # remote_file.download_time = elapsed_time - # - # logger.debug( - # "Downloaded: {}. Bandwidth: {:.1f}".format( - # remote_file.remote_path, - # calculate_rate(local_file_size, elapsed_time), - # ) - # ) - # return True - # except Exception as e: - # err_msg = "Error downloading {}: {}".format(remote_file.remote_path, e) - # logger.error(err_msg) - # remote_file.error = err_msg - # return False - return True + raise NotImplementedError class RemoteRepos: diff --git a/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py b/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py index 79a152a0e..80fca1a91 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py @@ -89,4 +89,5 @@ def is_uri_directory(self, file_uri): def list_uris_in_directory(self, dir_uri): # TODO: implement Parse the S3 directory URI to extract bucket name and prefix (directory path) file_uris = [] - return file_uris + #return file_uris + raise NotImplementedError From 50a0d3e10a1b03e9b62fff8079a9e01068299e2e Mon Sep 17 00:00:00 2001 From: Oleksandr Saienko Date: Wed, 11 Sep 2024 16:01:43 +0100 Subject: [PATCH 6/8] cleaning --- custom_model_runner/datarobot_drum/lazy_loading/constants.py | 1 - 1 file changed, 1 deletion(-) diff --git a/custom_model_runner/datarobot_drum/lazy_loading/constants.py b/custom_model_runner/datarobot_drum/lazy_loading/constants.py index db541e9f4..0366fc331 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/constants.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/constants.py @@ -9,7 +9,6 @@ # affiliates. # # Released under the terms of DataRobot Tool and Utility Agreement. -import os MLOPS_RUNTIME_PARAM_PREFIX = "MLOPS_RUNTIME_PARAM_" MLOPS_LAZY_LOADING_DATA_ENV_VARIABLE = "MLOPS_LAZY_LOADING_DATA" From 41fe98947eaf10ac37b97339e4269792c4a755f0 Mon Sep 17 00:00:00 2001 From: Oleksandr Saienko Date: Wed, 11 Sep 2024 16:52:54 +0100 Subject: [PATCH 7/8] clenaed --- ...helper.py => environment_config_helper.py} | 0 .../lazy_loading/remote_repos.py | 22 ------------------ .../lazy_loading/s3_file_download_helper.py | 23 +++++-------------- 3 files changed, 6 insertions(+), 39 deletions(-) rename custom_model_runner/datarobot_drum/lazy_loading/{runtime_params_helper.py => environment_config_helper.py} (100%) diff --git a/custom_model_runner/datarobot_drum/lazy_loading/runtime_params_helper.py b/custom_model_runner/datarobot_drum/lazy_loading/environment_config_helper.py similarity index 100% rename from custom_model_runner/datarobot_drum/lazy_loading/runtime_params_helper.py rename to custom_model_runner/datarobot_drum/lazy_loading/environment_config_helper.py diff --git a/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py b/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py index 44f139796..c11fad1e6 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py @@ -64,16 +64,6 @@ def __str__(self): def is_file_exist(self, file_path) -> bool: # TODO: implement Head the object to get its metadata, including content length - # try: - # self._s3.head_object(Bucket=self._bucket_name, Key=file_path) - # return True - # except ClientError as e: - # # Check if the exception is a 404 error - # if e.response["Error"]["Code"] == "404": - # return False - # else: - # # Re-raise the exception if it's not a 404 error - # raise raise NotImplementedError def get_file_size(self, file_path): @@ -83,18 +73,6 @@ def get_file_size(self, file_path): :return: Size in bytes if file exists, None if not exists. Raise Exception otherwise """ #TODO: implement Head the object to get its metadata, including content length - # try: - # response = self._s3.head_object(Bucket=self._bucket_name, Key=file_path) - # # Extract and return the content length (file size) - # file_size = response["ContentLength"] - # return file_size - # except ClientError as e: - # # Check if the exception is a 404 error - # if e.response["Error"]["Code"] == "404": - # return None - # else: - # # Re-raise the exception if it's not a 404 error - # raise raise NotImplementedError def download_file(self, remote_file: RemoteFile, progress: ProgressPercentage): diff --git a/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py b/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py index 80fca1a91..0b2af3bf7 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py @@ -20,15 +20,15 @@ class S3FileDownloadHelper(StorageFileDownloadHelper): - def __init__(self): + def __init__(self, storage_credentials): - self._s3 = None + super().__init__(storage_credentials) + self._storage_client = None # TODO: implement dr-storage client def get_file_size(self, file_uri): # TODO: implement dr-storage get_file_size - file_size = 1000 - return file_size + raise NotImplementedError def download_file(self, result_list, file_info, output_dir, lock, buffer_size, verify_checksum): logger.debug( @@ -72,19 +72,8 @@ def download_file(self, result_list, file_info, output_dir, lock, buffer_size, v return 0 def is_uri_directory(self, file_uri): - parsed_uri = urllib.parse.urlparse(file_uri) - bucket_name = parsed_uri.netloc - object_key = parsed_uri.path.lstrip("/") - # Use head_object to check if the S3 URI points to an object or a directory - try: - self._s3.head_object(Bucket=bucket_name, Key=object_key) - return False # File exists, it's not a directory - except self._s3.exceptions.ClientError as e: - if e.response["Error"]["Code"] == "404": - logger.error("Directory does not exist") - return True # Directory does not exist, it's a directory - else: - raise # Other errors should be raised + # TODO: Use head_object to check if the S3 URI points to an object or a directory + raise NotImplementedError def list_uris_in_directory(self, dir_uri): # TODO: implement Parse the S3 directory URI to extract bucket name and prefix (directory path) From 0e79f5c509cf8fd57769ff34357916a886f159de Mon Sep 17 00:00:00 2001 From: Oleksandr Saienko Date: Thu, 12 Sep 2024 10:19:14 +0100 Subject: [PATCH 8/8] style fix --- .../datarobot_drum/lazy_loading/constants.py | 2 +- .../lazy_loading/environment_config_helper.py | 9 +++- .../lazy_loading/remote_files.py | 17 +++---- .../lazy_loading/remote_repos.py | 15 +++--- .../lazy_loading/s3_file_download_helper.py | 13 +++-- .../lazy_loading/test_lazy_loading_info.py | 48 ++++++++++--------- 6 files changed, 56 insertions(+), 48 deletions(-) diff --git a/custom_model_runner/datarobot_drum/lazy_loading/constants.py b/custom_model_runner/datarobot_drum/lazy_loading/constants.py index 0366fc331..e33fd5946 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/constants.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/constants.py @@ -13,6 +13,6 @@ MLOPS_RUNTIME_PARAM_PREFIX = "MLOPS_RUNTIME_PARAM_" MLOPS_LAZY_LOADING_DATA_ENV_VARIABLE = "MLOPS_LAZY_LOADING_DATA" MLOPS_REPOSITORY_SECRET_PREFIX = "MLOPS_REPOSITORY_SECRET_PREFIX_" -AWS_DEFAULT_REGION = 'us-east-1' +AWS_DEFAULT_REGION = "us-east-1" REMOTE_FILE_SUFFIX = ".remote" METADATA_FILE = "model-metadata.yaml" diff --git a/custom_model_runner/datarobot_drum/lazy_loading/environment_config_helper.py b/custom_model_runner/datarobot_drum/lazy_loading/environment_config_helper.py index 4ac60b570..c954904d1 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/environment_config_helper.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/environment_config_helper.py @@ -17,6 +17,7 @@ logger = logging.getLogger(__name__) + def handle_credentials_param(credential_id): """ Take a credential_id and create corresponding credentials object from env variable @@ -27,12 +28,16 @@ def handle_credentials_param(credential_id): credentials_env_variable = MLOPS_REPOSITORY_SECRET_PREFIX + credential_id.upper() param_json = os.environ.get(credentials_env_variable, None) if param_json is None: - raise EnvironmentError("expected environment variable '{}' to be set".format(credentials_env_variable)) + raise EnvironmentError( + "expected environment variable '{}' to be set".format(credentials_env_variable) + ) # logger.debug(f"param_json: {param_json}") TODO: mask credentials for logging json_content = json.loads(param_json) if param_json is None: - raise EnvironmentError("expected environment variable '{}' to be json".format(credentials_env_variable)) + raise EnvironmentError( + "expected environment variable '{}' to be json".format(credentials_env_variable) + ) logger.debug("Successfully loaded JSON content") return json_content["payload"] diff --git a/custom_model_runner/datarobot_drum/lazy_loading/remote_files.py b/custom_model_runner/datarobot_drum/lazy_loading/remote_files.py index 3667a8fa9..71bc08e90 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/remote_files.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/remote_files.py @@ -21,7 +21,10 @@ from custom_model_runner.datarobot_drum.lazy_loading.remote_file import RemoteFile from custom_model_runner.datarobot_drum.lazy_loading.remote_repos import RemoteRepos -from custom_model_runner.datarobot_drum.lazy_loading.storage_utils import calculate_rate, get_disk_space +from custom_model_runner.datarobot_drum.lazy_loading.storage_utils import ( + calculate_rate, + get_disk_space, +) logger = logging.getLogger(__name__) @@ -46,9 +49,8 @@ def __init__( ): """ """ self._local_dir = local_dir - self._remote_repos: RemoteRepos self._remote_files: List[RemoteFile] = [] - #self._remote_repos: RemoteRepos = None + self._remote_repos: RemoteRepos self._validated = False self._total_size_mb = 0 self._download_start_time = None @@ -78,15 +80,14 @@ def download_rate_mb_seconds(self): return None return self._total_size_mb / self.download_time_seconds - def set_remote_repos(self, remote_repos): - self._remote_repos = remote_repos - return self + # def set_remote_repos(self, remote_repos): + # self._remote_repos = remote_repos + # return self def get_remote_files(self): return self._remote_files def from_env_config(self): - if MLOPS_LAZY_LOADING_DATA_ENV_VARIABLE not in os.environ: raise Exception("Cant find lazy loading environment variable") @@ -231,4 +232,4 @@ def download(self, progress: ProgressPercentage): print(remote_file) self._download_end_time = time.time() - return overall_download_ok, error_list \ No newline at end of file + return overall_download_ok, error_list diff --git a/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py b/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py index c11fad1e6..1739ef40b 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/remote_repos.py @@ -15,14 +15,15 @@ from custom_model_runner.datarobot_drum.lazy_loading.progress_percentage import ProgressPercentage from custom_model_runner.datarobot_drum.lazy_loading.remote_file import RemoteFile -from custom_model_runner.datarobot_drum.lazy_loading.runtime_params_helper import handle_credentials_param +from custom_model_runner.datarobot_drum.lazy_loading.environment_config_helper import ( + handle_credentials_param, +) from custom_model_runner.datarobot_drum.lazy_loading.storage_utils import has_mandatory_keys logger = logging.getLogger(__name__) class FileRepo(ABC): - def __init__(self, name): self._name = name @@ -72,7 +73,7 @@ def get_file_size(self, file_path): :param file_path: :return: Size in bytes if file exists, None if not exists. Raise Exception otherwise """ - #TODO: implement Head the object to get its metadata, including content length + # TODO: implement Head the object to get its metadata, including content length raise NotImplementedError def download_file(self, remote_file: RemoteFile, progress: ProgressPercentage): @@ -99,12 +100,12 @@ def from_dict(self, repos_dict): if is_valid: repository_id = repo_dict["repository_id"] repo_obj = S3FileRepo( - repo_dict["repository_id"], repo_dict["bucket_name"], repo_dict["credential_id"] + repo_dict["repository_id"], + repo_dict["bucket_name"], + repo_dict["credential_id"], ) else: - raise Exception( - f"Repo has missing fields for S3 Repo: {missing_fields}" - ) + raise Exception(f"Repo has missing fields for S3 Repo: {missing_fields}") else: raise Exception( diff --git a/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py b/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py index 0b2af3bf7..728e5ee04 100644 --- a/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py +++ b/custom_model_runner/datarobot_drum/lazy_loading/s3_file_download_helper.py @@ -21,7 +21,6 @@ class S3FileDownloadHelper(StorageFileDownloadHelper): def __init__(self, storage_credentials): - super().__init__(storage_credentials) self._storage_client = None # TODO: implement dr-storage client @@ -46,8 +45,8 @@ def download_file(self, result_list, file_info, output_dir, lock, buffer_size, v end_time = time.time() elapsed_time = end_time - start_time - #TODO: implement file file integrity check - #result_info["download_ok"] = verify_file_integrity(file_info, verify_checksum) + # TODO: implement file file integrity check + # result_info["download_ok"] = verify_file_integrity(file_info, verify_checksum) local_file_size = os.path.getsize(file_info["local_file"]) logger.debug(f"Elapsed time: {elapsed_time}") logger.debug(f"File size: {local_file_size}") @@ -55,10 +54,10 @@ def download_file(self, result_list, file_info, output_dir, lock, buffer_size, v result_info["index"] = file_info["index"] result_info["elapsed_time"] = elapsed_time result_info["total_time_sec"] = elapsed_time - #TODO: Implement Downloading rate calc + # TODO: Implement Downloading rate calc download_rate = 100.0 - #download_rate = calculate_rate(local_file_size, elapsed_time) - #result_info["rate_mb_sec"] = download_rate + # download_rate = calculate_rate(local_file_size, elapsed_time) + # result_info["rate_mb_sec"] = download_rate logger.debug( "Downloaded: {}. Bandwidth: {:.1f}".format( file_info["object_key"], @@ -78,5 +77,5 @@ def is_uri_directory(self, file_uri): def list_uris_in_directory(self, dir_uri): # TODO: implement Parse the S3 directory URI to extract bucket name and prefix (directory path) file_uris = [] - #return file_uris + # return file_uris raise NotImplementedError diff --git a/tests/unit/datarobot_drum/drum/lazy_loading/test_lazy_loading_info.py b/tests/unit/datarobot_drum/drum/lazy_loading/test_lazy_loading_info.py index 6d6f00dc3..c66c134c1 100644 --- a/tests/unit/datarobot_drum/drum/lazy_loading/test_lazy_loading_info.py +++ b/tests/unit/datarobot_drum/drum/lazy_loading/test_lazy_loading_info.py @@ -9,20 +9,22 @@ from custom_model_runner.datarobot_drum.lazy_loading.constants import MLOPS_REPOSITORY_SECRET_PREFIX from custom_model_runner.datarobot_drum.lazy_loading.remote_files import RemoteFiles + @pytest.fixture def code_root_dir(): return "/tmp/code" + # Fetch credentials form Env variables and create single runtime # env variable MLOPS_REPOSITORY_SECRET to emulate DRUM env def set_aws_credentials( - credential_id, - mode="regular", - cred_runtime_param_name=None, - aws_access_key_id=None, - aws_secret_access_key=None, - aws_session_token=None, - endpoint_url=None, + credential_id, + mode="regular", + cred_runtime_param_name=None, + aws_access_key_id=None, + aws_secret_access_key=None, + aws_session_token=None, + endpoint_url=None, ): credential_payload = { "credentialType": "s3", @@ -39,42 +41,42 @@ def set_aws_credentials( mlops_credential_secret = {"type": "credential", "payload": credential_payload} os.environ[cred_runtime_param_name] = json.dumps(mlops_credential_secret) + @pytest.fixture def get_credential(): credential_id = "669d2623485e94b838e637bb" return credential_id + @pytest.fixture def lazy_loading_config(get_credential): - lazy_loading_config_dict = { "repositories": [ { "type": "s3", "repository_id": "669d2623485e94b838e637bf", "bucket_name": "llm-artifacts-dev", - "credential_id": get_credential + "credential_id": get_credential, }, ], "files": [ { "remote_path": "llm-artifacts/artifact_1.bin", "local_path": "/tmp/artifact_1.bin", - "repository_id": "669d2623485e94b838e637bf" + "repository_id": "669d2623485e94b838e637bf", }, { "remote_path": "llm-artifacts/artifact_2.bin", "local_path": "/tmp/artifact_2.bin", - "repository_id": "669d2623485e94b838e637bf" + "repository_id": "669d2623485e94b838e637bf", }, - ] + ], } return lazy_loading_config_dict @pytest.fixture def set_lazy_loading_env_config(lazy_loading_config, get_credential): - # Fetch credentials form Env variables and create single runtime # env variable MLOPS_REPOSITORY_SECRET to emulate DRUM env # TODO: Credentials storing logic not finalized yet @@ -83,26 +85,26 @@ def set_lazy_loading_env_config(lazy_loading_config, get_credential): credential_id=get_credential, mode="runtime", cred_runtime_param_name=mlops_credential_env_variable, - aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], - aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'], + aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], + aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"], endpoint_url=( - os.environ['STORAGE_ENDPOINT_URL'] if 'STORAGE_ENDPOINT_URL' in os.environ else None + os.environ["STORAGE_ENDPOINT_URL"] if "STORAGE_ENDPOINT_URL" in os.environ else None ), aws_session_token=( - os.environ['AWS_SESSION_TOKEN'] if 'AWS_SESSION_TOKEN' in os.environ else None + os.environ["AWS_SESSION_TOKEN"] if "AWS_SESSION_TOKEN" in os.environ else None ), ) os.environ[MLOPS_LAZY_LOADING_DATA_ENV_VARIABLE] = json.dumps(lazy_loading_config) class TestLazyLoadingConfig(object): - - def test_model_downloader(self, code_root_dir, lazy_loading_config, set_lazy_loading_env_config): - + def test_model_downloader( + self, code_root_dir, lazy_loading_config, set_lazy_loading_env_config + ): remote_files = RemoteFiles(local_dir=code_root_dir).from_env_config() for remote_file in remote_files.get_remote_files(): # TODO: add assert statements - assert remote_file.repository_id == lazy_loading_config["repositories"][0]["repository_id"] - - + assert ( + remote_file.repository_id == lazy_loading_config["repositories"][0]["repository_id"] + )