Skip to content

Commit

Permalink
adding chunked size download and separate files for failed upload parts
Browse files Browse the repository at this point in the history
Signed-off-by: Hemanth Sai Maheswarla <[email protected]>
  • Loading branch information
Hemanth Sai Maheswarla authored and Hemanth Sai Maheswarla committed Mar 19, 2024
1 parent f1dc601 commit 2e393a5
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
# script: test_Mbuckets_with_Nobjects.py
config:
user_count: 1
bucket_count: 1
bucket_count: 2
objects_count: 1
split_size: 40
split_size: 100
objects_size_range:
min: 70M
max: 70M
min: 200M
max: 200M
local_file_delete: true
test_ops:
create_bucket: true
Expand Down
135 changes: 96 additions & 39 deletions rgw/v2/tests/s3_swift/reusable.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
rgw_service = RGWService()

log = logging.getLogger()
stop_threads = False


def create_bucket(bucket_name, rgw, user_info, location=None):
Expand Down Expand Up @@ -578,6 +579,32 @@ def upload_part(
parts_info["Parts"].append(part_info)


def parallel_operation_to_stress_rgw(
rgw_client,
bucket_name
):
global stop_threads
temp_object_path = "/tmp/temp_obj200MB"
temp_obj_name = "temp_obj1"
utils.exec_shell_cmd(f"fallocate -l 200MB {temp_object_path}")
log.info(f"uploading object: {temp_obj_name}")
rgw_client.upload_file(temp_object_path, bucket_name, temp_obj_name)

for i in range(10000):
log.info(f"downlading object: {temp_obj_name}")
s3_object_download_path = f"{temp_object_path}.download"
rgw_client.download_file(bucket_name, temp_obj_name, s3_object_download_path)

# log.info("sleeping for 1 second")
# time.sleep(1)

log.info(f"deleting local downloaded file: {s3_object_download_path}")
os.remove(s3_object_download_path)

if stop_threads:
break


def test_multipart_upload_failed_parts(
rgw_client,
s3_object_name,
Expand All @@ -587,47 +614,69 @@ def test_multipart_upload_failed_parts(
append_data=False,
append_msg=None,
):
log.info("s3 object name: %s" % s3_object_name)
s3_object_path = os.path.join(TEST_DATA_PATH, s3_object_name)
log.info("s3 object path: %s" % s3_object_path)
s3_object_size = config.obj_size
split_size = config.split_size if hasattr(config, "split_size") else 5
log.info("split size: %s" % split_size)
if append_data is True:
data_info = manage_data.io_generator(
s3_object_path,
s3_object_size,
op="append",
**{"message": "\n%s" % append_msg},
)
else:
data_info = manage_data.io_generator(s3_object_path, s3_object_size)
if data_info is False:
TestExecError("data creation failed")
mp_dir = os.path.join(TEST_DATA_PATH, s3_object_name + ".mp.parts")
log.info("mp part dir: %s" % mp_dir)
log.info("making multipart object part dir")
mkdir = utils.exec_shell_cmd("sudo mkdir %s" % mp_dir)
if mkdir is False:
raise TestExecError("mkdir failed creating mp_dir_name")
utils.split_file(s3_object_path, split_size, mp_dir + "/")
parts_list = sorted(glob.glob(mp_dir + "/" + "*"))
log.info("parts_list: %s" % parts_list)
log.info("uploading s3 object: %s" % s3_object_path)

# log.info("s3 object name: %s" % s3_object_name)
# s3_object_path = os.path.join(TEST_DATA_PATH, s3_object_name)
# log.info("s3 object path: %s" % s3_object_path)
# s3_object_size = config.obj_size
# split_size = config.split_size if hasattr(config, "split_size") else 5
# log.info("split size: %s" % split_size)
# if append_data is True:
# data_info = manage_data.io_generator(
# s3_object_path,
# s3_object_size,
# op="append",
# **{"message": "\n%s" % append_msg},
# )
# else:
# data_info = manage_data.io_generator(s3_object_path, s3_object_size)
# if data_info is False:
# TestExecError("data creation failed")
# mp_dir = os.path.join(TEST_DATA_PATH, s3_object_name + ".mp.parts")
# log.info("mp part dir: %s" % mp_dir)
# log.info("making multipart object part dir")
# mkdir = utils.exec_shell_cmd("sudo mkdir %s" % mp_dir)
# if mkdir is False:
# raise TestExecError("mkdir failed creating mp_dir_name")
# utils.split_file(s3_object_path, split_size, mp_dir + "/")
# parts_list = sorted(glob.glob(mp_dir + "/" + "*"))
# log.info("parts_list: %s" % parts_list)
# log.info("uploading s3 object: %s" % s3_object_path)

# global stop_threads
# parallel_thread = Thread(
# target=parallel_operation_to_stress_rgw,
# args=(
# rgw_client,
# bucket_name
# ),
# )
# parallel_thread.start()

utils.exec_shell_cmd("fallocate -l 20MB /tmp/obj20MB")
utils.exec_shell_cmd("fallocate -l 30MB /tmp/obj30MB")
iteration_count = config.test_ops.get("iteration_count")
for i in range(1, iteration_count + 1):
log.info(
f"--------------------------- iteration {i} ---------------------------"
)

log.info("initiating multipart upload")
mpu = rgw_client.create_multipart_upload(Bucket=bucket_name, Key=s3_object_name)

part_number = 1
parts_info = {"Parts": []}
parts_list = []

file1 = "/tmp/obj1"
utils.exec_shell_cmd(f"fallocate -l 8MB {file1}")
parts_list.append(file1)

file2 = "/tmp/obj2"
utils.exec_shell_cmd(f"fallocate -l 100MB {file2}")
parts_list.append(file2)

log.info("no of parts: %s" % len(parts_list))

log.info("initiating multipart upload")
mpu = rgw_client.create_multipart_upload(Bucket=bucket_name, Key=s3_object_name)

for each_part in parts_list:
log.info("trying to upload part: %s" % each_part)
if config.test_ops.get("fail_part_upload") and part_number == 2:
Expand All @@ -639,7 +688,7 @@ def test_multipart_upload_failed_parts(
bucket_name,
mpu,
part_number,
each_part,
"/tmp/obj20MB",
10,
parts_info,
),
Expand All @@ -652,8 +701,8 @@ def test_multipart_upload_failed_parts(
bucket_name,
mpu,
part_number,
each_part,
11,
"/tmp/obj30MB",
20,
parts_info,
),
)
Expand All @@ -672,8 +721,8 @@ def test_multipart_upload_failed_parts(
)

t1.start()
t3.start()
t2.start()
t3.start()

t1.join()
t2.join()
Expand Down Expand Up @@ -711,17 +760,25 @@ def test_multipart_upload_failed_parts(

log.info(f"downlading object: {s3_object_name}")
s3_object_download_path = f"/tmp/{s3_object_name}"
rgw_client.download_file(bucket_name, s3_object_name, s3_object_download_path)
rgw_client.download_file(
bucket_name, s3_object_name, s3_object_download_path,
Config=boto3.s3.transfer.TransferConfig(multipart_chunksize=1024*20,max_concurrency=10, use_threads=True)
)

# log.info("sleeping for 1 second")
# time.sleep(1)

log.info(f"deleting local downloaded file: {s3_object_download_path}")
os.remove(s3_object_download_path)

log.info(f"deleting object: {s3_object_name}")
rgw_client.delete_object(Bucket=bucket_name, Key=s3_object_name)

if config.local_file_delete is True:
log.info("deleting local file part")
utils.exec_shell_cmd(f"rm -rf {mp_dir}")
# if config.local_file_delete is True:
# log.info("deleting local file part")
# utils.exec_shell_cmd(f"rm -rf {mp_dir}")
# stop_threads = True
# parallel_thread.join()


def enable_versioning(bucket, rgw_conn, user_info, write_bucket_io_info):
Expand Down
1 change: 1 addition & 0 deletions rgw/v2/tests/s3_swift/test_Mbuckets_with_Nobjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ def test_exec(config, ssh_con):
abort_multipart = config.abort_multipart
log.info(f"value of abort_multipart {abort_multipart}")
if config.test_ops.get("fail_part_upload", False):
time.sleep(30)
reusable.test_multipart_upload_failed_parts(
rgw_conn2,
s3_object_name,
Expand Down

0 comments on commit 2e393a5

Please sign in to comment.