Skip to content

Commit

Permalink
add glue connection
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrea Callarelli committed Sep 20, 2024
1 parent 01377d9 commit 5fa9c6d
Showing 1 changed file with 94 additions and 24 deletions.
118 changes: 94 additions & 24 deletions src/damavand/cloud/aws/resources/glue_component.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import json
import logging
from typing import Any, Optional
from enum import Enum
from functools import cache
from dataclasses import dataclass, field

Expand All @@ -8,6 +10,46 @@
from pulumi import ResourceOptions


class GlueWorkerType(Enum):
G_1X: str = "G.1X"
G_2X: str = "G.2X"
G_4X: str = "G.4X"
G_8X: str = "G.8X"
G_025X: str = "G.025X"
Z_2X: str = "Z.2X"


class GlueJobType(Enum):
GLUE_ETL: str = "glueetl"
GLUE_STREAMING: str = "gluestreaming"


class GlueExecutionClass(Enum):
STANDARD: str = "STANDARD"
FLEX: str = "FLEX"


class ConnectionType(Enum):
KAFKA: str = "KAFKA"


@dataclass
class ConnectorConfig:
"""Configuration for the Connector.
:param vpc_id: id of the vpc
:param subnet_id: id of the subnet
:param security_groups: list of security group ids
:param connection_properties: a dictionary with connection properties specific to the connector.
For more info see https://www.pulumi.com/registry/packages/aws/api-docs/glue/connection/
"""
vpc_id: str
subnet_id: str
security_groups: list[str]
connector_type: ConnectionType = ConnectionType.KAFKA
connection_properties: dict = field(default_factory=list)


@dataclass
class GlueJobDefinition:
"""
Expand All @@ -34,22 +76,22 @@ class GlueJobDefinition:
* For the G.025X worker type, each worker maps to 0.25 DPU (2 vCPU, 4GB of memory, 64 GB disk), and provides 1 executor per worker. Recommended for low volume streaming jobs. Only available for Glue version 3.0.
* For the Z.2X worker type, each worker maps to 2 M-DPU (8vCPU, 64 GB of m emory, 128 GB disk), and provides up to 8 Ray workers based on the autoscaler.
:param enable_glue_datacatalog: To use the Glue catalog as the metadata catalog
:param enable_continuous_cloudwatch_log:
:param enable_continuous_cloudwatch_log: To enable logging continuously.
:param enable_continuous_log_filter: When set to true it reduces the amount of logging.
For more information see https://repost.aws/knowledge-center/glue-reduce-cloudwatch-logs
:param enable_metrics: Enables observability metrics about the worker nodes.
:param enable_observability_metrics: Enables extra Spark-related observability metrics such as how long a tasks takes.
This parameter could increase cloud costs significantly.
:param script_args: The arguments that your own script consumes {"--arg1": "arg1 value"}
:param schedule: A time-based cron-like schedule for the job. For syntax rules see https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html
:param job_type: Specify if the job is streaming or batch. Possible values: glueetl, gluestreaming
:param job_type: Specify if the job is streaming or batch.
"""
# Parameters for Pulumi Glue Job
name: str
description: str = None
script_location: str = None
extra_libraries: list[str] = field(default_factory=list)
execution_class: str = "STANDARD"
execution_class: GlueExecutionClass = GlueExecutionClass.STANDARD
max_concurrent_runs: int = 1,
glue_version: str = "4.0"
enable_auto_scaling: bool = True
Expand All @@ -58,15 +100,15 @@ class GlueJobDefinition:
number_of_workers: int = 2
tags: dict = None
timeout: int = 2880
worker_type: str = "G.1X"
worker_type: GlueWorkerType = GlueWorkerType.G_1X
enable_glue_datacatalog: bool = True
enable_continuous_cloudwatch_log: bool = False
enable_continuous_log_filter: bool = True
enable_continuous_cloudwatch_log: bool = False
enable_continuous_log_filter: bool = True
enable_metrics: bool = False
enable_observability_metrics: bool = False
script_args: dict = field(default_factory=dict)
schedule: str = None
job_type: str = "glueetl"
job_type: GlueJobType = GlueJobType.GLUE_ETL


@dataclass
Expand All @@ -80,13 +122,15 @@ class GlueComponentArgs:
:param code_repository_bucket_name: name of the s3 code repository database if it exists, if not one will be created
:param data_bucket_name: name of the s3 bucket to store data, if it exists, if not one will be created
:param kafka_checkpoints_bucket_name: name of the s3 bucket to store checkpoints if it exists, if not one will be created
:param connector_config: Connector configuration to run the Glue Jobs in a VPC.
"""
jobs: list[GlueJobDefinition]
execution_role: str = None
database_name: str = None
code_repository_bucket_name: str = None
data_bucket_name: str = None
kafka_checkpoints_bucket_name: str = None
connector_config: Optional[ConnectorConfig] = None


class GlueComponent(PulumiComponentResource):
Expand All @@ -104,11 +148,12 @@ class GlueComponent(PulumiComponentResource):
- Full access to S3 bucket with data
- Full access to tables in Glue database
"""

def __init__(
self,
name: str,
args: GlueComponentArgs,
opts: Optional[ResourceOptions] = None,
self,
name: str,
args: GlueComponentArgs,
opts: Optional[ResourceOptions] = None,
) -> None:
super().__init__(
f"Damavand:Spark:{GlueComponent.__name__}",
Expand All @@ -123,6 +168,7 @@ def __init__(
self.iceberg_database
self.iceberg_bucket
self.kafka_checkpoint_bucket
self.connection
self.jobs

# Compute
Expand All @@ -137,32 +183,33 @@ def jobs(self) -> list[aws.glue.Job]:
glue_job = aws.glue.Job(
resource_name=f"{self._name}-{job.name}-job",
opts=ResourceOptions(parent=self),
name=self.__get_job_name(job),
name=self._get_job_name(job),
glue_version=job.glue_version,
role_arn=self.role.arn,
command=aws.glue.JobCommandArgs(
name=job.job_type,
name=job.job_type.value,
python_version="3",
script_location=self.__get_source_path(job)
script_location=self._get_source_path(job)
),
default_arguments=self.__get_default_arguments(job),
default_arguments=self._get_default_arguments(job),
number_of_workers=job.number_of_workers,
worker_type=job.worker_type,
worker_type=job.worker_type.value,
execution_property=aws.glue.JobExecutionPropertyArgs(max_concurrent_runs=job.max_concurrent_runs),
connections=[self.connection.name] if self.connection else [],
)
jobs.append(glue_job)
self.__create_glue_trigger(job)
self._create_glue_trigger(job)
return jobs

def __get_job_name(self, job: GlueJobDefinition):
def _get_job_name(self, job: GlueJobDefinition):
return f"{self._name}-{job.name}-job"

def __get_source_path(self, job: GlueJobDefinition) -> str:
def _get_source_path(self, job: GlueJobDefinition) -> str:
return f"s3://{self.code_repository_bucket}/{job.script_location}" if job.script_location \
else f"s3://{self.code_repository_bucket}/{job.name}.py"

@staticmethod
def __get_default_arguments(job: GlueJobDefinition) -> dict[str, str]:
def _get_default_arguments(job: GlueJobDefinition) -> dict[str, str]:
"""The map of default arguments for this job.
These are arguments that your own script consumes, as well as arguments that AWS Glue itself consumes.
Expand All @@ -179,7 +226,7 @@ def __get_default_arguments(job: GlueJobDefinition) -> dict[str, str]:
"--enable-glue-datacatalog": "true" if job.enable_continuous_log_filter else "false",
"--datalake-formats": "iceberg",
"--enable-metrics": "true" if job.enable_metrics else "false",
"--enable-observability-metrics": "true" if job.enable_observability_metrics else "false",
"--enable-observability-metrics": "true" if job.enable_observability_metrics else "false",
**job.script_args,
}

Expand Down Expand Up @@ -273,14 +320,14 @@ def iceberg_database(self) -> aws.glue.CatalogDatabase:
)

# Orchestration
def __create_glue_trigger(self, job: GlueJobDefinition) -> Optional[aws.glue.Trigger]:
def _create_glue_trigger(self, job: GlueJobDefinition) -> Optional[aws.glue.Trigger]:
"""Return a Glue Trigger object."""
return aws.glue.Trigger(
f"{job.name}-glue-trigger",
type='SCHEDULED',
schedule=job.schedule,
actions=[aws.glue.TriggerActionArgs(
job_name=self.__get_job_name(job),
job_name=self._get_job_name(job),
)],
start_on_creation=True
) if job.schedule else None
Expand All @@ -295,10 +342,33 @@ def kafka_checkpoint_bucket(self) -> Optional[aws.s3.BucketV2]:
"""
if "gluestreaming" in [job.job_type for job in self.args.jobs]:
if self.args.kafka_checkpoints_bucket_name:
return aws.s3.BucketV2.get(f"{self._name}-checkpoints-bucket", id=self.args.kafka_checkpoints_bucket_name)
return aws.s3.BucketV2.get(f"{self._name}-checkpoints-bucket",
id=self.args.kafka_checkpoints_bucket_name)

return aws.s3.BucketV2(
resource_name=f"{self._name}-checkpoints-bucket",
opts=ResourceOptions(parent=self),
bucket_prefix=f"{self._name}-checkpoints-bucket",
)

@property
@cache
def connection(self) -> Optional[aws.glue.Connection]:
"""Return a s3 bucket to store the checkpoints.
Creates the bucket if at least one job is of type 'gluestreaming'.
"""
if not self.args.connector_config:
logging.warning("No connector config provided. Glue jobs will run outside a VPC. This is not recommended.")
return None

return aws.glue.Connection(
resource_name=f"{self._name}-kafka-connection",
name=f"{self._name}-kafka-connection",
connection_type=self.args.connector_config.connector_type.value,
connection_properties=self.args.connector_config.connection_properties,
physical_connection_requirements={
"security_group_id_lists": self.args.connector_config.security_groups,
"subnet_id": self.args.connector_config.subnet_id
}
)

0 comments on commit 5fa9c6d

Please sign in to comment.