diff --git a/src/integrations/prefect-aws/prefect_aws/s3.py b/src/integrations/prefect-aws/prefect_aws/s3.py index 39a3e9b7ea9c..f98bca275bda 100644 --- a/src/integrations/prefect-aws/prefect_aws/s3.py +++ b/src/integrations/prefect-aws/prefect_aws/s3.py @@ -5,18 +5,18 @@ import os import uuid from pathlib import Path -from typing import Any, BinaryIO, Dict, List, Optional, Union, get_args +from typing import Any, BinaryIO, Dict, List, Optional, Tuple, Union, get_args -import boto3 from botocore.paginate import PageIterator from botocore.response import StreamingBody from pydantic import Field, field_validator from prefect import task +from prefect._internal.compatibility.async_dispatch import async_dispatch from prefect.blocks.abstract import CredentialsBlock, ObjectStorageBlock from prefect.filesystems import WritableDeploymentStorage, WritableFileSystem from prefect.logging import get_run_logger -from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible +from prefect.utilities.asyncutils import run_sync_in_worker_thread from prefect.utilities.filesystem import filter_files from prefect.utilities.pydantic import lookup_type from prefect_aws import AwsCredentials, MinIOCredentials @@ -24,7 +24,7 @@ @task -async def s3_download( +async def adownload_from_bucket( bucket: str, key: str, aws_credentials: AwsCredentials, @@ -33,6 +33,8 @@ async def s3_download( """ Downloads an object with a given key from a given S3 bucket. + Added in prefect-aws==0.5.3. + Args: bucket: Name of bucket to download object from. Required if a default value was not supplied when creating the task. @@ -51,22 +53,21 @@ async def s3_download( ```python from prefect import flow from prefect_aws import AwsCredentials - from prefect_aws.s3 import s3_download - + from prefect_aws.s3 import adownload_from_bucket @flow - async def example_s3_download_flow(): + async def example_download_from_bucket_flow(): aws_credentials = AwsCredentials( aws_access_key_id="acccess_key_id", aws_secret_access_key="secret_access_key" ) - data = await s3_download( + data = await adownload_from_bucket( bucket="bucket", key="key", aws_credentials=aws_credentials, ) - example_s3_download_flow() + await example_download_from_bucket_flow() ``` """ logger = get_run_logger() @@ -86,7 +87,71 @@ async def example_s3_download_flow(): @task -async def s3_upload( +@async_dispatch(adownload_from_bucket) +def download_from_bucket( + bucket: str, + key: str, + aws_credentials: AwsCredentials, + aws_client_parameters: AwsClientParameters = AwsClientParameters(), +) -> bytes: + """ + Downloads an object with a given key from a given S3 bucket. + + Args: + bucket: Name of bucket to download object from. Required if a default value was + not supplied when creating the task. + key: Key of object to download. Required if a default value was not supplied + when creating the task. + aws_credentials: Credentials to use for authentication with AWS. + aws_client_parameters: Custom parameter for the boto3 client initialization. + + + Returns: + A `bytes` representation of the downloaded object. + + Example: + Download a file from an S3 bucket: + + ```python + from prefect import flow + from prefect_aws import AwsCredentials + from prefect_aws.s3 import download_from_bucket + + + @flow + async def example_download_from_bucket_flow(): + aws_credentials = AwsCredentials( + aws_access_key_id="acccess_key_id", + aws_secret_access_key="secret_access_key" + ) + data = download_from_bucket( + bucket="bucket", + key="key", + aws_credentials=aws_credentials, + ) + + example_download_from_bucket_flow() + ``` + """ + logger = get_run_logger() + logger.info("Downloading object from bucket %s with key %s", bucket, key) + + s3_client = aws_credentials.get_boto3_session().client( + "s3", **aws_client_parameters.get_params_override() + ) + stream = io.BytesIO() + s3_client.download_fileobj(Bucket=bucket, Key=key, Fileobj=stream) + stream.seek(0) + output = stream.read() + + return output + + +s3_download = download_from_bucket # backward compatibility + + +@task +async def aupload_to_bucket( data: bytes, bucket: str, aws_credentials: AwsCredentials, @@ -94,7 +159,9 @@ async def s3_upload( key: Optional[str] = None, ) -> str: """ - Uploads data to an S3 bucket. + Asynchronously uploads data to an S3 bucket. + + Added in prefect-aws==0.5.3. Args: data: Bytes representation of data to upload to S3. @@ -113,7 +180,7 @@ async def s3_upload( ```python from prefect import flow from prefect_aws import AwsCredentials - from prefect_aws.s3 import s3_upload + from prefect_aws.s3 import aupload_to_bucket @flow @@ -123,14 +190,14 @@ async def example_s3_upload_flow(): aws_secret_access_key="secret_access_key" ) with open("data.csv", "rb") as file: - key = await s3_upload( + key = await aupload_to_bucket( bucket="bucket", key="data.csv", data=file.read(), aws_credentials=aws_credentials, ) - example_s3_upload_flow() + await example_s3_upload_flow() ``` """ logger = get_run_logger() @@ -151,7 +218,73 @@ async def example_s3_upload_flow(): @task -async def s3_copy( +@async_dispatch(aupload_to_bucket) +def upload_to_bucket( + data: bytes, + bucket: str, + aws_credentials: AwsCredentials, + aws_client_parameters: AwsClientParameters = AwsClientParameters(), + key: Optional[str] = None, +) -> str: + """ + Uploads data to an S3 bucket. + + Args: + data: Bytes representation of data to upload to S3. + bucket: Name of bucket to upload data to. Required if a default value was not + supplied when creating the task. + aws_credentials: Credentials to use for authentication with AWS. + aws_client_parameters: Custom parameter for the boto3 client initialization.. + key: Key of object to download. Defaults to a UUID string. + + Returns: + The key of the uploaded object + + Example: + Read and upload a file to an S3 bucket: + + ```python + from prefect import flow + from prefect_aws import AwsCredentials + from prefect_aws.s3 import upload_to_bucket + + + @flow + async def example_s3_upload_flow(): + aws_credentials = AwsCredentials( + aws_access_key_id="acccess_key_id", + aws_secret_access_key="secret_access_key" + ) + with open("data.csv", "rb") as file: + key = upload_to_bucket( + bucket="bucket", + key="data.csv", + data=file.read(), + aws_credentials=aws_credentials, + ) + + example_s3_upload_flow() + ``` + """ + logger = get_run_logger() + + key = key or str(uuid.uuid4()) + + logger.info("Uploading object to bucket %s with key %s", bucket, key) + + s3_client = aws_credentials.get_boto3_session().client( + "s3", **aws_client_parameters.get_params_override() + ) + stream = io.BytesIO(data) + s3_client.upload_fileobj(stream, Bucket=bucket, Key=key) + return key + + +s3_upload = upload_to_bucket # backward compatibility + + +@task +async def acopy_objects( source_path: str, target_path: str, source_bucket_name: str, @@ -159,13 +292,15 @@ async def s3_copy( target_bucket_name: Optional[str] = None, **copy_kwargs, ) -> str: - """Uses S3's internal + """Asynchronously uses S3's internal [CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html) to copy objects within or between buckets. To copy objects between buckets, the credentials must have permission to read the source object and write to the target object. If the credentials do not have those permissions, try using `S3Bucket.stream_from`. + Added in prefect-aws==0.5.3. + Args: source_path: The path to the object to copy. Can be a string or `Path`. target_path: The path to copy the object to. Can be a string or `Path`. @@ -186,20 +321,20 @@ async def s3_copy( ```python from prefect import flow from prefect_aws import AwsCredentials - from prefect_aws.s3 import s3_copy + from prefect_aws.s3 import acopy_objects aws_credentials = AwsCredentials.load("my-creds") @flow async def example_copy_flow(): - await s3_copy( + await acopy_objects( source_path="my_folder/notes.txt", target_path="my_folder/notes_copy.txt", source_bucket_name="my-bucket", aws_credentials=aws_credentials, ) - example_copy_flow() + await example_copy_flow() ``` Copy notes.txt from s3://my-bucket/my_folder/notes.txt to @@ -208,13 +343,115 @@ async def example_copy_flow(): ```python from prefect import flow from prefect_aws import AwsCredentials - from prefect_aws.s3 import s3_copy + from prefect_aws.s3 import acopy_objects aws_credentials = AwsCredentials.load("shared-creds") @flow async def example_copy_flow(): - await s3_copy( + await acopy_objects( + source_path="my_folder/notes.txt", + target_path="notes_copy.txt", + source_bucket_name="my-bucket", + aws_credentials=aws_credentials, + target_bucket_name="other-bucket", + ) + + await example_copy_flow() + ``` + + """ + logger = get_run_logger() + + s3_client = aws_credentials.get_s3_client() + + target_bucket_name = target_bucket_name or source_bucket_name + + logger.info( + "Copying object from bucket %s with key %s to bucket %s with key %s", + source_bucket_name, + source_path, + target_bucket_name, + target_path, + ) + + await run_sync_in_worker_thread( + s3_client.copy_object, + CopySource={"Bucket": source_bucket_name, "Key": source_path}, + Bucket=target_bucket_name, + Key=target_path, + **copy_kwargs, + ) + + return target_path + + +@task +@async_dispatch(acopy_objects) +def copy_objects( + source_path: str, + target_path: str, + source_bucket_name: str, + aws_credentials: AwsCredentials, + target_bucket_name: Optional[str] = None, + **copy_kwargs, +) -> str: + """Uses S3's internal + [CopyObject](https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html) + to copy objects within or between buckets. To copy objects between buckets, the + credentials must have permission to read the source object and write to the target + object. If the credentials do not have those permissions, try using + `S3Bucket.stream_from`. + + Args: + source_path: The path to the object to copy. Can be a string or `Path`. + target_path: The path to copy the object to. Can be a string or `Path`. + source_bucket_name: The bucket to copy the object from. + aws_credentials: Credentials to use for authentication with AWS. + target_bucket_name: The bucket to copy the object to. If not provided, defaults + to `source_bucket`. + **copy_kwargs: Additional keyword arguments to pass to `S3Client.copy_object`. + + Returns: + The path that the object was copied to. Excludes the bucket name. + + Examples: + + Copy notes.txt from s3://my-bucket/my_folder/notes.txt to + s3://my-bucket/my_folder/notes_copy.txt. + + ```python + from prefect import flow + from prefect_aws import AwsCredentials + from prefect_aws.s3 import copy_objects + + aws_credentials = AwsCredentials.load("my-creds") + + @flow + def example_copy_flow(): + copy_objects( + source_path="my_folder/notes.txt", + target_path="my_folder/notes_copy.txt", + source_bucket_name="my-bucket", + aws_credentials=aws_credentials, + ) + + example_copy_flow() + ``` + + Copy notes.txt from s3://my-bucket/my_folder/notes.txt to + s3://other-bucket/notes_copy.txt. + + ```python + from prefect import flow + from prefect_aws import AwsCredentials + from prefect_aws.s3 import copy_objects + + aws_credentials = AwsCredentials.load("shared-creds") + + @flow + def example_copy_flow(): + copy_objects( source_path="my_folder/notes.txt", target_path="notes_copy.txt", source_bucket_name="my-bucket", @@ -250,8 +487,11 @@ async def example_copy_flow(): return target_path +s3_copy = copy_objects # backward compatibility + + @task -async def s3_move( +async def amove_objects( source_path: str, target_path: str, source_bucket_name: str, @@ -259,11 +499,13 @@ async def s3_move( target_bucket_name: Optional[str] = None, ) -> str: """ - Move an object from one S3 location to another. To move objects between buckets, - the credentials must have permission to read and delete the source object and write - to the target object. If the credentials do not have those permissions, this method - will raise an error. If the credentials have permission to read the source object - but not delete it, the object will be copied but not deleted. + Asynchronously moves an object from one S3 location to another. To move objects + between buckets, the credentials must have permission to read and delete the source + object and write to the target object. If the credentials do not have those + permissions, this method will raise an error. If the credentials have permission to + read the source object but not delete it, the object will be copied but not deleted. + + Added in prefect-aws==0.5.3. Args: source_path: The path of the object to move @@ -292,33 +534,94 @@ async def s3_move( ) # Copy the object to the new location - s3_client.copy_object( + await run_sync_in_worker_thread( + s3_client.copy_object, Bucket=target_bucket_name, CopySource={"Bucket": source_bucket_name, "Key": source_path}, Key=target_path, ) # Delete the original object - s3_client.delete_object(Bucket=source_bucket_name, Key=source_path) + await run_sync_in_worker_thread( + s3_client.delete_object, Bucket=source_bucket_name, Key=source_path + ) return target_path -def _list_objects_sync(page_iterator: PageIterator): +@task +@async_dispatch(amove_objects) +def move_objects( + source_path: str, + target_path: str, + source_bucket_name: str, + aws_credentials: AwsCredentials, + target_bucket_name: Optional[str] = None, +) -> str: """ - Synchronous method to collect S3 objects into a list + Move an object from one S3 location to another. To move objects between buckets, + the credentials must have permission to read and delete the source object and write + to the target object. If the credentials do not have those permissions, this method + will raise an error. If the credentials have permission to read the source object + but not delete it, the object will be copied but not deleted. Args: - page_iterator: AWS Paginator for S3 objects + source_path: The path of the object to move + target_path: The path to move the object to + source_bucket_name: The name of the bucket containing the source object + aws_credentials: Credentials to use for authentication with AWS. + target_bucket_name: The bucket to copy the object to. If not provided, defaults + to `source_bucket`. Returns: - List[Dict]: List of object information + The path that the object was moved to. Excludes the bucket name. """ - return [content for page in page_iterator for content in page.get("Contents", [])] + logger = get_run_logger() + s3_client = aws_credentials.get_s3_client() -@task -async def s3_list_objects( + # If target bucket is not provided, assume it's the same as the source bucket + target_bucket_name = target_bucket_name or source_bucket_name + + logger.info( + "Moving object from s3://%s/%s s3://%s/%s", + source_bucket_name, + source_path, + target_bucket_name, + target_path, + ) + + # Copy the object to the new location + s3_client.copy_object( + Bucket=target_bucket_name, + CopySource={"Bucket": source_bucket_name, "Key": source_path}, + Key=target_path, + ) + + # Delete the original object + s3_client.delete_object(Bucket=source_bucket_name, Key=source_path) + + return target_path + + +s3_move = move_objects # backward compatibility + + +def _list_objects_sync(page_iterator: PageIterator): + """ + Synchronous method to collect S3 objects into a list + + Args: + page_iterator: AWS Paginator for S3 objects + + Returns: + List[Dict]: List of object information + """ + return [content for page in page_iterator for content in page.get("Contents", [])] + + +@task +async def alist_objects( bucket: str, aws_credentials: AwsCredentials, aws_client_parameters: AwsClientParameters = AwsClientParameters(), @@ -329,7 +632,9 @@ async def s3_list_objects( jmespath_query: Optional[str] = None, ) -> List[Dict[str, Any]]: """ - Lists details of objects in a given S3 bucket. + Asynchronously lists details of objects in a given S3 bucket. + + Added in prefect-aws==0.5.3. Args: bucket: Name of bucket to list items from. Required if a default value was not @@ -354,7 +659,7 @@ async def s3_list_objects( ```python from prefect import flow from prefect_aws import AwsCredentials - from prefect_aws.s3 import s3_list_objects + from prefect_aws.s3 import alist_objects @flow @@ -363,12 +668,12 @@ async def example_s3_list_objects_flow(): aws_access_key_id="acccess_key_id", aws_secret_access_key="secret_access_key" ) - objects = await s3_list_objects( + objects = await alist_objects( bucket="data_bucket", aws_credentials=aws_credentials ) - example_s3_list_objects_flow() + await example_s3_list_objects_flow() ``` """ # noqa E501 logger = get_run_logger() @@ -390,6 +695,83 @@ async def example_s3_list_objects_flow(): return await run_sync_in_worker_thread(_list_objects_sync, page_iterator) +@task +@async_dispatch(alist_objects) +def list_objects( + bucket: str, + aws_credentials: AwsCredentials, + aws_client_parameters: AwsClientParameters = AwsClientParameters(), + prefix: str = "", + delimiter: str = "", + page_size: Optional[int] = None, + max_items: Optional[int] = None, + jmespath_query: Optional[str] = None, +) -> List[Dict[str, Any]]: + """ + Lists details of objects in a given S3 bucket. + + Args: + bucket: Name of bucket to list items from. Required if a default value was not + supplied when creating the task. + aws_credentials: Credentials to use for authentication with AWS. + aws_client_parameters: Custom parameter for the boto3 client initialization.. + prefix: Used to filter objects with keys starting with the specified prefix. + delimiter: Character used to group keys of listed objects. + page_size: Number of objects to return in each request to the AWS API. + max_items: Maximum number of objects that to be returned by task. + jmespath_query: Query used to filter objects based on object attributes refer to + the [boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html#filtering-results-with-jmespath) + for more information on how to construct queries. + + Returns: + A list of dictionaries containing information about the objects retrieved. Refer + to the boto3 docs for an example response. + + Example: + List all objects in a bucket: + + ```python + from prefect import flow + from prefect_aws import AwsCredentials + from prefect_aws.s3 import list_objects + + + @flow + def example_s3_list_objects_flow(): + aws_credentials = AwsCredentials( + aws_access_key_id="acccess_key_id", + aws_secret_access_key="secret_access_key" + ) + objects = list_objects( + bucket="data_bucket", + aws_credentials=aws_credentials + ) + + example_s3_list_objects_flow() + ``` + """ # noqa E501 + logger = get_run_logger() + logger.info("Listing objects in bucket %s with prefix %s", bucket, prefix) + + s3_client = aws_credentials.get_boto3_session().client( + "s3", **aws_client_parameters.get_params_override() + ) + paginator = s3_client.get_paginator("list_objects_v2") + page_iterator = paginator.paginate( + Bucket=bucket, + Prefix=prefix, + Delimiter=delimiter, + PaginationConfig={"PageSize": page_size, "MaxItems": max_items}, + ) + if jmespath_query: + page_iterator = page_iterator.search(f"{jmespath_query} | {{Contents: @}}") + + return _list_objects_sync(page_iterator) # type: ignore + + +s3_list_objects = list_objects # backward compatibility + + class S3Bucket(WritableFileSystem, WritableDeploymentStorage, ObjectStorageBlock): """ Block used to store data using AWS S3 or S3-compatible object storage like MinIO. @@ -492,14 +874,14 @@ def _resolve_path(self, path: str) -> str: return path - def _get_s3_client(self) -> boto3.client: + def _get_s3_client(self): """ Authenticate MinIO credentials or AWS credentials and return an S3 client. This is a helper function called by read_path() or write_path(). """ return self.credentials.get_client("s3") - def _get_bucket_resource(self) -> boto3.resource: + def _get_bucket_resource(self): """ Retrieves boto3 resource object for the configured bucket """ @@ -511,8 +893,44 @@ def _get_bucket_resource(self) -> boto3.resource: ) return bucket - @sync_compatible - async def get_directory( + async def aget_directory( + self, from_path: Optional[str] = None, local_path: Optional[str] = None + ) -> None: + """ + Asynchronously copies a folder from the configured S3 bucket to a local directory. + + Defaults to copying the entire contents of the block's basepath to the current + working directory. + + Args: + from_path: Path in S3 bucket to download from. Defaults to the block's + configured basepath. + local_path: Local path to download S3 contents to. Defaults to the current + working directory. + """ + bucket_folder = self.bucket_folder + if from_path is None: + from_path = str(bucket_folder) if bucket_folder else "" + + if local_path is None: + local_path = str(Path(".").absolute()) + else: + local_path = str(Path(local_path).expanduser()) + + bucket = self._get_bucket_resource() + for obj in bucket.objects.filter(Prefix=from_path): + if obj.key[-1] == "/": + # object is a folder and will be created if it contains any objects + continue + target = os.path.join( + local_path, + os.path.relpath(obj.key, from_path), + ) + os.makedirs(os.path.dirname(target), exist_ok=True) + await run_sync_in_worker_thread(bucket.download_file, obj.key, target) + + @async_dispatch(aget_directory) + def get_directory( self, from_path: Optional[str] = None, local_path: Optional[str] = None ) -> None: """ @@ -548,15 +966,14 @@ async def get_directory( os.makedirs(os.path.dirname(target), exist_ok=True) bucket.download_file(obj.key, target) - @sync_compatible - async def put_directory( + async def aput_directory( self, local_path: Optional[str] = None, to_path: Optional[str] = None, ignore_file: Optional[str] = None, ) -> int: """ - Uploads a directory from a given local path to the configured S3 bucket in a + Asynchronously uploads a directory from a given local path to the configured S3 bucket in a given folder. Defaults to uploading the entire contents the current working directory to the @@ -596,15 +1013,115 @@ async def put_directory( with open(local_file_path, "rb") as local_file: local_file_content = local_file.read() - await self.write_path( + await self.awrite_path( remote_file_path.as_posix(), content=local_file_content ) uploaded_file_count += 1 return uploaded_file_count - @sync_compatible - async def read_path(self, path: str) -> bytes: + @async_dispatch(aput_directory) + def put_directory( + self, + local_path: Optional[str] = None, + to_path: Optional[str] = None, + ignore_file: Optional[str] = None, + ) -> int: + """ + Uploads a directory from a given local path to the configured S3 bucket in a + given folder. + + Defaults to uploading the entire contents the current working directory to the + block's basepath. + + Args: + local_path: Path to local directory to upload from. + to_path: Path in S3 bucket to upload to. Defaults to block's configured + basepath. + ignore_file: Path to file containing gitignore style expressions for + filepaths to ignore. + + """ + to_path = "" if to_path is None else to_path + + if local_path is None: + local_path = "." + + included_files = None + if ignore_file: + with open(ignore_file, "r") as f: + ignore_patterns = f.readlines() + + included_files = filter_files(local_path, ignore_patterns) + + uploaded_file_count = 0 + for local_file_path in Path(local_path).expanduser().rglob("*"): + if ( + included_files is not None + and str(local_file_path.relative_to(local_path)) not in included_files + ): + continue + elif not local_file_path.is_dir(): + remote_file_path = Path(to_path) / local_file_path.relative_to( + local_path + ) + with open(local_file_path, "rb") as local_file: + local_file_content = local_file.read() + + self.write_path( + remote_file_path.as_posix(), content=local_file_content, _sync=True + ) + uploaded_file_count += 1 + + return uploaded_file_count + + def _read_sync(self, key: str) -> bytes: + """ + Called by read_path(). Creates an S3 client and retrieves the + contents from a specified path. + """ + + s3_client = self._get_s3_client() + + with io.BytesIO() as stream: + s3_client.download_fileobj(Bucket=self.bucket_name, Key=key, Fileobj=stream) + stream.seek(0) + output = stream.read() + return output + + async def aread_path(self, path: str) -> bytes: + """ + Asynchronously reads the contents of a specified path from the S3 bucket. + Provide the entire path to the key in S3. + + Args: + path: Entire path to (and including) the key. + + Example: + Read "subfolder/file1" contents from an S3 bucket named "bucket": + ```python + from prefect_aws import AwsCredentials + from prefect_aws.s3 import S3Bucket + + aws_creds = AwsCredentials( + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY + ) + + s3_bucket_block = S3Bucket( + bucket_name="bucket", + credentials=aws_creds, + bucket_folder="subfolder" + ) + + key_contents = await s3_bucket_block.aread_path(path="subfolder/file1") + ``` + """ + path = self._resolve_path(path) + return await run_sync_in_worker_thread(self._read_sync, path) + + @async_dispatch(aread_path) + def read_path(self, path: str) -> bytes: """ Read specified path from S3 and return contents. Provide the entire path to the key in S3. @@ -634,26 +1151,22 @@ async def read_path(self, path: str) -> bytes: """ path = self._resolve_path(path) - return await run_sync_in_worker_thread(self._read_sync, path) + return self._read_sync(path) - def _read_sync(self, key: str) -> bytes: + def _write_sync(self, key: str, data: bytes) -> None: """ - Called by read_path(). Creates an S3 client and retrieves the - contents from a specified path. + Called by write_path(). Creates an S3 client and uploads a file + object. """ s3_client = self._get_s3_client() - with io.BytesIO() as stream: - s3_client.download_fileobj(Bucket=self.bucket_name, Key=key, Fileobj=stream) - stream.seek(0) - output = stream.read() - return output + with io.BytesIO(data) as stream: + s3_client.upload_fileobj(Fileobj=stream, Bucket=self.bucket_name, Key=key) - @sync_compatible - async def write_path(self, path: str, content: bytes) -> str: + async def awrite_path(self, path: str, content: bytes) -> str: """ - Writes to an S3 bucket. + Asynchronously writes to an S3 bucket. Args: @@ -679,7 +1192,7 @@ async def write_path(self, path: str, content: bytes) -> str: bucket_folder="dogs/smalldogs", endpoint_url="http://localhost:9000", ) - s3_havanese_path = s3_bucket_block.write_path(path="havanese", content=data) + s3_havanese_path = await s3_bucket_block.awrite_path(path="havanese", content=data) ``` """ @@ -689,16 +1202,44 @@ async def write_path(self, path: str, content: bytes) -> str: return path - def _write_sync(self, key: str, data: bytes) -> None: + @async_dispatch(awrite_path) + def write_path(self, path: str, content: bytes) -> str: """ - Called by write_path(). Creates an S3 client and uploads a file - object. + Writes to an S3 bucket. + + Args: + + path: The key name. Each object in your bucket has a unique + key (or key name). + content: What you are uploading to S3. + + Example: + + Write data to the path `dogs/small_dogs/havanese` in an S3 Bucket: + ```python + from prefect_aws import MinioCredentials + from prefect_aws.s3 import S3Bucket + + minio_creds = MinIOCredentials( + minio_root_user = "minioadmin", + minio_root_password = "minioadmin", + ) + + s3_bucket_block = S3Bucket( + bucket_name="bucket", + minio_credentials=minio_creds, + bucket_folder="dogs/smalldogs", + endpoint_url="http://localhost:9000", + ) + s3_havanese_path = s3_bucket_block.write_path(path="havanese", content=data) + ``` """ - s3_client = self._get_s3_client() + path = self._resolve_path(path) - with io.BytesIO(data) as stream: - s3_client.upload_fileobj(Fileobj=stream, Bucket=self.bucket_name, Key=key) + self._write_sync(path, content) + + return path # NEW BLOCK INTERFACE METHODS BELOW @staticmethod @@ -738,8 +1279,29 @@ def _join_bucket_folder(self, bucket_path: str = "") -> str: "" if not bucket_path.endswith("/") else "/" ) - @sync_compatible - async def list_objects( + def _list_objects_setup( + self, + folder: str = "", + delimiter: str = "", + page_size: Optional[int] = None, + max_items: Optional[int] = None, + jmespath_query: Optional[str] = None, + ) -> Tuple[PageIterator, str]: + bucket_path = self._join_bucket_folder(folder) + client = self.credentials.get_s3_client() + paginator = client.get_paginator("list_objects_v2") + page_iterator = paginator.paginate( + Bucket=self.bucket_name, + Prefix=bucket_path, + Delimiter=delimiter, + PaginationConfig={"PageSize": page_size, "MaxItems": max_items}, + ) + if jmespath_query: + page_iterator = page_iterator.search(f"{jmespath_query} | {{Contents: @}}") + + return page_iterator, bucket_path + + async def alist_objects( self, folder: str = "", delimiter: str = "", @@ -748,6 +1310,8 @@ async def list_objects( jmespath_query: Optional[str] = None, ) -> List[Dict[str, Any]]: """ + Asynchronously lists objects in the S3 bucket. + Args: folder: Folder to list objects from. delimiter: Character used to group keys of listed objects. @@ -766,20 +1330,12 @@ async def list_objects( from prefect_aws.s3 import S3Bucket s3_bucket = S3Bucket.load("my-bucket") - s3_bucket.list_objects("base_folder") + await s3_bucket.alist_objects("base_folder") ``` """ # noqa: E501 - bucket_path = self._join_bucket_folder(folder) - client = self.credentials.get_s3_client() - paginator = client.get_paginator("list_objects_v2") - page_iterator = paginator.paginate( - Bucket=self.bucket_name, - Prefix=bucket_path, - Delimiter=delimiter, - PaginationConfig={"PageSize": page_size, "MaxItems": max_items}, + page_iterator, bucket_path = self._list_objects_setup( + folder, delimiter, page_size, max_items, jmespath_query ) - if jmespath_query: - page_iterator = page_iterator.search(f"{jmespath_query} | {{Contents: @}}") self.logger.info(f"Listing objects in bucket {bucket_path}.") objects = await run_sync_in_worker_thread( @@ -787,15 +1343,52 @@ async def list_objects( ) return objects - @sync_compatible - async def download_object_to_path( + @async_dispatch(alist_objects) + def list_objects( + self, + folder: str = "", + delimiter: str = "", + page_size: Optional[int] = None, + max_items: Optional[int] = None, + jmespath_query: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """ + Args: + folder: Folder to list objects from. + delimiter: Character used to group keys of listed objects. + page_size: Number of objects to return in each request to the AWS API. + max_items: Maximum number of objects that to be returned by task. + jmespath_query: Query used to filter objects based on object attributes refer to + the [boto3 docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html#filtering-results-with-jmespath) + for more information on how to construct queries. + + Returns: + List of objects and their metadata in the bucket. + + Examples: + List objects under the `base_folder`. + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + s3_bucket.list_objects("base_folder") + ``` + """ # noqa: E501 + page_iterator, bucket_path = self._list_objects_setup( + folder, delimiter, page_size, max_items, jmespath_query + ) + + self.logger.info(f"Listing objects in bucket {bucket_path}.") + return self._list_objects_sync(page_iterator) + + async def adownload_object_to_path( self, from_path: str, to_path: Optional[Union[str, Path]], **download_kwargs: Dict[str, Any], ) -> Path: """ - Downloads an object from the S3 bucket to a path. + Asynchronously downloads an object from the S3 bucket to a path. Args: from_path: The path to the object to download; this gets prefixed @@ -814,7 +1407,7 @@ async def download_object_to_path( from prefect_aws.s3 import S3Bucket s3_bucket = S3Bucket.load("my-bucket") - s3_bucket.download_object_to_path("my_folder/notes.txt", "notes.txt") + await s3_bucket.adownload_object_to_path("my_folder/notes.txt", "notes.txt") ``` """ if to_path is None: @@ -843,8 +1436,124 @@ async def download_object_to_path( ) return Path(to_path) - @sync_compatible - async def download_object_to_file_object( + @async_dispatch(adownload_object_to_path) + def download_object_to_path( + self, + from_path: str, + to_path: Optional[Union[str, Path]], + **download_kwargs: Dict[str, Any], + ) -> Path: + """ + Downloads an object from the S3 bucket to a path. + + Args: + from_path: The path to the object to download; this gets prefixed + with the bucket_folder. + to_path: The path to download the object to. If not provided, the + object's name will be used. + **download_kwargs: Additional keyword arguments to pass to + `Client.download_file`. + + Returns: + The absolute path that the object was downloaded to. + + Examples: + Download my_folder/notes.txt object to notes.txt. + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + s3_bucket.download_object_to_path("my_folder/notes.txt", "notes.txt") + ``` + """ + if to_path is None: + to_path = Path(from_path).name + + # making path absolute, but converting back to str here + # since !r looks nicer that way and filename arg expects str + to_path = str(Path(to_path).absolute()) + bucket_path = self._join_bucket_folder(from_path) + client = self.credentials.get_s3_client() + + self.logger.debug( + f"Preparing to download object from bucket {self.bucket_name!r} " + f"path {bucket_path!r} to {to_path!r}." + ) + client.download_file( + Bucket=self.bucket_name, + Key=bucket_path, + Filename=to_path, + **download_kwargs, + ) + self.logger.info( + f"Downloaded object from bucket {self.bucket_name!r} path {bucket_path!r} " + f"to {to_path!r}." + ) + return Path(to_path) + + async def adownload_object_to_file_object( + self, + from_path: str, + to_file_object: BinaryIO, + **download_kwargs: Dict[str, Any], + ) -> BinaryIO: + """ + Asynchronously downloads an object from the object storage service to a file-like object, + which can be a BytesIO object or a BufferedWriter. + + Args: + from_path: The path to the object to download from; this gets prefixed + with the bucket_folder. + to_file_object: The file-like object to download the object to. + **download_kwargs: Additional keyword arguments to pass to + `Client.download_fileobj`. + + Returns: + The file-like object that the object was downloaded to. + + Examples: + Download my_folder/notes.txt object to a BytesIO object. + ```python + from io import BytesIO + + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + with BytesIO() as buf: + await s3_bucket.adownload_object_to_file_object("my_folder/notes.txt", buf) + ``` + + Download my_folder/notes.txt object to a BufferedWriter. + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + with open("notes.txt", "wb") as f: + await s3_bucket.adownload_object_to_file_object("my_folder/notes.txt", f) + ``` + """ + client = self.credentials.get_s3_client() + bucket_path = self._join_bucket_folder(from_path) + + self.logger.debug( + f"Preparing to download object from bucket {self.bucket_name!r} " + f"path {bucket_path!r} to file object." + ) + await run_sync_in_worker_thread( + client.download_fileobj, + Bucket=self.bucket_name, + Key=bucket_path, + Fileobj=to_file_object, + **download_kwargs, + ) + self.logger.info( + f"Downloaded object from bucket {self.bucket_name!r} path {bucket_path!r} " + "to file object." + ) + return to_file_object + + @async_dispatch(adownload_object_to_file_object) + def download_object_to_file_object( self, from_path: str, to_file_object: BinaryIO, @@ -892,8 +1601,7 @@ async def download_object_to_file_object( f"Preparing to download object from bucket {self.bucket_name!r} " f"path {bucket_path!r} to file object." ) - await run_sync_in_worker_thread( - client.download_fileobj, + client.download_fileobj( Bucket=self.bucket_name, Key=bucket_path, Fileobj=to_file_object, @@ -905,15 +1613,14 @@ async def download_object_to_file_object( ) return to_file_object - @sync_compatible - async def download_folder_to_path( + async def adownload_folder_to_path( self, from_folder: str, to_folder: Optional[Union[str, Path]] = None, **download_kwargs: Dict[str, Any], ) -> Path: """ - Downloads objects *within* a folder (excluding the folder itself) + Asynchronously downloads objects *within* a folder (excluding the folder itself) from the S3 bucket to a folder. Args: @@ -931,7 +1638,7 @@ async def download_folder_to_path( from prefect_aws.s3 import S3Bucket s3_bucket = S3Bucket.load("my-bucket") - s3_bucket.download_folder_to_path("my_folder", "my_folder") + await s3_bucket.adownload_folder_to_path("my_folder", "my_folder") ``` """ if to_folder is None: @@ -974,8 +1681,140 @@ async def download_folder_to_path( return Path(to_folder) - @sync_compatible - async def stream_from( + @async_dispatch(adownload_folder_to_path) + def download_folder_to_path( + self, + from_folder: str, + to_folder: Optional[Union[str, Path]] = None, + **download_kwargs: Dict[str, Any], + ) -> Path: + """ + Downloads objects *within* a folder (excluding the folder itself) + from the S3 bucket to a folder. + Changed in version 0.6.0. + + Args: + from_folder: The path to the folder to download from. + to_folder: The path to download the folder to. + **download_kwargs: Additional keyword arguments to pass to + `Client.download_file`. + + Returns: + The absolute path that the folder was downloaded to. + + Examples: + Download my_folder to a local folder named my_folder. + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + s3_bucket.download_folder_to_path("my_folder", "my_folder") + ``` + """ + if to_folder is None: + to_folder = "" + to_folder = Path(to_folder).absolute() + + client = self.credentials.get_s3_client() + objects = self.list_objects(folder=from_folder) + + # do not call self._join_bucket_folder for filter + # because it's built-in to that method already! + # however, we still need to do it because we're using relative_to + bucket_folder = self._join_bucket_folder(from_folder) + + assert isinstance(objects, list), "list of objects expected" + for object in objects: + bucket_path = Path(object["Key"]).relative_to(bucket_folder) + # this skips the actual directory itself, e.g. + # `my_folder/` will be skipped + # `my_folder/notes.txt` will be downloaded + if bucket_path.is_dir(): + continue + to_path = to_folder / bucket_path + to_path.parent.mkdir(parents=True, exist_ok=True) + to_path = str(to_path) # must be string + self.logger.info( + f"Downloading object from bucket {self.bucket_name!r} path " + f"{bucket_path.as_posix()!r} to {to_path!r}." + ) + client.download_file( + Bucket=self.bucket_name, + Key=object["Key"], + Filename=to_path, + **download_kwargs, + ) + + return Path(to_folder) + + async def astream_from( + self, + bucket: "S3Bucket", + from_path: str, + to_path: Optional[str] = None, + **upload_kwargs: Dict[str, Any], + ) -> str: + """Asynchronously streams an object from another bucket to this bucket. Requires the + object to be downloaded and uploaded in chunks. If `self`'s credentials + allow for writes to the other bucket, try using `S3Bucket.copy_object`. + Added in version 0.5.3. + + Args: + bucket: The bucket to stream from. + from_path: The path of the object to stream. + to_path: The path to stream the object to. Defaults to the object's name. + **upload_kwargs: Additional keyword arguments to pass to + `Client.upload_fileobj`. + + Returns: + The path that the object was uploaded to. + + Examples: + Stream notes.txt from your-bucket/notes.txt to my-bucket/landed/notes.txt. + + ```python + from prefect_aws.s3 import S3Bucket + + your_s3_bucket = S3Bucket.load("your-bucket") + my_s3_bucket = S3Bucket.load("my-bucket") + + await my_s3_bucket.astream_from( + your_s3_bucket, + "notes.txt", + to_path="landed/notes.txt" + ) + ``` + + """ + if to_path is None: + to_path = Path(from_path).name + + # Get the source object's StreamingBody + _from_path: str = bucket._join_bucket_folder(from_path) + from_client = bucket.credentials.get_s3_client() + obj = await run_sync_in_worker_thread( + from_client.get_object, Bucket=bucket.bucket_name, Key=_from_path + ) + body: StreamingBody = obj["Body"] + + # Upload the StreamingBody to this bucket + bucket_path = str(self._join_bucket_folder(to_path)) + to_client = self.credentials.get_s3_client() + await run_sync_in_worker_thread( + to_client.upload_fileobj, + Fileobj=body, + Bucket=self.bucket_name, + Key=bucket_path, + **upload_kwargs, + ) + self.logger.info( + f"Streamed s3://{bucket.bucket_name}/{_from_path} to the bucket " + f"{self.bucket_name!r} path {bucket_path!r}." + ) + return bucket_path + + @async_dispatch(astream_from) + def stream_from( self, bucket: "S3Bucket", from_path: str, @@ -997,51 +1836,95 @@ async def stream_from( The path that the object was uploaded to. Examples: - Stream notes.txt from your-bucket/notes.txt to my-bucket/landed/notes.txt. - + Stream notes.txt from your-bucket/notes.txt to my-bucket/landed/notes.txt. + + ```python + from prefect_aws.s3 import S3Bucket + + your_s3_bucket = S3Bucket.load("your-bucket") + my_s3_bucket = S3Bucket.load("my-bucket") + + my_s3_bucket.stream_from( + your_s3_bucket, + "notes.txt", + to_path="landed/notes.txt" + ) + ``` + + """ + if to_path is None: + to_path = Path(from_path).name + + # Get the source object's StreamingBody + _from_path: str = bucket._join_bucket_folder(from_path) + from_client = bucket.credentials.get_s3_client() + obj = from_client.get_object(Bucket=bucket.bucket_name, Key=_from_path) + body: StreamingBody = obj["Body"] + + # Upload the StreamingBody to this bucket + bucket_path = str(self._join_bucket_folder(to_path)) + to_client = self.credentials.get_s3_client() + to_client.upload_fileobj( + Fileobj=body, + Bucket=self.bucket_name, + Key=bucket_path, + **upload_kwargs, + ) + self.logger.info( + f"Streamed s3://{bucket.bucket_name}/{_from_path} to the bucket " + f"{self.bucket_name!r} path {bucket_path!r}." + ) + return bucket_path + + async def aupload_from_path( + self, + from_path: Union[str, Path], + to_path: Optional[str] = None, + **upload_kwargs: Dict[str, Any], + ) -> str: + """ + Asynchronously uploads an object from a path to the S3 bucket. + Added in version 0.5.3. + + Args: + from_path: The path to the file to upload from. + to_path: The path to upload the file to. + **upload_kwargs: Additional keyword arguments to pass to `Client.upload`. + + Returns: + The path that the object was uploaded to. + + Examples: + Upload notes.txt to my_folder/notes.txt. ```python from prefect_aws.s3 import S3Bucket - your_s3_bucket = S3Bucket.load("your-bucket") - my_s3_bucket = S3Bucket.load("my-bucket") - - my_s3_bucket.stream_from( - your_s3_bucket, - "notes.txt", - to_path="landed/notes.txt" - ) + s3_bucket = S3Bucket.load("my-bucket") + await s3_bucket.aupload_from_path("notes.txt", "my_folder/notes.txt") ``` - """ + from_path = str(Path(from_path).absolute()) if to_path is None: to_path = Path(from_path).name - # Get the source object's StreamingBody - from_path: str = bucket._join_bucket_folder(from_path) - from_client = bucket.credentials.get_s3_client() - obj = await run_sync_in_worker_thread( - from_client.get_object, Bucket=bucket.bucket_name, Key=from_path - ) - body: StreamingBody = obj["Body"] - - # Upload the StreamingBody to this bucket bucket_path = str(self._join_bucket_folder(to_path)) - to_client = self.credentials.get_s3_client() + client = self.credentials.get_s3_client() + await run_sync_in_worker_thread( - to_client.upload_fileobj, - Fileobj=body, + client.upload_file, + Filename=from_path, Bucket=self.bucket_name, Key=bucket_path, **upload_kwargs, ) self.logger.info( - f"Streamed s3://{bucket.bucket_name}/{from_path} to the bucket " + f"Uploaded from {from_path!r} to the bucket " f"{self.bucket_name!r} path {bucket_path!r}." ) return bucket_path - @sync_compatible - async def upload_from_path( + @async_dispatch(aupload_from_path) + def upload_from_path( self, from_path: Union[str, Path], to_path: Optional[str] = None, @@ -1074,8 +1957,7 @@ async def upload_from_path( bucket_path = str(self._join_bucket_folder(to_path)) client = self.credentials.get_s3_client() - await run_sync_in_worker_thread( - client.upload_file, + client.upload_file( Filename=from_path, Bucket=self.bucket_name, Key=bucket_path, @@ -1087,12 +1969,11 @@ async def upload_from_path( ) return bucket_path - @sync_compatible - async def upload_from_file_object( + async def aupload_from_file_object( self, from_file_object: BinaryIO, to_path: str, **upload_kwargs: Dict[str, Any] ) -> str: """ - Uploads an object to the S3 bucket from a file-like object, + Asynchronously uploads an object to the S3 bucket from a file-like object, which can be a BytesIO object or a BufferedReader. Args: @@ -1113,7 +1994,7 @@ async def upload_from_file_object( s3_bucket = S3Bucket.load("my-bucket") with open("notes.txt", "rb") as f: - s3_bucket.upload_from_file_object(f, "my_folder/notes.txt") + await s3_bucket.aupload_from_file_object(f, "my_folder/notes.txt") ``` Upload BufferedReader object to my_folder/notes.txt. @@ -1142,16 +2023,69 @@ async def upload_from_file_object( ) return bucket_path - @sync_compatible - async def upload_from_folder( + @async_dispatch(aupload_from_file_object) + def upload_from_file_object( + self, from_file_object: BinaryIO, to_path: str, **upload_kwargs: Dict[str, Any] + ) -> str: + """ + Uploads an object to the S3 bucket from a file-like object, + which can be a BytesIO object or a BufferedReader. + + Args: + from_file_object: The file-like object to upload from. + to_path: The path to upload the object to. + **upload_kwargs: Additional keyword arguments to pass to + `Client.upload_fileobj`. + + Returns: + The path that the object was uploaded to. + + Examples: + Upload BytesIO object to my_folder/notes.txt. + ```python + from io import BytesIO + + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + with open("notes.txt", "rb") as f: + s3_bucket.upload_from_file_object(f, "my_folder/notes.txt") + ``` + + Upload BufferedReader object to my_folder/notes.txt. + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + with open("notes.txt", "rb") as f: + s3_bucket.upload_from_file_object( + f, "my_folder/notes.txt" + ) + ``` + """ + bucket_path = str(self._join_bucket_folder(to_path)) + client = self.credentials.get_s3_client() + client.upload_fileobj( + Fileobj=from_file_object, + Bucket=self.bucket_name, + Key=bucket_path, + **upload_kwargs, + ) + self.logger.info( + "Uploaded from file object to the bucket " + f"{self.bucket_name!r} path {bucket_path!r}." + ) + return bucket_path + + async def aupload_from_folder( self, from_folder: Union[str, Path], to_folder: Optional[str] = None, **upload_kwargs: Dict[str, Any], - ) -> str: + ) -> Union[str, None]: """ - Uploads files *within* a folder (excluding the folder itself) - to the object storage service folder. + Asynchronously uploads files *within* a folder (excluding the folder itself) + to the object storage service folder. Added in version prefect-aws==0.5.3. Args: from_folder: The path to the folder to upload from. @@ -1168,7 +2102,7 @@ async def upload_from_folder( from prefect_aws.s3 import S3Bucket s3_bucket = S3Bucket.load("my-bucket") - s3_bucket.upload_from_folder("my_folder", "new_folder") + await s3_bucket.aupload_from_folder("my_folder", "new_folder") ``` """ from_folder = Path(from_folder) @@ -1213,8 +2147,73 @@ async def upload_from_folder( return to_folder - @sync_compatible - async def copy_object( + @async_dispatch(aupload_from_folder) + def upload_from_folder( + self, + from_folder: Union[str, Path], + to_folder: Optional[str] = None, + **upload_kwargs: Dict[str, Any], + ) -> Union[str, None]: + """ + Uploads files *within* a folder (excluding the folder itself) + to the object storage service folder. + + Args: + from_folder: The path to the folder to upload from. + to_folder: The path to upload the folder to. + **upload_kwargs: Additional keyword arguments to pass to + `Client.upload_fileobj`. + + Returns: + The path that the folder was uploaded to. + + Examples: + Upload contents from my_folder to new_folder. + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + s3_bucket.upload_from_folder("my_folder", "new_folder") + ``` + """ + from_folder = Path(from_folder) + bucket_folder = self._join_bucket_folder(to_folder or "") + + num_uploaded = 0 + client = self.credentials.get_s3_client() + + for from_path in from_folder.rglob("**/*"): + # this skips the actual directory itself, e.g. + # `my_folder/` will be skipped + # `my_folder/notes.txt` will be uploaded + if from_path.is_dir(): + continue + bucket_path = ( + Path(bucket_folder) / from_path.relative_to(from_folder) + ).as_posix() + self.logger.info( + f"Uploading from {str(from_path)!r} to the bucket " + f"{self.bucket_name!r} path {bucket_path!r}." + ) + client.upload_file( + Filename=str(from_path), + Bucket=self.bucket_name, + Key=bucket_path, + **upload_kwargs, + ) + num_uploaded += 1 + + if num_uploaded == 0: + self.logger.warning(f"No files were uploaded from {str(from_folder)!r}.") + else: + self.logger.info( + f"Uploaded {num_uploaded} files from {str(from_folder)!r} to " + f"the bucket {self.bucket_name!r} path {bucket_path!r}" + ) + + return to_folder + + def copy_object( self, from_path: Union[str, Path], to_path: Union[str, Path], @@ -1301,8 +2300,108 @@ async def copy_object( return target_path - @sync_compatible - async def move_object( + def _move_object_setup( + self, + from_path: Union[str, Path], + to_path: Union[str, Path], + to_bucket: Optional[Union["S3Bucket", str]] = None, + ) -> Tuple[str, str, str, str]: + source_bucket_name = self.bucket_name + source_path = self._resolve_path(Path(from_path).as_posix()) + + # Default to moving within the same bucket + to_bucket = to_bucket or self + + target_bucket_name: str + target_path: str + if isinstance(to_bucket, S3Bucket): + target_bucket_name = to_bucket.bucket_name + target_path = to_bucket._resolve_path(Path(to_path).as_posix()) + elif isinstance(to_bucket, str): + target_bucket_name = to_bucket + target_path = Path(to_path).as_posix() + else: + raise TypeError( + f"to_bucket must be a string or S3Bucket, not {type(to_bucket)}" + ) + + self.logger.info( + "Moving object from s3://%s/%s to s3://%s/%s", + source_bucket_name, + source_path, + target_bucket_name, + target_path, + ) + + return source_bucket_name, source_path, target_bucket_name, target_path + + async def amove_object( + self, + from_path: Union[str, Path], + to_path: Union[str, Path], + to_bucket: Optional[Union["S3Bucket", str]] = None, + ) -> str: + """Asynchronously uses S3's internal CopyObject and DeleteObject to move objects + within or between buckets. To move objects between buckets, `self`'s credentials + must have permission to read and delete the source object and write to the target + object. If the credentials do not have those permissions, this method will raise + an error. If the credentials have permission to read the source object but not + delete it, the object will be copied but not deleted. + + Args: + from_path: The path of the object to move. + to_path: The path to move the object to. + to_bucket: The bucket to move to. Defaults to the current bucket. + + Returns: + The path that the object was moved to. Excludes the bucket name. + + Examples: + + Move notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt. + + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + await s3_bucket.amove_object("my_folder/notes.txt", "my_folder/notes_copy.txt") + ``` + + Move notes.txt from my_folder/notes.txt to my_folder/notes_copy.txt in + another bucket. + + ```python + from prefect_aws.s3 import S3Bucket + + s3_bucket = S3Bucket.load("my-bucket") + await s3_bucket.amove_object( + "my_folder/notes.txt", + "my_folder/notes_copy.txt", + to_bucket="other-bucket" + ) + ``` + """ + s3_client = self.credentials.get_s3_client() + + ( + source_bucket_name, + source_path, + target_bucket_name, + target_path, + ) = self._move_object_setup(from_path, to_path, to_bucket) + + # If invalid, should error and prevent next operation + await run_sync_in_worker_thread( + s3_client.copy, + CopySource={"Bucket": source_bucket_name, "Key": source_path}, + Bucket=target_bucket_name, + Key=target_path, + ) + s3_client.delete_object(Bucket=source_bucket_name, Key=source_path) + return target_path + + @async_dispatch(amove_object) + def move_object( self, from_path: Union[str, Path], to_path: Union[str, Path], @@ -1311,9 +2410,9 @@ async def move_object( """Uses S3's internal CopyObject and DeleteObject to move objects within or between buckets. To move objects between buckets, `self`'s credentials must have permission to read and delete the source object and write to the target - object. If the credentials do not have those permissions, this method will - raise an error. If the credentials have permission to read the source object - but not delete it, the object will be copied but not deleted. + object. If the credentials do not have those permissions, this method will raise + an error. If the credentials have permission to read the source object but not + delete it, the object will be copied but not deleted. Args: from_path: The path of the object to move. @@ -1350,34 +2449,13 @@ async def move_object( """ s3_client = self.credentials.get_s3_client() - source_bucket_name = self.bucket_name - source_path = self._resolve_path(Path(from_path).as_posix()) - - # Default to moving within the same bucket - to_bucket = to_bucket or self - - target_bucket_name: str - target_path: str - if isinstance(to_bucket, S3Bucket): - target_bucket_name = to_bucket.bucket_name - target_path = to_bucket._resolve_path(Path(to_path).as_posix()) - elif isinstance(to_bucket, str): - target_bucket_name = to_bucket - target_path = Path(to_path).as_posix() - else: - raise TypeError( - f"to_bucket must be a string or S3Bucket, not {type(to_bucket)}" - ) - - self.logger.info( - "Moving object from s3://%s/%s to s3://%s/%s", + ( source_bucket_name, source_path, target_bucket_name, target_path, - ) + ) = self._move_object_setup(from_path, to_path, to_bucket) - # If invalid, should error and prevent next operation s3_client.copy( CopySource={"Bucket": source_bucket_name, "Key": source_path}, Bucket=target_bucket_name, diff --git a/src/integrations/prefect-aws/tests/test_s3.py b/src/integrations/prefect-aws/tests/test_s3.py index 9a5e389c2035..69121d21a807 100644 --- a/src/integrations/prefect-aws/tests/test_s3.py +++ b/src/integrations/prefect-aws/tests/test_s3.py @@ -10,6 +10,10 @@ from prefect_aws.client_parameters import AwsClientParameters from prefect_aws.s3 import ( S3Bucket, + acopy_objects, + adownload_from_bucket, + alist_objects, + amove_objects, s3_copy, s3_download, s3_list_objects, @@ -1137,3 +1141,111 @@ def test_round_trip_default_credentials(self): assert hasattr( loaded.credentials, "aws_access_key_id" ), "`credentials` were not properly initialized" + + @pytest.mark.parametrize( + "client_parameters", + [ + pytest.param( + "aws_client_parameters_custom_endpoint", + marks=pytest.mark.is_public(False), + ), + pytest.param( + "aws_client_parameters_custom_endpoint", + marks=pytest.mark.is_public(True), + ), + pytest.param( + "aws_client_parameters_empty", + marks=pytest.mark.is_public(False), + ), + pytest.param( + "aws_client_parameters_empty", + marks=pytest.mark.is_public(True), + ), + pytest.param( + "aws_client_parameters_public_bucket", + marks=[ + pytest.mark.is_public(False), + pytest.mark.xfail(reason="Bucket is not a public one"), + ], + ), + pytest.param( + "aws_client_parameters_public_bucket", + marks=pytest.mark.is_public(True), + ), + ], + indirect=True, + ) + async def test_async_download_from_bucket( + self, object, client_parameters, aws_credentials + ): + @flow + async def test_flow(): + return await adownload_from_bucket( + bucket="bucket", + key="object", + aws_credentials=aws_credentials, + aws_client_parameters=client_parameters, + ) + + result = await test_flow() + assert result == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) + async def test_async_list_objects( + self, object, object_in_folder, client_parameters, aws_credentials + ): + @flow + async def test_flow(): + return await alist_objects( + bucket="bucket", + aws_credentials=aws_credentials, + aws_client_parameters=client_parameters, + ) + + objects = await test_flow() + assert len(objects) == 2 + assert [object["Key"] for object in objects] == ["folder/object", "object"] + + @pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) + async def test_async_copy_objects(self, object, bucket, bucket_2, aws_credentials): + def read(bucket, key): + stream = io.BytesIO() + bucket.download_fileobj(key, stream) + stream.seek(0) + return stream.read() + + @flow + async def test_flow(): + await acopy_objects( + source_path="object", + target_path="subfolder/new_object", + source_bucket_name="bucket", + aws_credentials=aws_credentials, + target_bucket_name="bucket_2", + ) + + await test_flow() + assert read(bucket_2, "subfolder/new_object") == b"TEST" + + @pytest.mark.parametrize("client_parameters", aws_clients, indirect=True) + async def test_async_move_objects(self, object, bucket, bucket_2, aws_credentials): + def read(bucket, key): + stream = io.BytesIO() + bucket.download_fileobj(key, stream) + stream.seek(0) + return stream.read() + + @flow + async def test_flow(): + await amove_objects( + source_path="object", + target_path="moved_object", + source_bucket_name="bucket", + target_bucket_name="bucket_2", + aws_credentials=aws_credentials, + ) + + await test_flow() + assert read(bucket_2, "moved_object") == b"TEST" + with pytest.raises(ClientError): + read(bucket, "object")