From 86640d166c8d5b3c840bf98e5c6db0d91392fde3 Mon Sep 17 00:00:00 2001 From: simonprydden <72669153+simonprydden@users.noreply.github.com> Date: Thu, 26 Oct 2023 11:56:44 +0100 Subject: [PATCH] Add Http to s3 operator (#35176) --------- Co-authored-by: SimonPrydden --- .../amazon/aws/transfers/http_to_s3.py | 174 ++++++++++++++++++ airflow/providers/amazon/provider.yaml | 6 +- .../transfer/http_to_s3.rst | 53 ++++++ .../amazon/aws/transfers/test_http_to_s3.py | 76 ++++++++ .../amazon/aws/example_http_to_s3.py | 83 +++++++++ 5 files changed, 391 insertions(+), 1 deletion(-) create mode 100644 airflow/providers/amazon/aws/transfers/http_to_s3.py create mode 100644 docs/apache-airflow-providers-amazon/transfer/http_to_s3.rst create mode 100644 tests/providers/amazon/aws/transfers/test_http_to_s3.py create mode 100644 tests/system/providers/amazon/aws/example_http_to_s3.py diff --git a/airflow/providers/amazon/aws/transfers/http_to_s3.py b/airflow/providers/amazon/aws/transfers/http_to_s3.py new file mode 100644 index 0000000000000..ba6e63eadfe21 --- /dev/null +++ b/airflow/providers/amazon/aws/transfers/http_to_s3.py @@ -0,0 +1,174 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""This module contains operator to move data from HTTP endpoint to S3.""" +from __future__ import annotations + +from functools import cached_property +from typing import TYPE_CHECKING, Any + +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.http.hooks.http import HttpHook + +if TYPE_CHECKING: + from collections.abc import Sequence + + from requests.auth import AuthBase + + from airflow.utils.context import Context + + +class HttpToS3Operator(BaseOperator): + """ + Calls an endpoint on an HTTP system to execute an action and store the result in S3. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:HttpToS3Operator` + + :param http_conn_id: The :ref:`http connection` to run + the operator against + :param endpoint: The relative part of the full url. (templated) + :param method: The HTTP method to use, default = "POST" + :param data: The data to pass. POST-data in POST/PUT and params + in the URL for a GET request. (templated) + :param headers: The HTTP headers to be added to the GET request + :param response_check: A check against the 'requests' response object. + The callable takes the response object as the first positional argument + and optionally any number of keyword arguments available in the context dictionary. + It should return True for 'pass' and False otherwise. + :param response_filter: A function allowing you to manipulate the response + text. e.g response_filter=lambda response: json.loads(response.text). + The callable takes the response object as the first positional argument + and optionally any number of keyword arguments available in the context dictionary. + :param extra_options: Extra options for the 'requests' library, see the + 'requests' documentation (options to modify timeout, ssl, etc.) + :param log_response: Log the response (default: False) + :param auth_type: The auth type for the service + :param tcp_keep_alive: Enable TCP Keep Alive for the connection. + :param tcp_keep_alive_idle: The TCP Keep Alive Idle parameter (corresponds to ``socket.TCP_KEEPIDLE``). + :param tcp_keep_alive_count: The TCP Keep Alive count parameter (corresponds to ``socket.TCP_KEEPCNT``) + :param tcp_keep_alive_interval: The TCP Keep Alive interval parameter (corresponds to + ``socket.TCP_KEEPINTVL``) + :param s3_bucket: Name of the S3 bucket where to save the object. (templated) + It should be omitted when ``s3_key`` is provided as a full s3:// url. + :param s3_key: The key of the object to be created. (templated) + It can be either full s3:// style url or relative path from root level. + When it's specified as a full s3:// url, please omit ``s3_bucket``. + :param replace: If True, it will overwrite the key if it already exists + :param encrypt: If True, the file will be encrypted on the server-side + by S3 and will be stored in an encrypted form while at rest in S3. + :param acl_policy: String specifying the canned ACL policy for the file being + uploaded to the S3 bucket. + :param aws_conn_id: Connection id of the S3 connection to use + :param verify: Whether or not to verify SSL certificates for S3 connection. + By default SSL certificates are verified. + + You can provide the following values: + + - False: do not validate SSL certificates. SSL will still be used, + but SSL certificates will not be + verified. + - path/to/cert/bundle.pem: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. + """ + + template_fields: Sequence[str] = ("endpoint", "data", "headers", "s3_bucket", "s3_key") + template_fields_renderers = {"headers": "json", "data": "py"} + template_ext: Sequence[str] = () + ui_color = "#f4a460" + + def __init__( + self, + *, + endpoint: str | None = None, + method: str = "GET", + data: Any = None, + headers: dict[str, str] | None = None, + extra_options: dict[str, Any] | None = None, + http_conn_id: str = "http_default", + log_response: bool = False, + auth_type: type[AuthBase] | None = None, + tcp_keep_alive: bool = True, + tcp_keep_alive_idle: int = 120, + tcp_keep_alive_count: int = 20, + tcp_keep_alive_interval: int = 30, + s3_bucket: str | None = None, + s3_key: str, + replace: bool = False, + encrypt: bool = False, + acl_policy: str | None = None, + aws_conn_id: str = "aws_default", + verify: str | bool | None = None, + **kwargs, + ): + super().__init__(**kwargs) + self.http_conn_id = http_conn_id + self.method = method + self.endpoint = endpoint + self.headers = headers or {} + self.data = data or {} + self.extra_options = extra_options or {} + self.log_response = log_response + self.auth_type = auth_type + self.tcp_keep_alive = tcp_keep_alive + self.tcp_keep_alive_idle = tcp_keep_alive_idle + self.tcp_keep_alive_count = tcp_keep_alive_count + self.tcp_keep_alive_interval = tcp_keep_alive_interval + self.s3_bucket = s3_bucket + self.s3_key = s3_key + self.replace = replace + self.encrypt = encrypt + self.acl_policy = acl_policy + self.aws_conn_id = aws_conn_id + self.verify = verify + + @cached_property + def http_hook(self) -> HttpHook: + """Create and return an HttpHook.""" + return HttpHook( + self.method, + http_conn_id=self.http_conn_id, + auth_type=self.auth_type, + tcp_keep_alive=self.tcp_keep_alive, + tcp_keep_alive_idle=self.tcp_keep_alive_idle, + tcp_keep_alive_count=self.tcp_keep_alive_count, + tcp_keep_alive_interval=self.tcp_keep_alive_interval, + ) + + @cached_property + def s3_hook(self) -> S3Hook: + """Create and return an S3Hook.""" + return S3Hook( + aws_conn_id=self.aws_conn_id, + verify=self.verify, + ) + + def execute(self, context: Context): + self.log.info("Calling HTTP method") + response = self.http_hook.run(self.endpoint, self.data, self.headers, self.extra_options) + + self.s3_hook.load_bytes( + response.content, + self.s3_key, + self.s3_bucket, + self.replace, + self.encrypt, + self.acl_policy, + ) diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 77c9482c7144b..3b13925d5bfd6 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -19,7 +19,7 @@ package-name: apache-airflow-providers-amazon name: Amazon description: | - Amazon integration (including `Amazon Web Services (AWS) `__). + Amazon integration (including `Amazon Web Services (AWS) `__). suspended: false versions: @@ -628,6 +628,10 @@ transfers: target-integration-name: Amazon DynamoDB how-to-guide: /docs/apache-airflow-providers-amazon/transfer/hive_to_dynamodb.rst python-module: airflow.providers.amazon.aws.transfers.hive_to_dynamodb + - source-integration-name: Hypertext Transfer Protocol (HTTP) + target-integration-name: Amazon Simple Storage Service (S3) + how-to-guide: /docs/apache-airflow-providers-amazon/transfer/http_to_s3.rst + python-module: airflow.providers.amazon.aws.transfers.http_to_s3 - source-integration-name: Internet Message Access Protocol (IMAP) target-integration-name: Amazon Simple Storage Service (S3) how-to-guide: /docs/apache-airflow-providers-amazon/transfer/imap_attachment_to_s3.rst diff --git a/docs/apache-airflow-providers-amazon/transfer/http_to_s3.rst b/docs/apache-airflow-providers-amazon/transfer/http_to_s3.rst new file mode 100644 index 0000000000000..d40e66f861b2b --- /dev/null +++ b/docs/apache-airflow-providers-amazon/transfer/http_to_s3.rst @@ -0,0 +1,53 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +================= +HTTP to Amazon S3 +================= + +Use the ``HttpToS3Operator`` transfer content from a http endpoint to an Amazon Simple Storage Service (S3) file. + +Prerequisite Tasks +------------------ + +.. include:: ../_partials/prerequisite_tasks.rst + +Operators +--------- + +.. _howto/operator:HttpToS3Operator: + +HTTP to Amazon S3 transfer operator +=================================== + +This operator copies data from a HTTP endpoint to an Amazon S3 file. + +To get more information about this operator visit: +:class:`~airflow.providers.amazon.aws.transfers.http_to_s3.HttpToS3Operator` + +Example usage: + +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_http_to_s3.py + :language: python + :dedent: 4 + :start-after: [START howto_transfer_http_to_s3] + :end-before: [END howto_transfer_http_to_s3] + +Reference +--------- + +* `AWS boto3 library documentation for Amazon S3 `__ diff --git a/tests/providers/amazon/aws/transfers/test_http_to_s3.py b/tests/providers/amazon/aws/transfers/test_http_to_s3.py new file mode 100644 index 0000000000000..5710e3d751f03 --- /dev/null +++ b/tests/providers/amazon/aws/transfers/test_http_to_s3.py @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import datetime +from unittest import mock + +import boto3 +from moto import mock_s3 + +from airflow.models.dag import DAG +from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator + +EXAMPLE_URL = "http://www.example.com" + + +@mock.patch.dict("os.environ", AIRFLOW_CONN_HTTP_EXAMPLE=EXAMPLE_URL) +class TestHttpToS3Operator: + def setup_method(self): + args = {"owner": "airflow", "start_date": datetime.datetime(2017, 1, 1)} + self.dag = DAG("test_dag_id", default_args=args) + self.http_conn_id = "HTTP_EXAMPLE" + self.response = b"Example.com fake response" + self.endpoint = "/" + self.s3_key = "test/test1.csv" + self.s3_bucket = "dummy" + + def test_init(self): + operator = HttpToS3Operator( + task_id="http_to_s3_operator", + http_conn_id=self.http_conn_id, + endpoint=self.endpoint, + s3_key=self.s3_key, + s3_bucket=self.s3_bucket, + dag=self.dag, + ) + assert operator.endpoint == self.endpoint + assert operator.s3_key == self.s3_key + assert operator.s3_bucket == self.s3_bucket + assert operator.http_conn_id == self.http_conn_id + + @mock_s3 + def test_execute(self, requests_mock): + requests_mock.register_uri("GET", EXAMPLE_URL, content=self.response) + conn = boto3.client("s3") + conn.create_bucket(Bucket=self.s3_bucket) + operator = HttpToS3Operator( + task_id="s3_to_file_sensor", + http_conn_id=self.http_conn_id, + endpoint=self.endpoint, + s3_key=self.s3_key, + s3_bucket=self.s3_bucket, + dag=self.dag, + ) + operator.execute(None) + + objects_in_bucket = conn.list_objects(Bucket=self.s3_bucket, Prefix=self.s3_key) + # there should be object found, and there should only be one object found + assert len(objects_in_bucket["Contents"]) == 1 + # the object found should be consistent with dest_key specified earlier + assert objects_in_bucket["Contents"][0]["Key"] == self.s3_key diff --git a/tests/system/providers/amazon/aws/example_http_to_s3.py b/tests/system/providers/amazon/aws/example_http_to_s3.py new file mode 100644 index 0000000000000..b5d65abe69b1d --- /dev/null +++ b/tests/system/providers/amazon/aws/example_http_to_s3.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +from airflow.models.baseoperator import chain +from airflow.models.dag import DAG +from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator +from airflow.providers.amazon.aws.transfers.http_to_s3 import HttpToS3Operator +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder + +sys_test_context_task = SystemTestContextBuilder().build() + +DAG_ID = "example_http_to_s3" + +with DAG( + DAG_ID, + schedule="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example"], +) as dag: + test_context = sys_test_context_task() + env_id = test_context["ENV_ID"] + + s3_bucket = f"{env_id}-http-to-s3-bucket" + s3_key = f"{env_id}-http-to-s3-key" + + create_s3_bucket = S3CreateBucketOperator(task_id="create_s3_bucket", bucket_name=s3_bucket) + + # [START howto_transfer_http_to_s3] + http_to_s3_task = HttpToS3Operator( + task_id="http_to_s3_task", + endpoint="/tmp/http_path", + s3_bucket=s3_bucket, + s3_key=s3_key, + replace=True, + ) + # [END howto_transfer_http_to_s3] + + delete_s3_bucket = S3DeleteBucketOperator( + task_id="delete_s3_bucket", + bucket_name=s3_bucket, + force_delete=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + + chain( + # TEST SETUP + test_context, + create_s3_bucket, + # TEST BODY + http_to_s3_task, + # TEST TEARDOWN + delete_s3_bucket, + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)