Skip to content

Commit

Permalink
[CDE-560] Add enable_metrics config option for glue executor (#390)
Browse files Browse the repository at this point in the history
* Add enable_metrics config option

* Oops fix docstring format

* Change default to False since True costs money

* Ensure spark history dir is correctly formatted
  • Loading branch information
rbetz authored Dec 2, 2024
1 parent 6c28e92 commit 147c19d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
30 changes: 17 additions & 13 deletions docs/source/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ redun --setup profile=dev run workflow.py main --x 1 --y 2
#### `boto_config`

A JSON object used to construct a `boto3.session.Config` object, which is provided when creating
Boto session clients.
Boto session clients.

For example, we might want to change the retry strategy and raise the number of retries
For example, we might want to change the retry strategy and raise the number of retries
across boto. This can be helpful when spawning a large number of AWS Batch jobs.

```ini
Expand Down Expand Up @@ -291,7 +291,7 @@ A string that specifies the executor module (e.g. `local`, `aws_batch`, etc) to

#### Local executor

The [Local executor](executors.md#local-executor) (`type = local`) executes tasks on the local machine using either multiple threads or processes.
The [Local executor](executors.md#local-executor) (`type = local`) executes tasks on the local machine using either multiple threads or processes.

##### `mode`

Expand All @@ -307,12 +307,12 @@ A string (default: `fork`) that specifies the [start method](https://docs.python

#### Alias executor

The [Alias executor](executors.md#alias-executor) (`type = alias`) is a "symlink" to
The [Alias executor](executors.md#alias-executor) (`type = alias`) is a "symlink" to
another underlying executor.

##### `target`

A string containing the name of another executor to defer to. It is lazily resolved
A string containing the name of another executor to defer to. It is lazily resolved
at execution time.


Expand Down Expand Up @@ -469,7 +469,7 @@ A bool (default: True) that specifies whether redun should add default tags to a

##### `num_nodes`

If not none, use a multi-node job and set the number of workers.
If not none, use a multi-node job and set the number of workers.

##### `job_def_extra`

Expand Down Expand Up @@ -537,6 +537,10 @@ Spark History Server to visualize and monitor job progress. Defaults to `f"{s3_s

Float specifying how frequently to query AWS for the status of running Glue jobs. Defaults to 10.0 seconds.

##### `enable_metrics`

A bool (default: False) that will enable observability and profiling of running jobs via Cloudwatch and the AWS Glue console.

##### `job_retry_interval`

Float specifying how frequently to retry submitting Glue jobs that were not able to be started due to resource
Expand All @@ -562,7 +566,7 @@ A string (default: None) that specifies a pattern for which files should be excl

### Federated tasks

The following config keys are required. All other data is passed to the `subrun` that
The following config keys are required. All other data is passed to the `subrun` that
implements the bulk of the federated task behavior, hence they are either consumed by `subrun` directly or
are set as task options.

Expand All @@ -585,7 +589,7 @@ The name of the executor that has the execution context for this federated task.
#### `config_dir`

The path to the configuration to use for the remainder of the execution. Either an absolute
path or relative to the executor entrypoint.
path or relative to the executor entrypoint.

#### `task_signature`

Expand Down Expand Up @@ -685,15 +689,15 @@ A float (default: 3.0) that specifies the maximum time, in seconds, jobs will wa

##### `share_id`

Queues with Scheduling Policies require all jobs be assigned a Fair Share
Queues with Scheduling Policies require all jobs be assigned a Fair Share
Scheduling `shareIdentifier`. Can be further overridden on a per-task basis using task options.

##### `scheduling_priority_override`

Queues with Scheduling Policies require that all job definitions specify a `schedulingPriority`
Alternatively, if the batch job definition does *not* configure a `schedulingPriority`, you
must provide a `schedulingPriorityOverride` by setting this variable.
Can be further overridden on a per-task basis using task options.
Queues with Scheduling Policies require that all job definitions specify a `schedulingPriority`
Alternatively, if the batch job definition does *not* configure a `schedulingPriority`, you
must provide a `schedulingPriorityOverride` by setting this variable.
Can be further overridden on a per-task basis using task options.

## Configuration variables

Expand Down
11 changes: 11 additions & 0 deletions redun/executors/aws_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def get_or_create_glue_job_definition(
temp_dir: str,
extra_py_files: str,
spark_history_dir: str,
enable_metrics: bool = False,
additional_python_modules: List[str] = DEFAULT_ADDITIONAL_PYTHON_MODULES,
aws_region: str = aws_utils.DEFAULT_AWS_REGION,
) -> str:
Expand All @@ -151,6 +152,9 @@ def get_or_create_glue_job_definition(
spark_history_dir : str
S3 path where Spark event logs are stored.
enable_metrics : bool
Whether to enable observability and profiling via Cloudwatch.
additional_python_modules : List[str]
Python modules to be installed with pip before job start.
Expand All @@ -164,6 +168,9 @@ def get_or_create_glue_job_definition(
"""
client = aws_utils.get_aws_client("glue", aws_region=aws_region)

# Ensure spark_history_dir ends with a trailing slash.
spark_history_dir = spark_history_dir.rstrip("/") + "/"

# Define job definition.
glue_job_def = dict(
Description=f"Redun oneshot glue job {REDUN_REQUIRED_VERSION}",
Expand All @@ -186,6 +193,8 @@ def get_or_create_glue_job_definition(
"--enable-spark-ui": "true",
"--enable-job-insights": "true",
"--spark-event-logs-path": spark_history_dir,
"--enable-metrics": str(enable_metrics).lower(),
"--enable-observability-metrics": str(enable_metrics).lower(),
},
MaxRetries=0,
NumberOfWorkers=2, # Jobs will override this, so set to minimum value.
Expand Down Expand Up @@ -247,6 +256,7 @@ def __init__(self, name: str, scheduler: Optional[Scheduler] = None, config=None
self.spark_history_dir = config.get(
"spark_history_dir", get_spark_history_dir(self.s3_scratch_prefix)
)
self.enable_metrics = config.get("enable_metrics", False)

# Default task options
self.default_task_options = {
Expand Down Expand Up @@ -315,6 +325,7 @@ def get_or_create_job_definition(self) -> None:
extra_py_files=self.redun_zip_location,
aws_region=self.aws_region,
glue_version=self.glue_version,
enable_metrics=self.enable_metrics,
)

def _start(self) -> None:
Expand Down

0 comments on commit 147c19d

Please sign in to comment.