Skip to content

Commit

Permalink
refactor: apply PR reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
farbodahm committed Oct 14, 2024
1 parent e5730f8 commit 2da7493
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 63 deletions.
105 changes: 55 additions & 50 deletions src/damavand/cloud/aws/resources/glue_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class GlueWorkerType(Enum):
class GlueJobType(Enum):
"""Enum representing Glue job types."""

GLUE_ETL = "glueetl"
GLUE_STREAMING = "gluestreaming"
ETL = "glueetl"
STREAMING = "gluestreaming"


class GlueExecutionClass(Enum):
Expand Down Expand Up @@ -110,7 +110,7 @@ class GlueJobDefinition:
enable_observability_metrics: bool = False
script_args: dict = field(default_factory=dict)
schedule: Optional[str] = None
job_type: GlueJobType = GlueJobType.GLUE_ETL
job_type: GlueJobType = GlueJobType.ETL


@dataclass
Expand Down Expand Up @@ -172,10 +172,18 @@ def __init__(
self.code_repository_bucket
self.iceberg_database
self.iceberg_bucket
self.kafka_checkpoint_bucket
self.connection
self.jobs

if any(job.job_type == GlueJobType.STREAMING for job in self.args.jobs):
self.kafka_checkpoint_bucket

if not self.args.connector_config:
logging.warning(
"No connector config provided. Glue jobs will run outside a VPC."
)
else:
self.connection

@property
@cache
def jobs(self) -> list[aws.glue.Job]:
Expand Down Expand Up @@ -203,7 +211,9 @@ def jobs(self) -> list[aws.glue.Job]:
execution_property=aws.glue.JobExecutionPropertyArgs(
max_concurrent_runs=job.max_concurrent_runs
),
connections=[self.connection.name] if self.connection else [],
connections=(
[self.connection.name] if self.args.connector_config else []
),
)
jobs.append(glue_job)
if job.schedule:
Expand Down Expand Up @@ -236,8 +246,7 @@ def _get_source_path(self, job: GlueJobDefinition) -> str:
else f"s3://{self.code_repository_bucket.bucket}/{job.name}.py"
)

@staticmethod
def _get_default_arguments(job: GlueJobDefinition) -> dict[str, str]:
def _get_default_arguments(self, job: GlueJobDefinition) -> dict[str, str]:
"""Returns the map of default arguments for this job.
Args:
Expand Down Expand Up @@ -326,8 +335,10 @@ def code_repository_bucket(self) -> aws.s3.BucketV2:
aws.s3.BucketV2: The S3 bucket for code repository.
"""
if self.args.code_repository_bucket_name:
return aws.s3.BucketV2.get(
f"{self._name}-code-bucket", id=self.args.code_repository_bucket_name
return aws.s3.BucketV2(
resource_name=f"{self._name}-code-bucket",
opts=ResourceOptions(parent=self),
bucket=self.args.code_repository_bucket_name,
)

return aws.s3.BucketV2(
Expand All @@ -345,8 +356,10 @@ def iceberg_bucket(self) -> aws.s3.BucketV2:
aws.s3.BucketV2: The S3 bucket for Iceberg data.
"""
if self.args.data_bucket_name:
return aws.s3.BucketV2.get(
f"{self._name}-data-bucket", id=self.args.data_bucket_name
return aws.s3.BucketV2(
resource_name=f"{self._name}-data-bucket",
opts=ResourceOptions(parent=self),
bucket=self.args.data_bucket_name,
)

return aws.s3.BucketV2(
Expand All @@ -364,8 +377,11 @@ def iceberg_database(self) -> aws.glue.CatalogDatabase:
aws.glue.CatalogDatabase: The Glue database.
"""
if self.args.database_name:
return aws.glue.CatalogDatabase.get(
f"{self._name}-database", id=self.args.database_name
return aws.glue.CatalogDatabase(
resource_name=f"{self._name}-database",
opts=ResourceOptions(parent=self),
name=self.args.database_name,
location_uri=f"s3://{self.iceberg_bucket.bucket}/",
)

return aws.glue.CatalogDatabase(
Expand All @@ -375,69 +391,58 @@ def iceberg_database(self) -> aws.glue.CatalogDatabase:
location_uri=f"s3://{self.iceberg_bucket.bucket}/",
)

def _create_glue_trigger(
self, job: GlueJobDefinition
) -> Optional[aws.glue.Trigger]:
def _create_glue_trigger(self, job: GlueJobDefinition) -> aws.glue.Trigger:
"""Creates a Glue Trigger for the job if scheduled.
Args:
job (GlueJobDefinition): The job definition.
Returns:
Optional[aws.glue.Trigger]: The Glue trigger if scheduled, else None.
aws.glue.Trigger: The Glue trigger.
"""
return (
aws.glue.Trigger(
f"{job.name}-glue-trigger",
type="SCHEDULED",
schedule=job.schedule,
actions=[
aws.glue.TriggerActionArgs(
job_name=self._get_job_name(job),
)
],
start_on_creation=True,
)
if job.schedule
else None
return aws.glue.Trigger(
f"{job.name}-glue-trigger",
type="SCHEDULED",
schedule=job.schedule,
actions=[
aws.glue.TriggerActionArgs(
job_name=self._get_job_name(job),
)
],
start_on_creation=True,
)

@property
@cache
def kafka_checkpoint_bucket(self) -> Optional[aws.s3.BucketV2]:
def kafka_checkpoint_bucket(self) -> aws.s3.BucketV2:
"""Returns an S3 bucket to store Kafka checkpoints.
Creates the bucket if at least one job is of type 'GLUE_STREAMING'.
Returns:
Optional[aws.s3.BucketV2]: The S3 bucket for Kafka checkpoints.
aws.s3.BucketV2: The S3 bucket for Kafka checkpoints.
"""
if any(job.job_type == GlueJobType.GLUE_STREAMING for job in self.args.jobs):
if self.args.kafka_checkpoints_bucket_name:
return aws.s3.BucketV2.get(
f"{self._name}-checkpoints-bucket",
id=self.args.kafka_checkpoints_bucket_name,
)

if self.args.kafka_checkpoints_bucket_name:
return aws.s3.BucketV2(
resource_name=f"{self._name}-checkpoints-bucket",
opts=ResourceOptions(parent=self),
bucket_prefix=f"{self._name}-checkpoints-bucket",
bucket=self.args.kafka_checkpoints_bucket_name,
)
return None
return aws.s3.BucketV2(
resource_name=f"{self._name}-checkpoints-bucket",
opts=ResourceOptions(parent=self),
bucket_prefix=f"{self._name}-checkpoints-bucket",
)

@property
@cache
def connection(self) -> Optional[aws.glue.Connection]:
def connection(self) -> aws.glue.Connection:
"""Returns a Glue Connection.
Returns:
Optional[aws.glue.Connection]: The Glue Connection.
aws.glue.Connection: The Glue Connection.
"""
if not self.args.connector_config:
logging.warning(
"No connector config provided. Glue jobs will run outside a VPC. This is not recommended."
)
return None
raise ValueError("No connector config provided.")

return aws.glue.Connection(
resource_name=f"{self._name}-kafka-connection",
Expand Down
14 changes: 1 addition & 13 deletions tests/clouds/aws/resources/test_glue_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,6 @@ def should_have_one_database(dbs: list[aws.glue.CatalogDatabase]):
pulumi.Output.all(glue_component.iceberg_database).apply(should_have_one_database)


@pulumi.runtime.test
def test_kafka_checkpoint_bucket(glue_component):
def should_not_create_bucket_if_no_streaming(
buckets: list[Optional[aws.s3.BucketV2]],
):
assert buckets[0] is None

pulumi.Output.all(glue_component.kafka_checkpoint_bucket).apply(
should_not_create_bucket_if_no_streaming
)


@pytest.fixture
def glue_component_with_streaming_job():
return GlueComponent(
Expand All @@ -137,7 +125,7 @@ def glue_component_with_streaming_job():
GlueJobDefinition(
name="streaming-job",
description="test streaming job",
job_type=GlueJobType.GLUE_STREAMING,
job_type=GlueJobType.STREAMING,
),
]
),
Expand Down

0 comments on commit 2da7493

Please sign in to comment.