From 5ca1ffb3b72e94f175e402cc34819edea40a88cf Mon Sep 17 00:00:00 2001 From: Andrea Callarelli Date: Thu, 19 Sep 2024 11:58:04 +0200 Subject: [PATCH 01/13] feat: work on Glue component --- .../cloud/aws/resources/glue_component.py | 228 ++++++++++++++---- 1 file changed, 180 insertions(+), 48 deletions(-) diff --git a/src/damavand/cloud/aws/resources/glue_component.py b/src/damavand/cloud/aws/resources/glue_component.py index 86775b2..dc73038 100644 --- a/src/damavand/cloud/aws/resources/glue_component.py +++ b/src/damavand/cloud/aws/resources/glue_component.py @@ -10,19 +10,94 @@ @dataclass class GlueJobDefinition: + """ + Parameters specific to the Glue job. + + :param name: The name you assign to this job. It must be unique in your account. + :param description: Description of the job. + :param script_location: the s3 path to the entrypoint script of your Glue application. + :param default_arguments: The map of default arguments for this job. You can specify arguments here that your own job-execution script consumes, as well as arguments that AWS Glue itself consumes. For information about how to specify and consume your own Job arguments, see the [Calling AWS Glue APIs in Python](http://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-calling.html) topic in the developer guide. For information about the key-value pairs that AWS Glue consumes to set up your job, see the [Special Parameters Used by AWS Glue](http://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-glue-arguments.html) topic in the developer guide. + :param extra_libraries: A list of paths to the extra dependencies. If you use packages not supported by Glue, compress them, upload them to s3 and pass here the path to the zip file. + :param execution_class: Indicates whether the job is run with a standard or flexible execution class. The standard execution class is ideal for time-sensitive workloads that require fast job startup and dedicated resources. Valid value: `FLEX`, `STANDARD`. + :param max_concurrent_runs: Max amount of instances of this Job that can run concurrently. + :param glue_version: The version of glue to use, for example "1.0". Ray jobs should set this to 4.0 or greater. For information about available versions, see the [AWS Glue Release Notes](https://docs.aws.amazon.com/glue/latest/dg/release-notes.html). + :param max_capacity: The maximum number of AWS Glue data processing units (DPUs) that can be allocated when this job runs. `Required` when `pythonshell` is set, accept either `0.0625` or `1.0`. Use `number_of_workers` and `worker_type` arguments instead with `glue_version` `2.0` and above. + :param max_retries: The maximum number of times to retry this job if it fails. + :param number_of_workers: The number of workers of a defined workerType that are allocated when a job runs. + :param tags: Key-value map of resource tags. If configured with a provider `default_tags` configuration block present, tags with matching keys will overwrite those defined at the provider-level. + :param timeout: The job timeout in minutes. The default is 2880 minutes (48 hours) for `glueetl` and `pythonshell` jobs, and null (unlimited) for `gluestreaming` jobs. + :param worker_type: The type of predefined worker that is allocated when a job runs. Accepts a value of Standard, G.1X, G.2X, or G.025X for Spark jobs. Accepts the value Z.2X for Ray jobs. + * For the Standard worker type, each worker provides 4 vCPU, 16 GB of memory and a 50GB disk, and 2 executors per worker. + * For the G.1X worker type, each worker maps to 1 DPU (4 vCPU, 16 GB of memory, 64 GB disk), and provides 1 executor per worker. Recommended for memory-intensive jobs. + * For the G.2X worker type, each worker maps to 2 DPU (8 vCPU, 32 GB of memory, 128 GB disk), and provides 1 executor per worker. Recommended for memory-intensive jobs. + * For the G.4X worker type, each worker maps to 4 DPU (16 vCPUs, 64 GB of memory) with 256GB disk (approximately 235GB free), and provides 1 executor per worker. Recommended for memory-intensive jobs. Only available for Glue version 3.0. Available AWS Regions: US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Canada (Central), Europe (Frankfurt), Europe (Ireland), and Europe (Stockholm). + * For the G.8X worker type, each worker maps to 8 DPU (32 vCPUs, 128 GB of memory) with 512GB disk (approximately 487GB free), and provides 1 executor per worker. Recommended for memory-intensive jobs. Only available for Glue version 3.0. Available AWS Regions: US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Canada (Central), Europe (Frankfurt), Europe (Ireland), and Europe (Stockholm). + * For the G.025X worker type, each worker maps to 0.25 DPU (2 vCPU, 4GB of memory, 64 GB disk), and provides 1 executor per worker. Recommended for low volume streaming jobs. Only available for Glue version 3.0. + * For the Z.2X worker type, each worker maps to 2 M-DPU (8vCPU, 64 GB of m emory, 128 GB disk), and provides up to 8 Ray workers based on the autoscaler. + :param enable_glue_datacatalog: To use the Glue catalog as the metadata catalog + :param enable_continuous_cloudwatch_log: + :param enable_continuous_log_filter: When set to true it reduces the amount of logging. + For more information see https://repost.aws/knowledge-center/glue-reduce-cloudwatch-logs + :param enable_metrics: Enables observability metrics about the worker nodes. + :param enable_observability_metrics: Enables extra Spark-related observability metrics such as how long a tasks takes. + This parameter could increase cloud costs significantly. + """ + # Parameters for Pulumi Glue Job name: str - description: str - options: dict[str, Any] = field(default_factory=dict) + description: str = None + script_location: str = None + default_arguments: dict = field(default_factory=dict) + extra_libraries: list[str] = field(default_factory=list) + execution_class: str = "STANDARD" + max_concurrent_runs: int = 1, + glue_version: str = "4.0" + enable_auto_scaling: bool = True + max_capacity: int = 5 + max_retries: int = 0 + number_of_workers: int = 2 + tags: dict = None + timeout: int = 2880 + worker_type: str = "G.1X" + enable_glue_datacatalog: bool = True + enable_continuous_cloudwatch_log: bool = False + enable_continuous_log_filter: bool = True + enable_metrics: bool = False + enable_observability_metrics: bool = False @dataclass class GlueComponentArgs: + """ + Glue job definitions and infrastructure dependencies such as IAM roles, external connections, code and data storage. + """ jobs: list[GlueJobDefinition] - role: Optional[aws.iam.Role] = None - code_repository_bucket: Optional[aws.s3.BucketV2] = None + connections: list[aws.glue.Connection] = None + execution_role_arn: str = None + cloudwatch_log_group_name: str = None + database_name: str = None + code_repository_bucket_name: str = None + data_bucket_name: str = None + # TODO: implement this + kafka_config: dict = None # this needs to be an object holding the Kafka configuration class GlueComponent(PulumiComponentResource): + """ + An opinionated deployment of a fully functional PySpark applications on Glue. + + Resources deployed: + - Code Repository: s3 bucket + - Data Storage: s3 bucket + - Kafka checkpoint storage: s3 bucket + - Compute: Glue Job + - Kafka connection + - Metadata Catalog: Glue Database + - Monitoring: Cloudwatch Log Group + - Permissions: IAM role for Glue + - Full access to S3 bucket with data + - Full access to tables in Glue database + - CloudWatch Access to create log streams + """ def __init__( self, name: str, @@ -42,10 +117,74 @@ def __init__( self.iceberg_database self.jobs + # Compute + @property + @cache + def jobs(self) -> list[aws.glue.Job]: + """ + Return all the Glue jobs for the application. + """ + return [ + aws.glue.Job( + resource_name=f"{self._name}-{job.name}-job", + opts=ResourceOptions(parent=self), + name=f"{self._name}-{job.name}-job", + glue_version=job.glue_version, + role_arn=self.role.arn, + command=aws.glue.JobCommandArgs( + name="glueetl", + python_version="3", + script_location=self.__get_source_path(job) + ), + default_arguments=self.__get_default_arguments(job), + number_of_workers=job.number_of_workers, + worker_type=job.worker_type, + execution_property=aws.glue.JobExecutionPropertyArgs(max_concurrent_runs=job.max_concurrent_runs), + ) + for job in self.args.jobs + ] + + def __get_source_path(self, job: GlueJobDefinition) -> str: + return f"s3://{self.code_repository_bucket}/{job.script_location}" if job.script_location \ + else f"s3://{self.code_repository_bucket}/{job.name}.py" + + @staticmethod + def __get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: + """ + TODO: + - Handling of logging via log4j config file. + - At Brenntag we found out how expensive Glue logging can become + - An effective way to limit logging volume is by using a custom log4j configuration file + - File needs to be uploaded to s3 and passed via + - Passing extra arguments from application, such as specific Spark config parameters + """ + return { + "--additional-python-modules": ",".join(job.extra_libraries), + "--enable-auto-scaling": "true" if job.enable_auto_scaling else "false", + "--enable-continuous-cloudwatch-log": "true" if job.enable_continuous_cloudwatch_log else "false", + "--enable-continuous-log-filter": "true" if job.enable_continuous_log_filter else "false", + "--enable-glue-datacatalog": "true" if job.enable_continuous_log_filter else "false", + "--datalake-formats": "iceberg", + "--enable-metrics": "true" if job.enable_metrics else "false", + "--enable-observability-metrics": "true" if job.enable_observability_metrics else "false", + } + + @property + @cache + def role(self) -> aws.iam.Role: + """Return an execution role for Glue jobs.""" + return self.args.execution_role_arn or aws.iam.Role( + resource_name=f"{self._name}-role", + opts=ResourceOptions(parent=self), + name=f"{self._name}-ExecutionRole", + assume_role_policy=json.dumps(self.assume_policy), + managed_policy_arns=self.managed_policy_arns, + ) + + # Permissions @property def assume_policy(self) -> dict[str, Any]: """Return the assume role policy for Glue jobs.""" - return { "Version": "2012-10-17", "Statement": [ @@ -62,54 +201,53 @@ def assume_policy(self) -> dict[str, Any]: @property def managed_policy_arns(self) -> list[str]: """Return a list of managed policy ARNs that defines the permissions for Glue jobs.""" - return [ aws.iam.ManagedPolicy.AWS_GLUE_SERVICE_ROLE, aws.iam.ManagedPolicy.AMAZON_S3_FULL_ACCESS, aws.iam.ManagedPolicy.CLOUD_TRAIL_FULL_ACCESS, ] - @property - @cache - def role(self) -> aws.iam.Role: - """Return an execution role for Glue jobs.""" - - return self.args.role or aws.iam.Role( - resource_name=f"{self._name}-role", - opts=ResourceOptions(parent=self), - name=f"{self._name}-ExecutionRole", - assume_role_policy=json.dumps(self.assume_policy), - managed_policy_arns=self.managed_policy_arns, - ) - + # Code Repository @property @cache def code_repository_bucket(self) -> aws.s3.BucketV2: - """Return an S3 bucket for Glue jobs to host source codes.""" + """Return an S3 bucket for Glue jobs to host source codes. - # NOTE: using `bucket_prefix` to avoid name conflict as the bucket name must be globally unique. - return self.args.code_repository_bucket or aws.s3.BucketV2( + NOTE: using `bucket_prefix` to avoid name conflict as the bucket name must be globally unique. + """ + if self.args.code_repository_bucket_name: + return aws.s3.BucketV2.get(f"{self._name}-code-bucket", id=self.args.code_repository_bucket_name) + + return self.args.code_repository_bucket_name or aws.s3.BucketV2( resource_name=f"{self._name}-code-bucket", opts=ResourceOptions(parent=self), bucket_prefix=f"{self._name}-code-bucket", ) + # Data Storage @property @cache def iceberg_bucket(self) -> aws.s3.BucketV2: - """Return an S3 bucket for Iceberg tables to store data processed by Glue jobs.""" + """Return an S3 bucket for Iceberg tables to store data processed by Glue jobs. + + NOTE: using `bucket_prefix` to avoid name conflict as the bucket name must be globally unique. + """ + if self.args.data_bucket_name: + return aws.s3.BucketV.get(f"{self._name}-data-bucket", id=self.args.data_bucket_name) - # NOTE: using `bucket_prefix` to avoid name conflict as the bucket name must be globally unique. return aws.s3.BucketV2( - resource_name=f"{self._name}-bucket", + resource_name=f"{self._name}-data-bucket", opts=ResourceOptions(parent=self), - bucket_prefix=f"{self._name}-bucket", + bucket_prefix=f"{self._name}-data-bucket", ) + # Metadata @property @cache def iceberg_database(self) -> aws.glue.CatalogDatabase: """Return a Glue database for Iceberg tables to store data processed by Glue jobs.""" + if self.args.database_name: + return aws.cloudwatch.CatalogDatabase.get(f"{self._name}-database", id=self.args.database_name) return aws.glue.CatalogDatabase( resource_name=f"{self._name}-database", @@ -118,28 +256,22 @@ def iceberg_database(self) -> aws.glue.CatalogDatabase: location_uri=f"s3://{self.iceberg_bucket.bucket}/", ) + # Kafka @property @cache - def jobs(self) -> list[aws.glue.Job]: - """Return all the Glue jobs for the application.""" + def glue_kafka_connection(self) -> aws.glue.Connection: + """Return a Kafka Connection object.""" + return NotImplementedError - return [ - aws.glue.Job( - resource_name=f"{self._name}-{job.name}-job", - opts=ResourceOptions(parent=self), - name=f"{self._name}-{job.name}-job", - role_arn=self.role.arn, - glue_version="4.0", - command={ - "script_location": f"s3://{self.code_repository_bucket.bucket}/", - }, - default_arguments={ - "--env": "dev", - "--pipeline-name": job.name, - "--options": " ".join( - [f'{k}="{v}"' for k, v in job.options.items()] - ), - }, - ) - for job in self.args.jobs - ] + @property + @cache + def kafka_checkpoint_bucket(self) -> aws.s3.Bucket: + """Return an s3 bucket to store the checkpoints.""" + return NotImplementedError + + # Orchestration + @property + @cache + def glue_trigger(self) -> aws.glue.Trigger: + """Return a Glue Trigger object.""" + return NotImplementedError From 5ebe20a9b96b289e03011dc8d332e2de75f54100 Mon Sep 17 00:00:00 2001 From: Andrea Callarelli Date: Fri, 20 Sep 2024 10:18:33 +0200 Subject: [PATCH 02/13] feat: add Job schedule and checkpoints bucket --- .../cloud/aws/resources/glue_component.py | 96 ++++++++++++------- 1 file changed, 62 insertions(+), 34 deletions(-) diff --git a/src/damavand/cloud/aws/resources/glue_component.py b/src/damavand/cloud/aws/resources/glue_component.py index dc73038..bcc38a4 100644 --- a/src/damavand/cloud/aws/resources/glue_component.py +++ b/src/damavand/cloud/aws/resources/glue_component.py @@ -16,7 +16,6 @@ class GlueJobDefinition: :param name: The name you assign to this job. It must be unique in your account. :param description: Description of the job. :param script_location: the s3 path to the entrypoint script of your Glue application. - :param default_arguments: The map of default arguments for this job. You can specify arguments here that your own job-execution script consumes, as well as arguments that AWS Glue itself consumes. For information about how to specify and consume your own Job arguments, see the [Calling AWS Glue APIs in Python](http://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-calling.html) topic in the developer guide. For information about the key-value pairs that AWS Glue consumes to set up your job, see the [Special Parameters Used by AWS Glue](http://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-python-glue-arguments.html) topic in the developer guide. :param extra_libraries: A list of paths to the extra dependencies. If you use packages not supported by Glue, compress them, upload them to s3 and pass here the path to the zip file. :param execution_class: Indicates whether the job is run with a standard or flexible execution class. The standard execution class is ideal for time-sensitive workloads that require fast job startup and dedicated resources. Valid value: `FLEX`, `STANDARD`. :param max_concurrent_runs: Max amount of instances of this Job that can run concurrently. @@ -41,12 +40,14 @@ class GlueJobDefinition: :param enable_metrics: Enables observability metrics about the worker nodes. :param enable_observability_metrics: Enables extra Spark-related observability metrics such as how long a tasks takes. This parameter could increase cloud costs significantly. + :param script_args: The arguments that your own script consumes {"--arg1": "arg1 value"} + :param schedule: A time-based cron-like schedule for the job. For syntax rules see https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html + :param job_type: Specify if the job is streaming or batch. Possible values: glueetl, gluestreaming """ # Parameters for Pulumi Glue Job name: str description: str = None script_location: str = None - default_arguments: dict = field(default_factory=dict) extra_libraries: list[str] = field(default_factory=list) execution_class: str = "STANDARD" max_concurrent_runs: int = 1, @@ -63,22 +64,29 @@ class GlueJobDefinition: enable_continuous_log_filter: bool = True enable_metrics: bool = False enable_observability_metrics: bool = False + script_args: dict = field(default_factory=dict) + schedule: str = None + job_type: str = "glueetl" @dataclass class GlueComponentArgs: """ Glue job definitions and infrastructure dependencies such as IAM roles, external connections, code and data storage. + + :param jobs: + :param execution_role_arn: str = None + :param database_name: str = None + :param code_repository_bucket_name: str = None + :param data_bucket_name: str = None + :param kafka_checkpoints_bucket: str = None """ jobs: list[GlueJobDefinition] - connections: list[aws.glue.Connection] = None execution_role_arn: str = None - cloudwatch_log_group_name: str = None database_name: str = None code_repository_bucket_name: str = None data_bucket_name: str = None - # TODO: implement this - kafka_config: dict = None # this needs to be an object holding the Kafka configuration + kafka_checkpoints_bucket_name: str = None class GlueComponent(PulumiComponentResource): @@ -124,15 +132,16 @@ def jobs(self) -> list[aws.glue.Job]: """ Return all the Glue jobs for the application. """ - return [ - aws.glue.Job( + jobs = [] + for job in self.args.jobs: + glue_job = aws.glue.Job( resource_name=f"{self._name}-{job.name}-job", opts=ResourceOptions(parent=self), - name=f"{self._name}-{job.name}-job", + name=self.__get_job_name(job), glue_version=job.glue_version, role_arn=self.role.arn, command=aws.glue.JobCommandArgs( - name="glueetl", + name=job.job_type, python_version="3", script_location=self.__get_source_path(job) ), @@ -141,8 +150,12 @@ def jobs(self) -> list[aws.glue.Job]: worker_type=job.worker_type, execution_property=aws.glue.JobExecutionPropertyArgs(max_concurrent_runs=job.max_concurrent_runs), ) - for job in self.args.jobs - ] + jobs.append(glue_job) + self.__create_glue_trigger(job) + return jobs + + def __get_job_name(self, job: GlueJobDefinition): + return f"{self._name}-{job.name}-job" def __get_source_path(self, job: GlueJobDefinition) -> str: return f"s3://{self.code_repository_bucket}/{job.script_location}" if job.script_location \ @@ -150,13 +163,13 @@ def __get_source_path(self, job: GlueJobDefinition) -> str: @staticmethod def __get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: - """ - TODO: - - Handling of logging via log4j config file. - - At Brenntag we found out how expensive Glue logging can become - - An effective way to limit logging volume is by using a custom log4j configuration file - - File needs to be uploaded to s3 and passed via - - Passing extra arguments from application, such as specific Spark config parameters + """The map of default arguments for this job. + These are arguments that your own script consumes, as well as arguments that AWS Glue itself consumes. + + TODO: Handling of logging via log4j config file. + - At Brenntag we found out how expensive Glue logging can become + - An effective way to limit logging volume is by using a custom log4j configuration file + - File needs to be uploaded to s3 and passed via """ return { "--additional-python-modules": ",".join(job.extra_libraries), @@ -167,13 +180,17 @@ def __get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: "--datalake-formats": "iceberg", "--enable-metrics": "true" if job.enable_metrics else "false", "--enable-observability-metrics": "true" if job.enable_observability_metrics else "false", + **job.script_args, } @property @cache def role(self) -> aws.iam.Role: """Return an execution role for Glue jobs.""" - return self.args.execution_role_arn or aws.iam.Role( + if self.args.execution_role_arn: + return aws.iam.Role.get(f"{self._name}-role", id=self.args.execution_role_arn) + + return aws.iam.Role( resource_name=f"{self._name}-role", opts=ResourceOptions(parent=self), name=f"{self._name}-ExecutionRole", @@ -256,22 +273,33 @@ def iceberg_database(self) -> aws.glue.CatalogDatabase: location_uri=f"s3://{self.iceberg_bucket.bucket}/", ) + # Orchestration + def __create_glue_trigger(self, job: GlueJobDefinition) -> Optional[aws.glue.Trigger]: + """Return a Glue Trigger object.""" + return aws.glue.Trigger( + f"{job.name}-glue-trigger", + type='SCHEDULED', + schedule=job.schedule, + actions=[aws.glue.TriggerActionArgs( + job_name=self.__get_job_name(job), + )], + start_on_creation=True + ) if job.schedule else None + # Kafka @property @cache - def glue_kafka_connection(self) -> aws.glue.Connection: - """Return a Kafka Connection object.""" - return NotImplementedError + def kafka_checkpoint_bucket(self) -> Optional[aws.s3.BucketV2]: + """Return a s3 bucket to store the checkpoints. - @property - @cache - def kafka_checkpoint_bucket(self) -> aws.s3.Bucket: - """Return an s3 bucket to store the checkpoints.""" - return NotImplementedError + Creates the bucket if at least one job is of type 'gluestreaming'. + """ + if "gluestreaming" in [job.job_type for job in self.args.jobs]: + if self.args.kafka_checkpoints_bucket_name: + return aws.s3.BucketV2.get(f"{self._name}-checkpoints-bucket", id=self.args.kafka_checkpoints_bucket_name) - # Orchestration - @property - @cache - def glue_trigger(self) -> aws.glue.Trigger: - """Return a Glue Trigger object.""" - return NotImplementedError + return aws.s3.BucketV2( + resource_name=f"{self._name}-checkpoints-bucket", + opts=ResourceOptions(parent=self), + bucket_prefix=f"{self._name}-checkpoints-bucket", + ) From 28859e3d3d7b0ffa9c536de7a9c10e341581be32 Mon Sep 17 00:00:00 2001 From: Andrea Callarelli Date: Fri, 20 Sep 2024 10:42:17 +0200 Subject: [PATCH 03/13] documentation --- .../cloud/aws/resources/glue_component.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/damavand/cloud/aws/resources/glue_component.py b/src/damavand/cloud/aws/resources/glue_component.py index bcc38a4..1ccaffb 100644 --- a/src/damavand/cloud/aws/resources/glue_component.py +++ b/src/damavand/cloud/aws/resources/glue_component.py @@ -74,15 +74,15 @@ class GlueComponentArgs: """ Glue job definitions and infrastructure dependencies such as IAM roles, external connections, code and data storage. - :param jobs: - :param execution_role_arn: str = None - :param database_name: str = None - :param code_repository_bucket_name: str = None - :param data_bucket_name: str = None - :param kafka_checkpoints_bucket: str = None + :param jobs: the list of GlueJobDefinition to deploy + :param execution_role: the IAM role attached to the Glue jobs if it exists, if not one will be createdw + :param database_name: name of the Glue database if it exists, if not one will be created + :param code_repository_bucket_name: name of the s3 code repository database if it exists, if not one will be created + :param data_bucket_name: name of the s3 bucket to store data, if it exists, if not one will be created + :param kafka_checkpoints_bucket_name: name of the s3 bucket to store checkpoints if it exists, if not one will be created """ jobs: list[GlueJobDefinition] - execution_role_arn: str = None + execution_role: str = None database_name: str = None code_repository_bucket_name: str = None data_bucket_name: str = None @@ -104,7 +104,6 @@ class GlueComponent(PulumiComponentResource): - Permissions: IAM role for Glue - Full access to S3 bucket with data - Full access to tables in Glue database - - CloudWatch Access to create log streams """ def __init__( self, @@ -123,6 +122,8 @@ def __init__( self.args = args self.code_repository_bucket self.iceberg_database + self.iceberg_bucket + self.kafka_checkpoint_bucket self.jobs # Compute @@ -187,13 +188,13 @@ def __get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: @cache def role(self) -> aws.iam.Role: """Return an execution role for Glue jobs.""" - if self.args.execution_role_arn: - return aws.iam.Role.get(f"{self._name}-role", id=self.args.execution_role_arn) + if self.args.execution_role: + return aws.iam.Role.get(f"{self._name}-role", id=self.args.execution_role) return aws.iam.Role( resource_name=f"{self._name}-role", opts=ResourceOptions(parent=self), - name=f"{self._name}-ExecutionRole", + name=f"{self._name}-role", assume_role_policy=json.dumps(self.assume_policy), managed_policy_arns=self.managed_policy_arns, ) @@ -221,7 +222,6 @@ def managed_policy_arns(self) -> list[str]: return [ aws.iam.ManagedPolicy.AWS_GLUE_SERVICE_ROLE, aws.iam.ManagedPolicy.AMAZON_S3_FULL_ACCESS, - aws.iam.ManagedPolicy.CLOUD_TRAIL_FULL_ACCESS, ] # Code Repository From 056d1feb5d2573b87c8a94128eda3880fa18ec80 Mon Sep 17 00:00:00 2001 From: Andrea Callarelli Date: Fri, 20 Sep 2024 10:44:51 +0200 Subject: [PATCH 04/13] documentation --- src/damavand/cloud/aws/resources/glue_component.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/damavand/cloud/aws/resources/glue_component.py b/src/damavand/cloud/aws/resources/glue_component.py index 1ccaffb..e6c83a0 100644 --- a/src/damavand/cloud/aws/resources/glue_component.py +++ b/src/damavand/cloud/aws/resources/glue_component.py @@ -97,11 +97,10 @@ class GlueComponent(PulumiComponentResource): - Code Repository: s3 bucket - Data Storage: s3 bucket - Kafka checkpoint storage: s3 bucket - - Compute: Glue Job - - Kafka connection + - Compute: Glue Jobs + - Orchestration: Triggers for the Glue Jobs - Metadata Catalog: Glue Database - - Monitoring: Cloudwatch Log Group - - Permissions: IAM role for Glue + - Permissions: IAM role for Glue Jobs - Full access to S3 bucket with data - Full access to tables in Glue database """ @@ -131,7 +130,7 @@ def __init__( @cache def jobs(self) -> list[aws.glue.Job]: """ - Return all the Glue jobs for the application. + Return all the Glue jobs for the application. Add a trigger for the job if a schedule is specified. """ jobs = [] for job in self.args.jobs: From a20022fdf0d0f06b6f6f1c00a62e059fd1c6b140 Mon Sep 17 00:00:00 2001 From: Andrea Callarelli Date: Fri, 20 Sep 2024 16:56:51 +0200 Subject: [PATCH 05/13] add glue connection --- .../cloud/aws/resources/glue_component.py | 118 ++++++++++++++---- 1 file changed, 94 insertions(+), 24 deletions(-) diff --git a/src/damavand/cloud/aws/resources/glue_component.py b/src/damavand/cloud/aws/resources/glue_component.py index e6c83a0..d409383 100644 --- a/src/damavand/cloud/aws/resources/glue_component.py +++ b/src/damavand/cloud/aws/resources/glue_component.py @@ -1,5 +1,7 @@ import json +import logging from typing import Any, Optional +from enum import Enum from functools import cache from dataclasses import dataclass, field @@ -8,6 +10,46 @@ from pulumi import ResourceOptions +class GlueWorkerType(Enum): + G_1X: str = "G.1X" + G_2X: str = "G.2X" + G_4X: str = "G.4X" + G_8X: str = "G.8X" + G_025X: str = "G.025X" + Z_2X: str = "Z.2X" + + +class GlueJobType(Enum): + GLUE_ETL: str = "glueetl" + GLUE_STREAMING: str = "gluestreaming" + + +class GlueExecutionClass(Enum): + STANDARD: str = "STANDARD" + FLEX: str = "FLEX" + + +class ConnectionType(Enum): + KAFKA: str = "KAFKA" + + +@dataclass +class ConnectorConfig: + """Configuration for the Connector. + + :param vpc_id: id of the vpc + :param subnet_id: id of the subnet + :param security_groups: list of security group ids + :param connection_properties: a dictionary with connection properties specific to the connector. + For more info see https://www.pulumi.com/registry/packages/aws/api-docs/glue/connection/ + """ + vpc_id: str + subnet_id: str + security_groups: list[str] + connector_type: ConnectionType = ConnectionType.KAFKA + connection_properties: dict = field(default_factory=list) + + @dataclass class GlueJobDefinition: """ @@ -34,7 +76,7 @@ class GlueJobDefinition: * For the G.025X worker type, each worker maps to 0.25 DPU (2 vCPU, 4GB of memory, 64 GB disk), and provides 1 executor per worker. Recommended for low volume streaming jobs. Only available for Glue version 3.0. * For the Z.2X worker type, each worker maps to 2 M-DPU (8vCPU, 64 GB of m emory, 128 GB disk), and provides up to 8 Ray workers based on the autoscaler. :param enable_glue_datacatalog: To use the Glue catalog as the metadata catalog - :param enable_continuous_cloudwatch_log: + :param enable_continuous_cloudwatch_log: To enable logging continuously. :param enable_continuous_log_filter: When set to true it reduces the amount of logging. For more information see https://repost.aws/knowledge-center/glue-reduce-cloudwatch-logs :param enable_metrics: Enables observability metrics about the worker nodes. @@ -42,14 +84,14 @@ class GlueJobDefinition: This parameter could increase cloud costs significantly. :param script_args: The arguments that your own script consumes {"--arg1": "arg1 value"} :param schedule: A time-based cron-like schedule for the job. For syntax rules see https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html - :param job_type: Specify if the job is streaming or batch. Possible values: glueetl, gluestreaming + :param job_type: Specify if the job is streaming or batch. """ # Parameters for Pulumi Glue Job name: str description: str = None script_location: str = None extra_libraries: list[str] = field(default_factory=list) - execution_class: str = "STANDARD" + execution_class: GlueExecutionClass = GlueExecutionClass.STANDARD max_concurrent_runs: int = 1, glue_version: str = "4.0" enable_auto_scaling: bool = True @@ -58,15 +100,15 @@ class GlueJobDefinition: number_of_workers: int = 2 tags: dict = None timeout: int = 2880 - worker_type: str = "G.1X" + worker_type: GlueWorkerType = GlueWorkerType.G_1X enable_glue_datacatalog: bool = True - enable_continuous_cloudwatch_log: bool = False - enable_continuous_log_filter: bool = True + enable_continuous_cloudwatch_log: bool = False + enable_continuous_log_filter: bool = True enable_metrics: bool = False enable_observability_metrics: bool = False script_args: dict = field(default_factory=dict) schedule: str = None - job_type: str = "glueetl" + job_type: GlueJobType = GlueJobType.GLUE_ETL @dataclass @@ -80,6 +122,7 @@ class GlueComponentArgs: :param code_repository_bucket_name: name of the s3 code repository database if it exists, if not one will be created :param data_bucket_name: name of the s3 bucket to store data, if it exists, if not one will be created :param kafka_checkpoints_bucket_name: name of the s3 bucket to store checkpoints if it exists, if not one will be created + :param connector_config: Connector configuration to run the Glue Jobs in a VPC. """ jobs: list[GlueJobDefinition] execution_role: str = None @@ -87,6 +130,7 @@ class GlueComponentArgs: code_repository_bucket_name: str = None data_bucket_name: str = None kafka_checkpoints_bucket_name: str = None + connector_config: Optional[ConnectorConfig] = None class GlueComponent(PulumiComponentResource): @@ -104,11 +148,12 @@ class GlueComponent(PulumiComponentResource): - Full access to S3 bucket with data - Full access to tables in Glue database """ + def __init__( - self, - name: str, - args: GlueComponentArgs, - opts: Optional[ResourceOptions] = None, + self, + name: str, + args: GlueComponentArgs, + opts: Optional[ResourceOptions] = None, ) -> None: super().__init__( f"Damavand:Spark:{GlueComponent.__name__}", @@ -123,6 +168,7 @@ def __init__( self.iceberg_database self.iceberg_bucket self.kafka_checkpoint_bucket + self.connection self.jobs # Compute @@ -137,32 +183,33 @@ def jobs(self) -> list[aws.glue.Job]: glue_job = aws.glue.Job( resource_name=f"{self._name}-{job.name}-job", opts=ResourceOptions(parent=self), - name=self.__get_job_name(job), + name=self._get_job_name(job), glue_version=job.glue_version, role_arn=self.role.arn, command=aws.glue.JobCommandArgs( - name=job.job_type, + name=job.job_type.value, python_version="3", - script_location=self.__get_source_path(job) + script_location=self._get_source_path(job) ), - default_arguments=self.__get_default_arguments(job), + default_arguments=self._get_default_arguments(job), number_of_workers=job.number_of_workers, - worker_type=job.worker_type, + worker_type=job.worker_type.value, execution_property=aws.glue.JobExecutionPropertyArgs(max_concurrent_runs=job.max_concurrent_runs), + connections=[self.connection.name] if self.connection else [], ) jobs.append(glue_job) - self.__create_glue_trigger(job) + self._create_glue_trigger(job) return jobs - def __get_job_name(self, job: GlueJobDefinition): + def _get_job_name(self, job: GlueJobDefinition): return f"{self._name}-{job.name}-job" - def __get_source_path(self, job: GlueJobDefinition) -> str: + def _get_source_path(self, job: GlueJobDefinition) -> str: return f"s3://{self.code_repository_bucket}/{job.script_location}" if job.script_location \ else f"s3://{self.code_repository_bucket}/{job.name}.py" @staticmethod - def __get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: + def _get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: """The map of default arguments for this job. These are arguments that your own script consumes, as well as arguments that AWS Glue itself consumes. @@ -179,7 +226,7 @@ def __get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: "--enable-glue-datacatalog": "true" if job.enable_continuous_log_filter else "false", "--datalake-formats": "iceberg", "--enable-metrics": "true" if job.enable_metrics else "false", - "--enable-observability-metrics": "true" if job.enable_observability_metrics else "false", + "--enable-observability-metrics": "true" if job.enable_observability_metrics else "false", **job.script_args, } @@ -273,14 +320,14 @@ def iceberg_database(self) -> aws.glue.CatalogDatabase: ) # Orchestration - def __create_glue_trigger(self, job: GlueJobDefinition) -> Optional[aws.glue.Trigger]: + def _create_glue_trigger(self, job: GlueJobDefinition) -> Optional[aws.glue.Trigger]: """Return a Glue Trigger object.""" return aws.glue.Trigger( f"{job.name}-glue-trigger", type='SCHEDULED', schedule=job.schedule, actions=[aws.glue.TriggerActionArgs( - job_name=self.__get_job_name(job), + job_name=self._get_job_name(job), )], start_on_creation=True ) if job.schedule else None @@ -295,10 +342,33 @@ def kafka_checkpoint_bucket(self) -> Optional[aws.s3.BucketV2]: """ if "gluestreaming" in [job.job_type for job in self.args.jobs]: if self.args.kafka_checkpoints_bucket_name: - return aws.s3.BucketV2.get(f"{self._name}-checkpoints-bucket", id=self.args.kafka_checkpoints_bucket_name) + return aws.s3.BucketV2.get(f"{self._name}-checkpoints-bucket", + id=self.args.kafka_checkpoints_bucket_name) return aws.s3.BucketV2( resource_name=f"{self._name}-checkpoints-bucket", opts=ResourceOptions(parent=self), bucket_prefix=f"{self._name}-checkpoints-bucket", ) + + @property + @cache + def connection(self) -> Optional[aws.glue.Connection]: + """Return a s3 bucket to store the checkpoints. + + Creates the bucket if at least one job is of type 'gluestreaming'. + """ + if not self.args.connector_config: + logging.warning("No connector config provided. Glue jobs will run outside a VPC. This is not recommended.") + return None + + return aws.glue.Connection( + resource_name=f"{self._name}-kafka-connection", + name=f"{self._name}-kafka-connection", + connection_type=self.args.connector_config.connector_type.value, + connection_properties=self.args.connector_config.connection_properties, + physical_connection_requirements={ + "security_group_id_lists": self.args.connector_config.security_groups, + "subnet_id": self.args.connector_config.subnet_id + } + ) From 850744530d94bdd4c02c390c5b6f96f144a221a6 Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Fri, 4 Oct 2024 12:44:53 +0200 Subject: [PATCH 06/13] refactor: fix format --- .../cloud/aws/resources/glue_component.py | 104 ++++++++++++------ 1 file changed, 69 insertions(+), 35 deletions(-) diff --git a/src/damavand/cloud/aws/resources/glue_component.py b/src/damavand/cloud/aws/resources/glue_component.py index d409383..0ffd806 100644 --- a/src/damavand/cloud/aws/resources/glue_component.py +++ b/src/damavand/cloud/aws/resources/glue_component.py @@ -43,6 +43,7 @@ class ConnectorConfig: :param connection_properties: a dictionary with connection properties specific to the connector. For more info see https://www.pulumi.com/registry/packages/aws/api-docs/glue/connection/ """ + vpc_id: str subnet_id: str security_groups: list[str] @@ -86,13 +87,14 @@ class GlueJobDefinition: :param schedule: A time-based cron-like schedule for the job. For syntax rules see https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html :param job_type: Specify if the job is streaming or batch. """ + # Parameters for Pulumi Glue Job name: str description: str = None script_location: str = None extra_libraries: list[str] = field(default_factory=list) execution_class: GlueExecutionClass = GlueExecutionClass.STANDARD - max_concurrent_runs: int = 1, + max_concurrent_runs: int = (1,) glue_version: str = "4.0" enable_auto_scaling: bool = True max_capacity: int = 5 @@ -124,6 +126,7 @@ class GlueComponentArgs: :param kafka_checkpoints_bucket_name: name of the s3 bucket to store checkpoints if it exists, if not one will be created :param connector_config: Connector configuration to run the Glue Jobs in a VPC. """ + jobs: list[GlueJobDefinition] execution_role: str = None database_name: str = None @@ -150,10 +153,10 @@ class GlueComponent(PulumiComponentResource): """ def __init__( - self, - name: str, - args: GlueComponentArgs, - opts: Optional[ResourceOptions] = None, + self, + name: str, + args: GlueComponentArgs, + opts: Optional[ResourceOptions] = None, ) -> None: super().__init__( f"Damavand:Spark:{GlueComponent.__name__}", @@ -189,12 +192,14 @@ def jobs(self) -> list[aws.glue.Job]: command=aws.glue.JobCommandArgs( name=job.job_type.value, python_version="3", - script_location=self._get_source_path(job) + script_location=self._get_source_path(job), ), default_arguments=self._get_default_arguments(job), number_of_workers=job.number_of_workers, worker_type=job.worker_type.value, - execution_property=aws.glue.JobExecutionPropertyArgs(max_concurrent_runs=job.max_concurrent_runs), + execution_property=aws.glue.JobExecutionPropertyArgs( + max_concurrent_runs=job.max_concurrent_runs + ), connections=[self.connection.name] if self.connection else [], ) jobs.append(glue_job) @@ -205,8 +210,11 @@ def _get_job_name(self, job: GlueJobDefinition): return f"{self._name}-{job.name}-job" def _get_source_path(self, job: GlueJobDefinition) -> str: - return f"s3://{self.code_repository_bucket}/{job.script_location}" if job.script_location \ + return ( + f"s3://{self.code_repository_bucket}/{job.script_location}" + if job.script_location else f"s3://{self.code_repository_bucket}/{job.name}.py" + ) @staticmethod def _get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: @@ -221,12 +229,20 @@ def _get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: return { "--additional-python-modules": ",".join(job.extra_libraries), "--enable-auto-scaling": "true" if job.enable_auto_scaling else "false", - "--enable-continuous-cloudwatch-log": "true" if job.enable_continuous_cloudwatch_log else "false", - "--enable-continuous-log-filter": "true" if job.enable_continuous_log_filter else "false", - "--enable-glue-datacatalog": "true" if job.enable_continuous_log_filter else "false", + "--enable-continuous-cloudwatch-log": ( + "true" if job.enable_continuous_cloudwatch_log else "false" + ), + "--enable-continuous-log-filter": ( + "true" if job.enable_continuous_log_filter else "false" + ), + "--enable-glue-datacatalog": ( + "true" if job.enable_continuous_log_filter else "false" + ), "--datalake-formats": "iceberg", "--enable-metrics": "true" if job.enable_metrics else "false", - "--enable-observability-metrics": "true" if job.enable_observability_metrics else "false", + "--enable-observability-metrics": ( + "true" if job.enable_observability_metrics else "false" + ), **job.script_args, } @@ -279,7 +295,9 @@ def code_repository_bucket(self) -> aws.s3.BucketV2: NOTE: using `bucket_prefix` to avoid name conflict as the bucket name must be globally unique. """ if self.args.code_repository_bucket_name: - return aws.s3.BucketV2.get(f"{self._name}-code-bucket", id=self.args.code_repository_bucket_name) + return aws.s3.BucketV2.get( + f"{self._name}-code-bucket", id=self.args.code_repository_bucket_name + ) return self.args.code_repository_bucket_name or aws.s3.BucketV2( resource_name=f"{self._name}-code-bucket", @@ -296,7 +314,9 @@ def iceberg_bucket(self) -> aws.s3.BucketV2: NOTE: using `bucket_prefix` to avoid name conflict as the bucket name must be globally unique. """ if self.args.data_bucket_name: - return aws.s3.BucketV.get(f"{self._name}-data-bucket", id=self.args.data_bucket_name) + return aws.s3.BucketV.get( + f"{self._name}-data-bucket", id=self.args.data_bucket_name + ) return aws.s3.BucketV2( resource_name=f"{self._name}-data-bucket", @@ -310,7 +330,9 @@ def iceberg_bucket(self) -> aws.s3.BucketV2: def iceberg_database(self) -> aws.glue.CatalogDatabase: """Return a Glue database for Iceberg tables to store data processed by Glue jobs.""" if self.args.database_name: - return aws.cloudwatch.CatalogDatabase.get(f"{self._name}-database", id=self.args.database_name) + return aws.cloudwatch.CatalogDatabase.get( + f"{self._name}-database", id=self.args.database_name + ) return aws.glue.CatalogDatabase( resource_name=f"{self._name}-database", @@ -320,17 +342,25 @@ def iceberg_database(self) -> aws.glue.CatalogDatabase: ) # Orchestration - def _create_glue_trigger(self, job: GlueJobDefinition) -> Optional[aws.glue.Trigger]: + def _create_glue_trigger( + self, job: GlueJobDefinition + ) -> Optional[aws.glue.Trigger]: """Return a Glue Trigger object.""" - return aws.glue.Trigger( - f"{job.name}-glue-trigger", - type='SCHEDULED', - schedule=job.schedule, - actions=[aws.glue.TriggerActionArgs( - job_name=self._get_job_name(job), - )], - start_on_creation=True - ) if job.schedule else None + return ( + aws.glue.Trigger( + f"{job.name}-glue-trigger", + type="SCHEDULED", + schedule=job.schedule, + actions=[ + aws.glue.TriggerActionArgs( + job_name=self._get_job_name(job), + ) + ], + start_on_creation=True, + ) + if job.schedule + else None + ) # Kafka @property @@ -342,14 +372,16 @@ def kafka_checkpoint_bucket(self) -> Optional[aws.s3.BucketV2]: """ if "gluestreaming" in [job.job_type for job in self.args.jobs]: if self.args.kafka_checkpoints_bucket_name: - return aws.s3.BucketV2.get(f"{self._name}-checkpoints-bucket", - id=self.args.kafka_checkpoints_bucket_name) + return aws.s3.BucketV2.get( + f"{self._name}-checkpoints-bucket", + id=self.args.kafka_checkpoints_bucket_name, + ) - return aws.s3.BucketV2( - resource_name=f"{self._name}-checkpoints-bucket", - opts=ResourceOptions(parent=self), - bucket_prefix=f"{self._name}-checkpoints-bucket", - ) + return aws.s3.BucketV2( + resource_name=f"{self._name}-checkpoints-bucket", + opts=ResourceOptions(parent=self), + bucket_prefix=f"{self._name}-checkpoints-bucket", + ) @property @cache @@ -359,7 +391,9 @@ def connection(self) -> Optional[aws.glue.Connection]: Creates the bucket if at least one job is of type 'gluestreaming'. """ if not self.args.connector_config: - logging.warning("No connector config provided. Glue jobs will run outside a VPC. This is not recommended.") + logging.warning( + "No connector config provided. Glue jobs will run outside a VPC. This is not recommended." + ) return None return aws.glue.Connection( @@ -369,6 +403,6 @@ def connection(self) -> Optional[aws.glue.Connection]: connection_properties=self.args.connector_config.connection_properties, physical_connection_requirements={ "security_group_id_lists": self.args.connector_config.security_groups, - "subnet_id": self.args.connector_config.subnet_id - } + "subnet_id": self.args.connector_config.subnet_id, + }, ) From c4ca573a14ed912deda29502258f1a46873175ef Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Fri, 4 Oct 2024 16:08:28 +0200 Subject: [PATCH 07/13] feat: add Java for running spark related applications --- devenv.nix | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/devenv.nix b/devenv.nix index 3206765..8e01384 100644 --- a/devenv.nix +++ b/devenv.nix @@ -69,6 +69,10 @@ }; }; + # Java is required for PySpark + languages.java.enable = true; + languages.java.jdk.package = pkgs.jdk8; # Java version running on AWS Glue + enterShell = '' hello pdm install From 3aae7c0653a140724942597a58588c049990046c Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Tue, 8 Oct 2024 18:00:08 +0200 Subject: [PATCH 08/13] fix: application example --- examples/sparkle/Pulumi.yaml | 12 ++---- examples/sparkle/__main__.py | 8 ++-- examples/sparkle/applications/orders.py | 15 +++++++- examples/sparkle/applications/products.py | 14 +++++-- pdm.lock | 42 ++++++++++++++------- pyproject.toml | 1 + src/damavand/base/controllers/spark.py | 1 - src/damavand/cloud/aws/controllers/spark.py | 2 + src/damavand/factories.py | 1 + 9 files changed, 63 insertions(+), 33 deletions(-) diff --git a/examples/sparkle/Pulumi.yaml b/examples/sparkle/Pulumi.yaml index 4f93a19..196c4a9 100644 --- a/examples/sparkle/Pulumi.yaml +++ b/examples/sparkle/Pulumi.yaml @@ -1,11 +1,5 @@ -name: object_storage +name: simple-spark-application runtime: name: python - options: - toolchain: pip - virtualenv: venv -description: A minimal Azure Native Python Pulumi program -config: - pulumi:tags: - value: - pulumi:template: azure-python +description: A minimal spark application that uses Sparkle +region: eu-west-1 diff --git a/examples/sparkle/__main__.py b/examples/sparkle/__main__.py index 2ff67c2..46b029e 100644 --- a/examples/sparkle/__main__.py +++ b/examples/sparkle/__main__.py @@ -1,4 +1,3 @@ -import os from damavand.cloud.provider import AwsProvider from damavand.factories import SparkControllerFactory @@ -10,7 +9,7 @@ def main() -> None: spark_factory = SparkControllerFactory( provider=AwsProvider( app_name="my-app", - region="us-west-2", + region="eu-west-1", ), tags={"env": "dev"}, ) @@ -22,10 +21,9 @@ def main() -> None: CustomerOrders(), ], ) + # app_name = os.getenv("APP_NAME", "products-app") # Get app name on runtime - app_name = os.getenv("APP_NAME", "default_app") # Get app name on runtime - - spark_controller.run_application(app_name) + # spark_controller.run_application(app_name) spark_controller.provision() diff --git a/examples/sparkle/applications/orders.py b/examples/sparkle/applications/orders.py index d82514d..5c424df 100644 --- a/examples/sparkle/applications/orders.py +++ b/examples/sparkle/applications/orders.py @@ -1,4 +1,5 @@ -from sparkle.config import Config +from sparkle.config import Config, IcebergConfig, KafkaReaderConfig +from sparkle.config.kafka_config import KafkaConfig, Credentials from sparkle.writer.iceberg_writer import IcebergWriter from sparkle.application import Sparkle from sparkle.reader.kafka_reader import KafkaReader @@ -15,6 +16,18 @@ def __init__(self): version="0.0.1", database_bucket="s3://test-bucket", checkpoints_bucket="s3://test-checkpoints", + iceberg_output=IcebergConfig( + database_name="all_products", + database_path="", + table_name="orders_v1", + ), + kafka_input=KafkaReaderConfig( + KafkaConfig( + bootstrap_servers="localhost:9119", + credentials=Credentials("test", "test"), + ), + kafka_topic="src_orders_v1", + ), ), readers={"orders": KafkaReader}, writers=[IcebergWriter], diff --git a/examples/sparkle/applications/products.py b/examples/sparkle/applications/products.py index 9362fd8..e34bdcc 100644 --- a/examples/sparkle/applications/products.py +++ b/examples/sparkle/applications/products.py @@ -1,7 +1,6 @@ from sparkle.application import Sparkle -from sparkle.config import Config +from sparkle.config import Config, IcebergConfig, TableConfig from sparkle.writer.iceberg_writer import IcebergWriter -from sparkle.writer.kafka_writer import KafkaStreamPublisher from sparkle.reader.table_reader import TableReader from pyspark.sql import DataFrame @@ -16,11 +15,20 @@ def __init__(self): version="0.0.1", database_bucket="s3://test-bucket", checkpoints_bucket="s3://test-checkpoints", + iceberg_output=IcebergConfig( + database_name="all_products", + database_path="", + table_name="products_v1", + ), + hive_table_input=TableConfig( + database="source_database", + table="products_v1", + bucket="", + ), ), readers={"products": TableReader}, writers=[ IcebergWriter, - KafkaStreamPublisher, ], ) diff --git a/pdm.lock b/pdm.lock index eac070a..7608580 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev", "llm"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:8876bfee687350c2cca3535991e47503fc5a54baa28d6f0625216d56ef84630a" +content_hash = "sha256:eded6b337337dee9c917e85dfb3d774f698d96ba4f8d7ad37fcdc1e20cba8a12" [[metadata.targets]] requires_python = ">=3.11.0" @@ -139,7 +139,7 @@ name = "certifi" version = "2024.8.30" requires_python = ">=3.6" summary = "Python package for providing Mozilla's CA Bundle." -groups = ["default", "dev", "llm"] +groups = ["dev", "llm"] files = [ {file = "certifi-2024.8.30-py3-none-any.whl", hash = "sha256:922820b53db7a7257ffbda3f597266d435245903d80737e34f8a45ff3e3230d8"}, {file = "certifi-2024.8.30.tar.gz", hash = "sha256:bec941d2aa8195e248a60b31ff9f0558284cf01a52591ceda73ea9afffd69fd9"}, @@ -198,7 +198,7 @@ name = "charset-normalizer" version = "3.3.2" requires_python = ">=3.7.0" summary = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." -groups = ["default", "dev", "llm"] +groups = ["dev", "llm"] files = [ {file = "charset-normalizer-3.3.2.tar.gz", hash = "sha256:f30c3cb33b24454a82faecaf01b19c18562b1e89558fb6c56de4d9118a032fd5"}, {file = "charset_normalizer-3.3.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:802fe99cca7457642125a8a88a084cef28ff0cf9407060f7b93dca5aa25480db"}, @@ -432,6 +432,25 @@ files = [ {file = "cryptography-43.0.1.tar.gz", hash = "sha256:203e92a75716d8cfb491dc47c79e17d0d9207ccffcbcb35f598fbe463ae3444d"}, ] +[[package]] +name = "damavand" +version = "0.0.0" +requires_python = ">=3.11.0" +path = "./." +summary = "Damavand is an opinionated cloud-agnostic pythonic implementation of ARC design pattern for developing cloud-native applications." +groups = ["default"] +dependencies = [ + "boto3>=1.34.147", + "flask>=3.0.3", + "psutil>=6.0.0", + "pulumi-aws>=6.47.0", + "pulumi-azure-native>=2.51.0", + "pulumi-random>=4.16.3", + "pulumi>=3.127.0", + "rich>=13.7.1", + "sparkle @ git+https://github.com/DataChefHQ/sparkle.git@v0.6.1", +] + [[package]] name = "debugpy" version = "1.8.6" @@ -568,7 +587,7 @@ name = "idna" version = "3.10" requires_python = ">=3.6" summary = "Internationalized Domain Names in Applications (IDNA)" -groups = ["default", "dev", "llm"] +groups = ["dev", "llm"] files = [ {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"}, {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"}, @@ -1127,7 +1146,7 @@ files = [ name = "py4j" version = "0.10.9.5" summary = "Enables Python programs to dynamically access arbitrary Java objects" -groups = ["default", "dev"] +groups = ["dev"] files = [ {file = "py4j-0.10.9.5-py2.py3-none-any.whl", hash = "sha256:52d171a6a2b031d8a5d1de6efe451cf4f5baff1a2819aabc3741c8406539ba04"}, {file = "py4j-0.10.9.5.tar.gz", hash = "sha256:276a4a3c5a2154df1860ef3303a927460e02e97b047dc0a47c1c3fb8cce34db6"}, @@ -1261,7 +1280,7 @@ name = "pyspark" version = "3.3.2" requires_python = ">=3.7" summary = "Apache Spark Python API" -groups = ["default", "dev"] +groups = ["dev"] dependencies = [ "py4j==0.10.9.5", ] @@ -1439,7 +1458,7 @@ name = "requests" version = "2.32.3" requires_python = ">=3.8" summary = "Python HTTP for Humans." -groups = ["default", "dev", "llm"] +groups = ["dev", "llm"] dependencies = [ "certifi>=2017.4.17", "charset-normalizer<4,>=2", @@ -1456,7 +1475,7 @@ name = "responses" version = "0.25.3" requires_python = ">=3.8" summary = "A utility library for mocking out the `requests` Python library." -groups = ["default", "dev"] +groups = ["dev"] dependencies = [ "pyyaml", "requests<3.0,>=2.30.0", @@ -1655,13 +1674,8 @@ requires_python = "<4.0,>=3.10.14" git = "https://github.com/DataChefHQ/sparkle.git" ref = "v0.6.1" revision = "7c59a62035142b51af8477f62829be46fa214b43" -summary = "✨ A meta framework for Apache Spark, helping data engineers to focus on solving business problems with highest quality!" +summary = "" groups = ["default"] -dependencies = [ - "pyspark==3.3.2", - "requests<3.0.0,>=2.32.3", - "responses<1.0.0,>=0.25.3", -] [[package]] name = "tblib" diff --git a/pyproject.toml b/pyproject.toml index e7016b4..1d14a0b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "pulumi-azure-native>=2.51.0", "pulumi-random>=4.16.3", "sparkle @ git+https://github.com/DataChefHQ/sparkle.git@v0.6.1", + "damavand @ file:///${PROJECT_ROOT}/", ] requires-python = ">=3.11.0" readme = "README.md" diff --git a/src/damavand/base/controllers/spark.py b/src/damavand/base/controllers/spark.py index d591fe4..84c12ba 100644 --- a/src/damavand/base/controllers/spark.py +++ b/src/damavand/base/controllers/spark.py @@ -54,7 +54,6 @@ def application_with_id(self, app_id: str) -> Sparkle: Returns: Sparkle: The Spark application. """ - for app in self.applications: if app.config.app_id == app_id: return app diff --git a/src/damavand/cloud/aws/controllers/spark.py b/src/damavand/cloud/aws/controllers/spark.py index 1f4713d..fea18be 100644 --- a/src/damavand/cloud/aws/controllers/spark.py +++ b/src/damavand/cloud/aws/controllers/spark.py @@ -24,11 +24,13 @@ def __init__( **kwargs, ) -> None: super().__init__(name, applications, tags, **kwargs) + print("\n\n\nShit") self._glue_client = boto3.client("glue", region_name=region) @buildtime @cache def resource(self) -> PulumiResource: + print("\n\n\nI'm called") if not self.applications: raise BuildtimeException("No applications found to create Glue jobs.") diff --git a/src/damavand/factories.py b/src/damavand/factories.py index 0367a2a..79b0ed7 100644 --- a/src/damavand/factories.py +++ b/src/damavand/factories.py @@ -16,6 +16,7 @@ def _new_aws_controller( ) -> SparkController: return AwsSparkController( name=name, + applications=applications, region=region, tags=tags, **kwargs, From 164b37f0f9c3936a347e1c4d6d07e8aeba545a5a Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Wed, 9 Oct 2024 11:06:34 +0200 Subject: [PATCH 09/13] fix: Pulumi args --- src/damavand/cloud/aws/resources/glue_component.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/damavand/cloud/aws/resources/glue_component.py b/src/damavand/cloud/aws/resources/glue_component.py index 0ffd806..029ea2a 100644 --- a/src/damavand/cloud/aws/resources/glue_component.py +++ b/src/damavand/cloud/aws/resources/glue_component.py @@ -90,17 +90,17 @@ class GlueJobDefinition: # Parameters for Pulumi Glue Job name: str - description: str = None - script_location: str = None + description: str = "" + script_location: str = "" extra_libraries: list[str] = field(default_factory=list) execution_class: GlueExecutionClass = GlueExecutionClass.STANDARD - max_concurrent_runs: int = (1,) + max_concurrent_runs: int = 1 glue_version: str = "4.0" enable_auto_scaling: bool = True max_capacity: int = 5 max_retries: int = 0 number_of_workers: int = 2 - tags: dict = None + tags: dict | None = None timeout: int = 2880 worker_type: GlueWorkerType = GlueWorkerType.G_1X enable_glue_datacatalog: bool = True @@ -109,7 +109,7 @@ class GlueJobDefinition: enable_metrics: bool = False enable_observability_metrics: bool = False script_args: dict = field(default_factory=dict) - schedule: str = None + schedule: str | None = None job_type: GlueJobType = GlueJobType.GLUE_ETL @@ -203,7 +203,8 @@ def jobs(self) -> list[aws.glue.Job]: connections=[self.connection.name] if self.connection else [], ) jobs.append(glue_job) - self._create_glue_trigger(job) + if job.schedule: + self._create_glue_trigger(job) return jobs def _get_job_name(self, job: GlueJobDefinition): From 93219a184be64733beae939bad8c99e263395f6d Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Thu, 10 Oct 2024 10:06:27 +0200 Subject: [PATCH 10/13] refactor: Fix comments and types --- .../cloud/aws/resources/glue_component.py | 284 ++++++++++-------- 1 file changed, 163 insertions(+), 121 deletions(-) diff --git a/src/damavand/cloud/aws/resources/glue_component.py b/src/damavand/cloud/aws/resources/glue_component.py index 029ea2a..d839877 100644 --- a/src/damavand/cloud/aws/resources/glue_component.py +++ b/src/damavand/cloud/aws/resources/glue_component.py @@ -11,84 +11,84 @@ class GlueWorkerType(Enum): - G_1X: str = "G.1X" - G_2X: str = "G.2X" - G_4X: str = "G.4X" - G_8X: str = "G.8X" - G_025X: str = "G.025X" - Z_2X: str = "Z.2X" + """Enum representing Glue worker types.""" + + G_1X = "G.1X" + G_2X = "G.2X" + G_4X = "G.4X" + G_8X = "G.8X" + G_025X = "G.025X" + Z_2X = "Z.2X" class GlueJobType(Enum): - GLUE_ETL: str = "glueetl" - GLUE_STREAMING: str = "gluestreaming" + """Enum representing Glue job types.""" + + GLUE_ETL = "glueetl" + GLUE_STREAMING = "gluestreaming" class GlueExecutionClass(Enum): - STANDARD: str = "STANDARD" - FLEX: str = "FLEX" + """Enum representing Glue execution classes.""" + + STANDARD = "STANDARD" + FLEX = "FLEX" class ConnectionType(Enum): - KAFKA: str = "KAFKA" + """Enum representing connection types.""" + + KAFKA = "KAFKA" @dataclass class ConnectorConfig: """Configuration for the Connector. - :param vpc_id: id of the vpc - :param subnet_id: id of the subnet - :param security_groups: list of security group ids - :param connection_properties: a dictionary with connection properties specific to the connector. - For more info see https://www.pulumi.com/registry/packages/aws/api-docs/glue/connection/ + Attributes: + vpc_id (str): VPC ID. + subnet_id (str): Subnet ID. + security_groups (list[str]): List of security group IDs. + connector_type (ConnectionType): Connector type. Default is ConnectionType.KAFKA. + connection_properties (dict): Connection properties. Default is empty dict. """ vpc_id: str subnet_id: str security_groups: list[str] connector_type: ConnectionType = ConnectionType.KAFKA - connection_properties: dict = field(default_factory=list) + connection_properties: dict = field(default_factory=dict) @dataclass class GlueJobDefinition: - """ - Parameters specific to the Glue job. - - :param name: The name you assign to this job. It must be unique in your account. - :param description: Description of the job. - :param script_location: the s3 path to the entrypoint script of your Glue application. - :param extra_libraries: A list of paths to the extra dependencies. If you use packages not supported by Glue, compress them, upload them to s3 and pass here the path to the zip file. - :param execution_class: Indicates whether the job is run with a standard or flexible execution class. The standard execution class is ideal for time-sensitive workloads that require fast job startup and dedicated resources. Valid value: `FLEX`, `STANDARD`. - :param max_concurrent_runs: Max amount of instances of this Job that can run concurrently. - :param glue_version: The version of glue to use, for example "1.0". Ray jobs should set this to 4.0 or greater. For information about available versions, see the [AWS Glue Release Notes](https://docs.aws.amazon.com/glue/latest/dg/release-notes.html). - :param max_capacity: The maximum number of AWS Glue data processing units (DPUs) that can be allocated when this job runs. `Required` when `pythonshell` is set, accept either `0.0625` or `1.0`. Use `number_of_workers` and `worker_type` arguments instead with `glue_version` `2.0` and above. - :param max_retries: The maximum number of times to retry this job if it fails. - :param number_of_workers: The number of workers of a defined workerType that are allocated when a job runs. - :param tags: Key-value map of resource tags. If configured with a provider `default_tags` configuration block present, tags with matching keys will overwrite those defined at the provider-level. - :param timeout: The job timeout in minutes. The default is 2880 minutes (48 hours) for `glueetl` and `pythonshell` jobs, and null (unlimited) for `gluestreaming` jobs. - :param worker_type: The type of predefined worker that is allocated when a job runs. Accepts a value of Standard, G.1X, G.2X, or G.025X for Spark jobs. Accepts the value Z.2X for Ray jobs. - * For the Standard worker type, each worker provides 4 vCPU, 16 GB of memory and a 50GB disk, and 2 executors per worker. - * For the G.1X worker type, each worker maps to 1 DPU (4 vCPU, 16 GB of memory, 64 GB disk), and provides 1 executor per worker. Recommended for memory-intensive jobs. - * For the G.2X worker type, each worker maps to 2 DPU (8 vCPU, 32 GB of memory, 128 GB disk), and provides 1 executor per worker. Recommended for memory-intensive jobs. - * For the G.4X worker type, each worker maps to 4 DPU (16 vCPUs, 64 GB of memory) with 256GB disk (approximately 235GB free), and provides 1 executor per worker. Recommended for memory-intensive jobs. Only available for Glue version 3.0. Available AWS Regions: US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Canada (Central), Europe (Frankfurt), Europe (Ireland), and Europe (Stockholm). - * For the G.8X worker type, each worker maps to 8 DPU (32 vCPUs, 128 GB of memory) with 512GB disk (approximately 487GB free), and provides 1 executor per worker. Recommended for memory-intensive jobs. Only available for Glue version 3.0. Available AWS Regions: US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Canada (Central), Europe (Frankfurt), Europe (Ireland), and Europe (Stockholm). - * For the G.025X worker type, each worker maps to 0.25 DPU (2 vCPU, 4GB of memory, 64 GB disk), and provides 1 executor per worker. Recommended for low volume streaming jobs. Only available for Glue version 3.0. - * For the Z.2X worker type, each worker maps to 2 M-DPU (8vCPU, 64 GB of m emory, 128 GB disk), and provides up to 8 Ray workers based on the autoscaler. - :param enable_glue_datacatalog: To use the Glue catalog as the metadata catalog - :param enable_continuous_cloudwatch_log: To enable logging continuously. - :param enable_continuous_log_filter: When set to true it reduces the amount of logging. - For more information see https://repost.aws/knowledge-center/glue-reduce-cloudwatch-logs - :param enable_metrics: Enables observability metrics about the worker nodes. - :param enable_observability_metrics: Enables extra Spark-related observability metrics such as how long a tasks takes. - This parameter could increase cloud costs significantly. - :param script_args: The arguments that your own script consumes {"--arg1": "arg1 value"} - :param schedule: A time-based cron-like schedule for the job. For syntax rules see https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html - :param job_type: Specify if the job is streaming or batch. + """Parameters for the Glue job. + + Attributes: + name (str): Job name. + description (str): Job description. + script_location (str): S3 path to the entrypoint script. + extra_libraries (list[str]): Paths to extra dependencies. + execution_class (GlueExecutionClass): Execution class. Default is STANDARD. + max_concurrent_runs (int): Max concurrent runs. + glue_version (str): Glue version. + enable_auto_scaling (bool): Enable auto-scaling. + max_capacity (int): Max capacity. + max_retries (int): Max retries. + number_of_workers (int): Number of workers. + tags (Optional[dict]): Resource tags. + timeout (int): Job timeout in minutes. + worker_type (GlueWorkerType): Worker type. + enable_glue_datacatalog (bool): Use Glue Data Catalog. + enable_continuous_cloudwatch_log (bool): Enable continuous CloudWatch log. + enable_continuous_log_filter (bool): Reduce logging amount. + enable_metrics (bool): Enable metrics. + enable_observability_metrics (bool): Enable extra Spark metrics. + script_args (dict): Script arguments. + schedule (Optional[str]): Cron-like schedule. + job_type (GlueJobType): Job type. """ - # Parameters for Pulumi Glue Job name: str description: str = "" script_location: str = "" @@ -100,7 +100,7 @@ class GlueJobDefinition: max_capacity: int = 5 max_retries: int = 0 number_of_workers: int = 2 - tags: dict | None = None + tags: Optional[dict] = None timeout: int = 2880 worker_type: GlueWorkerType = GlueWorkerType.G_1X enable_glue_datacatalog: bool = True @@ -109,47 +109,49 @@ class GlueJobDefinition: enable_metrics: bool = False enable_observability_metrics: bool = False script_args: dict = field(default_factory=dict) - schedule: str | None = None + schedule: Optional[str] = None job_type: GlueJobType = GlueJobType.GLUE_ETL @dataclass class GlueComponentArgs: - """ - Glue job definitions and infrastructure dependencies such as IAM roles, external connections, code and data storage. - - :param jobs: the list of GlueJobDefinition to deploy - :param execution_role: the IAM role attached to the Glue jobs if it exists, if not one will be createdw - :param database_name: name of the Glue database if it exists, if not one will be created - :param code_repository_bucket_name: name of the s3 code repository database if it exists, if not one will be created - :param data_bucket_name: name of the s3 bucket to store data, if it exists, if not one will be created - :param kafka_checkpoints_bucket_name: name of the s3 bucket to store checkpoints if it exists, if not one will be created - :param connector_config: Connector configuration to run the Glue Jobs in a VPC. + """Glue job definitions and infrastructure dependencies. + + Attributes: + jobs (list[GlueJobDefinition]): List of Glue jobs to deploy. + execution_role (Optional[str]): IAM role for Glue jobs. + database_name (Optional[str]): Glue database name. + code_repository_bucket_name (Optional[str]): S3 code repository bucket name. + data_bucket_name (Optional[str]): S3 data bucket name. + kafka_checkpoints_bucket_name (Optional[str]): S3 checkpoints bucket name. + connector_config (Optional[ConnectorConfig]): Connector configuration. """ jobs: list[GlueJobDefinition] - execution_role: str = None - database_name: str = None - code_repository_bucket_name: str = None - data_bucket_name: str = None - kafka_checkpoints_bucket_name: str = None + execution_role: Optional[str] = None + database_name: Optional[str] = None + code_repository_bucket_name: Optional[str] = None + data_bucket_name: Optional[str] = None + kafka_checkpoints_bucket_name: Optional[str] = None connector_config: Optional[ConnectorConfig] = None class GlueComponent(PulumiComponentResource): - """ - An opinionated deployment of a fully functional PySpark applications on Glue. + """Deployment of PySpark applications on Glue. Resources deployed: - - Code Repository: s3 bucket - - Data Storage: s3 bucket - - Kafka checkpoint storage: s3 bucket - - Compute: Glue Jobs - - Orchestration: Triggers for the Glue Jobs - - Metadata Catalog: Glue Database - - Permissions: IAM role for Glue Jobs - - Full access to S3 bucket with data - - Full access to tables in Glue database + - Code Repository: S3 bucket. + - Data Storage: S3 bucket. + - Kafka checkpoint storage: S3 bucket. + - Compute: Glue Jobs. + - Orchestration: Triggers for the Glue Jobs. + - Metadata Catalog: Glue Database. + - Permissions: IAM role for Glue Jobs. + + Args: + name (str): The name of the resource. + args (GlueComponentArgs): The arguments for the component. + opts (Optional[ResourceOptions]): Resource options. """ def __init__( @@ -174,12 +176,13 @@ def __init__( self.connection self.jobs - # Compute @property @cache def jobs(self) -> list[aws.glue.Job]: - """ - Return all the Glue jobs for the application. Add a trigger for the job if a schedule is specified. + """Creates and returns all the Glue jobs and adds triggers if scheduled. + + Returns: + list[aws.glue.Job]: List of Glue jobs. """ jobs = [] for job in self.args.jobs: @@ -207,25 +210,41 @@ def jobs(self) -> list[aws.glue.Job]: self._create_glue_trigger(job) return jobs - def _get_job_name(self, job: GlueJobDefinition): + def _get_job_name(self, job: GlueJobDefinition) -> str: + """Generates the job name. + + Args: + job (GlueJobDefinition): The job definition. + + Returns: + str: The job name. + """ return f"{self._name}-{job.name}-job" def _get_source_path(self, job: GlueJobDefinition) -> str: + """Gets the source path for the job script. + + Args: + job (GlueJobDefinition): The job definition. + + Returns: + str: The S3 path to the job script. + """ return ( - f"s3://{self.code_repository_bucket}/{job.script_location}" + f"s3://{self.code_repository_bucket.bucket}/{job.script_location}" if job.script_location - else f"s3://{self.code_repository_bucket}/{job.name}.py" + else f"s3://{self.code_repository_bucket.bucket}/{job.name}.py" ) @staticmethod def _get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: - """The map of default arguments for this job. - These are arguments that your own script consumes, as well as arguments that AWS Glue itself consumes. + """Returns the map of default arguments for this job. - TODO: Handling of logging via log4j config file. - - At Brenntag we found out how expensive Glue logging can become - - An effective way to limit logging volume is by using a custom log4j configuration file - - File needs to be uploaded to s3 and passed via + Args: + job (GlueJobDefinition): The job definition. + + Returns: + dict[str, str]: The default arguments for the job. """ return { "--additional-python-modules": ",".join(job.extra_libraries), @@ -237,7 +256,7 @@ def _get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: "true" if job.enable_continuous_log_filter else "false" ), "--enable-glue-datacatalog": ( - "true" if job.enable_continuous_log_filter else "false" + "true" if job.enable_glue_datacatalog else "false" ), "--datalake-formats": "iceberg", "--enable-metrics": "true" if job.enable_metrics else "false", @@ -250,7 +269,11 @@ def _get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: @property @cache def role(self) -> aws.iam.Role: - """Return an execution role for Glue jobs.""" + """Returns an execution role for Glue jobs. + + Returns: + aws.iam.Role: The IAM role for Glue jobs. + """ if self.args.execution_role: return aws.iam.Role.get(f"{self._name}-role", id=self.args.execution_role) @@ -262,10 +285,13 @@ def role(self) -> aws.iam.Role: managed_policy_arns=self.managed_policy_arns, ) - # Permissions @property def assume_policy(self) -> dict[str, Any]: - """Return the assume role policy for Glue jobs.""" + """Returns the assume role policy for Glue jobs. + + Returns: + dict[str, Any]: The assume role policy. + """ return { "Version": "2012-10-17", "Statement": [ @@ -281,41 +307,45 @@ def assume_policy(self) -> dict[str, Any]: @property def managed_policy_arns(self) -> list[str]: - """Return a list of managed policy ARNs that defines the permissions for Glue jobs.""" + """Returns the managed policy ARNs defining permissions for Glue jobs. + + Returns: + list[str]: List of managed policy ARNs. + """ return [ aws.iam.ManagedPolicy.AWS_GLUE_SERVICE_ROLE, aws.iam.ManagedPolicy.AMAZON_S3_FULL_ACCESS, ] - # Code Repository @property @cache def code_repository_bucket(self) -> aws.s3.BucketV2: - """Return an S3 bucket for Glue jobs to host source codes. + """Returns an S3 bucket for Glue jobs to host source codes. - NOTE: using `bucket_prefix` to avoid name conflict as the bucket name must be globally unique. + Returns: + aws.s3.BucketV2: The S3 bucket for code repository. """ if self.args.code_repository_bucket_name: return aws.s3.BucketV2.get( f"{self._name}-code-bucket", id=self.args.code_repository_bucket_name ) - return self.args.code_repository_bucket_name or aws.s3.BucketV2( + return aws.s3.BucketV2( resource_name=f"{self._name}-code-bucket", opts=ResourceOptions(parent=self), bucket_prefix=f"{self._name}-code-bucket", ) - # Data Storage @property @cache def iceberg_bucket(self) -> aws.s3.BucketV2: - """Return an S3 bucket for Iceberg tables to store data processed by Glue jobs. + """Returns an S3 bucket for Iceberg tables to store data. - NOTE: using `bucket_prefix` to avoid name conflict as the bucket name must be globally unique. + Returns: + aws.s3.BucketV2: The S3 bucket for Iceberg data. """ if self.args.data_bucket_name: - return aws.s3.BucketV.get( + return aws.s3.BucketV2.get( f"{self._name}-data-bucket", id=self.args.data_bucket_name ) @@ -325,13 +355,16 @@ def iceberg_bucket(self) -> aws.s3.BucketV2: bucket_prefix=f"{self._name}-data-bucket", ) - # Metadata @property @cache def iceberg_database(self) -> aws.glue.CatalogDatabase: - """Return a Glue database for Iceberg tables to store data processed by Glue jobs.""" + """Returns a Glue database for Iceberg tables. + + Returns: + aws.glue.CatalogDatabase: The Glue database. + """ if self.args.database_name: - return aws.cloudwatch.CatalogDatabase.get( + return aws.glue.CatalogDatabase.get( f"{self._name}-database", id=self.args.database_name ) @@ -342,11 +375,17 @@ def iceberg_database(self) -> aws.glue.CatalogDatabase: location_uri=f"s3://{self.iceberg_bucket.bucket}/", ) - # Orchestration def _create_glue_trigger( self, job: GlueJobDefinition ) -> Optional[aws.glue.Trigger]: - """Return a Glue Trigger object.""" + """Creates a Glue Trigger for the job if scheduled. + + Args: + job (GlueJobDefinition): The job definition. + + Returns: + Optional[aws.glue.Trigger]: The Glue trigger if scheduled, else None. + """ return ( aws.glue.Trigger( f"{job.name}-glue-trigger", @@ -363,33 +402,36 @@ def _create_glue_trigger( else None ) - # Kafka @property @cache def kafka_checkpoint_bucket(self) -> Optional[aws.s3.BucketV2]: - """Return a s3 bucket to store the checkpoints. + """Returns an S3 bucket to store Kafka checkpoints. + + Creates the bucket if at least one job is of type 'GLUE_STREAMING'. - Creates the bucket if at least one job is of type 'gluestreaming'. + Returns: + Optional[aws.s3.BucketV2]: The S3 bucket for Kafka checkpoints. """ - if "gluestreaming" in [job.job_type for job in self.args.jobs]: + if any(job.job_type == GlueJobType.GLUE_STREAMING for job in self.args.jobs): if self.args.kafka_checkpoints_bucket_name: return aws.s3.BucketV2.get( f"{self._name}-checkpoints-bucket", id=self.args.kafka_checkpoints_bucket_name, ) - - return aws.s3.BucketV2( - resource_name=f"{self._name}-checkpoints-bucket", - opts=ResourceOptions(parent=self), - bucket_prefix=f"{self._name}-checkpoints-bucket", - ) + return aws.s3.BucketV2( + resource_name=f"{self._name}-checkpoints-bucket", + opts=ResourceOptions(parent=self), + bucket_prefix=f"{self._name}-checkpoints-bucket", + ) + return None @property @cache def connection(self) -> Optional[aws.glue.Connection]: - """Return a s3 bucket to store the checkpoints. + """Returns a Glue Connection. - Creates the bucket if at least one job is of type 'gluestreaming'. + Returns: + Optional[aws.glue.Connection]: The Glue Connection. """ if not self.args.connector_config: logging.warning( From e5730f8ab2cb10b109871fa7cb47c5f7c7fbe0f0 Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Thu, 10 Oct 2024 16:34:10 +0200 Subject: [PATCH 11/13] tests: add tests for Glue component --- src/damavand/cloud/aws/controllers/spark.py | 2 - .../aws/resources/test_glue_component.py | 93 ++++++++++++++++++- 2 files changed, 91 insertions(+), 4 deletions(-) diff --git a/src/damavand/cloud/aws/controllers/spark.py b/src/damavand/cloud/aws/controllers/spark.py index fea18be..1f4713d 100644 --- a/src/damavand/cloud/aws/controllers/spark.py +++ b/src/damavand/cloud/aws/controllers/spark.py @@ -24,13 +24,11 @@ def __init__( **kwargs, ) -> None: super().__init__(name, applications, tags, **kwargs) - print("\n\n\nShit") self._glue_client = boto3.client("glue", region_name=region) @buildtime @cache def resource(self) -> PulumiResource: - print("\n\n\nI'm called") if not self.applications: raise BuildtimeException("No applications found to create Glue jobs.") diff --git a/tests/clouds/aws/resources/test_glue_component.py b/tests/clouds/aws/resources/test_glue_component.py index 1b8e07c..dc98a4d 100644 --- a/tests/clouds/aws/resources/test_glue_component.py +++ b/tests/clouds/aws/resources/test_glue_component.py @@ -6,8 +6,6 @@ import pulumi_aws as aws from pulumi.runtime.mocks import MockResourceArgs, MockCallArgs -from damavand.cloud.aws.resources.glue_component import GlueJobDefinition - # NOTE: this has to be defined before importing infrastructure codes. # Check Pulumi's documentation for more details: https://www.pulumi.com/docs/using-pulumi/testing/unit/ @@ -25,6 +23,11 @@ def call(self, args: MockCallArgs) -> Tuple[dict, Optional[List[Tuple[str, str]] ) from damavand.cloud.aws.resources import GlueComponent, GlueComponentArgs # noqa: E402 +from damavand.cloud.aws.resources.glue_component import ( # noqa: E402 + GlueJobDefinition, + ConnectorConfig, + GlueJobType, +) @pytest.fixture @@ -93,3 +96,89 @@ def should_name_have_prefix(names: list[str]): pulumi.Output.all(glue_component.jobs).apply(should_have_two) for job in glue_component.jobs: pulumi.Output.all(job.name).apply(should_name_have_prefix) + + +@pulumi.runtime.test +def test_code_repository_bucket(glue_component): + def should_have_one_bucket(buckets: list[aws.s3.BucketV2]): + assert len(buckets) == 1 + + pulumi.Output.all(glue_component.code_repository_bucket).apply( + should_have_one_bucket + ) + + +@pulumi.runtime.test +def test_iceberg_database(glue_component): + def should_have_one_database(dbs: list[aws.glue.CatalogDatabase]): + assert len(dbs) == 1 + + pulumi.Output.all(glue_component.iceberg_database).apply(should_have_one_database) + + +@pulumi.runtime.test +def test_kafka_checkpoint_bucket(glue_component): + def should_not_create_bucket_if_no_streaming( + buckets: list[Optional[aws.s3.BucketV2]], + ): + assert buckets[0] is None + + pulumi.Output.all(glue_component.kafka_checkpoint_bucket).apply( + should_not_create_bucket_if_no_streaming + ) + + +@pytest.fixture +def glue_component_with_streaming_job(): + return GlueComponent( + name="test-streaming", + args=GlueComponentArgs( + jobs=[ + GlueJobDefinition( + name="streaming-job", + description="test streaming job", + job_type=GlueJobType.GLUE_STREAMING, + ), + ] + ), + ) + + +@pulumi.runtime.test +def test_kafka_checkpoint_bucket_streaming(glue_component_with_streaming_job): + def should_create_bucket_for_streaming(buckets: list[aws.s3.BucketV2]): + assert len(buckets) == 1 + + pulumi.Output.all(glue_component_with_streaming_job.kafka_checkpoint_bucket).apply( + should_create_bucket_for_streaming + ) + + +@pytest.fixture +def glue_component_with_connector(): + return GlueComponent( + name="test-connector", + args=GlueComponentArgs( + jobs=[ + GlueJobDefinition( + name="job-with-connector", + description="job with connector", + ), + ], + connector_config=ConnectorConfig( + vpc_id="vpc-12345678", + subnet_id="subnet-12345678", + security_groups=["sg-12345678"], + connection_properties={"BootstrapServers": "localhost:9092"}, + ), + ), + ) + + +@pulumi.runtime.test +def test_connection_creation(glue_component_with_connector): + + def should_have_one(connections: list[aws.glue.Connection]): + assert len(connections) == 1 + + pulumi.Output.all(glue_component_with_connector.connection).apply(should_have_one) From 2da74935bb7b23db3dc8a35ba69d00d8b79a8755 Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Mon, 14 Oct 2024 12:58:34 +0200 Subject: [PATCH 12/13] refactor: apply PR reviews --- .../cloud/aws/resources/glue_component.py | 105 +++++++++--------- .../aws/resources/test_glue_component.py | 14 +-- 2 files changed, 56 insertions(+), 63 deletions(-) diff --git a/src/damavand/cloud/aws/resources/glue_component.py b/src/damavand/cloud/aws/resources/glue_component.py index d839877..890aaab 100644 --- a/src/damavand/cloud/aws/resources/glue_component.py +++ b/src/damavand/cloud/aws/resources/glue_component.py @@ -24,8 +24,8 @@ class GlueWorkerType(Enum): class GlueJobType(Enum): """Enum representing Glue job types.""" - GLUE_ETL = "glueetl" - GLUE_STREAMING = "gluestreaming" + ETL = "glueetl" + STREAMING = "gluestreaming" class GlueExecutionClass(Enum): @@ -110,7 +110,7 @@ class GlueJobDefinition: enable_observability_metrics: bool = False script_args: dict = field(default_factory=dict) schedule: Optional[str] = None - job_type: GlueJobType = GlueJobType.GLUE_ETL + job_type: GlueJobType = GlueJobType.ETL @dataclass @@ -172,10 +172,18 @@ def __init__( self.code_repository_bucket self.iceberg_database self.iceberg_bucket - self.kafka_checkpoint_bucket - self.connection self.jobs + if any(job.job_type == GlueJobType.STREAMING for job in self.args.jobs): + self.kafka_checkpoint_bucket + + if not self.args.connector_config: + logging.warning( + "No connector config provided. Glue jobs will run outside a VPC." + ) + else: + self.connection + @property @cache def jobs(self) -> list[aws.glue.Job]: @@ -203,7 +211,9 @@ def jobs(self) -> list[aws.glue.Job]: execution_property=aws.glue.JobExecutionPropertyArgs( max_concurrent_runs=job.max_concurrent_runs ), - connections=[self.connection.name] if self.connection else [], + connections=( + [self.connection.name] if self.args.connector_config else [] + ), ) jobs.append(glue_job) if job.schedule: @@ -236,8 +246,7 @@ def _get_source_path(self, job: GlueJobDefinition) -> str: else f"s3://{self.code_repository_bucket.bucket}/{job.name}.py" ) - @staticmethod - def _get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: + def _get_default_arguments(self, job: GlueJobDefinition) -> dict[str, str]: """Returns the map of default arguments for this job. Args: @@ -326,8 +335,10 @@ def code_repository_bucket(self) -> aws.s3.BucketV2: aws.s3.BucketV2: The S3 bucket for code repository. """ if self.args.code_repository_bucket_name: - return aws.s3.BucketV2.get( - f"{self._name}-code-bucket", id=self.args.code_repository_bucket_name + return aws.s3.BucketV2( + resource_name=f"{self._name}-code-bucket", + opts=ResourceOptions(parent=self), + bucket=self.args.code_repository_bucket_name, ) return aws.s3.BucketV2( @@ -345,8 +356,10 @@ def iceberg_bucket(self) -> aws.s3.BucketV2: aws.s3.BucketV2: The S3 bucket for Iceberg data. """ if self.args.data_bucket_name: - return aws.s3.BucketV2.get( - f"{self._name}-data-bucket", id=self.args.data_bucket_name + return aws.s3.BucketV2( + resource_name=f"{self._name}-data-bucket", + opts=ResourceOptions(parent=self), + bucket=self.args.data_bucket_name, ) return aws.s3.BucketV2( @@ -364,8 +377,11 @@ def iceberg_database(self) -> aws.glue.CatalogDatabase: aws.glue.CatalogDatabase: The Glue database. """ if self.args.database_name: - return aws.glue.CatalogDatabase.get( - f"{self._name}-database", id=self.args.database_name + return aws.glue.CatalogDatabase( + resource_name=f"{self._name}-database", + opts=ResourceOptions(parent=self), + name=self.args.database_name, + location_uri=f"s3://{self.iceberg_bucket.bucket}/", ) return aws.glue.CatalogDatabase( @@ -375,69 +391,58 @@ def iceberg_database(self) -> aws.glue.CatalogDatabase: location_uri=f"s3://{self.iceberg_bucket.bucket}/", ) - def _create_glue_trigger( - self, job: GlueJobDefinition - ) -> Optional[aws.glue.Trigger]: + def _create_glue_trigger(self, job: GlueJobDefinition) -> aws.glue.Trigger: """Creates a Glue Trigger for the job if scheduled. Args: job (GlueJobDefinition): The job definition. Returns: - Optional[aws.glue.Trigger]: The Glue trigger if scheduled, else None. + aws.glue.Trigger: The Glue trigger. """ - return ( - aws.glue.Trigger( - f"{job.name}-glue-trigger", - type="SCHEDULED", - schedule=job.schedule, - actions=[ - aws.glue.TriggerActionArgs( - job_name=self._get_job_name(job), - ) - ], - start_on_creation=True, - ) - if job.schedule - else None + return aws.glue.Trigger( + f"{job.name}-glue-trigger", + type="SCHEDULED", + schedule=job.schedule, + actions=[ + aws.glue.TriggerActionArgs( + job_name=self._get_job_name(job), + ) + ], + start_on_creation=True, ) @property @cache - def kafka_checkpoint_bucket(self) -> Optional[aws.s3.BucketV2]: + def kafka_checkpoint_bucket(self) -> aws.s3.BucketV2: """Returns an S3 bucket to store Kafka checkpoints. - Creates the bucket if at least one job is of type 'GLUE_STREAMING'. - Returns: - Optional[aws.s3.BucketV2]: The S3 bucket for Kafka checkpoints. + aws.s3.BucketV2: The S3 bucket for Kafka checkpoints. """ - if any(job.job_type == GlueJobType.GLUE_STREAMING for job in self.args.jobs): - if self.args.kafka_checkpoints_bucket_name: - return aws.s3.BucketV2.get( - f"{self._name}-checkpoints-bucket", - id=self.args.kafka_checkpoints_bucket_name, - ) + + if self.args.kafka_checkpoints_bucket_name: return aws.s3.BucketV2( resource_name=f"{self._name}-checkpoints-bucket", opts=ResourceOptions(parent=self), - bucket_prefix=f"{self._name}-checkpoints-bucket", + bucket=self.args.kafka_checkpoints_bucket_name, ) - return None + return aws.s3.BucketV2( + resource_name=f"{self._name}-checkpoints-bucket", + opts=ResourceOptions(parent=self), + bucket_prefix=f"{self._name}-checkpoints-bucket", + ) @property @cache - def connection(self) -> Optional[aws.glue.Connection]: + def connection(self) -> aws.glue.Connection: """Returns a Glue Connection. Returns: - Optional[aws.glue.Connection]: The Glue Connection. + aws.glue.Connection: The Glue Connection. """ if not self.args.connector_config: - logging.warning( - "No connector config provided. Glue jobs will run outside a VPC. This is not recommended." - ) - return None + raise ValueError("No connector config provided.") return aws.glue.Connection( resource_name=f"{self._name}-kafka-connection", diff --git a/tests/clouds/aws/resources/test_glue_component.py b/tests/clouds/aws/resources/test_glue_component.py index dc98a4d..4257d30 100644 --- a/tests/clouds/aws/resources/test_glue_component.py +++ b/tests/clouds/aws/resources/test_glue_component.py @@ -116,18 +116,6 @@ def should_have_one_database(dbs: list[aws.glue.CatalogDatabase]): pulumi.Output.all(glue_component.iceberg_database).apply(should_have_one_database) -@pulumi.runtime.test -def test_kafka_checkpoint_bucket(glue_component): - def should_not_create_bucket_if_no_streaming( - buckets: list[Optional[aws.s3.BucketV2]], - ): - assert buckets[0] is None - - pulumi.Output.all(glue_component.kafka_checkpoint_bucket).apply( - should_not_create_bucket_if_no_streaming - ) - - @pytest.fixture def glue_component_with_streaming_job(): return GlueComponent( @@ -137,7 +125,7 @@ def glue_component_with_streaming_job(): GlueJobDefinition( name="streaming-job", description="test streaming job", - job_type=GlueJobType.GLUE_STREAMING, + job_type=GlueJobType.STREAMING, ), ] ), From ae85a3ed9e3f6df689fbc420c4f12917361403ec Mon Sep 17 00:00:00 2001 From: Farbod Ahmadian Date: Mon, 14 Oct 2024 16:19:03 +0200 Subject: [PATCH 13/13] refactor: force creating required buckets --- .../cloud/aws/resources/glue_component.py | 73 ++++--------------- 1 file changed, 16 insertions(+), 57 deletions(-) diff --git a/src/damavand/cloud/aws/resources/glue_component.py b/src/damavand/cloud/aws/resources/glue_component.py index 890aaab..fdf1847 100644 --- a/src/damavand/cloud/aws/resources/glue_component.py +++ b/src/damavand/cloud/aws/resources/glue_component.py @@ -76,7 +76,6 @@ class GlueJobDefinition: max_capacity (int): Max capacity. max_retries (int): Max retries. number_of_workers (int): Number of workers. - tags (Optional[dict]): Resource tags. timeout (int): Job timeout in minutes. worker_type (GlueWorkerType): Worker type. enable_glue_datacatalog (bool): Use Glue Data Catalog. @@ -100,7 +99,6 @@ class GlueJobDefinition: max_capacity: int = 5 max_retries: int = 0 number_of_workers: int = 2 - tags: Optional[dict] = None timeout: int = 2880 worker_type: GlueWorkerType = GlueWorkerType.G_1X enable_glue_datacatalog: bool = True @@ -120,19 +118,11 @@ class GlueComponentArgs: Attributes: jobs (list[GlueJobDefinition]): List of Glue jobs to deploy. execution_role (Optional[str]): IAM role for Glue jobs. - database_name (Optional[str]): Glue database name. - code_repository_bucket_name (Optional[str]): S3 code repository bucket name. - data_bucket_name (Optional[str]): S3 data bucket name. - kafka_checkpoints_bucket_name (Optional[str]): S3 checkpoints bucket name. connector_config (Optional[ConnectorConfig]): Connector configuration. """ jobs: list[GlueJobDefinition] execution_role: Optional[str] = None - database_name: Optional[str] = None - code_repository_bucket_name: Optional[str] = None - data_bucket_name: Optional[str] = None - kafka_checkpoints_bucket_name: Optional[str] = None connector_config: Optional[ConnectorConfig] = None @@ -257,21 +247,19 @@ def _get_default_arguments(self, job: GlueJobDefinition) -> dict[str, str]: """ return { "--additional-python-modules": ",".join(job.extra_libraries), - "--enable-auto-scaling": "true" if job.enable_auto_scaling else "false", - "--enable-continuous-cloudwatch-log": ( - "true" if job.enable_continuous_cloudwatch_log else "false" - ), - "--enable-continuous-log-filter": ( - "true" if job.enable_continuous_log_filter else "false" - ), - "--enable-glue-datacatalog": ( - "true" if job.enable_glue_datacatalog else "false" - ), + "--enable-auto-scaling": str(job.enable_auto_scaling).lower(), + "--enable-continuous-cloudwatch-log": str( + job.enable_continuous_cloudwatch_log + ).lower(), + "--enable-continuous-log-filter": str( + job.enable_continuous_log_filter + ).lower(), + "--enable-glue-datacatalog": str(job.enable_glue_datacatalog).lower(), "--datalake-formats": "iceberg", - "--enable-metrics": "true" if job.enable_metrics else "false", - "--enable-observability-metrics": ( - "true" if job.enable_observability_metrics else "false" - ), + "--enable-metrics": str(job.enable_metrics).lower(), + "--enable-observability-metrics": str( + job.enable_observability_metrics + ).lower(), **job.script_args, } @@ -334,17 +322,10 @@ def code_repository_bucket(self) -> aws.s3.BucketV2: Returns: aws.s3.BucketV2: The S3 bucket for code repository. """ - if self.args.code_repository_bucket_name: - return aws.s3.BucketV2( - resource_name=f"{self._name}-code-bucket", - opts=ResourceOptions(parent=self), - bucket=self.args.code_repository_bucket_name, - ) - return aws.s3.BucketV2( resource_name=f"{self._name}-code-bucket", opts=ResourceOptions(parent=self), - bucket_prefix=f"{self._name}-code-bucket", + bucket_prefix=f"{self._name[:20]}-code-bucket", ) @property @@ -355,17 +336,10 @@ def iceberg_bucket(self) -> aws.s3.BucketV2: Returns: aws.s3.BucketV2: The S3 bucket for Iceberg data. """ - if self.args.data_bucket_name: - return aws.s3.BucketV2( - resource_name=f"{self._name}-data-bucket", - opts=ResourceOptions(parent=self), - bucket=self.args.data_bucket_name, - ) - return aws.s3.BucketV2( resource_name=f"{self._name}-data-bucket", opts=ResourceOptions(parent=self), - bucket_prefix=f"{self._name}-data-bucket", + bucket_prefix=f"{self._name[:20]}-data-bucket", ) @property @@ -376,18 +350,10 @@ def iceberg_database(self) -> aws.glue.CatalogDatabase: Returns: aws.glue.CatalogDatabase: The Glue database. """ - if self.args.database_name: - return aws.glue.CatalogDatabase( - resource_name=f"{self._name}-database", - opts=ResourceOptions(parent=self), - name=self.args.database_name, - location_uri=f"s3://{self.iceberg_bucket.bucket}/", - ) - return aws.glue.CatalogDatabase( resource_name=f"{self._name}-database", opts=ResourceOptions(parent=self), - name=f"{self._name}-database", + name=f"{self._name[:20]}-database", location_uri=f"s3://{self.iceberg_bucket.bucket}/", ) @@ -420,17 +386,10 @@ def kafka_checkpoint_bucket(self) -> aws.s3.BucketV2: Returns: aws.s3.BucketV2: The S3 bucket for Kafka checkpoints. """ - - if self.args.kafka_checkpoints_bucket_name: - return aws.s3.BucketV2( - resource_name=f"{self._name}-checkpoints-bucket", - opts=ResourceOptions(parent=self), - bucket=self.args.kafka_checkpoints_bucket_name, - ) return aws.s3.BucketV2( resource_name=f"{self._name}-checkpoints-bucket", opts=ResourceOptions(parent=self), - bucket_prefix=f"{self._name}-checkpoints-bucket", + bucket_prefix=f"{self._name[:20]}-checkpoints-bucket", ) @property