Skip to content

Commit

Permalink
testing using asyncio instead of threads, but still not reproducible.…
Browse files Browse the repository at this point in the history
… curl command with incorrect part length is causing connection broken, may be reproducible with that, same error seen with go

Signed-off-by: Hemanth Sai Maheswarla <[email protected]>
  • Loading branch information
Hemanth Sai Maheswarla authored and Hemanth Sai Maheswarla committed Mar 21, 2024
1 parent 2e393a5 commit 42bc55f
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 94 deletions.
303 changes: 216 additions & 87 deletions rgw/v2/tests/s3_swift/reusable.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys

import boto3
import asyncio

sys.path.append(os.path.abspath(os.path.join(__file__, "../../../..")))
import logging
Expand Down Expand Up @@ -552,7 +553,7 @@ def upload_mutipart_object(
log.info("multipart upload complete for key: %s" % s3_object_name)


def upload_part(
async def upload_part(
rgw_client,
s3_object_name,
bucket_name,
Expand Down Expand Up @@ -605,7 +606,183 @@ def parallel_operation_to_stress_rgw(
break


def test_multipart_upload_failed_parts(
# def test_multipart_upload_failed_parts(
# rgw_client,
# s3_object_name,
# bucket_name,
# TEST_DATA_PATH,
# config,
# append_data=False,
# append_msg=None,
# ):
# # log.info("s3 object name: %s" % s3_object_name)
# # s3_object_path = os.path.join(TEST_DATA_PATH, s3_object_name)
# # log.info("s3 object path: %s" % s3_object_path)
# # s3_object_size = config.obj_size
# # split_size = config.split_size if hasattr(config, "split_size") else 5
# # log.info("split size: %s" % split_size)
# # if append_data is True:
# # data_info = manage_data.io_generator(
# # s3_object_path,
# # s3_object_size,
# # op="append",
# # **{"message": "\n%s" % append_msg},
# # )
# # else:
# # data_info = manage_data.io_generator(s3_object_path, s3_object_size)
# # if data_info is False:
# # TestExecError("data creation failed")
# # mp_dir = os.path.join(TEST_DATA_PATH, s3_object_name + ".mp.parts")
# # log.info("mp part dir: %s" % mp_dir)
# # log.info("making multipart object part dir")
# # mkdir = utils.exec_shell_cmd("sudo mkdir %s" % mp_dir)
# # if mkdir is False:
# # raise TestExecError("mkdir failed creating mp_dir_name")
# # utils.split_file(s3_object_path, split_size, mp_dir + "/")
# # parts_list = sorted(glob.glob(mp_dir + "/" + "*"))
# # log.info("parts_list: %s" % parts_list)
# # log.info("uploading s3 object: %s" % s3_object_path)
#
# # global stop_threads
# # parallel_thread = Thread(
# # target=parallel_operation_to_stress_rgw,
# # args=(
# # rgw_client,
# # bucket_name
# # ),
# # )
# # parallel_thread.start()
#
# utils.exec_shell_cmd("fallocate -l 20MB /tmp/obj20MB")
# utils.exec_shell_cmd("fallocate -l 30MB /tmp/obj30MB")
# iteration_count = config.test_ops.get("iteration_count")
# for i in range(1, iteration_count + 1):
# log.info(
# f"--------------------------- iteration {i} ---------------------------"
# )
#
# part_number = 1
# parts_info = {"Parts": []}
# parts_list = []
#
# file1 = "/tmp/obj1"
# utils.exec_shell_cmd(f"fallocate -l 8MB {file1}")
# parts_list.append(file1)
#
# file2 = "/tmp/obj2"
# utils.exec_shell_cmd(f"fallocate -l 100MB {file2}")
# parts_list.append(file2)
#
# log.info("no of parts: %s" % len(parts_list))
#
# log.info("initiating multipart upload")
# mpu = rgw_client.create_multipart_upload(Bucket=bucket_name, Key=s3_object_name)
#
# for each_part in parts_list:
# log.info("trying to upload part: %s" % each_part)
# if config.test_ops.get("fail_part_upload") and part_number == 2:
# t1 = Thread(
# target=upload_part,
# args=(
# rgw_client,
# s3_object_name,
# bucket_name,
# mpu,
# part_number,
# "/tmp/obj20MB",
# 10,
# parts_info,
# ),
# )
# t2 = Thread(
# target=upload_part,
# args=(
# rgw_client,
# s3_object_name,
# bucket_name,
# mpu,
# part_number,
# "/tmp/obj30MB",
# 20,
# parts_info,
# ),
# )
# t3 = Thread(
# target=upload_part,
# args=(
# rgw_client,
# s3_object_name,
# bucket_name,
# mpu,
# part_number,
# each_part,
# os.stat(each_part).st_size,
# parts_info,
# ),
# )
#
# t1.start()
# t2.start()
# t3.start()
#
# t1.join()
# t2.join()
# t3.join()
#
# else:
# part_upload_response = rgw_client.upload_part(
# Bucket=bucket_name,
# Key=s3_object_name,
# PartNumber=part_number,
# UploadId=mpu["UploadId"],
# Body=open(each_part, mode="rb"),
# )
# log.info(f"part uploaded response {part_upload_response}")
# part_info = {
# "PartNumber": part_number,
# "ETag": part_upload_response["ETag"],
# }
# parts_info["Parts"].append(part_info)
# if each_part != parts_list[-1]:
# # increase the part number only if the current part is not the last part
# part_number += 1
# log.info("curr part_number: %s" % part_number)
#
# if len(parts_list) == part_number:
# log.info("all parts upload completed")
# response = rgw_client.complete_multipart_upload(
# Bucket=bucket_name,
# Key=s3_object_name,
# UploadId=mpu["UploadId"],
# MultipartUpload=parts_info,
# )
# log.info(f"complete multipart upload: {response}")
# log.info("multipart upload complete for key: %s" % s3_object_name)
#
# log.info(f"downlading object: {s3_object_name}")
# s3_object_download_path = f"/tmp/{s3_object_name}"
# rgw_client.download_file(
# bucket_name, s3_object_name, s3_object_download_path,
# Config=boto3.s3.transfer.TransferConfig(multipart_chunksize=1024*20,max_concurrency=10, use_threads=True)
# )
#
# # log.info("sleeping for 1 second")
# # time.sleep(1)
#
# log.info(f"deleting local downloaded file: {s3_object_download_path}")
# os.remove(s3_object_download_path)
#
# log.info(f"deleting object: {s3_object_name}")
# rgw_client.delete_object(Bucket=bucket_name, Key=s3_object_name)
#
# # if config.local_file_delete is True:
# # log.info("deleting local file part")
# # utils.exec_shell_cmd(f"rm -rf {mp_dir}")
# # stop_threads = True
# # parallel_thread.join()


async def test_multipart_upload_failed_parts(
rgw_client,
s3_object_name,
bucket_name,
Expand All @@ -614,43 +791,6 @@ def test_multipart_upload_failed_parts(
append_data=False,
append_msg=None,
):
# log.info("s3 object name: %s" % s3_object_name)
# s3_object_path = os.path.join(TEST_DATA_PATH, s3_object_name)
# log.info("s3 object path: %s" % s3_object_path)
# s3_object_size = config.obj_size
# split_size = config.split_size if hasattr(config, "split_size") else 5
# log.info("split size: %s" % split_size)
# if append_data is True:
# data_info = manage_data.io_generator(
# s3_object_path,
# s3_object_size,
# op="append",
# **{"message": "\n%s" % append_msg},
# )
# else:
# data_info = manage_data.io_generator(s3_object_path, s3_object_size)
# if data_info is False:
# TestExecError("data creation failed")
# mp_dir = os.path.join(TEST_DATA_PATH, s3_object_name + ".mp.parts")
# log.info("mp part dir: %s" % mp_dir)
# log.info("making multipart object part dir")
# mkdir = utils.exec_shell_cmd("sudo mkdir %s" % mp_dir)
# if mkdir is False:
# raise TestExecError("mkdir failed creating mp_dir_name")
# utils.split_file(s3_object_path, split_size, mp_dir + "/")
# parts_list = sorted(glob.glob(mp_dir + "/" + "*"))
# log.info("parts_list: %s" % parts_list)
# log.info("uploading s3 object: %s" % s3_object_path)

# global stop_threads
# parallel_thread = Thread(
# target=parallel_operation_to_stress_rgw,
# args=(
# rgw_client,
# bucket_name
# ),
# )
# parallel_thread.start()

utils.exec_shell_cmd("fallocate -l 20MB /tmp/obj20MB")
utils.exec_shell_cmd("fallocate -l 30MB /tmp/obj30MB")
Expand Down Expand Up @@ -680,53 +820,42 @@ def test_multipart_upload_failed_parts(
for each_part in parts_list:
log.info("trying to upload part: %s" % each_part)
if config.test_ops.get("fail_part_upload") and part_number == 2:
t1 = Thread(
target=upload_part,
args=(
rgw_client,
s3_object_name,
bucket_name,
mpu,
part_number,
"/tmp/obj20MB",
10,
parts_info,
),
)
t2 = Thread(
target=upload_part,
args=(
rgw_client,
s3_object_name,
bucket_name,
mpu,
part_number,
"/tmp/obj30MB",
20,
parts_info,
),
)
t3 = Thread(
target=upload_part,
args=(
rgw_client,
s3_object_name,
bucket_name,
mpu,
part_number,
each_part,
os.stat(each_part).st_size,
parts_info,
),
)

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()
await asyncio.gather(
*[
upload_part(
rgw_client,
s3_object_name,
bucket_name,
mpu,
part_number,
"/tmp/obj20MB",
1000,
parts_info
),
upload_part(
rgw_client,
s3_object_name,
bucket_name,
mpu,
part_number,
"/tmp/obj30MB",
2000,
parts_info
),
upload_part(
rgw_client,
s3_object_name,
bucket_name,
mpu,
part_number,
each_part,
os.stat(each_part).st_size,
parts_info
),

]
)

else:
part_upload_response = rgw_client.upload_part(
Expand Down Expand Up @@ -761,8 +890,8 @@ def test_multipart_upload_failed_parts(
log.info(f"downlading object: {s3_object_name}")
s3_object_download_path = f"/tmp/{s3_object_name}"
rgw_client.download_file(
bucket_name, s3_object_name, s3_object_download_path,
Config=boto3.s3.transfer.TransferConfig(multipart_chunksize=1024*20,max_concurrency=10, use_threads=True)
bucket_name, s3_object_name, s3_object_download_path
# Config=boto3.s3.transfer.TransferConfig(multipart_chunksize=1024*20,max_concurrency=10, use_threads=True)
)

# log.info("sleeping for 1 second")
Expand Down
17 changes: 10 additions & 7 deletions rgw/v2/tests/s3_swift/test_Mbuckets_with_Nobjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import logging
import time
import traceback
import asyncio

import v2.lib.resource_op as s3lib
import v2.utils.utils as utils
Expand Down Expand Up @@ -307,13 +308,15 @@ def test_exec(config, ssh_con):
abort_multipart = config.abort_multipart
log.info(f"value of abort_multipart {abort_multipart}")
if config.test_ops.get("fail_part_upload", False):
time.sleep(30)
reusable.test_multipart_upload_failed_parts(
rgw_conn2,
s3_object_name,
bucket.name,
TEST_DATA_PATH,
config,
# time.sleep(30)
asyncio.run(
reusable.test_multipart_upload_failed_parts(
rgw_conn2,
s3_object_name,
bucket.name,
TEST_DATA_PATH,
config,
)
)
else:
reusable.upload_mutipart_object(
Expand Down

0 comments on commit 42bc55f

Please sign in to comment.