From e461bdff23711bda2e5a831fb772aebd6d1877d9 Mon Sep 17 00:00:00 2001 From: Ryan Raasch Date: Tue, 3 Dec 2024 21:37:53 +0000 Subject: [PATCH 01/23] new automation mod --- cfa_azure/automation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cfa_azure/automation.py b/cfa_azure/automation.py index ed69361..af7ec57 100644 --- a/cfa_azure/automation.py +++ b/cfa_azure/automation.py @@ -263,4 +263,4 @@ def run_tasks(task_config: str, auth_config: str | None = None): if "monitor_job" in task_toml["job"].keys(): if task_toml["job"]["monitor_job"] is True: client.monitor_job(job_id) - return None + return None \ No newline at end of file From bb158ce59c19deffa20a653ac49e73a3779c84b0 Mon Sep 17 00:00:00 2001 From: Ryan Raasch Date: Thu, 12 Dec 2024 18:56:11 +0000 Subject: [PATCH 02/23] fix add_task return as string not list --- cfa_azure/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cfa_azure/helpers.py b/cfa_azure/helpers.py index 1d15984..8c719cb 100644 --- a/cfa_azure/helpers.py +++ b/cfa_azure/helpers.py @@ -25,7 +25,7 @@ JobAction, JobConstraints, OnAllTasksComplete, - OnTaskFailure, + OnTaskFailure ) from azure.containerregistry import ContainerRegistryClient from azure.core.exceptions import HttpResponseError From 4691198e4adf93c74ab067ca942876fa90a078b9 Mon Sep 17 00:00:00 2001 From: Ryan Raasch Date: Thu, 12 Dec 2024 22:48:30 +0000 Subject: [PATCH 03/23] updated automation and new readme --- README.md | 2 +- cfa_azure/automation.py | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/README.md b/README.md index 3fe3845..ed07952 100644 --- a/README.md +++ b/README.md @@ -543,4 +543,4 @@ collaboration and collaborative potential. All government records will be published through the [CDC web site](http://www.cdc.gov). ## Additional Standard Notices -Please refer to [CDC's Template Repository](https://github.com/CDCgov/template) for more information about [contributing to this repository](https://github.com/CDCgov/template/blob/main/CONTRIBUTING.md), [public domain notices and disclaimers](https://github.com/CDCgov/template/blob/main/DISCLAIMER.md), and [code of conduct](https://github.com/CDCgov/template/blob/main/code-of-conduct.md). +Please refer to [CDC's Template Repository](https://github.com/CDCgov/template) for more information about [contributing to this repository](https://github.com/CDCgov/template/blob/main/CONTRIBUTING.md), [public domain notices and disclaimers](https://github.com/CDCgov/template/blob/main/DISCLAIMER.md), and [code of conduct](https://github.com/CDCgov/template/blob/main/code-of-conduct.md). \ No newline at end of file diff --git a/cfa_azure/automation.py b/cfa_azure/automation.py index af7ec57..b435e85 100644 --- a/cfa_azure/automation.py +++ b/cfa_azure/automation.py @@ -1,8 +1,6 @@ import itertools - import pandas as pd import toml - from cfa_azure import helpers from cfa_azure.clients import AzureClient From 4054a230ff6e16d51a2085cd4f31deaa11a0d5d9 Mon Sep 17 00:00:00 2001 From: Ryan Raasch Date: Mon, 16 Dec 2024 22:26:59 +0000 Subject: [PATCH 04/23] new automation functions and documentation --- automation_README.md | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/automation_README.md b/automation_README.md index 7361d53..8bcd91b 100644 --- a/automation_README.md +++ b/automation_README.md @@ -1,7 +1,11 @@ # Automation Module ## Overview +<<<<<<< HEAD The `automation` module as part of `cfa_azure` is designed to perform certain actions in Azure based on a configuration file. This allows users to interact with Azure via this `cfa_azure` package even with little python experience. It also allows users to take their config file and make minor tweaks in order to upload new files, run different tasks, etc. It provides a more flexible framework than changing user-provided parameters spread throughout different functions in a python script. +======= +The `automation` module as part of `cfa_azure.automation` is designed to perform certain actions in Azure based on a configuration file. This allows users to interact with Azure via this cfa_azure package even with little python experience. It also allows users to take their config file and make minor tweaks as new files need to be uploaded, slightly different tasks are needed to be run, etc. It provides a more flexible framework than changing user-provided parameters spread throughout different functions in a python script. +>>>>>>> 3a7ad48 (new automation functions and documentation) Currently, the `automation` module is comprised of two functions to automate certain tasks: 1. `run_experiment`: useful when needing to create tasks based on a base command and a permutation of variables @@ -29,16 +33,26 @@ There are two slightly different configuration files depending on which function The experiment config will have the following additional section and keys: - [experiment] - base_cmd: the full docker command for executing the tasks, including some indication of how the variables should be included. This usually entails a flag or argument. See the example for more details. +<<<<<<< HEAD - the variable names along with their list of possible values. See the example for more details. Note that the order matters. For example, if you have variables `input` and `num` listed in that order in the config file, the values for `input` will be put into the {0} spot in the base_cmd and the values for `num` will be put into the {1} spot. Each variable should be on its own line in the form `var_name=[val1, val2, ...]`. +======= + - the variable names along with their list of possible values. See the example for more details. Note that the order matters. For example, if you have variables `input` and `num` listed in that order in the config file, the values for `input` will be put into the {0} spot in the base_cmd and the values for `num` will be put into the {1} spot. +>>>>>>> 3a7ad48 (new automation functions and documentation) The task config will have the following additional section(s) and keys: - [[task]] - cmd: the full docker command to use for the task - name: the name for the task. Required if using dependent tasks. - depends_on: a list of task names the task depends on to run. Optional. +<<<<<<< HEAD - run_dependent_tasks_on_fail: true or false, whether you want dependent tasks to run even if the parent task(s) fails. Optional. Notice above that [[task]] is in double brackets. This is necessary because there can be repeated sections starting with [[task]] as the header, followed by the task-specific information; one [[task]] section for each task to be submitted. See the task_config.toml example for more information. +======= + - run_dependent_tasks_on_fail: true or false, wether you want dependent tasks to run even if the parents task(s) fails. Optional. + +Notice above that [[task]] is in double brackets. This is necessary because there can be repeated sections starting with [[task]] as the header, followed by the task-specific information. See the task_config.toml example for more information. +>>>>>>> 3a7ad48 (new automation functions and documentation) See the example [experiment config](examples/automation/exp_config.toml) and [task config](examples/automation/task_config.toml) for more help. @@ -65,21 +79,33 @@ python3 /input/data/vars.py --var1 2 --var2 11 --var3 '99' ... ``` +<<<<<<< HEAD You can use the `run_experiment` function in two lines of code, as shown below. ``` from cfa_azure.automation import run_experiment run_experiment(exp_config = "path/to/exp_config.toml", auth_config = "path/to/auth_config.toml") +======= +You can use the `run_experiment` function in two lines of code, as shown below. +``` +from cfa_azure.automation import run_experiment +run_experiment(exp_config = "path/to/exp_config.toml", auth_config = "path/to/auth_config.toml") +>>>>>>> 3a7ad48 (new automation functions and documentation) ``` ## run_tasks() +<<<<<<< HEAD The `run_tasks()` function is designed to take an arbitrary number of tasks from a configuration file to submit them as part of a job. Any folders or files included in the [upload] section of the config will be uploaded before kicking off the tasks. +======= +The `run_tasks()` function is designed to take an arbitrary number of tasks from a configuration file to submit them as part of a job. Any folders or files included in the [upload] section of the config will be uploaded before kicking off the tasks. +>>>>>>> 3a7ad48 (new automation functions and documentation) Suppose we want to kick off two tasks we'll call "do_first" and "second_depends_on_first", where the R script is stored in Blob storage at the relative mount path "input/scripts/execute.R", the script can take different flags as input, the second task depends on the first, and we will run the second task even if the first fails. We would setup the task_config.toml to have the following info in the [[task]] sections: ``` [[task]] cmd = 'Rscript /input/scripts/execute.R --data /example/data.csv" +<<<<<<< HEAD name = "do_first" [[task]] @@ -87,11 +113,25 @@ cmd = 'Rscript /input/scripts/execute.R --model /example/model.pkl' name = "second_depends_on_first" depends_on = ["do_first"] run_dependent_tasks_on_fail = true +======= +name="do_first" + +[[task]] +cmd = 'Rscript /input/scripts/execute.R --model /example/model.pkl' +name="second_depends_on_first" +depends_on= ["do_first"] +run_dependent_tasks_on_fail=true +>>>>>>> 3a7ad48 (new automation functions and documentation) ``` You can then run the tasks in two lines of code, as shown below. ``` from cfa_azure.automation import run_tasks +<<<<<<< HEAD run_experiment(task_config = "path/to/task_config.toml", auth_config = "path/to/auth_config.toml") ``` +======= +run_experiment(task_config = "path/to/task_config.toml", auth_config = "path/to/auth_config.toml") +``` +>>>>>>> 3a7ad48 (new automation functions and documentation) From fcdb74ead42e18b0b1497d79aaa83b644292ef50 Mon Sep 17 00:00:00 2001 From: Ryan Raasch Date: Mon, 16 Dec 2024 22:31:36 +0000 Subject: [PATCH 05/23] new automation functions and documentation --- automation_README.md | 42 +----------------------------------------- 1 file changed, 1 insertion(+), 41 deletions(-) diff --git a/automation_README.md b/automation_README.md index 8bcd91b..68f4b52 100644 --- a/automation_README.md +++ b/automation_README.md @@ -1,11 +1,7 @@ # Automation Module ## Overview -<<<<<<< HEAD The `automation` module as part of `cfa_azure` is designed to perform certain actions in Azure based on a configuration file. This allows users to interact with Azure via this `cfa_azure` package even with little python experience. It also allows users to take their config file and make minor tweaks in order to upload new files, run different tasks, etc. It provides a more flexible framework than changing user-provided parameters spread throughout different functions in a python script. -======= -The `automation` module as part of `cfa_azure.automation` is designed to perform certain actions in Azure based on a configuration file. This allows users to interact with Azure via this cfa_azure package even with little python experience. It also allows users to take their config file and make minor tweaks as new files need to be uploaded, slightly different tasks are needed to be run, etc. It provides a more flexible framework than changing user-provided parameters spread throughout different functions in a python script. ->>>>>>> 3a7ad48 (new automation functions and documentation) Currently, the `automation` module is comprised of two functions to automate certain tasks: 1. `run_experiment`: useful when needing to create tasks based on a base command and a permutation of variables @@ -33,26 +29,16 @@ There are two slightly different configuration files depending on which function The experiment config will have the following additional section and keys: - [experiment] - base_cmd: the full docker command for executing the tasks, including some indication of how the variables should be included. This usually entails a flag or argument. See the example for more details. -<<<<<<< HEAD - the variable names along with their list of possible values. See the example for more details. Note that the order matters. For example, if you have variables `input` and `num` listed in that order in the config file, the values for `input` will be put into the {0} spot in the base_cmd and the values for `num` will be put into the {1} spot. Each variable should be on its own line in the form `var_name=[val1, val2, ...]`. -======= - - the variable names along with their list of possible values. See the example for more details. Note that the order matters. For example, if you have variables `input` and `num` listed in that order in the config file, the values for `input` will be put into the {0} spot in the base_cmd and the values for `num` will be put into the {1} spot. ->>>>>>> 3a7ad48 (new automation functions and documentation) The task config will have the following additional section(s) and keys: - [[task]] - cmd: the full docker command to use for the task - name: the name for the task. Required if using dependent tasks. - depends_on: a list of task names the task depends on to run. Optional. -<<<<<<< HEAD - run_dependent_tasks_on_fail: true or false, whether you want dependent tasks to run even if the parent task(s) fails. Optional. Notice above that [[task]] is in double brackets. This is necessary because there can be repeated sections starting with [[task]] as the header, followed by the task-specific information; one [[task]] section for each task to be submitted. See the task_config.toml example for more information. -======= - - run_dependent_tasks_on_fail: true or false, wether you want dependent tasks to run even if the parents task(s) fails. Optional. - -Notice above that [[task]] is in double brackets. This is necessary because there can be repeated sections starting with [[task]] as the header, followed by the task-specific information. See the task_config.toml example for more information. ->>>>>>> 3a7ad48 (new automation functions and documentation) See the example [experiment config](examples/automation/exp_config.toml) and [task config](examples/automation/task_config.toml) for more help. @@ -79,33 +65,21 @@ python3 /input/data/vars.py --var1 2 --var2 11 --var3 '99' ... ``` -<<<<<<< HEAD You can use the `run_experiment` function in two lines of code, as shown below. ``` from cfa_azure.automation import run_experiment run_experiment(exp_config = "path/to/exp_config.toml", auth_config = "path/to/auth_config.toml") -======= -You can use the `run_experiment` function in two lines of code, as shown below. -``` -from cfa_azure.automation import run_experiment -run_experiment(exp_config = "path/to/exp_config.toml", auth_config = "path/to/auth_config.toml") ->>>>>>> 3a7ad48 (new automation functions and documentation) ``` ## run_tasks() -<<<<<<< HEAD The `run_tasks()` function is designed to take an arbitrary number of tasks from a configuration file to submit them as part of a job. Any folders or files included in the [upload] section of the config will be uploaded before kicking off the tasks. -======= -The `run_tasks()` function is designed to take an arbitrary number of tasks from a configuration file to submit them as part of a job. Any folders or files included in the [upload] section of the config will be uploaded before kicking off the tasks. ->>>>>>> 3a7ad48 (new automation functions and documentation) Suppose we want to kick off two tasks we'll call "do_first" and "second_depends_on_first", where the R script is stored in Blob storage at the relative mount path "input/scripts/execute.R", the script can take different flags as input, the second task depends on the first, and we will run the second task even if the first fails. We would setup the task_config.toml to have the following info in the [[task]] sections: ``` [[task]] cmd = 'Rscript /input/scripts/execute.R --data /example/data.csv" -<<<<<<< HEAD name = "do_first" [[task]] @@ -113,25 +87,11 @@ cmd = 'Rscript /input/scripts/execute.R --model /example/model.pkl' name = "second_depends_on_first" depends_on = ["do_first"] run_dependent_tasks_on_fail = true -======= -name="do_first" - -[[task]] -cmd = 'Rscript /input/scripts/execute.R --model /example/model.pkl' -name="second_depends_on_first" -depends_on= ["do_first"] -run_dependent_tasks_on_fail=true ->>>>>>> 3a7ad48 (new automation functions and documentation) ``` You can then run the tasks in two lines of code, as shown below. ``` from cfa_azure.automation import run_tasks -<<<<<<< HEAD run_experiment(task_config = "path/to/task_config.toml", auth_config = "path/to/auth_config.toml") -``` -======= -run_experiment(task_config = "path/to/task_config.toml", auth_config = "path/to/auth_config.toml") -``` ->>>>>>> 3a7ad48 (new automation functions and documentation) +``` \ No newline at end of file From 534f6d25ea0f618933c60bfa1f18313d99e7fb55 Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Tue, 17 Dec 2024 05:29:48 +0000 Subject: [PATCH 06/23] Added configuration for container and pool --- cfa_azure/automation.py | 55 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 3 deletions(-) diff --git a/cfa_azure/automation.py b/cfa_azure/automation.py index b435e85..bc1d489 100644 --- a/cfa_azure/automation.py +++ b/cfa_azure/automation.py @@ -38,7 +38,34 @@ def run_experiment(exp_config: str, auth_config: str | None = None): print("could not create AzureClient object.") return None + # Get cotntainer details from exp_toml + if "registry_name" in exp_toml["setup"].keys(): + registry_name = exp_toml["setup"]["registry_name"] + else: + print( + f"registry_name must be provided in setup" + ) + return None + + if "repository_name" in exp_toml["setup"].keys(): + repository_name = exp_toml["setup"]["repository_name"] + else: + print( + f"repository_name must be provided in setup" + ) + return None + + if "tag_name" in exp_toml["setup"].keys(): + tag_name = exp_toml["setup"]["tag_name"] + else: + tag_name = "latest" + + client.set_azure_container( + registry_name=registry_name, repo_name=repository_name, tag_name=tag_name + ) + # check pool included in exp_toml and exists in azure + pool_name = None if "pool_name" in exp_toml["setup"].keys(): if not helpers.check_pool_exists( resource_group_name=client.resource_group_name, @@ -46,11 +73,33 @@ def run_experiment(exp_config: str, auth_config: str | None = None): pool_name=exp_toml["setup"]["pool_name"], batch_mgmt_client=client.batch_mgmt_client, ): + pool_name = exp_toml["setup"]["pool_name"] print( - f"pool name {exp_toml['setup']['pool_name']} does not exist in the Azure environment." + f"pool name {pool_name} does not exist in the Azure environment." ) - return None - pool_name = exp_toml["setup"]["pool_name"] + if "scaling_mode" in exp_toml["setup"].keys(): + scaling_mode = exp_toml["setup"]["scaling_mode"] + else: + scaling_mode = "fixed" + if scaling_mode == "autoscale": + if "autoscale_formula_path" in exp_toml["setup"].keys(): + autoscale_formula_path = exp_toml["setup"]["autoscale_formula_path"] + print("Creating pool with autoscaling mode") + client.set_pool_info(mode=scaling_mode, autoscale_formula_path=autoscale_formula_path) + else: + print( + f"Autoscaling formula path must be provided if scaling_mode is set to autoscaling." + ) + return None + else: + client.set_pool_info(mode=scaling_mode) + client.create_pool(pool_name) + print( + f"pool name {pool_name} has been created." + ) + else: + pool_name=exp_toml["setup"]["pool_name"] + client.pool_name = pool_name else: print("could not find 'pool_name' key in 'setup' section of exp toml.") print("please specify a pool name to use.") From 5894978d7f4edb743d23866a4cb3204af3381929 Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Thu, 19 Dec 2024 05:11:35 +0000 Subject: [PATCH 07/23] Added config options for client during initialization --- cfa_azure/automation.py | 57 ++----------------- cfa_azure/clients.py | 119 +++++++++++++++++++++++++++++++--------- 2 files changed, 97 insertions(+), 79 deletions(-) diff --git a/cfa_azure/automation.py b/cfa_azure/automation.py index bc1d489..af7ec57 100644 --- a/cfa_azure/automation.py +++ b/cfa_azure/automation.py @@ -1,6 +1,8 @@ import itertools + import pandas as pd import toml + from cfa_azure import helpers from cfa_azure.clients import AzureClient @@ -38,34 +40,7 @@ def run_experiment(exp_config: str, auth_config: str | None = None): print("could not create AzureClient object.") return None - # Get cotntainer details from exp_toml - if "registry_name" in exp_toml["setup"].keys(): - registry_name = exp_toml["setup"]["registry_name"] - else: - print( - f"registry_name must be provided in setup" - ) - return None - - if "repository_name" in exp_toml["setup"].keys(): - repository_name = exp_toml["setup"]["repository_name"] - else: - print( - f"repository_name must be provided in setup" - ) - return None - - if "tag_name" in exp_toml["setup"].keys(): - tag_name = exp_toml["setup"]["tag_name"] - else: - tag_name = "latest" - - client.set_azure_container( - registry_name=registry_name, repo_name=repository_name, tag_name=tag_name - ) - # check pool included in exp_toml and exists in azure - pool_name = None if "pool_name" in exp_toml["setup"].keys(): if not helpers.check_pool_exists( resource_group_name=client.resource_group_name, @@ -73,33 +48,11 @@ def run_experiment(exp_config: str, auth_config: str | None = None): pool_name=exp_toml["setup"]["pool_name"], batch_mgmt_client=client.batch_mgmt_client, ): - pool_name = exp_toml["setup"]["pool_name"] - print( - f"pool name {pool_name} does not exist in the Azure environment." - ) - if "scaling_mode" in exp_toml["setup"].keys(): - scaling_mode = exp_toml["setup"]["scaling_mode"] - else: - scaling_mode = "fixed" - if scaling_mode == "autoscale": - if "autoscale_formula_path" in exp_toml["setup"].keys(): - autoscale_formula_path = exp_toml["setup"]["autoscale_formula_path"] - print("Creating pool with autoscaling mode") - client.set_pool_info(mode=scaling_mode, autoscale_formula_path=autoscale_formula_path) - else: - print( - f"Autoscaling formula path must be provided if scaling_mode is set to autoscaling." - ) - return None - else: - client.set_pool_info(mode=scaling_mode) - client.create_pool(pool_name) print( - f"pool name {pool_name} has been created." + f"pool name {exp_toml['setup']['pool_name']} does not exist in the Azure environment." ) - else: - pool_name=exp_toml["setup"]["pool_name"] - client.pool_name = pool_name + return None + pool_name = exp_toml["setup"]["pool_name"] else: print("could not find 'pool_name' key in 'setup' section of exp toml.") print("please specify a pool name to use.") diff --git a/cfa_azure/clients.py b/cfa_azure/clients.py index 0acb8a5..32929db 100644 --- a/cfa_azure/clients.py +++ b/cfa_azure/clients.py @@ -40,23 +40,14 @@ def __init__( AzureClient object """ self.config_path = config_path - self.credential_method = credential_method self.debug = None - self.scaling = None - self.input_container_name = None - self.output_container_name = None self.files = [] self.task_id_max = 0 self.jobs = set() - self.container_registry_server = None - self.registry_url = None - self.container_image_name = None - self.full_container_name = None self.input_mount_dir = None self.output_mount_dir = None self.mounts = [] self.mount_container_clients = [] - self.pool_parameters = None self.timeout = None self.save_logs_to_blob = None self.logs_folder = "stdout_stderr" @@ -161,6 +152,39 @@ def __init__( logger.warning("Could not find resource group name in config.") # get credentials + self._initialize_authentication(credential_method) + logger.debug(f"generated credentials from {credential_method}.") + # initialize registry + self._initialize_registry() + # create blob service account + self.blob_service_client = helpers.get_blob_service_client( + self.config, self.cred + ) + logger.debug("generated Blob Service Client.") + # create batch mgmt client + self.batch_mgmt_client = helpers.get_batch_mgmt_client( + self.config, self.secret_cred + ) + logger.debug("generated Batch Management Client.") + # create batch service client + self.batch_client = helpers.get_batch_service_client( + self.config, self.batch_cred + ) + # Create pool + self._initialize_pool() + # Set up containers + self._initialize_containers() + logger.info("Client initialized! Happy coding!") + + def _initialize_authentication(self, credential_method): + """Called by init method to set up authentication + Args: + config (str): config dict + """ + if "credential_method" in self.config["Authentication"].keys(): + self.credential_method = credential_method = self.config["Authentication"]["credential_method"] + else: + self.credential_method = credential_method if "identity" in self.credential_method.lower(): self.cred = ManagedIdentityCredential() elif "sp" in self.credential_method.lower(): @@ -192,7 +216,6 @@ def __init__( raise Exception( "please choose a credential_method from 'identity', 'sp', 'ext_user', 'env' and try again." ) - if "sp_secrets" not in self.config["Authentication"].keys(): sp_secret = helpers.get_sp_secret(self.config, self.cred) self.secret_cred = ClientSecretCredential( @@ -207,26 +230,68 @@ def __init__( resource="https://batch.core.windows.net/", ) - logger.debug(f"generated credentials from {credential_method}.") - - # create blob service account + def _initialize_registry(self): + """Called by init to initialize the registry URL and details""" + self.container_registry_server = None + self.container_image_name = None + self.full_container_name = None + if "registry_name" in self.config["Container"].keys(): + registry_name = self.config["Container"]["registry_name"] + self.container_registry_server = f"{registry_name}.azurecr.io" + self.registry_url = f"https://{self.container_registry_server}" + else: + self.registry_url = None - self.blob_service_client = helpers.get_blob_service_client( - self.config, self.cred - ) - logger.debug("generated Blob Service Client.") + if "repository_name" in self.config["Container"].keys(): + repository_name = self.config["Container"]["repository_name"] + + if "tag_name" in self.config["Container"].keys(): + tag_name = self.config["Container"]["tag_name"] + else: + tag_name = "latest" - # create batch mgmt client - self.batch_mgmt_client = helpers.get_batch_mgmt_client( - self.config, self.secret_cred - ) - logger.debug("generated Batch Management Client.") + if registry_name and repository_name: + self.set_azure_container( + registry_name=registry_name, repo_name=repository_name, tag_name=tag_name + ) - # create batch service client - self.batch_client = helpers.get_batch_service_client( - self.config, self.batch_cred - ) - logger.info("Client initialized! Happy coding!") + def _initialize_pool(self): + """Called by init to initialize the pool""" + self.pool_parameters = None + self.pool_name = self.config["Batch"]["pool_name"] if "pool_name" in self.config["Batch"].keys() else None + self.scaling = self.config["Batch"]["scaling_mode"] if "scaling_mode" in self.config["Batch"].keys() else None + if self.pool_name: + if self.scaling == "autoscale" and "autoscale_formula_path" in self.config["Batch"].keys(): + autoscale_formula_path = self.config["Batch"]["autoscale_formula_path"] + print("Creating pool with autoscaling mode") + self.set_pool_info(mode=self.scaling, autoscale_formula_path=autoscale_formula_path) + elif self.scaling == "fixed": + dedicated_nodes = self.config["Batch"]["dedicated_nodes"] if "dedicated_nodes" in self.config["Batch"].keys() else 0 + low_priority_nodes = self.config["Batch"]["low_priority_nodes"] if "low_priority_nodes" in self.config["Batch"].keys() else 1 + node_deallocation_option = self.config["Batch"]["node_deallocation_option"] if "node_deallocation_option" in self.config["Batch"].keys() else None + self.set_pool_info( + mode=self.scaling, + dedicated_nodes=dedicated_nodes, + low_priority_nodes=low_priority_nodes, + node_deallocation_option=node_deallocation_option + ) + else: + pass + self.create_pool(self.pool_name) + else: + pass + + def _initialize_containers(self): + """Called by init to initialize input and output containers""" + self.input_container_name = self.config["Container"]["input_container_name"] if "input_container_name" in self.config["Container"].keys() else None + self.output_container_name = self.config["Container"]["output_container_name"] if "output_container_name" in self.config["Container"].keys() else None + if self.input_container_name and self.output_container_name: + self.update_containers( + pool_name=self.pool_name, + input_container_name=self.input_container_name, + output_container_name=self.output_container_name, + force_update=False + ) def set_debugging(self, debug: bool) -> None: """required method that determines whether debugging is on or off. Debug = True for 'on', debug = False for 'off'. From b5685f4ab68f5ab5a3418c79e20600c323172478 Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Tue, 3 Dec 2024 14:11:34 +0000 Subject: [PATCH 08/23] Updated unit tests --- cfa_azure/batch.py | 232 +++++++++++++++++++++++++++++++++++++++++ cfa_azure/clients.py | 4 +- tests/batch_tests.py | 187 +++++++++++++++++++++++++++++++++ tests/clients_tests.py | 39 ++----- tests/fake_client.py | 22 ++-- tests/helpers_tests.py | 19 ++-- 6 files changed, 447 insertions(+), 56 deletions(-) create mode 100644 cfa_azure/batch.py create mode 100644 tests/batch_tests.py diff --git a/cfa_azure/batch.py b/cfa_azure/batch.py new file mode 100644 index 0000000..7a11807 --- /dev/null +++ b/cfa_azure/batch.py @@ -0,0 +1,232 @@ +import datetime +import subprocess as sp + +import toml + +from cfa_azure import helpers +from azure.identity import DefaultAzureCredential + +def create_pool( + pool_id: str, + input_container_name: str, + output_container_name: str, + config_path: str, + autoscale_formula_path: str, +): + """Creates pool(s) in Azure Batch if not exists along with input + and output containers based on config. + + Args: + pool_id (str): Name of the pool to use. + input_container_name (str): Name to be used for input Blob container. + output_container_name (str): Name to be used for output Blob container. + config_path (str): Path to config file. + autoscale_formula_path (str): Path to autoscale formula file. + + Returns: + json: JSON containing pool_ID and creation_time. + """ + + print("Starting the pool creation process...") + + # Load config + print("Loading configuration...") + config = helpers.read_config(config_path) + + # Get credentials + print("Retrieving service principal credentials...") + sp_credential=DefaultAzureCredential() + sp_secret = helpers.get_sp_secret(config=config, credential=sp_credential) + + # Create blob service account + print("Setting up Blob service client...") + blob_service_client = helpers.get_blob_service_client( + sp_credential, config + ) + + print("Setting up Azure Batch management client...") + batch_mgmt_client = helpers.get_batch_mgmt_client(sp_credential, config) + + print("Preparing batch pool configuration...") + batch_json = helpers.get_batch_pool_json( + input_container_name, + output_container_name, + config, + autoscale_formula_path, + ) + + account_name = config["Batch"]["batch_account_name"] + resource_group_name = config["Authentication"]["resource_group"] + + ####updates + # take in pool-id + # check if pool-id already exists in environment + exists = 0 + try: + pool_info = batch_mgmt_client.pool.get( + resource_group_name, account_name, pool_name=pool_id + ) + exists = 1 + except Exception as e: + print(e) + # check if user wants to proceed + + if exists == 1: + print(f"{pool_id} already exists.") + print(f"Created: {pool_info.creation_time}") + print(f"Last modified: {pool_info.last_modified}") + print(f"VM size: {pool_info.vm_size}") + + # Check if user wants to proceed if the pool already exists + if exists == 1: + cont = input("Do you still want to use this pool? [Y/n]: ") + if cont.lower() != "y": + print( + "No pool created since it already exists. Exiting the process." + ) + return None + + print("Creating input and output containers...") + + start_time = datetime.datetime.now() + + helpers.create_blob_containers( + blob_service_client, input_container_name, output_container_name + ) + + print(f"Creating the pool '{pool_id}'...") + pool_id = helpers.create_batch_pool(batch_mgmt_client, batch_json) + print(f"Pool '{pool_id}' created successfully.") + + end_time = datetime.datetime.now() + creation_time = round((end_time - start_time).total_seconds(), 2) + print(f"Pool creation process completed in {creation_time} seconds.") + + return { + "pool_id": pool_id, + "creation_time": creation_time, + } + + +def upload_files_to_container( + folder_names: list[str], + input_container_name: str, + blob_service_client: object, + verbose: bool = False, + force_upload: bool = False, +): + """Uploads the files in specified folders to a Blob container. + Args: + blob_service_client (object): Blob service client class. + folder_names (list[str]): A list of folder names from which all files will be uploaded. + input_container_name (str): Name of input container to upload files to. + + Returns: + list: List of input file names that were uploaded. + """ + print(f"Starting to upload files to container: {input_container_name}") + input_files = [] # Empty list of input files + for _folder in folder_names: + # Add uploaded file names to input files list + uploaded_files = helpers.upload_files_in_folder( + _folder, + input_container_name, + blob_service_client, + verbose, + force_upload, + ) + input_files += uploaded_files + print(f"Uploaded {len(uploaded_files)} files from {_folder}.") + print(f"Finished uploading files to container: {input_container_name}") + return input_files + + +def run_job( + job_id: str, + task_id_base: str, + docker_cmd: str, + input_container_name: str, + output_container_name: str, + input_files: list[str] | None = None, + timeout: int = 90, + config_path: str = "./configuration.toml", + debug: bool = True, +): + print(f"Starting job: {job_id}") + # Load config + config = toml.load(config_path) + + # Get credentials + sp_credential = DefaultAzureCredential() + sp_secret = helpers.get_sp_secret(config=config, credential=sp_credential) + + # Check input_files + print("Checking input files against container contents...") + if input_files: + missing_files = [] + container_files = helpers.list_files_in_container( + input_container_name, sp_credential, config + ) + # Check files exist in the container + for f in input_files: + if f not in container_files: + missing_files.append(f) # Gather list of missing files + if missing_files: + print("The following input files are missing from the container:") + for m in missing_files: + print(f" {m}") + print("Not all input files exist in container. Closing job.") + return None + else: + input_files = helpers.list_files_in_container( + input_container_name, sp_credential, config + ) + print(f"All files in container '{input_container_name}' will be used.") + + # Get the batch service client + batch_client = helpers.get_batch_service_client(sp_secret, config) + + # Add the job to the pool + print(f"Adding job '{job_id}' to the pool...") + helpers.add_job(job_id, batch_client, config) + + # Add the tasks to the job + print(f"Adding tasks to job '{job_id}'...") + helpers.add_task_to_job( + job_id, task_id_base, docker_cmd, input_files, batch_client, config + ) + + # Monitor tasks + print(f"Monitoring tasks for job '{job_id}'...") + monitor = helpers.monitor_tasks(batch_client, job_id, timeout) + print(monitor) + + if debug: + print("Job complete. Time to debug. Job not deleted.") + else: + print("Cleaning up - deleting job.") + batch_client.job.delete(job_id) + + +def package_and_upload_dockerfile(config: dict): + """Packages and uploads Dockerfile to Azure Container Registry. + + Args: + config (dict): Config dictionary with container_account_name and container_name. + """ + print("Packaging and uploading Dockerfile to Azure Container Registry...") + container_account_name = config["Container"]["container_account_name"] + name_and_tag = config["Container"]["container_name"] + # Execute the shell script to package and upload the container + result = sp.call( + [ + "bash", + "cfa_azure/package_and_upload_container.sh", + container_account_name, + name_and_tag, + ] + ) + if result == 0: + print("Dockerfile packaged and uploaded successfully.") + else: + print("Failed to package and upload Dockerfile.") diff --git a/cfa_azure/clients.py b/cfa_azure/clients.py index 32929db..901b751 100644 --- a/cfa_azure/clients.py +++ b/cfa_azure/clients.py @@ -664,7 +664,7 @@ def update_containers( container_registry_server=self.container_registry_server, config=self.config, mount_config=mount_config, - credential=self.secret_cred, + credential=self.secret_cred ) self.create_pool(pool_name) return pool_name @@ -777,7 +777,7 @@ def update_container_set( container_registry_server=self.container_registry_server, config=self.config, mount_config=mount_config, - credential=self.secret_cred, + credential=self.secret_cred ) self.create_pool(pool_name) return pool_name diff --git a/tests/batch_tests.py b/tests/batch_tests.py new file mode 100644 index 0000000..0f4621c --- /dev/null +++ b/tests/batch_tests.py @@ -0,0 +1,187 @@ +# ruff: noqa: F403, F405 + +import unittest +from unittest.mock import MagicMock, patch + +from callee import Contains + +import cfa_azure +import cfa_azure.batch +from tests.fake_client import * + + +class TestBatch(unittest.TestCase): + @patch("builtins.print") + @patch( + "cfa_azure.helpers.read_config", MagicMock(return_value=FAKE_CONFIG) + ) + @patch("cfa_azure.helpers.get_sp_secret", MagicMock(return_value=True)) + @patch( + "cfa_azure.helpers.get_blob_service_client", + MagicMock(return_value=True), + ) + @patch( + "cfa_azure.helpers.get_batch_mgmt_client", MagicMock(return_value=True) + ) + @patch( + "cfa_azure.helpers.get_batch_pool_json", MagicMock(return_value=True) + ) + @patch( + "cfa_azure.helpers.create_blob_containers", + MagicMock(return_value=True), + ) + @patch( + "cfa_azure.helpers.create_batch_pool", + MagicMock(return_value=FAKE_BATCH_POOL), + ) + def test_create_pool(self, mock_print): + input_container_name = FAKE_INPUT_CONTAINER + output_container_name = FAKE_OUTPUT_CONTAINER + config_path = "some_path" + autoscale_formula_path = "test_formula" + status = cfa_azure.batch.create_pool( + FAKE_BATCH_POOL, + input_container_name, + output_container_name, + config_path, + autoscale_formula_path, + ) + mock_print.assert_called_with( + Contains("Pool creation process completed") + ) + self.assertTrue(status) + + @patch("builtins.print") + @patch( + "cfa_azure.helpers.read_config", MagicMock(return_value=FAKE_CONFIG) + ) + @patch("cfa_azure.helpers.get_sp_secret", MagicMock(return_value=True)) + @patch( + "cfa_azure.helpers.get_blob_service_client", + MagicMock(return_value=True), + ) + @patch( + "cfa_azure.helpers.get_batch_mgmt_client", + MagicMock(return_value=FakeClient()), + ) + @patch("builtins.input", MagicMock(return_value="n")) + @patch( + "cfa_azure.helpers.get_batch_pool_json", MagicMock(return_value=True) + ) + @patch( + "cfa_azure.helpers.create_blob_containers", + MagicMock(return_value=True), + ) + @patch( + "cfa_azure.helpers.create_batch_pool", + MagicMock(return_value=FAKE_BATCH_POOL), + ) + def test_create_pool_if_already_exists(self, mock_print): + input_container_name = FAKE_INPUT_CONTAINER + output_container_name = FAKE_OUTPUT_CONTAINER + config_path = "some_path" + autoscale_formula_path = "test_formula" + cfa_azure.batch.create_pool( + FAKE_BATCH_POOL, + input_container_name, + output_container_name, + config_path, + autoscale_formula_path, + ) + mock_print.assert_called_with( + "No pool created since it already exists. Exiting the process." + ) + + @patch("builtins.print") + @patch("toml.load", MagicMock(return_value=FAKE_CONFIG)) + @patch("cfa_azure.helpers.get_sp_secret", MagicMock(return_value=True)) + @patch( + "cfa_azure.helpers.get_batch_service_client", + MagicMock(return_value=FakeClient()), + ) + @patch("cfa_azure.helpers.add_job", MagicMock(return_value=True)) + @patch("cfa_azure.helpers.add_task_to_job", MagicMock(return_value=True)) + @patch("cfa_azure.helpers.monitor_tasks", MagicMock(return_value=True)) + @patch( + "cfa_azure.helpers.list_files_in_container", + MagicMock(return_value=FAKE_FOLDER_CONTENTS), + ) + def test_run_job(self, mock_print): + cfa_azure.batch.run_job( + "test_job_id", + "test_task_id", + "docker run something", + FAKE_INPUT_CONTAINER, + FAKE_OUTPUT_CONTAINER, + ) + mock_print.assert_called_with( + "Job complete. Time to debug. Job not deleted." + ) + + @patch("builtins.print") + @patch("toml.load", MagicMock(return_value=FAKE_CONFIG)) + @patch("cfa_azure.helpers.get_sp_secret", MagicMock(return_value=True)) + @patch( + "cfa_azure.helpers.get_batch_service_client", + MagicMock(return_value=FakeClient()), + ) + @patch("cfa_azure.helpers.add_job", MagicMock(return_value=True)) + @patch("cfa_azure.helpers.add_task_to_job", MagicMock(return_value=True)) + @patch("cfa_azure.helpers.monitor_tasks", MagicMock(return_value=True)) + @patch( + "cfa_azure.helpers.list_files_in_container", + MagicMock(return_value=FAKE_FOLDER_CONTENTS), + ) + def test_run_job_missing_files(self, mock_print): + cfa_azure.batch.run_job( + "test_job_id", + "test_task_id", + "docker run something", + FAKE_INPUT_CONTAINER, + FAKE_OUTPUT_CONTAINER, + input_files=['test_file.csv'] + ) + mock_print.assert_called_with( + "Not all input files exist in container. Closing job." + ) + + @patch("builtins.print") + @patch("toml.load", MagicMock(return_value=FAKE_CONFIG)) + @patch("cfa_azure.helpers.get_sp_secret", MagicMock(return_value=True)) + @patch( + "cfa_azure.helpers.get_batch_service_client", + MagicMock(return_value=FakeClient()), + ) + @patch("cfa_azure.helpers.add_job", MagicMock(return_value=True)) + @patch("cfa_azure.helpers.add_task_to_job", MagicMock(return_value=True)) + @patch("cfa_azure.helpers.monitor_tasks", MagicMock(return_value=True)) + @patch( + "cfa_azure.helpers.list_files_in_container", + MagicMock(return_value=FAKE_FOLDER_CONTENTS), + ) + def test_run_job_no_debugging(self, mock_print): + cfa_azure.batch.run_job( + "test_job_id", + "test_task_id", + "docker run something", + FAKE_INPUT_CONTAINER, + FAKE_OUTPUT_CONTAINER, + debug=False, + ) + mock_print.assert_called_with("Cleaning up - deleting job.") + + @patch("builtins.print") + @patch("subprocess.call", MagicMock(return_value=0)) + def test_package_and_upload_dockerfile(self, mock_print): + cfa_azure.batch.package_and_upload_dockerfile(FAKE_CONFIG) + mock_print.assert_called_with( + "Dockerfile packaged and uploaded successfully." + ) + + @patch("builtins.print") + @patch("subprocess.call", MagicMock(return_value=-1)) + def test_package_and_upload_dockerfile_failure(self, mock_print): + cfa_azure.batch.package_and_upload_dockerfile(FAKE_CONFIG) + mock_print.assert_called_with( + "Failed to package and upload Dockerfile." + ) diff --git a/tests/clients_tests.py b/tests/clients_tests.py index 37a439c..21fc784 100644 --- a/tests/clients_tests.py +++ b/tests/clients_tests.py @@ -10,10 +10,9 @@ class TestClients(unittest.TestCase): @patch("cfa_azure.clients.logger") - @patch( - "cfa_azure.helpers.read_config", - MagicMock(return_value=FAKE_CONFIG_MINIMAL), - ) + @patch("azure.identity.ClientSecretCredential.__init__", MagicMock(return_value=None)) + @patch("azure.common.credentials.ServicePrincipalCredentials.__init__", MagicMock(return_value=None)) + @patch("cfa_azure.helpers.read_config", MagicMock(return_value=FAKE_CONFIG_MINIMAL)) @patch("cfa_azure.helpers.check_config_req", MagicMock(return_value=True)) @patch("cfa_azure.helpers.get_sp_secret", MagicMock(return_value=True)) @patch( @@ -495,20 +494,10 @@ def test_update_container_set_forced(self): ) self.assertEqual(pool_name, FAKE_BATCH_POOL) - @patch( - "cfa_azure.helpers.check_pool_exists", MagicMock(return_value=False) - ) - @patch( - "cfa_azure.helpers.get_batch_service_client", - MagicMock(return_value=FakeClient()), - ) - @patch( - "cfa_azure.helpers.delete_pool", MagicMock(return_value=FakeClient()) - ) - @patch( - "cfa_azure.helpers.create_batch_pool", - MagicMock(return_value=FAKE_BATCH_POOL), - ) + @patch("cfa_azure.helpers.check_pool_exists", MagicMock(return_value=False)) + @patch("cfa_azure.helpers.get_batch_service_client", MagicMock(return_value=FakeClient())) + @patch("cfa_azure.helpers.delete_pool", MagicMock(return_value=FakeClient())) + @patch("cfa_azure.helpers.create_batch_pool", MagicMock(return_value=FAKE_BATCH_POOL)) def test_update_containers_new_pool(self): containers = [ {"name": FAKE_INPUT_CONTAINER, "relative_mount_dir": "input"}, @@ -524,17 +513,9 @@ def test_update_containers_new_pool(self): @patch("cfa_azure.clients.logger") @patch("cfa_azure.helpers.check_pool_exists", MagicMock(return_value=True)) - @patch( - "cfa_azure.helpers.get_batch_service_client", - MagicMock(return_value=FakeClient()), - ) - @patch( - "cfa_azure.helpers.delete_pool", MagicMock(return_value=FakeClient()) - ) - @patch( - "cfa_azure.helpers.create_batch_pool", - MagicMock(return_value=FAKE_BATCH_POOL), - ) + @patch("cfa_azure.helpers.get_batch_service_client", MagicMock(return_value=FakeClient())) + @patch("cfa_azure.helpers.delete_pool", MagicMock(return_value=FakeClient())) + @patch("cfa_azure.helpers.create_batch_pool", MagicMock(return_value=FAKE_BATCH_POOL)) def test_update_containers(self, mock_logger): pool_name = self.azure_client.update_containers( input_container_name=FAKE_INPUT_CONTAINER, diff --git a/tests/fake_client.py b/tests/fake_client.py index 230d122..251565b 100644 --- a/tests/fake_client.py +++ b/tests/fake_client.py @@ -252,19 +252,17 @@ def vm_size(self): @property def scale_settings(self): - return dict2obj( - { - "fixed_scale": { - "targetDedicatedNodes": 10, - "targetLowPriorityNodes": 5, - "resizeTimeout": 10, - }, - "auto_scale": { - "evaluationInterval": 10, - "formula": FAKE_AUTOSCALE_FORMULA, - }, + return dict2obj({ + "fixed_scale": { + "targetDedicatedNodes": 10, + "targetLowPriorityNodes": 5, + "resizeTimeout": 10, + }, + "auto_scale": { + "evaluationInterval": 10, + "formula": FAKE_AUTOSCALE_FORMULA } - ) + }) def get(self): return True diff --git a/tests/helpers_tests.py b/tests/helpers_tests.py index 24a8088..e4061c7 100644 --- a/tests/helpers_tests.py +++ b/tests/helpers_tests.py @@ -633,7 +633,7 @@ def test_check_azure_container_exists(self): registry_name=FAKE_CONTAINER_REGISTRY, repo_name="Fake Repo", tag_name="latest", - credential=FAKE_CREDENTIAL, + credential=FAKE_CREDENTIAL ) self.assertTrue(response) @@ -649,7 +649,7 @@ def test_check_azure_container_exists_missing_tag(self): registry_name=FAKE_CONTAINER_REGISTRY, repo_name="Fake Repo", tag_name="bad_tag_1", - credential=FAKE_CREDENTIAL, + credential=FAKE_CREDENTIAL ) self.assertIsNone(response) @@ -727,7 +727,7 @@ def test_add_job(self, mock_logger): job_id, FAKE_BATCH_POOL, batch_client=batch_client, - end_job_on_task_failure=False, + end_job_on_task_failure=False ) mock_logger.info.assert_called_with( f"Job '{job_id}' created successfully." @@ -738,10 +738,7 @@ def test_add_job_task_failure(self, mock_logger): batch_client = FakeClient() job_id = "my_job_id" cfa_azure.helpers.add_job( - job_id, - FAKE_BATCH_POOL, - batch_client=batch_client, - end_job_on_task_failure=False, + job_id, FAKE_BATCH_POOL, batch_client=batch_client, end_job_on_task_failure=False ) mock_logger.debug.assert_called_with("Attempting to add job.") @@ -782,9 +779,7 @@ def test_list_blobs_flat(self): ) def test_get_sp_secret(self, mock_secret): mock_secret.return_value = FakeClient.FakeSecretClient.FakeSecret() - secret = cfa_azure.helpers.get_sp_secret( - config=FAKE_CONFIG, credential=FAKE_CREDENTIAL - ) + secret = cfa_azure.helpers.get_sp_secret(config=FAKE_CONFIG, credential=FAKE_CREDENTIAL) self.assertEqual(secret, FAKE_SECRET) @patch( @@ -793,9 +788,7 @@ def test_get_sp_secret(self, mock_secret): ) def test_get_sp_secret_bad_key(self): with self.assertRaises(Exception): - cfa_azure.helpers.get_sp_secret( - config=FAKE_CONFIG, credential=FAKE_CREDENTIAL - ) + cfa_azure.helpers.get_sp_secret(config=FAKE_CONFIG, credential=FAKE_CREDENTIAL) def test_get_blob_config(self): blob_config = cfa_azure.helpers.get_blob_config( From 3941ee3c3cfc727133b7aa1759bf0c0b56034df6 Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Wed, 4 Dec 2024 19:16:29 +0000 Subject: [PATCH 09/23] Updated unit tests for added coverage --- cfa_azure/helpers.py | 6 +----- tests/fake_client.py | 4 ++-- tests/helpers_tests.py | 16 ++++++++-------- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/cfa_azure/helpers.py b/cfa_azure/helpers.py index 8c719cb..64deb94 100644 --- a/cfa_azure/helpers.py +++ b/cfa_azure/helpers.py @@ -1741,11 +1741,7 @@ def upload_docker_image( Returns: str: full container name - """ - # Generate a unique tag if none provided - if tag is None: - tag = datetime.datetime.now().strftime("%Y%m%d%H%M%S") - + """ full_container_name = f"{registry_name}.azurecr.io/{repo_name}:{tag}" # check if docker is running diff --git a/tests/fake_client.py b/tests/fake_client.py index 251565b..3050646 100644 --- a/tests/fake_client.py +++ b/tests/fake_client.py @@ -293,8 +293,8 @@ def compute_node(self) -> FakeComputeNodeList: @property def images(self): - return {FAKE_CONTAINER_IMAGE: FakeClient.FakeTag("fake_tag_1")} - + return { FAKE_CONTAINER_IMAGE: FakeClient.FakeTag("fake_tag_1") } + def get_container_client(self, container): return self.FakeContainerClient() diff --git a/tests/helpers_tests.py b/tests/helpers_tests.py index e4061c7..44be681 100644 --- a/tests/helpers_tests.py +++ b/tests/helpers_tests.py @@ -846,11 +846,11 @@ def test_list_all_nodes_by_pool(self): @patch("subprocess.run", MagicMock(return_value=True)) def test_upload_docker_image(self): full_container_name = cfa_azure.helpers.upload_docker_image( - image_name=FAKE_CONTAINER_IMAGE, + image_name=FAKE_CONTAINER_IMAGE, registry_name=FAKE_CONTAINER_REGISTRY, repo_name="Fake Repo", - tag="latest", - use_device_code=False, + tag="latest", + use_device_code=False ) self.assertIsNotNone(full_container_name) @@ -860,11 +860,11 @@ def test_upload_docker_image(self): def test_upload_docker_image_exception(self): with self.assertRaises(DockerException) as docexc: cfa_azure.helpers.upload_docker_image( - image_name=FAKE_CONTAINER_IMAGE, + image_name=FAKE_CONTAINER_IMAGE, registry_name=FAKE_CONTAINER_REGISTRY, repo_name="Fake Repo", - tag="latest", - use_device_code=False, + tag="latest", + use_device_code=False ) self.assertEqual( "Make sure Docker is running.", @@ -876,9 +876,9 @@ def test_upload_docker_image_exception(self): @patch("subprocess.run", MagicMock(return_value=True)) def test_upload_docker_image_notag(self): full_container_name = cfa_azure.helpers.upload_docker_image( - image_name=FAKE_CONTAINER_IMAGE, + image_name=FAKE_CONTAINER_IMAGE, registry_name=FAKE_CONTAINER_REGISTRY, repo_name="Fake Repo", - use_device_code=False, + use_device_code=False ) self.assertIsNotNone(full_container_name) From 53e5490d47155631abe515fa608142ab3570f708 Mon Sep 17 00:00:00 2001 From: antoniohndz1 <70829137+antoniohndz1@users.noreply.github.com> Date: Tue, 3 Dec 2024 14:31:32 -0600 Subject: [PATCH 10/23] 74 update readme and other documentation (#178) * Update README.md Bulk of updates * Update README.md Added example workflow for helpers section and added some syntax fixes * Update README.md Added remaining functions from helpers.py * Update README.md Deleting duplicate line * bump version * bump version --------- Co-authored-by: Ryan Raasch <150935395+ryanraaschCDC@users.noreply.github.com> --- README.md | 62 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index ed07952..891e4bb 100644 --- a/README.md +++ b/README.md @@ -43,14 +43,14 @@ export LOG_OUTPUT="stdout" **Using Various Credential Methods** -When instantiating a AzureClient object, there is an option to set which `credential_method` to use. Previously, only a service principal could be used. Now, there are three an options to choose `identity`, `sp`, or `env`. -- `identity`: Uses the managed identity associated with the VM where the code is running. +When instantiating a AzureClient object, there is an option to set which `credential_method` to use. Previously, only a service principal could be used. Now, there are three an options to choose `identity`, `sp`, or `env`. +- `identity`: Uses the managed identity associated with the VM where the code is running. - `sp`: Uses a service principal for the credential. The following values must be set in the configuration file: tenant_id, sp_application_id, and the corresponding secret fetched from Azure Key Vault. - `env`: Uses environment variables to create the credential. When choosing `env`, the following environment variables will need to be set: `AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, and `AZURE_CLIENT_SECRET`. You can also use `use_env_vars=True` to allow the configuration to be loaded directly from environment variables, which may be helpful in containerized environments. -By default, the managed identity option will be used. In whichever credential method is used, a secret is pulled from the key vault using the credential to create a secret client credential for interaction with various Azure services. +By default, the managed identity option will be used. In whichever credential method is used, a secret is pulled from the key vault using the credential to create a secret client credential for interaction with various Azure services. **Example:** ``` @@ -66,7 +66,7 @@ client = AzureClient(config_path="./configuration.toml", credential_method="sp") import os os.environ["AZURE_TENANT_ID"] = "your-tenant-id" os.environ["AZURE_CLIENT_ID"] = "your-client-id" -os.environ["AZURE_CLIENT_SECRET"] = "your-client-secret" #pragma: allowlist secret +os.environ["AZURE_CLIENT_SECRET"] = "your-client-secret" client = AzureClient(credential_method="env", use_env_vars=True) ``` @@ -101,7 +101,7 @@ client.set_pool_info("autoscale", ``` ### Functions -- `create_pool`: creates a new Azure batch pool using default autoscale mode +- `create_pool`: creates a new Azure batch pool using default autoscale mode **Example:** ``` client = AzureClient("./configuration.toml") @@ -118,7 +118,7 @@ client.upload_files_to_container( force_upload=True ) ``` -- `update_scale_settings`: modifies the scaling mode (fixed or autoscale) for an existing pool +- `update_scale_settings`: modifies the scaling mode (fixed or autoscale) for an existing pool **Example:** ``` # Specify new autoscale formula that will be evaluated every 30 minutes @@ -142,7 +142,7 @@ client.upload_files_to_container( # Switch to fixed scaling mode with 15 spot EC2 nodes and forced termination of current jobs client.update_scale_settings(low_priority_nodes=15, node_deallocation_option='Terminate') ``` -- update_containers: modifies the containers mounted on an existing Azure batch pool. It essentially recreates the pool with new mounts. +- update_containers: modifies the containers mounted on an existing Azure batch pool. It essentially recreates the pool with new mounts. **Example:** ``` # First create a pool @@ -248,83 +248,83 @@ Please view [this documentation](automation_README.md) on getting started with t The `helpers` module provides a collection of functions that helps manage Azure resources and perform key tasks, such as interacting with Blob storage, Azure Batch, configuration management, and data transformations. Below is an expanded overview of each function. **Functions:** -- `read_config`: reads in a configuration toml file and returns it as a Python dictionary +- `read_config`: reads in a configuration toml file and returns it as a Python dictionary ``` read_config("/path/to/config.toml") ``` -- `create_container`: creates an Azure Blob container if it doesn't already exist +- `create_container`: creates an Azure Blob container if it doesn't already exist ``` -create_container("my-container", blob_service_client) +create_container("my-container", blob_service_client) ``` - `get_autoscale_formula`: finds and reads `autoscale_formula.txt` from working directory or subdirectory ``` -get_autoscale_formula(filepath="/path/to/formula.txt") +get_autoscale_formula(filepath="/path/to/formula.txt") ``` - `get_sp_secret`: retrieves the user's service principal secret from the key vault based on the provided config file ``` -get_sp_secret(config, DefaultAzureCredential()) +get_sp_secret(config, DefaultAzureCredential()) ``` -- `get_sp_credential`: retrieves the service principal credential +- `get_sp_credential`: retrieves the service principal credential ``` get_sp_credential(config) ``` -- `get_blob_service_client`: creates a Blob Service Client for interacting with Azure Blob +- `get_blob_service_client`: creates a Blob Service Client for interacting with Azure Blob ``` blob_service_client = get_blob_service_client(config, DefaultAzureCredential()) ``` -- `get_batch_mgmt_client`: creates a Batch Management Client for interacting with Azure Batch, such as pools and jobs +- `get_batch_mgmt_client`: creates a Batch Management Client for interacting with Azure Batch, such as pools and jobs ``` -batch_mgmt_client = get_batch_mgmt_client(config, DefaultAzureCredential()) +batch_mgmt_client = get_batch_mgmt_client(config, DefaultAzureCredential()) ``` -- `create_blob_containers`: uses create_container() to create input and output containers in Azure Blob +- `create_blob_containers`: uses create_container() to create input and output containers in Azure Blob ``` create_blob_containers(blob_service_client, "input-container", "output-container") ``` -- `get_batch_pool_json`: creates a dict based on config for configuring an Azure Batch pool +- `get_batch_pool_json`: creates a dict based on config for configuring an Azure Batch pool ``` pool_config = get_batch_pool_json("input-container", "output-container", config) ``` -- `create_batch_pool`: creates a Azure Batch Pool based on info using the provided configuration details +- `create_batch_pool`: creates a Azure Batch Pool based on info using the provided configuration details ``` create_batch_pool(batch_mgmt_client, pool_config) ``` -- `list_containers`: lists the containers in Azure Blob Storage Account +- `list_containers`: lists the containers in Azure Blob Storage Account ``` list_containers(blob_service_client) ``` -- `upload_files_in_folder`: uploads all files in specified folder to the specified container +- `upload_files_in_folder`: uploads all files in specified folder to the specified container ``` upload_files_in_folder("/path/to/folder", "container-name", blob_service_client) ``` -- `get_batch_service_client`: creates a Batch Service Client object for interacting with Batch jobs +- `get_batch_service_client`: creates a Batch Service Client object for interacting with Batch jobs ``` batch_client = get_batch_service_client(config, DefaultAzureCredential()) ``` -- `add_job`: creates a new job to the specified Azure Batch pool. By default, a job remains active after completion of enclosed tasks. You can optionally specify the *mark_complete_after_tasks_run* argument to *True* if you want job to auto-complete after completion of enclosed tasks. +- `add_job`: creates a new job to the specified Azure Batch pool ``` add_job("job-id", "pool-id", True, batch_client) ``` -- `add_task_to_job`: adds a task to the specified job based on user-input Docker command +- `add_task_to_job`: adds a task to the specified job based on user-input Docker command ``` add_task_to_job("job-id", "task-id", "docker-command", batch_client) ``` -- `monitor_tasks`: monitors the tasks running in a job +- `monitor_tasks`: monitors the tasks running in a job ``` monitor_tasks("example-job-id", batch_client) ``` -- `list_files_in_container`: lists out all files stored in the specified Azure container +- `list_files_in_container`: lists out all files stored in the specified Azure container ``` list_files_in_container(container_client) ``` -- `df_to_yaml`: converts a pandas dataframe to yaml file, which is helpful for configuration and metadata storage +- `df_to_yaml`: converts a pandas dataframe to yaml file, which is helpful for configuration and metadata storage ``` df_to_yaml(dataframe, "output.yaml") ``` -- `yaml_to_df`: converts a yaml file to pandas dataframe +- `yaml_to_df`: converts a yaml file to pandas dataframe ``` yaml_to_df("input.yaml") ``` -- `edit_yaml_r0`: takes in a YAML file and produces replicate YAML files with the `r0` changed based on the specified range (i.e. start, stop, and step) +- `edit_yaml_r0`: takes in a YAML file and produces replicate YAML files with the `r0` changed based on the specified range (i.e. start, stop, and step) ``` edit_yaml_r0("input.yaml", start=1, stop=5, step=1) ``` @@ -372,6 +372,10 @@ delete_blob_folder("folder_path", "container_name", blob_service_client) ``` format_extensions([".txt", "jpg"]) ``` +- `mark_job_completed_after_tasks_run`: sets a job to be marked as complete once all tasks are finished +``` +mark_job_completed_after_tasks_run("job_id", "pool_id", batch_client) +``` - `check_autoscale_parameters`: checks which arguments are incompatible with the provided scaling mode ``` check_autoscale_parameters("autoscale", dedicated_nodes=5) From 793468d6840d193339bd73749a5ed93d0cb95ab3 Mon Sep 17 00:00:00 2001 From: Ryan Raasch <150935395+ryanraaschCDC@users.noreply.github.com> Date: Wed, 4 Dec 2024 09:00:56 -0800 Subject: [PATCH 11/23] fix download_directory function (#180) * ariks fix * new version * precommit fixes --- README.md | 58 ++++++++++++++++++++++---------------------- cfa_azure/helpers.py | 1 + 2 files changed, 30 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 891e4bb..c9b891f 100644 --- a/README.md +++ b/README.md @@ -43,14 +43,14 @@ export LOG_OUTPUT="stdout" **Using Various Credential Methods** -When instantiating a AzureClient object, there is an option to set which `credential_method` to use. Previously, only a service principal could be used. Now, there are three an options to choose `identity`, `sp`, or `env`. -- `identity`: Uses the managed identity associated with the VM where the code is running. +When instantiating a AzureClient object, there is an option to set which `credential_method` to use. Previously, only a service principal could be used. Now, there are three an options to choose `identity`, `sp`, or `env`. +- `identity`: Uses the managed identity associated with the VM where the code is running. - `sp`: Uses a service principal for the credential. The following values must be set in the configuration file: tenant_id, sp_application_id, and the corresponding secret fetched from Azure Key Vault. - `env`: Uses environment variables to create the credential. When choosing `env`, the following environment variables will need to be set: `AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, and `AZURE_CLIENT_SECRET`. You can also use `use_env_vars=True` to allow the configuration to be loaded directly from environment variables, which may be helpful in containerized environments. -By default, the managed identity option will be used. In whichever credential method is used, a secret is pulled from the key vault using the credential to create a secret client credential for interaction with various Azure services. +By default, the managed identity option will be used. In whichever credential method is used, a secret is pulled from the key vault using the credential to create a secret client credential for interaction with various Azure services. **Example:** ``` @@ -66,7 +66,7 @@ client = AzureClient(config_path="./configuration.toml", credential_method="sp") import os os.environ["AZURE_TENANT_ID"] = "your-tenant-id" os.environ["AZURE_CLIENT_ID"] = "your-client-id" -os.environ["AZURE_CLIENT_SECRET"] = "your-client-secret" +os.environ["AZURE_CLIENT_SECRET"] = "your-client-secret" #pragma: allowlist secret client = AzureClient(credential_method="env", use_env_vars=True) ``` @@ -101,7 +101,7 @@ client.set_pool_info("autoscale", ``` ### Functions -- `create_pool`: creates a new Azure batch pool using default autoscale mode +- `create_pool`: creates a new Azure batch pool using default autoscale mode **Example:** ``` client = AzureClient("./configuration.toml") @@ -118,7 +118,7 @@ client.upload_files_to_container( force_upload=True ) ``` -- `update_scale_settings`: modifies the scaling mode (fixed or autoscale) for an existing pool +- `update_scale_settings`: modifies the scaling mode (fixed or autoscale) for an existing pool **Example:** ``` # Specify new autoscale formula that will be evaluated every 30 minutes @@ -142,7 +142,7 @@ client.upload_files_to_container( # Switch to fixed scaling mode with 15 spot EC2 nodes and forced termination of current jobs client.update_scale_settings(low_priority_nodes=15, node_deallocation_option='Terminate') ``` -- update_containers: modifies the containers mounted on an existing Azure batch pool. It essentially recreates the pool with new mounts. +- update_containers: modifies the containers mounted on an existing Azure batch pool. It essentially recreates the pool with new mounts. **Example:** ``` # First create a pool @@ -248,83 +248,83 @@ Please view [this documentation](automation_README.md) on getting started with t The `helpers` module provides a collection of functions that helps manage Azure resources and perform key tasks, such as interacting with Blob storage, Azure Batch, configuration management, and data transformations. Below is an expanded overview of each function. **Functions:** -- `read_config`: reads in a configuration toml file and returns it as a Python dictionary +- `read_config`: reads in a configuration toml file and returns it as a Python dictionary ``` read_config("/path/to/config.toml") ``` -- `create_container`: creates an Azure Blob container if it doesn't already exist +- `create_container`: creates an Azure Blob container if it doesn't already exist ``` -create_container("my-container", blob_service_client) +create_container("my-container", blob_service_client) ``` - `get_autoscale_formula`: finds and reads `autoscale_formula.txt` from working directory or subdirectory ``` -get_autoscale_formula(filepath="/path/to/formula.txt") +get_autoscale_formula(filepath="/path/to/formula.txt") ``` - `get_sp_secret`: retrieves the user's service principal secret from the key vault based on the provided config file ``` -get_sp_secret(config, DefaultAzureCredential()) +get_sp_secret(config, DefaultAzureCredential()) ``` -- `get_sp_credential`: retrieves the service principal credential +- `get_sp_credential`: retrieves the service principal credential ``` get_sp_credential(config) ``` -- `get_blob_service_client`: creates a Blob Service Client for interacting with Azure Blob +- `get_blob_service_client`: creates a Blob Service Client for interacting with Azure Blob ``` blob_service_client = get_blob_service_client(config, DefaultAzureCredential()) ``` -- `get_batch_mgmt_client`: creates a Batch Management Client for interacting with Azure Batch, such as pools and jobs +- `get_batch_mgmt_client`: creates a Batch Management Client for interacting with Azure Batch, such as pools and jobs ``` -batch_mgmt_client = get_batch_mgmt_client(config, DefaultAzureCredential()) +batch_mgmt_client = get_batch_mgmt_client(config, DefaultAzureCredential()) ``` -- `create_blob_containers`: uses create_container() to create input and output containers in Azure Blob +- `create_blob_containers`: uses create_container() to create input and output containers in Azure Blob ``` create_blob_containers(blob_service_client, "input-container", "output-container") ``` -- `get_batch_pool_json`: creates a dict based on config for configuring an Azure Batch pool +- `get_batch_pool_json`: creates a dict based on config for configuring an Azure Batch pool ``` pool_config = get_batch_pool_json("input-container", "output-container", config) ``` -- `create_batch_pool`: creates a Azure Batch Pool based on info using the provided configuration details +- `create_batch_pool`: creates a Azure Batch Pool based on info using the provided configuration details ``` create_batch_pool(batch_mgmt_client, pool_config) ``` -- `list_containers`: lists the containers in Azure Blob Storage Account +- `list_containers`: lists the containers in Azure Blob Storage Account ``` list_containers(blob_service_client) ``` -- `upload_files_in_folder`: uploads all files in specified folder to the specified container +- `upload_files_in_folder`: uploads all files in specified folder to the specified container ``` upload_files_in_folder("/path/to/folder", "container-name", blob_service_client) ``` -- `get_batch_service_client`: creates a Batch Service Client object for interacting with Batch jobs +- `get_batch_service_client`: creates a Batch Service Client object for interacting with Batch jobs ``` batch_client = get_batch_service_client(config, DefaultAzureCredential()) ``` -- `add_job`: creates a new job to the specified Azure Batch pool +- `add_job`: creates a new job to the specified Azure Batch pool ``` add_job("job-id", "pool-id", True, batch_client) ``` -- `add_task_to_job`: adds a task to the specified job based on user-input Docker command +- `add_task_to_job`: adds a task to the specified job based on user-input Docker command ``` add_task_to_job("job-id", "task-id", "docker-command", batch_client) ``` -- `monitor_tasks`: monitors the tasks running in a job +- `monitor_tasks`: monitors the tasks running in a job ``` monitor_tasks("example-job-id", batch_client) ``` -- `list_files_in_container`: lists out all files stored in the specified Azure container +- `list_files_in_container`: lists out all files stored in the specified Azure container ``` list_files_in_container(container_client) ``` -- `df_to_yaml`: converts a pandas dataframe to yaml file, which is helpful for configuration and metadata storage +- `df_to_yaml`: converts a pandas dataframe to yaml file, which is helpful for configuration and metadata storage ``` df_to_yaml(dataframe, "output.yaml") ``` -- `yaml_to_df`: converts a yaml file to pandas dataframe +- `yaml_to_df`: converts a yaml file to pandas dataframe ``` yaml_to_df("input.yaml") ``` -- `edit_yaml_r0`: takes in a YAML file and produces replicate YAML files with the `r0` changed based on the specified range (i.e. start, stop, and step) +- `edit_yaml_r0`: takes in a YAML file and produces replicate YAML files with the `r0` changed based on the specified range (i.e. start, stop, and step) ``` edit_yaml_r0("input.yaml", start=1, stop=5, step=1) ``` diff --git a/cfa_azure/helpers.py b/cfa_azure/helpers.py index 64deb94..d35cfca 100644 --- a/cfa_azure/helpers.py +++ b/cfa_azure/helpers.py @@ -1742,6 +1742,7 @@ def upload_docker_image( Returns: str: full container name """ + full_container_name = f"{registry_name}.azurecr.io/{repo_name}:{tag}" # check if docker is running From 4cab92ff8611f5e7c3eca08af833a0b23485175d Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Wed, 11 Dec 2024 14:53:47 +0000 Subject: [PATCH 12/23] Auto complex the job if marked complete and all tasks are done --- cfa_azure/helpers.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cfa_azure/helpers.py b/cfa_azure/helpers.py index d35cfca..af26d89 100644 --- a/cfa_azure/helpers.py +++ b/cfa_azure/helpers.py @@ -24,8 +24,8 @@ ExitOptions, JobAction, JobConstraints, - OnAllTasksComplete, - OnTaskFailure + OnTaskFailure, + OnAllTasksComplete ) from azure.containerregistry import ContainerRegistryClient from azure.core.exceptions import HttpResponseError @@ -757,7 +757,7 @@ def add_job( uses_task_dependencies=True, on_all_tasks_complete=on_all_tasks_complete, on_task_failure=OnTaskFailure.perform_exit_options_job_action, - constraints=job_constraints, + constraints=job_constraints ) logger.debug("Attempting to add job.") try: @@ -934,7 +934,8 @@ def add_task_to_job( ) batch_client.task.add(job_id=job_id, task=task) logger.debug(f"Task '{task_id}' added to job '{job_id}'.") - return task_id + tasks.append(task_id) + return tasks def monitor_tasks( From 027fc66c99a799d14fe2b755255c213fcd3dbf21 Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Thu, 12 Dec 2024 20:03:29 +0000 Subject: [PATCH 13/23] Updated documentation and removed batch module --- tests/batch_tests.py | 187 ------------------------------------------- 1 file changed, 187 deletions(-) delete mode 100644 tests/batch_tests.py diff --git a/tests/batch_tests.py b/tests/batch_tests.py deleted file mode 100644 index 0f4621c..0000000 --- a/tests/batch_tests.py +++ /dev/null @@ -1,187 +0,0 @@ -# ruff: noqa: F403, F405 - -import unittest -from unittest.mock import MagicMock, patch - -from callee import Contains - -import cfa_azure -import cfa_azure.batch -from tests.fake_client import * - - -class TestBatch(unittest.TestCase): - @patch("builtins.print") - @patch( - "cfa_azure.helpers.read_config", MagicMock(return_value=FAKE_CONFIG) - ) - @patch("cfa_azure.helpers.get_sp_secret", MagicMock(return_value=True)) - @patch( - "cfa_azure.helpers.get_blob_service_client", - MagicMock(return_value=True), - ) - @patch( - "cfa_azure.helpers.get_batch_mgmt_client", MagicMock(return_value=True) - ) - @patch( - "cfa_azure.helpers.get_batch_pool_json", MagicMock(return_value=True) - ) - @patch( - "cfa_azure.helpers.create_blob_containers", - MagicMock(return_value=True), - ) - @patch( - "cfa_azure.helpers.create_batch_pool", - MagicMock(return_value=FAKE_BATCH_POOL), - ) - def test_create_pool(self, mock_print): - input_container_name = FAKE_INPUT_CONTAINER - output_container_name = FAKE_OUTPUT_CONTAINER - config_path = "some_path" - autoscale_formula_path = "test_formula" - status = cfa_azure.batch.create_pool( - FAKE_BATCH_POOL, - input_container_name, - output_container_name, - config_path, - autoscale_formula_path, - ) - mock_print.assert_called_with( - Contains("Pool creation process completed") - ) - self.assertTrue(status) - - @patch("builtins.print") - @patch( - "cfa_azure.helpers.read_config", MagicMock(return_value=FAKE_CONFIG) - ) - @patch("cfa_azure.helpers.get_sp_secret", MagicMock(return_value=True)) - @patch( - "cfa_azure.helpers.get_blob_service_client", - MagicMock(return_value=True), - ) - @patch( - "cfa_azure.helpers.get_batch_mgmt_client", - MagicMock(return_value=FakeClient()), - ) - @patch("builtins.input", MagicMock(return_value="n")) - @patch( - "cfa_azure.helpers.get_batch_pool_json", MagicMock(return_value=True) - ) - @patch( - "cfa_azure.helpers.create_blob_containers", - MagicMock(return_value=True), - ) - @patch( - "cfa_azure.helpers.create_batch_pool", - MagicMock(return_value=FAKE_BATCH_POOL), - ) - def test_create_pool_if_already_exists(self, mock_print): - input_container_name = FAKE_INPUT_CONTAINER - output_container_name = FAKE_OUTPUT_CONTAINER - config_path = "some_path" - autoscale_formula_path = "test_formula" - cfa_azure.batch.create_pool( - FAKE_BATCH_POOL, - input_container_name, - output_container_name, - config_path, - autoscale_formula_path, - ) - mock_print.assert_called_with( - "No pool created since it already exists. Exiting the process." - ) - - @patch("builtins.print") - @patch("toml.load", MagicMock(return_value=FAKE_CONFIG)) - @patch("cfa_azure.helpers.get_sp_secret", MagicMock(return_value=True)) - @patch( - "cfa_azure.helpers.get_batch_service_client", - MagicMock(return_value=FakeClient()), - ) - @patch("cfa_azure.helpers.add_job", MagicMock(return_value=True)) - @patch("cfa_azure.helpers.add_task_to_job", MagicMock(return_value=True)) - @patch("cfa_azure.helpers.monitor_tasks", MagicMock(return_value=True)) - @patch( - "cfa_azure.helpers.list_files_in_container", - MagicMock(return_value=FAKE_FOLDER_CONTENTS), - ) - def test_run_job(self, mock_print): - cfa_azure.batch.run_job( - "test_job_id", - "test_task_id", - "docker run something", - FAKE_INPUT_CONTAINER, - FAKE_OUTPUT_CONTAINER, - ) - mock_print.assert_called_with( - "Job complete. Time to debug. Job not deleted." - ) - - @patch("builtins.print") - @patch("toml.load", MagicMock(return_value=FAKE_CONFIG)) - @patch("cfa_azure.helpers.get_sp_secret", MagicMock(return_value=True)) - @patch( - "cfa_azure.helpers.get_batch_service_client", - MagicMock(return_value=FakeClient()), - ) - @patch("cfa_azure.helpers.add_job", MagicMock(return_value=True)) - @patch("cfa_azure.helpers.add_task_to_job", MagicMock(return_value=True)) - @patch("cfa_azure.helpers.monitor_tasks", MagicMock(return_value=True)) - @patch( - "cfa_azure.helpers.list_files_in_container", - MagicMock(return_value=FAKE_FOLDER_CONTENTS), - ) - def test_run_job_missing_files(self, mock_print): - cfa_azure.batch.run_job( - "test_job_id", - "test_task_id", - "docker run something", - FAKE_INPUT_CONTAINER, - FAKE_OUTPUT_CONTAINER, - input_files=['test_file.csv'] - ) - mock_print.assert_called_with( - "Not all input files exist in container. Closing job." - ) - - @patch("builtins.print") - @patch("toml.load", MagicMock(return_value=FAKE_CONFIG)) - @patch("cfa_azure.helpers.get_sp_secret", MagicMock(return_value=True)) - @patch( - "cfa_azure.helpers.get_batch_service_client", - MagicMock(return_value=FakeClient()), - ) - @patch("cfa_azure.helpers.add_job", MagicMock(return_value=True)) - @patch("cfa_azure.helpers.add_task_to_job", MagicMock(return_value=True)) - @patch("cfa_azure.helpers.monitor_tasks", MagicMock(return_value=True)) - @patch( - "cfa_azure.helpers.list_files_in_container", - MagicMock(return_value=FAKE_FOLDER_CONTENTS), - ) - def test_run_job_no_debugging(self, mock_print): - cfa_azure.batch.run_job( - "test_job_id", - "test_task_id", - "docker run something", - FAKE_INPUT_CONTAINER, - FAKE_OUTPUT_CONTAINER, - debug=False, - ) - mock_print.assert_called_with("Cleaning up - deleting job.") - - @patch("builtins.print") - @patch("subprocess.call", MagicMock(return_value=0)) - def test_package_and_upload_dockerfile(self, mock_print): - cfa_azure.batch.package_and_upload_dockerfile(FAKE_CONFIG) - mock_print.assert_called_with( - "Dockerfile packaged and uploaded successfully." - ) - - @patch("builtins.print") - @patch("subprocess.call", MagicMock(return_value=-1)) - def test_package_and_upload_dockerfile_failure(self, mock_print): - cfa_azure.batch.package_and_upload_dockerfile(FAKE_CONFIG) - mock_print.assert_called_with( - "Failed to package and upload Dockerfile." - ) From 9d7677c8d95a876224f5d40c3c640a94fcd40421 Mon Sep 17 00:00:00 2001 From: "Katie Gostic (she/her)" Date: Fri, 13 Dec 2024 14:15:40 -0800 Subject: [PATCH 14/23] Revert "Remove pre commit" (#187) * Revert "Remove pre commit" This reverts commit 60e1c870af996703523d65732925716600b4653e. * Styler changes * renamed main to master --- cfa_azure/clients.py | 4 ++-- cfa_azure/helpers.py | 6 +++--- tests/clients_tests.py | 47 +++++++++++++++++++++++++++++++++--------- tests/fake_client.py | 26 ++++++++++++----------- tests/helpers_tests.py | 33 +++++++++++++++++------------ 5 files changed, 76 insertions(+), 40 deletions(-) diff --git a/cfa_azure/clients.py b/cfa_azure/clients.py index 901b751..32929db 100644 --- a/cfa_azure/clients.py +++ b/cfa_azure/clients.py @@ -664,7 +664,7 @@ def update_containers( container_registry_server=self.container_registry_server, config=self.config, mount_config=mount_config, - credential=self.secret_cred + credential=self.secret_cred, ) self.create_pool(pool_name) return pool_name @@ -777,7 +777,7 @@ def update_container_set( container_registry_server=self.container_registry_server, config=self.config, mount_config=mount_config, - credential=self.secret_cred + credential=self.secret_cred, ) self.create_pool(pool_name) return pool_name diff --git a/cfa_azure/helpers.py b/cfa_azure/helpers.py index af26d89..0c96948 100644 --- a/cfa_azure/helpers.py +++ b/cfa_azure/helpers.py @@ -24,8 +24,8 @@ ExitOptions, JobAction, JobConstraints, + OnAllTasksComplete, OnTaskFailure, - OnAllTasksComplete ) from azure.containerregistry import ContainerRegistryClient from azure.core.exceptions import HttpResponseError @@ -757,7 +757,7 @@ def add_job( uses_task_dependencies=True, on_all_tasks_complete=on_all_tasks_complete, on_task_failure=OnTaskFailure.perform_exit_options_job_action, - constraints=job_constraints + constraints=job_constraints, ) logger.debug("Attempting to add job.") try: @@ -1742,7 +1742,7 @@ def upload_docker_image( Returns: str: full container name - """ + """ full_container_name = f"{registry_name}.azurecr.io/{repo_name}:{tag}" diff --git a/tests/clients_tests.py b/tests/clients_tests.py index 21fc784..1414ab4 100644 --- a/tests/clients_tests.py +++ b/tests/clients_tests.py @@ -10,9 +10,18 @@ class TestClients(unittest.TestCase): @patch("cfa_azure.clients.logger") - @patch("azure.identity.ClientSecretCredential.__init__", MagicMock(return_value=None)) - @patch("azure.common.credentials.ServicePrincipalCredentials.__init__", MagicMock(return_value=None)) - @patch("cfa_azure.helpers.read_config", MagicMock(return_value=FAKE_CONFIG_MINIMAL)) + @patch( + "azure.identity.ClientSecretCredential.__init__", + MagicMock(return_value=None), + ) + @patch( + "azure.common.credentials.ServicePrincipalCredentials.__init__", + MagicMock(return_value=None), + ) + @patch( + "cfa_azure.helpers.read_config", + MagicMock(return_value=FAKE_CONFIG_MINIMAL), + ) @patch("cfa_azure.helpers.check_config_req", MagicMock(return_value=True)) @patch("cfa_azure.helpers.get_sp_secret", MagicMock(return_value=True)) @patch( @@ -494,10 +503,20 @@ def test_update_container_set_forced(self): ) self.assertEqual(pool_name, FAKE_BATCH_POOL) - @patch("cfa_azure.helpers.check_pool_exists", MagicMock(return_value=False)) - @patch("cfa_azure.helpers.get_batch_service_client", MagicMock(return_value=FakeClient())) - @patch("cfa_azure.helpers.delete_pool", MagicMock(return_value=FakeClient())) - @patch("cfa_azure.helpers.create_batch_pool", MagicMock(return_value=FAKE_BATCH_POOL)) + @patch( + "cfa_azure.helpers.check_pool_exists", MagicMock(return_value=False) + ) + @patch( + "cfa_azure.helpers.get_batch_service_client", + MagicMock(return_value=FakeClient()), + ) + @patch( + "cfa_azure.helpers.delete_pool", MagicMock(return_value=FakeClient()) + ) + @patch( + "cfa_azure.helpers.create_batch_pool", + MagicMock(return_value=FAKE_BATCH_POOL), + ) def test_update_containers_new_pool(self): containers = [ {"name": FAKE_INPUT_CONTAINER, "relative_mount_dir": "input"}, @@ -513,9 +532,17 @@ def test_update_containers_new_pool(self): @patch("cfa_azure.clients.logger") @patch("cfa_azure.helpers.check_pool_exists", MagicMock(return_value=True)) - @patch("cfa_azure.helpers.get_batch_service_client", MagicMock(return_value=FakeClient())) - @patch("cfa_azure.helpers.delete_pool", MagicMock(return_value=FakeClient())) - @patch("cfa_azure.helpers.create_batch_pool", MagicMock(return_value=FAKE_BATCH_POOL)) + @patch( + "cfa_azure.helpers.get_batch_service_client", + MagicMock(return_value=FakeClient()), + ) + @patch( + "cfa_azure.helpers.delete_pool", MagicMock(return_value=FakeClient()) + ) + @patch( + "cfa_azure.helpers.create_batch_pool", + MagicMock(return_value=FAKE_BATCH_POOL), + ) def test_update_containers(self, mock_logger): pool_name = self.azure_client.update_containers( input_container_name=FAKE_INPUT_CONTAINER, diff --git a/tests/fake_client.py b/tests/fake_client.py index 3050646..230d122 100644 --- a/tests/fake_client.py +++ b/tests/fake_client.py @@ -252,17 +252,19 @@ def vm_size(self): @property def scale_settings(self): - return dict2obj({ - "fixed_scale": { - "targetDedicatedNodes": 10, - "targetLowPriorityNodes": 5, - "resizeTimeout": 10, - }, - "auto_scale": { - "evaluationInterval": 10, - "formula": FAKE_AUTOSCALE_FORMULA + return dict2obj( + { + "fixed_scale": { + "targetDedicatedNodes": 10, + "targetLowPriorityNodes": 5, + "resizeTimeout": 10, + }, + "auto_scale": { + "evaluationInterval": 10, + "formula": FAKE_AUTOSCALE_FORMULA, + }, } - }) + ) def get(self): return True @@ -293,8 +295,8 @@ def compute_node(self) -> FakeComputeNodeList: @property def images(self): - return { FAKE_CONTAINER_IMAGE: FakeClient.FakeTag("fake_tag_1") } - + return {FAKE_CONTAINER_IMAGE: FakeClient.FakeTag("fake_tag_1")} + def get_container_client(self, container): return self.FakeContainerClient() diff --git a/tests/helpers_tests.py b/tests/helpers_tests.py index 44be681..f235f78 100644 --- a/tests/helpers_tests.py +++ b/tests/helpers_tests.py @@ -633,7 +633,7 @@ def test_check_azure_container_exists(self): registry_name=FAKE_CONTAINER_REGISTRY, repo_name="Fake Repo", tag_name="latest", - credential=FAKE_CREDENTIAL + credential=FAKE_CREDENTIAL, ) self.assertTrue(response) @@ -649,7 +649,7 @@ def test_check_azure_container_exists_missing_tag(self): registry_name=FAKE_CONTAINER_REGISTRY, repo_name="Fake Repo", tag_name="bad_tag_1", - credential=FAKE_CREDENTIAL + credential=FAKE_CREDENTIAL, ) self.assertIsNone(response) @@ -738,7 +738,10 @@ def test_add_job_task_failure(self, mock_logger): batch_client = FakeClient() job_id = "my_job_id" cfa_azure.helpers.add_job( - job_id, FAKE_BATCH_POOL, batch_client=batch_client, end_job_on_task_failure=False + job_id, + FAKE_BATCH_POOL, + batch_client=batch_client, + end_job_on_task_failure=False, ) mock_logger.debug.assert_called_with("Attempting to add job.") @@ -779,7 +782,9 @@ def test_list_blobs_flat(self): ) def test_get_sp_secret(self, mock_secret): mock_secret.return_value = FakeClient.FakeSecretClient.FakeSecret() - secret = cfa_azure.helpers.get_sp_secret(config=FAKE_CONFIG, credential=FAKE_CREDENTIAL) + secret = cfa_azure.helpers.get_sp_secret( + config=FAKE_CONFIG, credential=FAKE_CREDENTIAL + ) self.assertEqual(secret, FAKE_SECRET) @patch( @@ -788,7 +793,9 @@ def test_get_sp_secret(self, mock_secret): ) def test_get_sp_secret_bad_key(self): with self.assertRaises(Exception): - cfa_azure.helpers.get_sp_secret(config=FAKE_CONFIG, credential=FAKE_CREDENTIAL) + cfa_azure.helpers.get_sp_secret( + config=FAKE_CONFIG, credential=FAKE_CREDENTIAL + ) def test_get_blob_config(self): blob_config = cfa_azure.helpers.get_blob_config( @@ -846,11 +853,11 @@ def test_list_all_nodes_by_pool(self): @patch("subprocess.run", MagicMock(return_value=True)) def test_upload_docker_image(self): full_container_name = cfa_azure.helpers.upload_docker_image( - image_name=FAKE_CONTAINER_IMAGE, + image_name=FAKE_CONTAINER_IMAGE, registry_name=FAKE_CONTAINER_REGISTRY, repo_name="Fake Repo", - tag="latest", - use_device_code=False + tag="latest", + use_device_code=False, ) self.assertIsNotNone(full_container_name) @@ -860,11 +867,11 @@ def test_upload_docker_image(self): def test_upload_docker_image_exception(self): with self.assertRaises(DockerException) as docexc: cfa_azure.helpers.upload_docker_image( - image_name=FAKE_CONTAINER_IMAGE, + image_name=FAKE_CONTAINER_IMAGE, registry_name=FAKE_CONTAINER_REGISTRY, repo_name="Fake Repo", - tag="latest", - use_device_code=False + tag="latest", + use_device_code=False, ) self.assertEqual( "Make sure Docker is running.", @@ -876,9 +883,9 @@ def test_upload_docker_image_exception(self): @patch("subprocess.run", MagicMock(return_value=True)) def test_upload_docker_image_notag(self): full_container_name = cfa_azure.helpers.upload_docker_image( - image_name=FAKE_CONTAINER_IMAGE, + image_name=FAKE_CONTAINER_IMAGE, registry_name=FAKE_CONTAINER_REGISTRY, repo_name="Fake Repo", - use_device_code=False + use_device_code=False, ) self.assertIsNotNone(full_container_name) From 540ed88a492bec69e908a38069e6d54d3ef79133 Mon Sep 17 00:00:00 2001 From: Ryan Raasch Date: Thu, 12 Dec 2024 18:56:11 +0000 Subject: [PATCH 15/23] fix add_task return as string not list --- cfa_azure/automation.py | 2 +- cfa_azure/helpers.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/cfa_azure/automation.py b/cfa_azure/automation.py index af7ec57..ed69361 100644 --- a/cfa_azure/automation.py +++ b/cfa_azure/automation.py @@ -263,4 +263,4 @@ def run_tasks(task_config: str, auth_config: str | None = None): if "monitor_job" in task_toml["job"].keys(): if task_toml["job"]["monitor_job"] is True: client.monitor_job(job_id) - return None \ No newline at end of file + return None diff --git a/cfa_azure/helpers.py b/cfa_azure/helpers.py index 0c96948..a2ff9d8 100644 --- a/cfa_azure/helpers.py +++ b/cfa_azure/helpers.py @@ -25,7 +25,7 @@ JobAction, JobConstraints, OnAllTasksComplete, - OnTaskFailure, + OnTaskFailure ) from azure.containerregistry import ContainerRegistryClient from azure.core.exceptions import HttpResponseError @@ -937,7 +937,6 @@ def add_task_to_job( tasks.append(task_id) return tasks - def monitor_tasks( job_id: str, timeout: int, @@ -1743,7 +1742,6 @@ def upload_docker_image( Returns: str: full container name """ - full_container_name = f"{registry_name}.azurecr.io/{repo_name}:{tag}" # check if docker is running From 99518d6f70bbc1a70ff4ec4c215d394d00673c70 Mon Sep 17 00:00:00 2001 From: Ryan Raasch Date: Thu, 12 Dec 2024 22:48:30 +0000 Subject: [PATCH 16/23] updated automation and new readme --- cfa_azure/automation.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/cfa_azure/automation.py b/cfa_azure/automation.py index ed69361..2fb07f4 100644 --- a/cfa_azure/automation.py +++ b/cfa_azure/automation.py @@ -1,8 +1,6 @@ import itertools - import pandas as pd import toml - from cfa_azure import helpers from cfa_azure.clients import AzureClient From 8515a8245fd7f1e317ed344977118e1636754207 Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Thu, 19 Dec 2024 06:05:44 +0000 Subject: [PATCH 17/23] Bug fix --- cfa_azure/clients.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cfa_azure/clients.py b/cfa_azure/clients.py index 32929db..ad05f4d 100644 --- a/cfa_azure/clients.py +++ b/cfa_azure/clients.py @@ -235,6 +235,7 @@ def _initialize_registry(self): self.container_registry_server = None self.container_image_name = None self.full_container_name = None + registry_name = None if "registry_name" in self.config["Container"].keys(): registry_name = self.config["Container"]["registry_name"] self.container_registry_server = f"{registry_name}.azurecr.io" From 6184ac74354c897bb98cca6f6b55777e6945d3f0 Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Mon, 23 Dec 2024 17:25:26 +0000 Subject: [PATCH 18/23] FIxed rebase issues --- cfa_azure/batch.py | 232 ------------------------------------------- cfa_azure/helpers.py | 6 +- 2 files changed, 3 insertions(+), 235 deletions(-) delete mode 100644 cfa_azure/batch.py diff --git a/cfa_azure/batch.py b/cfa_azure/batch.py deleted file mode 100644 index 7a11807..0000000 --- a/cfa_azure/batch.py +++ /dev/null @@ -1,232 +0,0 @@ -import datetime -import subprocess as sp - -import toml - -from cfa_azure import helpers -from azure.identity import DefaultAzureCredential - -def create_pool( - pool_id: str, - input_container_name: str, - output_container_name: str, - config_path: str, - autoscale_formula_path: str, -): - """Creates pool(s) in Azure Batch if not exists along with input - and output containers based on config. - - Args: - pool_id (str): Name of the pool to use. - input_container_name (str): Name to be used for input Blob container. - output_container_name (str): Name to be used for output Blob container. - config_path (str): Path to config file. - autoscale_formula_path (str): Path to autoscale formula file. - - Returns: - json: JSON containing pool_ID and creation_time. - """ - - print("Starting the pool creation process...") - - # Load config - print("Loading configuration...") - config = helpers.read_config(config_path) - - # Get credentials - print("Retrieving service principal credentials...") - sp_credential=DefaultAzureCredential() - sp_secret = helpers.get_sp_secret(config=config, credential=sp_credential) - - # Create blob service account - print("Setting up Blob service client...") - blob_service_client = helpers.get_blob_service_client( - sp_credential, config - ) - - print("Setting up Azure Batch management client...") - batch_mgmt_client = helpers.get_batch_mgmt_client(sp_credential, config) - - print("Preparing batch pool configuration...") - batch_json = helpers.get_batch_pool_json( - input_container_name, - output_container_name, - config, - autoscale_formula_path, - ) - - account_name = config["Batch"]["batch_account_name"] - resource_group_name = config["Authentication"]["resource_group"] - - ####updates - # take in pool-id - # check if pool-id already exists in environment - exists = 0 - try: - pool_info = batch_mgmt_client.pool.get( - resource_group_name, account_name, pool_name=pool_id - ) - exists = 1 - except Exception as e: - print(e) - # check if user wants to proceed - - if exists == 1: - print(f"{pool_id} already exists.") - print(f"Created: {pool_info.creation_time}") - print(f"Last modified: {pool_info.last_modified}") - print(f"VM size: {pool_info.vm_size}") - - # Check if user wants to proceed if the pool already exists - if exists == 1: - cont = input("Do you still want to use this pool? [Y/n]: ") - if cont.lower() != "y": - print( - "No pool created since it already exists. Exiting the process." - ) - return None - - print("Creating input and output containers...") - - start_time = datetime.datetime.now() - - helpers.create_blob_containers( - blob_service_client, input_container_name, output_container_name - ) - - print(f"Creating the pool '{pool_id}'...") - pool_id = helpers.create_batch_pool(batch_mgmt_client, batch_json) - print(f"Pool '{pool_id}' created successfully.") - - end_time = datetime.datetime.now() - creation_time = round((end_time - start_time).total_seconds(), 2) - print(f"Pool creation process completed in {creation_time} seconds.") - - return { - "pool_id": pool_id, - "creation_time": creation_time, - } - - -def upload_files_to_container( - folder_names: list[str], - input_container_name: str, - blob_service_client: object, - verbose: bool = False, - force_upload: bool = False, -): - """Uploads the files in specified folders to a Blob container. - Args: - blob_service_client (object): Blob service client class. - folder_names (list[str]): A list of folder names from which all files will be uploaded. - input_container_name (str): Name of input container to upload files to. - - Returns: - list: List of input file names that were uploaded. - """ - print(f"Starting to upload files to container: {input_container_name}") - input_files = [] # Empty list of input files - for _folder in folder_names: - # Add uploaded file names to input files list - uploaded_files = helpers.upload_files_in_folder( - _folder, - input_container_name, - blob_service_client, - verbose, - force_upload, - ) - input_files += uploaded_files - print(f"Uploaded {len(uploaded_files)} files from {_folder}.") - print(f"Finished uploading files to container: {input_container_name}") - return input_files - - -def run_job( - job_id: str, - task_id_base: str, - docker_cmd: str, - input_container_name: str, - output_container_name: str, - input_files: list[str] | None = None, - timeout: int = 90, - config_path: str = "./configuration.toml", - debug: bool = True, -): - print(f"Starting job: {job_id}") - # Load config - config = toml.load(config_path) - - # Get credentials - sp_credential = DefaultAzureCredential() - sp_secret = helpers.get_sp_secret(config=config, credential=sp_credential) - - # Check input_files - print("Checking input files against container contents...") - if input_files: - missing_files = [] - container_files = helpers.list_files_in_container( - input_container_name, sp_credential, config - ) - # Check files exist in the container - for f in input_files: - if f not in container_files: - missing_files.append(f) # Gather list of missing files - if missing_files: - print("The following input files are missing from the container:") - for m in missing_files: - print(f" {m}") - print("Not all input files exist in container. Closing job.") - return None - else: - input_files = helpers.list_files_in_container( - input_container_name, sp_credential, config - ) - print(f"All files in container '{input_container_name}' will be used.") - - # Get the batch service client - batch_client = helpers.get_batch_service_client(sp_secret, config) - - # Add the job to the pool - print(f"Adding job '{job_id}' to the pool...") - helpers.add_job(job_id, batch_client, config) - - # Add the tasks to the job - print(f"Adding tasks to job '{job_id}'...") - helpers.add_task_to_job( - job_id, task_id_base, docker_cmd, input_files, batch_client, config - ) - - # Monitor tasks - print(f"Monitoring tasks for job '{job_id}'...") - monitor = helpers.monitor_tasks(batch_client, job_id, timeout) - print(monitor) - - if debug: - print("Job complete. Time to debug. Job not deleted.") - else: - print("Cleaning up - deleting job.") - batch_client.job.delete(job_id) - - -def package_and_upload_dockerfile(config: dict): - """Packages and uploads Dockerfile to Azure Container Registry. - - Args: - config (dict): Config dictionary with container_account_name and container_name. - """ - print("Packaging and uploading Dockerfile to Azure Container Registry...") - container_account_name = config["Container"]["container_account_name"] - name_and_tag = config["Container"]["container_name"] - # Execute the shell script to package and upload the container - result = sp.call( - [ - "bash", - "cfa_azure/package_and_upload_container.sh", - container_account_name, - name_and_tag, - ] - ) - if result == 0: - print("Dockerfile packaged and uploaded successfully.") - else: - print("Failed to package and upload Dockerfile.") diff --git a/cfa_azure/helpers.py b/cfa_azure/helpers.py index a2ff9d8..637db2d 100644 --- a/cfa_azure/helpers.py +++ b/cfa_azure/helpers.py @@ -895,8 +895,8 @@ def add_task_to_job( else: full_cmd = d_cmd_str - tasks = [] if input_files: + tasks = [] for i, input_file in enumerate(input_files): config_stem = "_".join(input_file.split(".")[:-1]).split("/")[-1] id = task_id_base + "-" + config_stem @@ -917,6 +917,7 @@ def add_task_to_job( ) batch_client.task.add(job_id=job_id, task=task) print(f"Task '{id}' added to job '{job_id}'.") + return tasks else: command_line = full_cmd logger.debug(f"Adding task {task_id}") @@ -934,8 +935,7 @@ def add_task_to_job( ) batch_client.task.add(job_id=job_id, task=task) logger.debug(f"Task '{task_id}' added to job '{job_id}'.") - tasks.append(task_id) - return tasks + return task_id def monitor_tasks( job_id: str, From 3191a5cb54212130da198f8505481c379847f43d Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Mon, 23 Dec 2024 17:33:45 +0000 Subject: [PATCH 19/23] Workflow issues --- tests/helpers_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/helpers_tests.py b/tests/helpers_tests.py index f235f78..ef74e86 100644 --- a/tests/helpers_tests.py +++ b/tests/helpers_tests.py @@ -741,7 +741,7 @@ def test_add_job_task_failure(self, mock_logger): job_id, FAKE_BATCH_POOL, batch_client=batch_client, - end_job_on_task_failure=False, + end_job_on_task_failure=False ) mock_logger.debug.assert_called_with("Attempting to add job.") From f287a19a2079c6bf8c36409ad0c622341abaf240 Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Mon, 23 Dec 2024 17:44:46 +0000 Subject: [PATCH 20/23] Removed documentation sections that were rebased incorrectly --- README.md | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index c9b891f..f5df1cd 100644 --- a/README.md +++ b/README.md @@ -300,7 +300,8 @@ upload_files_in_folder("/path/to/folder", "container-name", blob_service_client) ``` batch_client = get_batch_service_client(config, DefaultAzureCredential()) ``` -- `add_job`: creates a new job to the specified Azure Batch pool +- `add_job`: creates a new job to the specified Azure Batch pool. By default, a job remains active after completion of enclosed tasks. You can optionally specify the *mark_complete_after_tasks_run* argument to *True* if you want job to auto-complete after completion of enclosed tasks. + ``` add_job("job-id", "pool-id", True, batch_client) ``` @@ -372,10 +373,6 @@ delete_blob_folder("folder_path", "container_name", blob_service_client) ``` format_extensions([".txt", "jpg"]) ``` -- `mark_job_completed_after_tasks_run`: sets a job to be marked as complete once all tasks are finished -``` -mark_job_completed_after_tasks_run("job_id", "pool_id", batch_client) -``` - `check_autoscale_parameters`: checks which arguments are incompatible with the provided scaling mode ``` check_autoscale_parameters("autoscale", dedicated_nodes=5) From 39762b8c2783c718dbec64ec44e72b4c4c23889b Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Mon, 23 Dec 2024 18:57:19 +0000 Subject: [PATCH 21/23] Updated files per precommit --- README.md | 3 +- automation_README.md | 2 +- cfa_azure/automation.py | 2 ++ cfa_azure/clients.py | 78 +++++++++++++++++++++++++++++++---------- cfa_azure/helpers.py | 3 +- tests/helpers_tests.py | 4 +-- 6 files changed, 67 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index f5df1cd..3fe3845 100644 --- a/README.md +++ b/README.md @@ -301,7 +301,6 @@ upload_files_in_folder("/path/to/folder", "container-name", blob_service_client) batch_client = get_batch_service_client(config, DefaultAzureCredential()) ``` - `add_job`: creates a new job to the specified Azure Batch pool. By default, a job remains active after completion of enclosed tasks. You can optionally specify the *mark_complete_after_tasks_run* argument to *True* if you want job to auto-complete after completion of enclosed tasks. - ``` add_job("job-id", "pool-id", True, batch_client) ``` @@ -544,4 +543,4 @@ collaboration and collaborative potential. All government records will be published through the [CDC web site](http://www.cdc.gov). ## Additional Standard Notices -Please refer to [CDC's Template Repository](https://github.com/CDCgov/template) for more information about [contributing to this repository](https://github.com/CDCgov/template/blob/main/CONTRIBUTING.md), [public domain notices and disclaimers](https://github.com/CDCgov/template/blob/main/DISCLAIMER.md), and [code of conduct](https://github.com/CDCgov/template/blob/main/code-of-conduct.md). \ No newline at end of file +Please refer to [CDC's Template Repository](https://github.com/CDCgov/template) for more information about [contributing to this repository](https://github.com/CDCgov/template/blob/main/CONTRIBUTING.md), [public domain notices and disclaimers](https://github.com/CDCgov/template/blob/main/DISCLAIMER.md), and [code of conduct](https://github.com/CDCgov/template/blob/main/code-of-conduct.md). diff --git a/automation_README.md b/automation_README.md index 68f4b52..7361d53 100644 --- a/automation_README.md +++ b/automation_README.md @@ -94,4 +94,4 @@ You can then run the tasks in two lines of code, as shown below. from cfa_azure.automation import run_tasks run_experiment(task_config = "path/to/task_config.toml", auth_config = "path/to/auth_config.toml") -``` \ No newline at end of file +``` diff --git a/cfa_azure/automation.py b/cfa_azure/automation.py index 2fb07f4..ed69361 100644 --- a/cfa_azure/automation.py +++ b/cfa_azure/automation.py @@ -1,6 +1,8 @@ import itertools + import pandas as pd import toml + from cfa_azure import helpers from cfa_azure.clients import AzureClient diff --git a/cfa_azure/clients.py b/cfa_azure/clients.py index ad05f4d..5a9c8c0 100644 --- a/cfa_azure/clients.py +++ b/cfa_azure/clients.py @@ -154,7 +154,6 @@ def __init__( # get credentials self._initialize_authentication(credential_method) logger.debug(f"generated credentials from {credential_method}.") - # initialize registry self._initialize_registry() # create blob service account self.blob_service_client = helpers.get_blob_service_client( @@ -182,7 +181,9 @@ def _initialize_authentication(self, credential_method): config (str): config dict """ if "credential_method" in self.config["Authentication"].keys(): - self.credential_method = credential_method = self.config["Authentication"]["credential_method"] + self.credential_method = credential_method = self.config[ + "Authentication" + ]["credential_method"] else: self.credential_method = credential_method if "identity" in self.credential_method.lower(): @@ -240,12 +241,12 @@ def _initialize_registry(self): registry_name = self.config["Container"]["registry_name"] self.container_registry_server = f"{registry_name}.azurecr.io" self.registry_url = f"https://{self.container_registry_server}" - else: + else: self.registry_url = None if "repository_name" in self.config["Container"].keys(): repository_name = self.config["Container"]["repository_name"] - + if "tag_name" in self.config["Container"].keys(): tag_name = self.config["Container"]["tag_name"] else: @@ -253,28 +254,59 @@ def _initialize_registry(self): if registry_name and repository_name: self.set_azure_container( - registry_name=registry_name, repo_name=repository_name, tag_name=tag_name + registry_name=registry_name, + repo_name=repository_name, + tag_name=tag_name, ) def _initialize_pool(self): """Called by init to initialize the pool""" self.pool_parameters = None - self.pool_name = self.config["Batch"]["pool_name"] if "pool_name" in self.config["Batch"].keys() else None - self.scaling = self.config["Batch"]["scaling_mode"] if "scaling_mode" in self.config["Batch"].keys() else None + self.pool_name = ( + self.config["Batch"]["pool_name"] + if "pool_name" in self.config["Batch"].keys() + else None + ) + self.scaling = ( + self.config["Batch"]["scaling_mode"] + if "scaling_mode" in self.config["Batch"].keys() + else None + ) if self.pool_name: - if self.scaling == "autoscale" and "autoscale_formula_path" in self.config["Batch"].keys(): - autoscale_formula_path = self.config["Batch"]["autoscale_formula_path"] + if ( + self.scaling == "autoscale" + and "autoscale_formula_path" in self.config["Batch"].keys() + ): + autoscale_formula_path = self.config["Batch"][ + "autoscale_formula_path" + ] print("Creating pool with autoscaling mode") - self.set_pool_info(mode=self.scaling, autoscale_formula_path=autoscale_formula_path) + self.set_pool_info( + mode=self.scaling, + autoscale_formula_path=autoscale_formula_path, + ) elif self.scaling == "fixed": - dedicated_nodes = self.config["Batch"]["dedicated_nodes"] if "dedicated_nodes" in self.config["Batch"].keys() else 0 - low_priority_nodes = self.config["Batch"]["low_priority_nodes"] if "low_priority_nodes" in self.config["Batch"].keys() else 1 - node_deallocation_option = self.config["Batch"]["node_deallocation_option"] if "node_deallocation_option" in self.config["Batch"].keys() else None + dedicated_nodes = ( + self.config["Batch"]["dedicated_nodes"] + if "dedicated_nodes" in self.config["Batch"].keys() + else 0 + ) + low_priority_nodes = ( + self.config["Batch"]["low_priority_nodes"] + if "low_priority_nodes" in self.config["Batch"].keys() + else 1 + ) + node_deallocation_option = ( + self.config["Batch"]["node_deallocation_option"] + if "node_deallocation_option" + in self.config["Batch"].keys() + else None + ) self.set_pool_info( - mode=self.scaling, - dedicated_nodes=dedicated_nodes, + mode=self.scaling, + dedicated_nodes=dedicated_nodes, low_priority_nodes=low_priority_nodes, - node_deallocation_option=node_deallocation_option + node_deallocation_option=node_deallocation_option, ) else: pass @@ -284,14 +316,22 @@ def _initialize_pool(self): def _initialize_containers(self): """Called by init to initialize input and output containers""" - self.input_container_name = self.config["Container"]["input_container_name"] if "input_container_name" in self.config["Container"].keys() else None - self.output_container_name = self.config["Container"]["output_container_name"] if "output_container_name" in self.config["Container"].keys() else None + self.input_container_name = ( + self.config["Container"]["input_container_name"] + if "input_container_name" in self.config["Container"].keys() + else None + ) + self.output_container_name = ( + self.config["Container"]["output_container_name"] + if "output_container_name" in self.config["Container"].keys() + else None + ) if self.input_container_name and self.output_container_name: self.update_containers( pool_name=self.pool_name, input_container_name=self.input_container_name, output_container_name=self.output_container_name, - force_update=False + force_update=False, ) def set_debugging(self, debug: bool) -> None: diff --git a/cfa_azure/helpers.py b/cfa_azure/helpers.py index 637db2d..cb901f6 100644 --- a/cfa_azure/helpers.py +++ b/cfa_azure/helpers.py @@ -25,7 +25,7 @@ JobAction, JobConstraints, OnAllTasksComplete, - OnTaskFailure + OnTaskFailure, ) from azure.containerregistry import ContainerRegistryClient from azure.core.exceptions import HttpResponseError @@ -937,6 +937,7 @@ def add_task_to_job( logger.debug(f"Task '{task_id}' added to job '{job_id}'.") return task_id + def monitor_tasks( job_id: str, timeout: int, diff --git a/tests/helpers_tests.py b/tests/helpers_tests.py index ef74e86..24a8088 100644 --- a/tests/helpers_tests.py +++ b/tests/helpers_tests.py @@ -727,7 +727,7 @@ def test_add_job(self, mock_logger): job_id, FAKE_BATCH_POOL, batch_client=batch_client, - end_job_on_task_failure=False + end_job_on_task_failure=False, ) mock_logger.info.assert_called_with( f"Job '{job_id}' created successfully." @@ -741,7 +741,7 @@ def test_add_job_task_failure(self, mock_logger): job_id, FAKE_BATCH_POOL, batch_client=batch_client, - end_job_on_task_failure=False + end_job_on_task_failure=False, ) mock_logger.debug.assert_called_with("Attempting to add job.") From ee724022be81cfa1a344945d09db6d52a1c4c2c0 Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Tue, 24 Dec 2024 06:30:20 +0000 Subject: [PATCH 22/23] Automation fix --- cfa_azure/clients.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/cfa_azure/clients.py b/cfa_azure/clients.py index 5a9c8c0..8abde4e 100644 --- a/cfa_azure/clients.py +++ b/cfa_azure/clients.py @@ -173,6 +173,8 @@ def __init__( self._initialize_pool() # Set up containers self._initialize_containers() + if self.pool_name and self.pool_parameters: + self.create_pool(self.pool_name) logger.info("Client initialized! Happy coding!") def _initialize_authentication(self, credential_method): @@ -296,21 +298,13 @@ def _initialize_pool(self): if "low_priority_nodes" in self.config["Batch"].keys() else 1 ) - node_deallocation_option = ( - self.config["Batch"]["node_deallocation_option"] - if "node_deallocation_option" - in self.config["Batch"].keys() - else None - ) self.set_pool_info( mode=self.scaling, dedicated_nodes=dedicated_nodes, - low_priority_nodes=low_priority_nodes, - node_deallocation_option=node_deallocation_option, + low_priority_nodes=low_priority_nodes ) else: pass - self.create_pool(self.pool_name) else: pass From dbf8149943b87091681132fc8c4ea6a36c5117b7 Mon Sep 17 00:00:00 2001 From: Fawad Rafi Date: Tue, 24 Dec 2024 06:33:15 +0000 Subject: [PATCH 23/23] Automation fix with pre commit hook --- cfa_azure/clients.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cfa_azure/clients.py b/cfa_azure/clients.py index 8abde4e..2844e44 100644 --- a/cfa_azure/clients.py +++ b/cfa_azure/clients.py @@ -301,7 +301,7 @@ def _initialize_pool(self): self.set_pool_info( mode=self.scaling, dedicated_nodes=dedicated_nodes, - low_priority_nodes=low_priority_nodes + low_priority_nodes=low_priority_nodes, ) else: pass