diff --git a/src/damavand/cloud/aws/resources/glue_component.py b/src/damavand/cloud/aws/resources/glue_component.py index e6c83a0..d409383 100644 --- a/src/damavand/cloud/aws/resources/glue_component.py +++ b/src/damavand/cloud/aws/resources/glue_component.py @@ -1,5 +1,7 @@ import json +import logging from typing import Any, Optional +from enum import Enum from functools import cache from dataclasses import dataclass, field @@ -8,6 +10,46 @@ from pulumi import ResourceOptions +class GlueWorkerType(Enum): + G_1X: str = "G.1X" + G_2X: str = "G.2X" + G_4X: str = "G.4X" + G_8X: str = "G.8X" + G_025X: str = "G.025X" + Z_2X: str = "Z.2X" + + +class GlueJobType(Enum): + GLUE_ETL: str = "glueetl" + GLUE_STREAMING: str = "gluestreaming" + + +class GlueExecutionClass(Enum): + STANDARD: str = "STANDARD" + FLEX: str = "FLEX" + + +class ConnectionType(Enum): + KAFKA: str = "KAFKA" + + +@dataclass +class ConnectorConfig: + """Configuration for the Connector. + + :param vpc_id: id of the vpc + :param subnet_id: id of the subnet + :param security_groups: list of security group ids + :param connection_properties: a dictionary with connection properties specific to the connector. + For more info see https://www.pulumi.com/registry/packages/aws/api-docs/glue/connection/ + """ + vpc_id: str + subnet_id: str + security_groups: list[str] + connector_type: ConnectionType = ConnectionType.KAFKA + connection_properties: dict = field(default_factory=list) + + @dataclass class GlueJobDefinition: """ @@ -34,7 +76,7 @@ class GlueJobDefinition: * For the G.025X worker type, each worker maps to 0.25 DPU (2 vCPU, 4GB of memory, 64 GB disk), and provides 1 executor per worker. Recommended for low volume streaming jobs. Only available for Glue version 3.0. * For the Z.2X worker type, each worker maps to 2 M-DPU (8vCPU, 64 GB of m emory, 128 GB disk), and provides up to 8 Ray workers based on the autoscaler. :param enable_glue_datacatalog: To use the Glue catalog as the metadata catalog - :param enable_continuous_cloudwatch_log: + :param enable_continuous_cloudwatch_log: To enable logging continuously. :param enable_continuous_log_filter: When set to true it reduces the amount of logging. For more information see https://repost.aws/knowledge-center/glue-reduce-cloudwatch-logs :param enable_metrics: Enables observability metrics about the worker nodes. @@ -42,14 +84,14 @@ class GlueJobDefinition: This parameter could increase cloud costs significantly. :param script_args: The arguments that your own script consumes {"--arg1": "arg1 value"} :param schedule: A time-based cron-like schedule for the job. For syntax rules see https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html - :param job_type: Specify if the job is streaming or batch. Possible values: glueetl, gluestreaming + :param job_type: Specify if the job is streaming or batch. """ # Parameters for Pulumi Glue Job name: str description: str = None script_location: str = None extra_libraries: list[str] = field(default_factory=list) - execution_class: str = "STANDARD" + execution_class: GlueExecutionClass = GlueExecutionClass.STANDARD max_concurrent_runs: int = 1, glue_version: str = "4.0" enable_auto_scaling: bool = True @@ -58,15 +100,15 @@ class GlueJobDefinition: number_of_workers: int = 2 tags: dict = None timeout: int = 2880 - worker_type: str = "G.1X" + worker_type: GlueWorkerType = GlueWorkerType.G_1X enable_glue_datacatalog: bool = True - enable_continuous_cloudwatch_log: bool = False - enable_continuous_log_filter: bool = True + enable_continuous_cloudwatch_log: bool = False + enable_continuous_log_filter: bool = True enable_metrics: bool = False enable_observability_metrics: bool = False script_args: dict = field(default_factory=dict) schedule: str = None - job_type: str = "glueetl" + job_type: GlueJobType = GlueJobType.GLUE_ETL @dataclass @@ -80,6 +122,7 @@ class GlueComponentArgs: :param code_repository_bucket_name: name of the s3 code repository database if it exists, if not one will be created :param data_bucket_name: name of the s3 bucket to store data, if it exists, if not one will be created :param kafka_checkpoints_bucket_name: name of the s3 bucket to store checkpoints if it exists, if not one will be created + :param connector_config: Connector configuration to run the Glue Jobs in a VPC. """ jobs: list[GlueJobDefinition] execution_role: str = None @@ -87,6 +130,7 @@ class GlueComponentArgs: code_repository_bucket_name: str = None data_bucket_name: str = None kafka_checkpoints_bucket_name: str = None + connector_config: Optional[ConnectorConfig] = None class GlueComponent(PulumiComponentResource): @@ -104,11 +148,12 @@ class GlueComponent(PulumiComponentResource): - Full access to S3 bucket with data - Full access to tables in Glue database """ + def __init__( - self, - name: str, - args: GlueComponentArgs, - opts: Optional[ResourceOptions] = None, + self, + name: str, + args: GlueComponentArgs, + opts: Optional[ResourceOptions] = None, ) -> None: super().__init__( f"Damavand:Spark:{GlueComponent.__name__}", @@ -123,6 +168,7 @@ def __init__( self.iceberg_database self.iceberg_bucket self.kafka_checkpoint_bucket + self.connection self.jobs # Compute @@ -137,32 +183,33 @@ def jobs(self) -> list[aws.glue.Job]: glue_job = aws.glue.Job( resource_name=f"{self._name}-{job.name}-job", opts=ResourceOptions(parent=self), - name=self.__get_job_name(job), + name=self._get_job_name(job), glue_version=job.glue_version, role_arn=self.role.arn, command=aws.glue.JobCommandArgs( - name=job.job_type, + name=job.job_type.value, python_version="3", - script_location=self.__get_source_path(job) + script_location=self._get_source_path(job) ), - default_arguments=self.__get_default_arguments(job), + default_arguments=self._get_default_arguments(job), number_of_workers=job.number_of_workers, - worker_type=job.worker_type, + worker_type=job.worker_type.value, execution_property=aws.glue.JobExecutionPropertyArgs(max_concurrent_runs=job.max_concurrent_runs), + connections=[self.connection.name] if self.connection else [], ) jobs.append(glue_job) - self.__create_glue_trigger(job) + self._create_glue_trigger(job) return jobs - def __get_job_name(self, job: GlueJobDefinition): + def _get_job_name(self, job: GlueJobDefinition): return f"{self._name}-{job.name}-job" - def __get_source_path(self, job: GlueJobDefinition) -> str: + def _get_source_path(self, job: GlueJobDefinition) -> str: return f"s3://{self.code_repository_bucket}/{job.script_location}" if job.script_location \ else f"s3://{self.code_repository_bucket}/{job.name}.py" @staticmethod - def __get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: + def _get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: """The map of default arguments for this job. These are arguments that your own script consumes, as well as arguments that AWS Glue itself consumes. @@ -179,7 +226,7 @@ def __get_default_arguments(job: GlueJobDefinition) -> dict[str, str]: "--enable-glue-datacatalog": "true" if job.enable_continuous_log_filter else "false", "--datalake-formats": "iceberg", "--enable-metrics": "true" if job.enable_metrics else "false", - "--enable-observability-metrics": "true" if job.enable_observability_metrics else "false", + "--enable-observability-metrics": "true" if job.enable_observability_metrics else "false", **job.script_args, } @@ -273,14 +320,14 @@ def iceberg_database(self) -> aws.glue.CatalogDatabase: ) # Orchestration - def __create_glue_trigger(self, job: GlueJobDefinition) -> Optional[aws.glue.Trigger]: + def _create_glue_trigger(self, job: GlueJobDefinition) -> Optional[aws.glue.Trigger]: """Return a Glue Trigger object.""" return aws.glue.Trigger( f"{job.name}-glue-trigger", type='SCHEDULED', schedule=job.schedule, actions=[aws.glue.TriggerActionArgs( - job_name=self.__get_job_name(job), + job_name=self._get_job_name(job), )], start_on_creation=True ) if job.schedule else None @@ -295,10 +342,33 @@ def kafka_checkpoint_bucket(self) -> Optional[aws.s3.BucketV2]: """ if "gluestreaming" in [job.job_type for job in self.args.jobs]: if self.args.kafka_checkpoints_bucket_name: - return aws.s3.BucketV2.get(f"{self._name}-checkpoints-bucket", id=self.args.kafka_checkpoints_bucket_name) + return aws.s3.BucketV2.get(f"{self._name}-checkpoints-bucket", + id=self.args.kafka_checkpoints_bucket_name) return aws.s3.BucketV2( resource_name=f"{self._name}-checkpoints-bucket", opts=ResourceOptions(parent=self), bucket_prefix=f"{self._name}-checkpoints-bucket", ) + + @property + @cache + def connection(self) -> Optional[aws.glue.Connection]: + """Return a s3 bucket to store the checkpoints. + + Creates the bucket if at least one job is of type 'gluestreaming'. + """ + if not self.args.connector_config: + logging.warning("No connector config provided. Glue jobs will run outside a VPC. This is not recommended.") + return None + + return aws.glue.Connection( + resource_name=f"{self._name}-kafka-connection", + name=f"{self._name}-kafka-connection", + connection_type=self.args.connector_config.connector_type.value, + connection_properties=self.args.connector_config.connection_properties, + physical_connection_requirements={ + "security_group_id_lists": self.args.connector_config.security_groups, + "subnet_id": self.args.connector_config.subnet_id + } + )