Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automate configuration #189

Merged
merged 23 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 125 additions & 25 deletions cfa_azure/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,14 @@ def __init__(
AzureClient object
"""
self.config_path = config_path
self.credential_method = credential_method
self.debug = None
self.scaling = None
self.input_container_name = None
self.output_container_name = None
self.files = []
self.task_id_max = 0
self.jobs = set()
self.container_registry_server = None
self.registry_url = None
self.container_image_name = None
self.full_container_name = None
self.input_mount_dir = None
self.output_mount_dir = None
self.mounts = []
self.mount_container_clients = []
self.pool_parameters = None
self.timeout = None
self.save_logs_to_blob = None
self.logs_folder = "stdout_stderr"
Expand Down Expand Up @@ -161,6 +152,42 @@ def __init__(
logger.warning("Could not find resource group name in config.")

# get credentials
self._initialize_authentication(credential_method)
logger.debug(f"generated credentials from {credential_method}.")
self._initialize_registry()
# create blob service account
self.blob_service_client = helpers.get_blob_service_client(
self.config, self.cred
)
logger.debug("generated Blob Service Client.")
# create batch mgmt client
self.batch_mgmt_client = helpers.get_batch_mgmt_client(
self.config, self.secret_cred
)
logger.debug("generated Batch Management Client.")
# create batch service client
self.batch_client = helpers.get_batch_service_client(
self.config, self.batch_cred
)
# Create pool
self._initialize_pool()
# Set up containers
self._initialize_containers()
if self.pool_name and self.pool_parameters:
self.create_pool(self.pool_name)
logger.info("Client initialized! Happy coding!")

def _initialize_authentication(self, credential_method):
"""Called by init method to set up authentication
Args:
config (str): config dict
"""
if "credential_method" in self.config["Authentication"].keys():
self.credential_method = credential_method = self.config[
"Authentication"
]["credential_method"]
else:
self.credential_method = credential_method
if "identity" in self.credential_method.lower():
self.cred = ManagedIdentityCredential()
elif "sp" in self.credential_method.lower():
Expand Down Expand Up @@ -192,7 +219,6 @@ def __init__(
raise Exception(
"please choose a credential_method from 'identity', 'sp', 'ext_user', 'env' and try again."
)

if "sp_secrets" not in self.config["Authentication"].keys():
sp_secret = helpers.get_sp_secret(self.config, self.cred)
self.secret_cred = ClientSecretCredential(
Expand All @@ -207,26 +233,100 @@ def __init__(
resource="https://batch.core.windows.net/",
)

logger.debug(f"generated credentials from {credential_method}.")
def _initialize_registry(self):
"""Called by init to initialize the registry URL and details"""
self.container_registry_server = None
self.container_image_name = None
self.full_container_name = None
registry_name = None
if "registry_name" in self.config["Container"].keys():
registry_name = self.config["Container"]["registry_name"]
self.container_registry_server = f"{registry_name}.azurecr.io"
self.registry_url = f"https://{self.container_registry_server}"
else:
self.registry_url = None

# create blob service account
if "repository_name" in self.config["Container"].keys():
repository_name = self.config["Container"]["repository_name"]

self.blob_service_client = helpers.get_blob_service_client(
self.config, self.cred
)
logger.debug("generated Blob Service Client.")
if "tag_name" in self.config["Container"].keys():
tag_name = self.config["Container"]["tag_name"]
else:
tag_name = "latest"

# create batch mgmt client
self.batch_mgmt_client = helpers.get_batch_mgmt_client(
self.config, self.secret_cred
)
logger.debug("generated Batch Management Client.")
if registry_name and repository_name:
self.set_azure_container(
registry_name=registry_name,
repo_name=repository_name,
tag_name=tag_name,
)

# create batch service client
self.batch_client = helpers.get_batch_service_client(
self.config, self.batch_cred
def _initialize_pool(self):
"""Called by init to initialize the pool"""
self.pool_parameters = None
self.pool_name = (
self.config["Batch"]["pool_name"]
if "pool_name" in self.config["Batch"].keys()
else None
)
logger.info("Client initialized! Happy coding!")
self.scaling = (
self.config["Batch"]["scaling_mode"]
if "scaling_mode" in self.config["Batch"].keys()
else None
)
if self.pool_name:
if (
self.scaling == "autoscale"
and "autoscale_formula_path" in self.config["Batch"].keys()
):
autoscale_formula_path = self.config["Batch"][
"autoscale_formula_path"
]
print("Creating pool with autoscaling mode")
self.set_pool_info(
mode=self.scaling,
autoscale_formula_path=autoscale_formula_path,
)
elif self.scaling == "fixed":
dedicated_nodes = (
self.config["Batch"]["dedicated_nodes"]
if "dedicated_nodes" in self.config["Batch"].keys()
else 0
)
low_priority_nodes = (
self.config["Batch"]["low_priority_nodes"]
if "low_priority_nodes" in self.config["Batch"].keys()
else 1
)
self.set_pool_info(
mode=self.scaling,
dedicated_nodes=dedicated_nodes,
low_priority_nodes=low_priority_nodes,
)
else:
pass
else:
pass

def _initialize_containers(self):
"""Called by init to initialize input and output containers"""
self.input_container_name = (
self.config["Container"]["input_container_name"]
if "input_container_name" in self.config["Container"].keys()
else None
)
self.output_container_name = (
self.config["Container"]["output_container_name"]
if "output_container_name" in self.config["Container"].keys()
else None
)
if self.input_container_name and self.output_container_name:
self.update_containers(
pool_name=self.pool_name,
input_container_name=self.input_container_name,
output_container_name=self.output_container_name,
force_update=False,
)

def set_debugging(self, debug: bool) -> None:
"""required method that determines whether debugging is on or off. Debug = True for 'on', debug = False for 'off'.
Expand Down
7 changes: 2 additions & 5 deletions cfa_azure/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,8 +895,8 @@ def add_task_to_job(
else:
full_cmd = d_cmd_str

tasks = []
if input_files:
tasks = []
for i, input_file in enumerate(input_files):
config_stem = "_".join(input_file.split(".")[:-1]).split("/")[-1]
id = task_id_base + "-" + config_stem
Expand All @@ -917,6 +917,7 @@ def add_task_to_job(
)
batch_client.task.add(job_id=job_id, task=task)
print(f"Task '{id}' added to job '{job_id}'.")
return tasks
else:
command_line = full_cmd
logger.debug(f"Adding task {task_id}")
Expand Down Expand Up @@ -1742,10 +1743,6 @@ def upload_docker_image(
Returns:
str: full container name
"""
# Generate a unique tag if none provided
if tag is None:
tag = datetime.datetime.now().strftime("%Y%m%d%H%M%S")

full_container_name = f"{registry_name}.azurecr.io/{repo_name}:{tag}"

# check if docker is running
Expand Down
8 changes: 8 additions & 0 deletions tests/clients_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@

class TestClients(unittest.TestCase):
@patch("cfa_azure.clients.logger")
@patch(
"azure.identity.ClientSecretCredential.__init__",
MagicMock(return_value=None),
)
@patch(
"azure.common.credentials.ServicePrincipalCredentials.__init__",
MagicMock(return_value=None),
)
@patch(
"cfa_azure.helpers.read_config",
MagicMock(return_value=FAKE_CONFIG_MINIMAL),
Expand Down
Loading