From 2e393a5e7dce5a09335a46e950bd9ddfbbcc8d44 Mon Sep 17 00:00:00 2001 From: Hemanth Sai Maheswarla Date: Wed, 20 Mar 2024 00:32:37 +0530 Subject: [PATCH] adding chunked size download and separate files for failed upload parts Signed-off-by: Hemanth Sai Maheswarla --- ...objects_multipart_failed_upload_parts.yaml | 8 +- rgw/v2/tests/s3_swift/reusable.py | 135 +++++++++++++----- .../s3_swift/test_Mbuckets_with_Nobjects.py | 1 + 3 files changed, 101 insertions(+), 43 deletions(-) diff --git a/rgw/v2/tests/s3_swift/configs/test_Mbuckets_with_Nobjects_multipart_failed_upload_parts.yaml b/rgw/v2/tests/s3_swift/configs/test_Mbuckets_with_Nobjects_multipart_failed_upload_parts.yaml index 2b10969d9..5d87f8a22 100644 --- a/rgw/v2/tests/s3_swift/configs/test_Mbuckets_with_Nobjects_multipart_failed_upload_parts.yaml +++ b/rgw/v2/tests/s3_swift/configs/test_Mbuckets_with_Nobjects_multipart_failed_upload_parts.yaml @@ -2,12 +2,12 @@ # script: test_Mbuckets_with_Nobjects.py config: user_count: 1 - bucket_count: 1 + bucket_count: 2 objects_count: 1 - split_size: 40 + split_size: 100 objects_size_range: - min: 70M - max: 70M + min: 200M + max: 200M local_file_delete: true test_ops: create_bucket: true diff --git a/rgw/v2/tests/s3_swift/reusable.py b/rgw/v2/tests/s3_swift/reusable.py index 6d83563cf..082546231 100644 --- a/rgw/v2/tests/s3_swift/reusable.py +++ b/rgw/v2/tests/s3_swift/reusable.py @@ -33,6 +33,7 @@ rgw_service = RGWService() log = logging.getLogger() +stop_threads = False def create_bucket(bucket_name, rgw, user_info, location=None): @@ -578,6 +579,32 @@ def upload_part( parts_info["Parts"].append(part_info) +def parallel_operation_to_stress_rgw( + rgw_client, + bucket_name +): + global stop_threads + temp_object_path = "/tmp/temp_obj200MB" + temp_obj_name = "temp_obj1" + utils.exec_shell_cmd(f"fallocate -l 200MB {temp_object_path}") + log.info(f"uploading object: {temp_obj_name}") + rgw_client.upload_file(temp_object_path, bucket_name, temp_obj_name) + + for i in range(10000): + log.info(f"downlading object: {temp_obj_name}") + s3_object_download_path = f"{temp_object_path}.download" + rgw_client.download_file(bucket_name, temp_obj_name, s3_object_download_path) + + # log.info("sleeping for 1 second") + # time.sleep(1) + + log.info(f"deleting local downloaded file: {s3_object_download_path}") + os.remove(s3_object_download_path) + + if stop_threads: + break + + def test_multipart_upload_failed_parts( rgw_client, s3_object_name, @@ -587,47 +614,69 @@ def test_multipart_upload_failed_parts( append_data=False, append_msg=None, ): - log.info("s3 object name: %s" % s3_object_name) - s3_object_path = os.path.join(TEST_DATA_PATH, s3_object_name) - log.info("s3 object path: %s" % s3_object_path) - s3_object_size = config.obj_size - split_size = config.split_size if hasattr(config, "split_size") else 5 - log.info("split size: %s" % split_size) - if append_data is True: - data_info = manage_data.io_generator( - s3_object_path, - s3_object_size, - op="append", - **{"message": "\n%s" % append_msg}, - ) - else: - data_info = manage_data.io_generator(s3_object_path, s3_object_size) - if data_info is False: - TestExecError("data creation failed") - mp_dir = os.path.join(TEST_DATA_PATH, s3_object_name + ".mp.parts") - log.info("mp part dir: %s" % mp_dir) - log.info("making multipart object part dir") - mkdir = utils.exec_shell_cmd("sudo mkdir %s" % mp_dir) - if mkdir is False: - raise TestExecError("mkdir failed creating mp_dir_name") - utils.split_file(s3_object_path, split_size, mp_dir + "/") - parts_list = sorted(glob.glob(mp_dir + "/" + "*")) - log.info("parts_list: %s" % parts_list) - log.info("uploading s3 object: %s" % s3_object_path) - + # log.info("s3 object name: %s" % s3_object_name) + # s3_object_path = os.path.join(TEST_DATA_PATH, s3_object_name) + # log.info("s3 object path: %s" % s3_object_path) + # s3_object_size = config.obj_size + # split_size = config.split_size if hasattr(config, "split_size") else 5 + # log.info("split size: %s" % split_size) + # if append_data is True: + # data_info = manage_data.io_generator( + # s3_object_path, + # s3_object_size, + # op="append", + # **{"message": "\n%s" % append_msg}, + # ) + # else: + # data_info = manage_data.io_generator(s3_object_path, s3_object_size) + # if data_info is False: + # TestExecError("data creation failed") + # mp_dir = os.path.join(TEST_DATA_PATH, s3_object_name + ".mp.parts") + # log.info("mp part dir: %s" % mp_dir) + # log.info("making multipart object part dir") + # mkdir = utils.exec_shell_cmd("sudo mkdir %s" % mp_dir) + # if mkdir is False: + # raise TestExecError("mkdir failed creating mp_dir_name") + # utils.split_file(s3_object_path, split_size, mp_dir + "/") + # parts_list = sorted(glob.glob(mp_dir + "/" + "*")) + # log.info("parts_list: %s" % parts_list) + # log.info("uploading s3 object: %s" % s3_object_path) + + # global stop_threads + # parallel_thread = Thread( + # target=parallel_operation_to_stress_rgw, + # args=( + # rgw_client, + # bucket_name + # ), + # ) + # parallel_thread.start() + + utils.exec_shell_cmd("fallocate -l 20MB /tmp/obj20MB") + utils.exec_shell_cmd("fallocate -l 30MB /tmp/obj30MB") iteration_count = config.test_ops.get("iteration_count") for i in range(1, iteration_count + 1): log.info( f"--------------------------- iteration {i} ---------------------------" ) - log.info("initiating multipart upload") - mpu = rgw_client.create_multipart_upload(Bucket=bucket_name, Key=s3_object_name) - part_number = 1 parts_info = {"Parts": []} + parts_list = [] + + file1 = "/tmp/obj1" + utils.exec_shell_cmd(f"fallocate -l 8MB {file1}") + parts_list.append(file1) + + file2 = "/tmp/obj2" + utils.exec_shell_cmd(f"fallocate -l 100MB {file2}") + parts_list.append(file2) + log.info("no of parts: %s" % len(parts_list)) + log.info("initiating multipart upload") + mpu = rgw_client.create_multipart_upload(Bucket=bucket_name, Key=s3_object_name) + for each_part in parts_list: log.info("trying to upload part: %s" % each_part) if config.test_ops.get("fail_part_upload") and part_number == 2: @@ -639,7 +688,7 @@ def test_multipart_upload_failed_parts( bucket_name, mpu, part_number, - each_part, + "/tmp/obj20MB", 10, parts_info, ), @@ -652,8 +701,8 @@ def test_multipart_upload_failed_parts( bucket_name, mpu, part_number, - each_part, - 11, + "/tmp/obj30MB", + 20, parts_info, ), ) @@ -672,8 +721,8 @@ def test_multipart_upload_failed_parts( ) t1.start() - t3.start() t2.start() + t3.start() t1.join() t2.join() @@ -711,7 +760,13 @@ def test_multipart_upload_failed_parts( log.info(f"downlading object: {s3_object_name}") s3_object_download_path = f"/tmp/{s3_object_name}" - rgw_client.download_file(bucket_name, s3_object_name, s3_object_download_path) + rgw_client.download_file( + bucket_name, s3_object_name, s3_object_download_path, + Config=boto3.s3.transfer.TransferConfig(multipart_chunksize=1024*20,max_concurrency=10, use_threads=True) + ) + + # log.info("sleeping for 1 second") + # time.sleep(1) log.info(f"deleting local downloaded file: {s3_object_download_path}") os.remove(s3_object_download_path) @@ -719,9 +774,11 @@ def test_multipart_upload_failed_parts( log.info(f"deleting object: {s3_object_name}") rgw_client.delete_object(Bucket=bucket_name, Key=s3_object_name) - if config.local_file_delete is True: - log.info("deleting local file part") - utils.exec_shell_cmd(f"rm -rf {mp_dir}") + # if config.local_file_delete is True: + # log.info("deleting local file part") + # utils.exec_shell_cmd(f"rm -rf {mp_dir}") + # stop_threads = True + # parallel_thread.join() def enable_versioning(bucket, rgw_conn, user_info, write_bucket_io_info): diff --git a/rgw/v2/tests/s3_swift/test_Mbuckets_with_Nobjects.py b/rgw/v2/tests/s3_swift/test_Mbuckets_with_Nobjects.py index 8c4c1fd51..213e47ceb 100644 --- a/rgw/v2/tests/s3_swift/test_Mbuckets_with_Nobjects.py +++ b/rgw/v2/tests/s3_swift/test_Mbuckets_with_Nobjects.py @@ -307,6 +307,7 @@ def test_exec(config, ssh_con): abort_multipart = config.abort_multipart log.info(f"value of abort_multipart {abort_multipart}") if config.test_ops.get("fail_part_upload", False): + time.sleep(30) reusable.test_multipart_upload_failed_parts( rgw_conn2, s3_object_name,