diff --git a/docs/source/config.md b/docs/source/config.md index a48c315..e6ac0ec 100644 --- a/docs/source/config.md +++ b/docs/source/config.md @@ -101,9 +101,9 @@ redun --setup profile=dev run workflow.py main --x 1 --y 2 #### `boto_config` A JSON object used to construct a `boto3.session.Config` object, which is provided when creating -Boto session clients. +Boto session clients. -For example, we might want to change the retry strategy and raise the number of retries +For example, we might want to change the retry strategy and raise the number of retries across boto. This can be helpful when spawning a large number of AWS Batch jobs. ```ini @@ -291,7 +291,7 @@ A string that specifies the executor module (e.g. `local`, `aws_batch`, etc) to #### Local executor -The [Local executor](executors.md#local-executor) (`type = local`) executes tasks on the local machine using either multiple threads or processes. +The [Local executor](executors.md#local-executor) (`type = local`) executes tasks on the local machine using either multiple threads or processes. ##### `mode` @@ -307,12 +307,12 @@ A string (default: `fork`) that specifies the [start method](https://docs.python #### Alias executor -The [Alias executor](executors.md#alias-executor) (`type = alias`) is a "symlink" to +The [Alias executor](executors.md#alias-executor) (`type = alias`) is a "symlink" to another underlying executor. ##### `target` -A string containing the name of another executor to defer to. It is lazily resolved +A string containing the name of another executor to defer to. It is lazily resolved at execution time. @@ -469,7 +469,7 @@ A bool (default: True) that specifies whether redun should add default tags to a ##### `num_nodes` -If not none, use a multi-node job and set the number of workers. +If not none, use a multi-node job and set the number of workers. ##### `job_def_extra` @@ -537,6 +537,10 @@ Spark History Server to visualize and monitor job progress. Defaults to `f"{s3_s Float specifying how frequently to query AWS for the status of running Glue jobs. Defaults to 10.0 seconds. +##### `enable_metrics` + +A bool (default: False) that will enable observability and profiling of running jobs via Cloudwatch and the AWS Glue console. + ##### `job_retry_interval` Float specifying how frequently to retry submitting Glue jobs that were not able to be started due to resource @@ -562,7 +566,7 @@ A string (default: None) that specifies a pattern for which files should be excl ### Federated tasks -The following config keys are required. All other data is passed to the `subrun` that +The following config keys are required. All other data is passed to the `subrun` that implements the bulk of the federated task behavior, hence they are either consumed by `subrun` directly or are set as task options. @@ -585,7 +589,7 @@ The name of the executor that has the execution context for this federated task. #### `config_dir` The path to the configuration to use for the remainder of the execution. Either an absolute -path or relative to the executor entrypoint. +path or relative to the executor entrypoint. #### `task_signature` @@ -685,15 +689,15 @@ A float (default: 3.0) that specifies the maximum time, in seconds, jobs will wa ##### `share_id` -Queues with Scheduling Policies require all jobs be assigned a Fair Share +Queues with Scheduling Policies require all jobs be assigned a Fair Share Scheduling `shareIdentifier`. Can be further overridden on a per-task basis using task options. ##### `scheduling_priority_override` -Queues with Scheduling Policies require that all job definitions specify a `schedulingPriority` -Alternatively, if the batch job definition does *not* configure a `schedulingPriority`, you -must provide a `schedulingPriorityOverride` by setting this variable. -Can be further overridden on a per-task basis using task options. +Queues with Scheduling Policies require that all job definitions specify a `schedulingPriority` +Alternatively, if the batch job definition does *not* configure a `schedulingPriority`, you +must provide a `schedulingPriorityOverride` by setting this variable. +Can be further overridden on a per-task basis using task options. ## Configuration variables diff --git a/redun/executors/aws_glue.py b/redun/executors/aws_glue.py index ca9bbbe..b187b6d 100644 --- a/redun/executors/aws_glue.py +++ b/redun/executors/aws_glue.py @@ -125,6 +125,7 @@ def get_or_create_glue_job_definition( temp_dir: str, extra_py_files: str, spark_history_dir: str, + enable_metrics: bool = False, additional_python_modules: List[str] = DEFAULT_ADDITIONAL_PYTHON_MODULES, aws_region: str = aws_utils.DEFAULT_AWS_REGION, ) -> str: @@ -151,6 +152,9 @@ def get_or_create_glue_job_definition( spark_history_dir : str S3 path where Spark event logs are stored. + enable_metrics : bool + Whether to enable observability and profiling via Cloudwatch. + additional_python_modules : List[str] Python modules to be installed with pip before job start. @@ -164,6 +168,9 @@ def get_or_create_glue_job_definition( """ client = aws_utils.get_aws_client("glue", aws_region=aws_region) + # Ensure spark_history_dir ends with a trailing slash. + spark_history_dir = spark_history_dir.rstrip("/") + "/" + # Define job definition. glue_job_def = dict( Description=f"Redun oneshot glue job {REDUN_REQUIRED_VERSION}", @@ -186,6 +193,8 @@ def get_or_create_glue_job_definition( "--enable-spark-ui": "true", "--enable-job-insights": "true", "--spark-event-logs-path": spark_history_dir, + "--enable-metrics": str(enable_metrics).lower(), + "--enable-observability-metrics": str(enable_metrics).lower(), }, MaxRetries=0, NumberOfWorkers=2, # Jobs will override this, so set to minimum value. @@ -247,6 +256,7 @@ def __init__(self, name: str, scheduler: Optional[Scheduler] = None, config=None self.spark_history_dir = config.get( "spark_history_dir", get_spark_history_dir(self.s3_scratch_prefix) ) + self.enable_metrics = config.get("enable_metrics", False) # Default task options self.default_task_options = { @@ -315,6 +325,7 @@ def get_or_create_job_definition(self) -> None: extra_py_files=self.redun_zip_location, aws_region=self.aws_region, glue_version=self.glue_version, + enable_metrics=self.enable_metrics, ) def _start(self) -> None: