diff --git a/devenv.nix b/devenv.nix index e40e9ab..0c8f391 100644 --- a/devenv.nix +++ b/devenv.nix @@ -55,6 +55,10 @@ }; }; + # Java is required for PySpark + languages.java.enable = true; + languages.java.jdk.package = pkgs.jdk8; # Java version running on AWS Glue + enterShell = '' hello pdm install diff --git a/examples/sparkle/Pulumi.yaml b/examples/sparkle/Pulumi.yaml index 4f93a19..196c4a9 100644 --- a/examples/sparkle/Pulumi.yaml +++ b/examples/sparkle/Pulumi.yaml @@ -1,11 +1,5 @@ -name: object_storage +name: simple-spark-application runtime: name: python - options: - toolchain: pip - virtualenv: venv -description: A minimal Azure Native Python Pulumi program -config: - pulumi:tags: - value: - pulumi:template: azure-python +description: A minimal spark application that uses Sparkle +region: eu-west-1 diff --git a/examples/sparkle/__main__.py b/examples/sparkle/__main__.py index 2ff67c2..46b029e 100644 --- a/examples/sparkle/__main__.py +++ b/examples/sparkle/__main__.py @@ -1,4 +1,3 @@ -import os from damavand.cloud.provider import AwsProvider from damavand.factories import SparkControllerFactory @@ -10,7 +9,7 @@ def main() -> None: spark_factory = SparkControllerFactory( provider=AwsProvider( app_name="my-app", - region="us-west-2", + region="eu-west-1", ), tags={"env": "dev"}, ) @@ -22,10 +21,9 @@ def main() -> None: CustomerOrders(), ], ) + # app_name = os.getenv("APP_NAME", "products-app") # Get app name on runtime - app_name = os.getenv("APP_NAME", "default_app") # Get app name on runtime - - spark_controller.run_application(app_name) + # spark_controller.run_application(app_name) spark_controller.provision() diff --git a/examples/sparkle/applications/orders.py b/examples/sparkle/applications/orders.py index d82514d..5c424df 100644 --- a/examples/sparkle/applications/orders.py +++ b/examples/sparkle/applications/orders.py @@ -1,4 +1,5 @@ -from sparkle.config import Config +from sparkle.config import Config, IcebergConfig, KafkaReaderConfig +from sparkle.config.kafka_config import KafkaConfig, Credentials from sparkle.writer.iceberg_writer import IcebergWriter from sparkle.application import Sparkle from sparkle.reader.kafka_reader import KafkaReader @@ -15,6 +16,18 @@ def __init__(self): version="0.0.1", database_bucket="s3://test-bucket", checkpoints_bucket="s3://test-checkpoints", + iceberg_output=IcebergConfig( + database_name="all_products", + database_path="", + table_name="orders_v1", + ), + kafka_input=KafkaReaderConfig( + KafkaConfig( + bootstrap_servers="localhost:9119", + credentials=Credentials("test", "test"), + ), + kafka_topic="src_orders_v1", + ), ), readers={"orders": KafkaReader}, writers=[IcebergWriter], diff --git a/examples/sparkle/applications/products.py b/examples/sparkle/applications/products.py index 9362fd8..e34bdcc 100644 --- a/examples/sparkle/applications/products.py +++ b/examples/sparkle/applications/products.py @@ -1,7 +1,6 @@ from sparkle.application import Sparkle -from sparkle.config import Config +from sparkle.config import Config, IcebergConfig, TableConfig from sparkle.writer.iceberg_writer import IcebergWriter -from sparkle.writer.kafka_writer import KafkaStreamPublisher from sparkle.reader.table_reader import TableReader from pyspark.sql import DataFrame @@ -16,11 +15,20 @@ def __init__(self): version="0.0.1", database_bucket="s3://test-bucket", checkpoints_bucket="s3://test-checkpoints", + iceberg_output=IcebergConfig( + database_name="all_products", + database_path="", + table_name="products_v1", + ), + hive_table_input=TableConfig( + database="source_database", + table="products_v1", + bucket="", + ), ), readers={"products": TableReader}, writers=[ IcebergWriter, - KafkaStreamPublisher, ], ) diff --git a/pdm.lock b/pdm.lock index eac070a..7608580 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev", "llm"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:8876bfee687350c2cca3535991e47503fc5a54baa28d6f0625216d56ef84630a" +content_hash = "sha256:eded6b337337dee9c917e85dfb3d774f698d96ba4f8d7ad37fcdc1e20cba8a12" [[metadata.targets]] requires_python = ">=3.11.0" @@ -139,7 +139,7 @@ name = "certifi" version = "2024.8.30" requires_python = ">=3.6" summary = "Python package for providing Mozilla's CA Bundle." -groups = ["default", "dev", "llm"] +groups = ["dev", "llm"] files = [ {file = "certifi-2024.8.30-py3-none-any.whl", hash = "sha256:922820b53db7a7257ffbda3f597266d435245903d80737e34f8a45ff3e3230d8"}, {file = "certifi-2024.8.30.tar.gz", hash = "sha256:bec941d2aa8195e248a60b31ff9f0558284cf01a52591ceda73ea9afffd69fd9"}, @@ -198,7 +198,7 @@ name = "charset-normalizer" version = "3.3.2" requires_python = ">=3.7.0" summary = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." -groups = ["default", "dev", "llm"] +groups = ["dev", "llm"] files = [ {file = "charset-normalizer-3.3.2.tar.gz", hash = "sha256:f30c3cb33b24454a82faecaf01b19c18562b1e89558fb6c56de4d9118a032fd5"}, {file = "charset_normalizer-3.3.2-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:802fe99cca7457642125a8a88a084cef28ff0cf9407060f7b93dca5aa25480db"}, @@ -432,6 +432,25 @@ files = [ {file = "cryptography-43.0.1.tar.gz", hash = "sha256:203e92a75716d8cfb491dc47c79e17d0d9207ccffcbcb35f598fbe463ae3444d"}, ] +[[package]] +name = "damavand" +version = "0.0.0" +requires_python = ">=3.11.0" +path = "./." +summary = "Damavand is an opinionated cloud-agnostic pythonic implementation of ARC design pattern for developing cloud-native applications." +groups = ["default"] +dependencies = [ + "boto3>=1.34.147", + "flask>=3.0.3", + "psutil>=6.0.0", + "pulumi-aws>=6.47.0", + "pulumi-azure-native>=2.51.0", + "pulumi-random>=4.16.3", + "pulumi>=3.127.0", + "rich>=13.7.1", + "sparkle @ git+https://github.com/DataChefHQ/sparkle.git@v0.6.1", +] + [[package]] name = "debugpy" version = "1.8.6" @@ -568,7 +587,7 @@ name = "idna" version = "3.10" requires_python = ">=3.6" summary = "Internationalized Domain Names in Applications (IDNA)" -groups = ["default", "dev", "llm"] +groups = ["dev", "llm"] files = [ {file = "idna-3.10-py3-none-any.whl", hash = "sha256:946d195a0d259cbba61165e88e65941f16e9b36ea6ddb97f00452bae8b1287d3"}, {file = "idna-3.10.tar.gz", hash = "sha256:12f65c9b470abda6dc35cf8e63cc574b1c52b11df2c86030af0ac09b01b13ea9"}, @@ -1127,7 +1146,7 @@ files = [ name = "py4j" version = "0.10.9.5" summary = "Enables Python programs to dynamically access arbitrary Java objects" -groups = ["default", "dev"] +groups = ["dev"] files = [ {file = "py4j-0.10.9.5-py2.py3-none-any.whl", hash = "sha256:52d171a6a2b031d8a5d1de6efe451cf4f5baff1a2819aabc3741c8406539ba04"}, {file = "py4j-0.10.9.5.tar.gz", hash = "sha256:276a4a3c5a2154df1860ef3303a927460e02e97b047dc0a47c1c3fb8cce34db6"}, @@ -1261,7 +1280,7 @@ name = "pyspark" version = "3.3.2" requires_python = ">=3.7" summary = "Apache Spark Python API" -groups = ["default", "dev"] +groups = ["dev"] dependencies = [ "py4j==0.10.9.5", ] @@ -1439,7 +1458,7 @@ name = "requests" version = "2.32.3" requires_python = ">=3.8" summary = "Python HTTP for Humans." -groups = ["default", "dev", "llm"] +groups = ["dev", "llm"] dependencies = [ "certifi>=2017.4.17", "charset-normalizer<4,>=2", @@ -1456,7 +1475,7 @@ name = "responses" version = "0.25.3" requires_python = ">=3.8" summary = "A utility library for mocking out the `requests` Python library." -groups = ["default", "dev"] +groups = ["dev"] dependencies = [ "pyyaml", "requests<3.0,>=2.30.0", @@ -1655,13 +1674,8 @@ requires_python = "<4.0,>=3.10.14" git = "https://github.com/DataChefHQ/sparkle.git" ref = "v0.6.1" revision = "7c59a62035142b51af8477f62829be46fa214b43" -summary = "✨ A meta framework for Apache Spark, helping data engineers to focus on solving business problems with highest quality!" +summary = "" groups = ["default"] -dependencies = [ - "pyspark==3.3.2", - "requests<3.0.0,>=2.32.3", - "responses<1.0.0,>=0.25.3", -] [[package]] name = "tblib" diff --git a/pyproject.toml b/pyproject.toml index e7016b4..1d14a0b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "pulumi-azure-native>=2.51.0", "pulumi-random>=4.16.3", "sparkle @ git+https://github.com/DataChefHQ/sparkle.git@v0.6.1", + "damavand @ file:///${PROJECT_ROOT}/", ] requires-python = ">=3.11.0" readme = "README.md" diff --git a/src/damavand/base/controllers/spark.py b/src/damavand/base/controllers/spark.py index d591fe4..84c12ba 100644 --- a/src/damavand/base/controllers/spark.py +++ b/src/damavand/base/controllers/spark.py @@ -54,7 +54,6 @@ def application_with_id(self, app_id: str) -> Sparkle: Returns: Sparkle: The Spark application. """ - for app in self.applications: if app.config.app_id == app_id: return app diff --git a/src/damavand/cloud/aws/resources/glue_component.py b/src/damavand/cloud/aws/resources/glue_component.py index 86775b2..fdf1847 100644 --- a/src/damavand/cloud/aws/resources/glue_component.py +++ b/src/damavand/cloud/aws/resources/glue_component.py @@ -1,5 +1,7 @@ import json +import logging from typing import Any, Optional +from enum import Enum from functools import cache from dataclasses import dataclass, field @@ -8,21 +10,140 @@ from pulumi import ResourceOptions +class GlueWorkerType(Enum): + """Enum representing Glue worker types.""" + + G_1X = "G.1X" + G_2X = "G.2X" + G_4X = "G.4X" + G_8X = "G.8X" + G_025X = "G.025X" + Z_2X = "Z.2X" + + +class GlueJobType(Enum): + """Enum representing Glue job types.""" + + ETL = "glueetl" + STREAMING = "gluestreaming" + + +class GlueExecutionClass(Enum): + """Enum representing Glue execution classes.""" + + STANDARD = "STANDARD" + FLEX = "FLEX" + + +class ConnectionType(Enum): + """Enum representing connection types.""" + + KAFKA = "KAFKA" + + +@dataclass +class ConnectorConfig: + """Configuration for the Connector. + + Attributes: + vpc_id (str): VPC ID. + subnet_id (str): Subnet ID. + security_groups (list[str]): List of security group IDs. + connector_type (ConnectionType): Connector type. Default is ConnectionType.KAFKA. + connection_properties (dict): Connection properties. Default is empty dict. + """ + + vpc_id: str + subnet_id: str + security_groups: list[str] + connector_type: ConnectionType = ConnectionType.KAFKA + connection_properties: dict = field(default_factory=dict) + + @dataclass class GlueJobDefinition: + """Parameters for the Glue job. + + Attributes: + name (str): Job name. + description (str): Job description. + script_location (str): S3 path to the entrypoint script. + extra_libraries (list[str]): Paths to extra dependencies. + execution_class (GlueExecutionClass): Execution class. Default is STANDARD. + max_concurrent_runs (int): Max concurrent runs. + glue_version (str): Glue version. + enable_auto_scaling (bool): Enable auto-scaling. + max_capacity (int): Max capacity. + max_retries (int): Max retries. + number_of_workers (int): Number of workers. + timeout (int): Job timeout in minutes. + worker_type (GlueWorkerType): Worker type. + enable_glue_datacatalog (bool): Use Glue Data Catalog. + enable_continuous_cloudwatch_log (bool): Enable continuous CloudWatch log. + enable_continuous_log_filter (bool): Reduce logging amount. + enable_metrics (bool): Enable metrics. + enable_observability_metrics (bool): Enable extra Spark metrics. + script_args (dict): Script arguments. + schedule (Optional[str]): Cron-like schedule. + job_type (GlueJobType): Job type. + """ + name: str - description: str - options: dict[str, Any] = field(default_factory=dict) + description: str = "" + script_location: str = "" + extra_libraries: list[str] = field(default_factory=list) + execution_class: GlueExecutionClass = GlueExecutionClass.STANDARD + max_concurrent_runs: int = 1 + glue_version: str = "4.0" + enable_auto_scaling: bool = True + max_capacity: int = 5 + max_retries: int = 0 + number_of_workers: int = 2 + timeout: int = 2880 + worker_type: GlueWorkerType = GlueWorkerType.G_1X + enable_glue_datacatalog: bool = True + enable_continuous_cloudwatch_log: bool = False + enable_continuous_log_filter: bool = True + enable_metrics: bool = False + enable_observability_metrics: bool = False + script_args: dict = field(default_factory=dict) + schedule: Optional[str] = None + job_type: GlueJobType = GlueJobType.ETL @dataclass class GlueComponentArgs: + """Glue job definitions and infrastructure dependencies. + + Attributes: + jobs (list[GlueJobDefinition]): List of Glue jobs to deploy. + execution_role (Optional[str]): IAM role for Glue jobs. + connector_config (Optional[ConnectorConfig]): Connector configuration. + """ + jobs: list[GlueJobDefinition] - role: Optional[aws.iam.Role] = None - code_repository_bucket: Optional[aws.s3.BucketV2] = None + execution_role: Optional[str] = None + connector_config: Optional[ConnectorConfig] = None class GlueComponent(PulumiComponentResource): + """Deployment of PySpark applications on Glue. + + Resources deployed: + - Code Repository: S3 bucket. + - Data Storage: S3 bucket. + - Kafka checkpoint storage: S3 bucket. + - Compute: Glue Jobs. + - Orchestration: Triggers for the Glue Jobs. + - Metadata Catalog: Glue Database. + - Permissions: IAM role for Glue Jobs. + + Args: + name (str): The name of the resource. + args (GlueComponentArgs): The arguments for the component. + opts (Optional[ResourceOptions]): Resource options. + """ + def __init__( self, name: str, @@ -40,12 +161,134 @@ def __init__( self.args = args self.code_repository_bucket self.iceberg_database + self.iceberg_bucket self.jobs + if any(job.job_type == GlueJobType.STREAMING for job in self.args.jobs): + self.kafka_checkpoint_bucket + + if not self.args.connector_config: + logging.warning( + "No connector config provided. Glue jobs will run outside a VPC." + ) + else: + self.connection + + @property + @cache + def jobs(self) -> list[aws.glue.Job]: + """Creates and returns all the Glue jobs and adds triggers if scheduled. + + Returns: + list[aws.glue.Job]: List of Glue jobs. + """ + jobs = [] + for job in self.args.jobs: + glue_job = aws.glue.Job( + resource_name=f"{self._name}-{job.name}-job", + opts=ResourceOptions(parent=self), + name=self._get_job_name(job), + glue_version=job.glue_version, + role_arn=self.role.arn, + command=aws.glue.JobCommandArgs( + name=job.job_type.value, + python_version="3", + script_location=self._get_source_path(job), + ), + default_arguments=self._get_default_arguments(job), + number_of_workers=job.number_of_workers, + worker_type=job.worker_type.value, + execution_property=aws.glue.JobExecutionPropertyArgs( + max_concurrent_runs=job.max_concurrent_runs + ), + connections=( + [self.connection.name] if self.args.connector_config else [] + ), + ) + jobs.append(glue_job) + if job.schedule: + self._create_glue_trigger(job) + return jobs + + def _get_job_name(self, job: GlueJobDefinition) -> str: + """Generates the job name. + + Args: + job (GlueJobDefinition): The job definition. + + Returns: + str: The job name. + """ + return f"{self._name}-{job.name}-job" + + def _get_source_path(self, job: GlueJobDefinition) -> str: + """Gets the source path for the job script. + + Args: + job (GlueJobDefinition): The job definition. + + Returns: + str: The S3 path to the job script. + """ + return ( + f"s3://{self.code_repository_bucket.bucket}/{job.script_location}" + if job.script_location + else f"s3://{self.code_repository_bucket.bucket}/{job.name}.py" + ) + + def _get_default_arguments(self, job: GlueJobDefinition) -> dict[str, str]: + """Returns the map of default arguments for this job. + + Args: + job (GlueJobDefinition): The job definition. + + Returns: + dict[str, str]: The default arguments for the job. + """ + return { + "--additional-python-modules": ",".join(job.extra_libraries), + "--enable-auto-scaling": str(job.enable_auto_scaling).lower(), + "--enable-continuous-cloudwatch-log": str( + job.enable_continuous_cloudwatch_log + ).lower(), + "--enable-continuous-log-filter": str( + job.enable_continuous_log_filter + ).lower(), + "--enable-glue-datacatalog": str(job.enable_glue_datacatalog).lower(), + "--datalake-formats": "iceberg", + "--enable-metrics": str(job.enable_metrics).lower(), + "--enable-observability-metrics": str( + job.enable_observability_metrics + ).lower(), + **job.script_args, + } + + @property + @cache + def role(self) -> aws.iam.Role: + """Returns an execution role for Glue jobs. + + Returns: + aws.iam.Role: The IAM role for Glue jobs. + """ + if self.args.execution_role: + return aws.iam.Role.get(f"{self._name}-role", id=self.args.execution_role) + + return aws.iam.Role( + resource_name=f"{self._name}-role", + opts=ResourceOptions(parent=self), + name=f"{self._name}-role", + assume_role_policy=json.dumps(self.assume_policy), + managed_policy_arns=self.managed_policy_arns, + ) + @property def assume_policy(self) -> dict[str, Any]: - """Return the assume role policy for Glue jobs.""" + """Returns the assume role policy for Glue jobs. + Returns: + dict[str, Any]: The assume role policy. + """ return { "Version": "2012-10-17", "Statement": [ @@ -61,85 +304,112 @@ def assume_policy(self) -> dict[str, Any]: @property def managed_policy_arns(self) -> list[str]: - """Return a list of managed policy ARNs that defines the permissions for Glue jobs.""" + """Returns the managed policy ARNs defining permissions for Glue jobs. + Returns: + list[str]: List of managed policy ARNs. + """ return [ aws.iam.ManagedPolicy.AWS_GLUE_SERVICE_ROLE, aws.iam.ManagedPolicy.AMAZON_S3_FULL_ACCESS, - aws.iam.ManagedPolicy.CLOUD_TRAIL_FULL_ACCESS, ] - @property - @cache - def role(self) -> aws.iam.Role: - """Return an execution role for Glue jobs.""" - - return self.args.role or aws.iam.Role( - resource_name=f"{self._name}-role", - opts=ResourceOptions(parent=self), - name=f"{self._name}-ExecutionRole", - assume_role_policy=json.dumps(self.assume_policy), - managed_policy_arns=self.managed_policy_arns, - ) - @property @cache def code_repository_bucket(self) -> aws.s3.BucketV2: - """Return an S3 bucket for Glue jobs to host source codes.""" + """Returns an S3 bucket for Glue jobs to host source codes. - # NOTE: using `bucket_prefix` to avoid name conflict as the bucket name must be globally unique. - return self.args.code_repository_bucket or aws.s3.BucketV2( + Returns: + aws.s3.BucketV2: The S3 bucket for code repository. + """ + return aws.s3.BucketV2( resource_name=f"{self._name}-code-bucket", opts=ResourceOptions(parent=self), - bucket_prefix=f"{self._name}-code-bucket", + bucket_prefix=f"{self._name[:20]}-code-bucket", ) @property @cache def iceberg_bucket(self) -> aws.s3.BucketV2: - """Return an S3 bucket for Iceberg tables to store data processed by Glue jobs.""" + """Returns an S3 bucket for Iceberg tables to store data. - # NOTE: using `bucket_prefix` to avoid name conflict as the bucket name must be globally unique. + Returns: + aws.s3.BucketV2: The S3 bucket for Iceberg data. + """ return aws.s3.BucketV2( - resource_name=f"{self._name}-bucket", + resource_name=f"{self._name}-data-bucket", opts=ResourceOptions(parent=self), - bucket_prefix=f"{self._name}-bucket", + bucket_prefix=f"{self._name[:20]}-data-bucket", ) @property @cache def iceberg_database(self) -> aws.glue.CatalogDatabase: - """Return a Glue database for Iceberg tables to store data processed by Glue jobs.""" + """Returns a Glue database for Iceberg tables. + Returns: + aws.glue.CatalogDatabase: The Glue database. + """ return aws.glue.CatalogDatabase( resource_name=f"{self._name}-database", opts=ResourceOptions(parent=self), - name=f"{self._name}-database", + name=f"{self._name[:20]}-database", location_uri=f"s3://{self.iceberg_bucket.bucket}/", ) + def _create_glue_trigger(self, job: GlueJobDefinition) -> aws.glue.Trigger: + """Creates a Glue Trigger for the job if scheduled. + + Args: + job (GlueJobDefinition): The job definition. + + Returns: + aws.glue.Trigger: The Glue trigger. + """ + return aws.glue.Trigger( + f"{job.name}-glue-trigger", + type="SCHEDULED", + schedule=job.schedule, + actions=[ + aws.glue.TriggerActionArgs( + job_name=self._get_job_name(job), + ) + ], + start_on_creation=True, + ) + @property @cache - def jobs(self) -> list[aws.glue.Job]: - """Return all the Glue jobs for the application.""" + def kafka_checkpoint_bucket(self) -> aws.s3.BucketV2: + """Returns an S3 bucket to store Kafka checkpoints. - return [ - aws.glue.Job( - resource_name=f"{self._name}-{job.name}-job", - opts=ResourceOptions(parent=self), - name=f"{self._name}-{job.name}-job", - role_arn=self.role.arn, - glue_version="4.0", - command={ - "script_location": f"s3://{self.code_repository_bucket.bucket}/", - }, - default_arguments={ - "--env": "dev", - "--pipeline-name": job.name, - "--options": " ".join( - [f'{k}="{v}"' for k, v in job.options.items()] - ), - }, - ) - for job in self.args.jobs - ] + Returns: + aws.s3.BucketV2: The S3 bucket for Kafka checkpoints. + """ + return aws.s3.BucketV2( + resource_name=f"{self._name}-checkpoints-bucket", + opts=ResourceOptions(parent=self), + bucket_prefix=f"{self._name[:20]}-checkpoints-bucket", + ) + + @property + @cache + def connection(self) -> aws.glue.Connection: + """Returns a Glue Connection. + + Returns: + aws.glue.Connection: The Glue Connection. + """ + if not self.args.connector_config: + raise ValueError("No connector config provided.") + + return aws.glue.Connection( + resource_name=f"{self._name}-kafka-connection", + name=f"{self._name}-kafka-connection", + connection_type=self.args.connector_config.connector_type.value, + connection_properties=self.args.connector_config.connection_properties, + physical_connection_requirements={ + "security_group_id_lists": self.args.connector_config.security_groups, + "subnet_id": self.args.connector_config.subnet_id, + }, + ) diff --git a/src/damavand/factories.py b/src/damavand/factories.py index 367e7e3..31d5a16 100644 --- a/src/damavand/factories.py +++ b/src/damavand/factories.py @@ -16,6 +16,7 @@ def _new_aws_controller( ) -> SparkController: return AwsSparkController( name=name, + applications=applications, region=region, tags=tags, **kwargs, diff --git a/tests/clouds/aws/resources/test_glue_component.py b/tests/clouds/aws/resources/test_glue_component.py index 1b8e07c..4257d30 100644 --- a/tests/clouds/aws/resources/test_glue_component.py +++ b/tests/clouds/aws/resources/test_glue_component.py @@ -6,8 +6,6 @@ import pulumi_aws as aws from pulumi.runtime.mocks import MockResourceArgs, MockCallArgs -from damavand.cloud.aws.resources.glue_component import GlueJobDefinition - # NOTE: this has to be defined before importing infrastructure codes. # Check Pulumi's documentation for more details: https://www.pulumi.com/docs/using-pulumi/testing/unit/ @@ -25,6 +23,11 @@ def call(self, args: MockCallArgs) -> Tuple[dict, Optional[List[Tuple[str, str]] ) from damavand.cloud.aws.resources import GlueComponent, GlueComponentArgs # noqa: E402 +from damavand.cloud.aws.resources.glue_component import ( # noqa: E402 + GlueJobDefinition, + ConnectorConfig, + GlueJobType, +) @pytest.fixture @@ -93,3 +96,77 @@ def should_name_have_prefix(names: list[str]): pulumi.Output.all(glue_component.jobs).apply(should_have_two) for job in glue_component.jobs: pulumi.Output.all(job.name).apply(should_name_have_prefix) + + +@pulumi.runtime.test +def test_code_repository_bucket(glue_component): + def should_have_one_bucket(buckets: list[aws.s3.BucketV2]): + assert len(buckets) == 1 + + pulumi.Output.all(glue_component.code_repository_bucket).apply( + should_have_one_bucket + ) + + +@pulumi.runtime.test +def test_iceberg_database(glue_component): + def should_have_one_database(dbs: list[aws.glue.CatalogDatabase]): + assert len(dbs) == 1 + + pulumi.Output.all(glue_component.iceberg_database).apply(should_have_one_database) + + +@pytest.fixture +def glue_component_with_streaming_job(): + return GlueComponent( + name="test-streaming", + args=GlueComponentArgs( + jobs=[ + GlueJobDefinition( + name="streaming-job", + description="test streaming job", + job_type=GlueJobType.STREAMING, + ), + ] + ), + ) + + +@pulumi.runtime.test +def test_kafka_checkpoint_bucket_streaming(glue_component_with_streaming_job): + def should_create_bucket_for_streaming(buckets: list[aws.s3.BucketV2]): + assert len(buckets) == 1 + + pulumi.Output.all(glue_component_with_streaming_job.kafka_checkpoint_bucket).apply( + should_create_bucket_for_streaming + ) + + +@pytest.fixture +def glue_component_with_connector(): + return GlueComponent( + name="test-connector", + args=GlueComponentArgs( + jobs=[ + GlueJobDefinition( + name="job-with-connector", + description="job with connector", + ), + ], + connector_config=ConnectorConfig( + vpc_id="vpc-12345678", + subnet_id="subnet-12345678", + security_groups=["sg-12345678"], + connection_properties={"BootstrapServers": "localhost:9092"}, + ), + ), + ) + + +@pulumi.runtime.test +def test_connection_creation(glue_component_with_connector): + + def should_have_one(connections: list[aws.glue.Connection]): + assert len(connections) == 1 + + pulumi.Output.all(glue_component_with_connector.connection).apply(should_have_one)