From 42bc55f83a464dc4eae828403cccf04b9fc202cc Mon Sep 17 00:00:00 2001 From: Hemanth Sai Maheswarla Date: Thu, 21 Mar 2024 09:37:16 +0530 Subject: [PATCH] testing using asyncio instead of threads, but still not reproducible. curl command with incorrect part length is causing connection broken, may be reproducible with that, same error seen with go Signed-off-by: Hemanth Sai Maheswarla --- rgw/v2/tests/s3_swift/reusable.py | 303 +++++++++++++----- .../s3_swift/test_Mbuckets_with_Nobjects.py | 17 +- 2 files changed, 226 insertions(+), 94 deletions(-) diff --git a/rgw/v2/tests/s3_swift/reusable.py b/rgw/v2/tests/s3_swift/reusable.py index 082546231..ccee803b8 100644 --- a/rgw/v2/tests/s3_swift/reusable.py +++ b/rgw/v2/tests/s3_swift/reusable.py @@ -6,6 +6,7 @@ import sys import boto3 +import asyncio sys.path.append(os.path.abspath(os.path.join(__file__, "../../../.."))) import logging @@ -552,7 +553,7 @@ def upload_mutipart_object( log.info("multipart upload complete for key: %s" % s3_object_name) -def upload_part( +async def upload_part( rgw_client, s3_object_name, bucket_name, @@ -605,7 +606,183 @@ def parallel_operation_to_stress_rgw( break -def test_multipart_upload_failed_parts( +# def test_multipart_upload_failed_parts( +# rgw_client, +# s3_object_name, +# bucket_name, +# TEST_DATA_PATH, +# config, +# append_data=False, +# append_msg=None, +# ): +# # log.info("s3 object name: %s" % s3_object_name) +# # s3_object_path = os.path.join(TEST_DATA_PATH, s3_object_name) +# # log.info("s3 object path: %s" % s3_object_path) +# # s3_object_size = config.obj_size +# # split_size = config.split_size if hasattr(config, "split_size") else 5 +# # log.info("split size: %s" % split_size) +# # if append_data is True: +# # data_info = manage_data.io_generator( +# # s3_object_path, +# # s3_object_size, +# # op="append", +# # **{"message": "\n%s" % append_msg}, +# # ) +# # else: +# # data_info = manage_data.io_generator(s3_object_path, s3_object_size) +# # if data_info is False: +# # TestExecError("data creation failed") +# # mp_dir = os.path.join(TEST_DATA_PATH, s3_object_name + ".mp.parts") +# # log.info("mp part dir: %s" % mp_dir) +# # log.info("making multipart object part dir") +# # mkdir = utils.exec_shell_cmd("sudo mkdir %s" % mp_dir) +# # if mkdir is False: +# # raise TestExecError("mkdir failed creating mp_dir_name") +# # utils.split_file(s3_object_path, split_size, mp_dir + "/") +# # parts_list = sorted(glob.glob(mp_dir + "/" + "*")) +# # log.info("parts_list: %s" % parts_list) +# # log.info("uploading s3 object: %s" % s3_object_path) +# +# # global stop_threads +# # parallel_thread = Thread( +# # target=parallel_operation_to_stress_rgw, +# # args=( +# # rgw_client, +# # bucket_name +# # ), +# # ) +# # parallel_thread.start() +# +# utils.exec_shell_cmd("fallocate -l 20MB /tmp/obj20MB") +# utils.exec_shell_cmd("fallocate -l 30MB /tmp/obj30MB") +# iteration_count = config.test_ops.get("iteration_count") +# for i in range(1, iteration_count + 1): +# log.info( +# f"--------------------------- iteration {i} ---------------------------" +# ) +# +# part_number = 1 +# parts_info = {"Parts": []} +# parts_list = [] +# +# file1 = "/tmp/obj1" +# utils.exec_shell_cmd(f"fallocate -l 8MB {file1}") +# parts_list.append(file1) +# +# file2 = "/tmp/obj2" +# utils.exec_shell_cmd(f"fallocate -l 100MB {file2}") +# parts_list.append(file2) +# +# log.info("no of parts: %s" % len(parts_list)) +# +# log.info("initiating multipart upload") +# mpu = rgw_client.create_multipart_upload(Bucket=bucket_name, Key=s3_object_name) +# +# for each_part in parts_list: +# log.info("trying to upload part: %s" % each_part) +# if config.test_ops.get("fail_part_upload") and part_number == 2: +# t1 = Thread( +# target=upload_part, +# args=( +# rgw_client, +# s3_object_name, +# bucket_name, +# mpu, +# part_number, +# "/tmp/obj20MB", +# 10, +# parts_info, +# ), +# ) +# t2 = Thread( +# target=upload_part, +# args=( +# rgw_client, +# s3_object_name, +# bucket_name, +# mpu, +# part_number, +# "/tmp/obj30MB", +# 20, +# parts_info, +# ), +# ) +# t3 = Thread( +# target=upload_part, +# args=( +# rgw_client, +# s3_object_name, +# bucket_name, +# mpu, +# part_number, +# each_part, +# os.stat(each_part).st_size, +# parts_info, +# ), +# ) +# +# t1.start() +# t2.start() +# t3.start() +# +# t1.join() +# t2.join() +# t3.join() +# +# else: +# part_upload_response = rgw_client.upload_part( +# Bucket=bucket_name, +# Key=s3_object_name, +# PartNumber=part_number, +# UploadId=mpu["UploadId"], +# Body=open(each_part, mode="rb"), +# ) +# log.info(f"part uploaded response {part_upload_response}") +# part_info = { +# "PartNumber": part_number, +# "ETag": part_upload_response["ETag"], +# } +# parts_info["Parts"].append(part_info) +# if each_part != parts_list[-1]: +# # increase the part number only if the current part is not the last part +# part_number += 1 +# log.info("curr part_number: %s" % part_number) +# +# if len(parts_list) == part_number: +# log.info("all parts upload completed") +# response = rgw_client.complete_multipart_upload( +# Bucket=bucket_name, +# Key=s3_object_name, +# UploadId=mpu["UploadId"], +# MultipartUpload=parts_info, +# ) +# log.info(f"complete multipart upload: {response}") +# log.info("multipart upload complete for key: %s" % s3_object_name) +# +# log.info(f"downlading object: {s3_object_name}") +# s3_object_download_path = f"/tmp/{s3_object_name}" +# rgw_client.download_file( +# bucket_name, s3_object_name, s3_object_download_path, +# Config=boto3.s3.transfer.TransferConfig(multipart_chunksize=1024*20,max_concurrency=10, use_threads=True) +# ) +# +# # log.info("sleeping for 1 second") +# # time.sleep(1) +# +# log.info(f"deleting local downloaded file: {s3_object_download_path}") +# os.remove(s3_object_download_path) +# +# log.info(f"deleting object: {s3_object_name}") +# rgw_client.delete_object(Bucket=bucket_name, Key=s3_object_name) +# +# # if config.local_file_delete is True: +# # log.info("deleting local file part") +# # utils.exec_shell_cmd(f"rm -rf {mp_dir}") +# # stop_threads = True +# # parallel_thread.join() + + +async def test_multipart_upload_failed_parts( rgw_client, s3_object_name, bucket_name, @@ -614,43 +791,6 @@ def test_multipart_upload_failed_parts( append_data=False, append_msg=None, ): - # log.info("s3 object name: %s" % s3_object_name) - # s3_object_path = os.path.join(TEST_DATA_PATH, s3_object_name) - # log.info("s3 object path: %s" % s3_object_path) - # s3_object_size = config.obj_size - # split_size = config.split_size if hasattr(config, "split_size") else 5 - # log.info("split size: %s" % split_size) - # if append_data is True: - # data_info = manage_data.io_generator( - # s3_object_path, - # s3_object_size, - # op="append", - # **{"message": "\n%s" % append_msg}, - # ) - # else: - # data_info = manage_data.io_generator(s3_object_path, s3_object_size) - # if data_info is False: - # TestExecError("data creation failed") - # mp_dir = os.path.join(TEST_DATA_PATH, s3_object_name + ".mp.parts") - # log.info("mp part dir: %s" % mp_dir) - # log.info("making multipart object part dir") - # mkdir = utils.exec_shell_cmd("sudo mkdir %s" % mp_dir) - # if mkdir is False: - # raise TestExecError("mkdir failed creating mp_dir_name") - # utils.split_file(s3_object_path, split_size, mp_dir + "/") - # parts_list = sorted(glob.glob(mp_dir + "/" + "*")) - # log.info("parts_list: %s" % parts_list) - # log.info("uploading s3 object: %s" % s3_object_path) - - # global stop_threads - # parallel_thread = Thread( - # target=parallel_operation_to_stress_rgw, - # args=( - # rgw_client, - # bucket_name - # ), - # ) - # parallel_thread.start() utils.exec_shell_cmd("fallocate -l 20MB /tmp/obj20MB") utils.exec_shell_cmd("fallocate -l 30MB /tmp/obj30MB") @@ -680,53 +820,42 @@ def test_multipart_upload_failed_parts( for each_part in parts_list: log.info("trying to upload part: %s" % each_part) if config.test_ops.get("fail_part_upload") and part_number == 2: - t1 = Thread( - target=upload_part, - args=( - rgw_client, - s3_object_name, - bucket_name, - mpu, - part_number, - "/tmp/obj20MB", - 10, - parts_info, - ), - ) - t2 = Thread( - target=upload_part, - args=( - rgw_client, - s3_object_name, - bucket_name, - mpu, - part_number, - "/tmp/obj30MB", - 20, - parts_info, - ), - ) - t3 = Thread( - target=upload_part, - args=( - rgw_client, - s3_object_name, - bucket_name, - mpu, - part_number, - each_part, - os.stat(each_part).st_size, - parts_info, - ), - ) - t1.start() - t2.start() - t3.start() - - t1.join() - t2.join() - t3.join() + await asyncio.gather( + *[ + upload_part( + rgw_client, + s3_object_name, + bucket_name, + mpu, + part_number, + "/tmp/obj20MB", + 1000, + parts_info + ), + upload_part( + rgw_client, + s3_object_name, + bucket_name, + mpu, + part_number, + "/tmp/obj30MB", + 2000, + parts_info + ), + upload_part( + rgw_client, + s3_object_name, + bucket_name, + mpu, + part_number, + each_part, + os.stat(each_part).st_size, + parts_info + ), + + ] + ) else: part_upload_response = rgw_client.upload_part( @@ -761,8 +890,8 @@ def test_multipart_upload_failed_parts( log.info(f"downlading object: {s3_object_name}") s3_object_download_path = f"/tmp/{s3_object_name}" rgw_client.download_file( - bucket_name, s3_object_name, s3_object_download_path, - Config=boto3.s3.transfer.TransferConfig(multipart_chunksize=1024*20,max_concurrency=10, use_threads=True) + bucket_name, s3_object_name, s3_object_download_path + # Config=boto3.s3.transfer.TransferConfig(multipart_chunksize=1024*20,max_concurrency=10, use_threads=True) ) # log.info("sleeping for 1 second") diff --git a/rgw/v2/tests/s3_swift/test_Mbuckets_with_Nobjects.py b/rgw/v2/tests/s3_swift/test_Mbuckets_with_Nobjects.py index 213e47ceb..159f84757 100644 --- a/rgw/v2/tests/s3_swift/test_Mbuckets_with_Nobjects.py +++ b/rgw/v2/tests/s3_swift/test_Mbuckets_with_Nobjects.py @@ -42,6 +42,7 @@ import logging import time import traceback +import asyncio import v2.lib.resource_op as s3lib import v2.utils.utils as utils @@ -307,13 +308,15 @@ def test_exec(config, ssh_con): abort_multipart = config.abort_multipart log.info(f"value of abort_multipart {abort_multipart}") if config.test_ops.get("fail_part_upload", False): - time.sleep(30) - reusable.test_multipart_upload_failed_parts( - rgw_conn2, - s3_object_name, - bucket.name, - TEST_DATA_PATH, - config, + # time.sleep(30) + asyncio.run( + reusable.test_multipart_upload_failed_parts( + rgw_conn2, + s3_object_name, + bucket.name, + TEST_DATA_PATH, + config, + ) ) else: reusable.upload_mutipart_object(