Skip to content

Commit

Permalink
Rgw granular sync policy changes
Browse files Browse the repository at this point in the history
Signed-off-by: Anuchaithra <[email protected]>
  • Loading branch information
anrao19 committed Nov 2, 2023
1 parent 165af6b commit 91ac532
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@
config:
user_count: 1
bucket_count: 2
objects_count: 100
objects_size_range:
min: 5K
max: 2M
multisite_global_sync_policy: true
multisite_sync_policy: true
test_ops:
create_bucket: true
create_object: true
group_create: true
group_remove: false
flow_create: true
flow_remove: false
pipe_create: true
pipe_remove: false
group_status: allowed # Enable , Allowed, Forbidden
group_status: enabled # Enable , Allowed, Forbidden
bucket_group_status: enabled
flow_type: symmetrical # symmetrical , directional
49 changes: 45 additions & 4 deletions rgw/v2/tests/s3_swift/reusable.py
Original file line number Diff line number Diff line change
Expand Up @@ -1700,23 +1700,64 @@ def group_operation(group_id, group_op, group_status="enabled", bucket_name=None
utils.exec_shell_cmd(cmd)


def flow_operation(group_id, flow_op, flow_type="symmetrical"):
def get_sync_policy(bucket_name=None):
if bucket_name is not None:
bkt = f" --bucket={bucket_name}"
else:
bkt = ""
sync_policy_resp = json.loads(
utils.exec_shell_cmd(f"radosgw-admin sync policy get" + bkt)
)
return sync_policy_resp


def flow_operation(
group_id,
flow_op,
flow_type="symmetrical",
bucket_name=None,
source_zone=None,
dest_zone=None,
):
flow_id = group_id + "flow"
bkt = ""
if bucket_name is not None:
bkt = f" --bucket={bucket_name}"
zone_names, _ = get_multisite_info()
cmd = f"radosgw-admin sync group flow {flow_op} --group-id={group_id} --flow-id={flow_id} --flow-type={flow_type} --zones={zone_names}"
cmd = f"radosgw-admin sync group flow {flow_op} --group-id={group_id} --flow-id={flow_id} --flow-type={flow_type}"
if flow_type == "directional":
cmd += f" --source-zone={source_zone} --dest-zone={dest_zone}" + bkt
else:
cmd += f" --zones={zone_names}" + bkt
utils.exec_shell_cmd(cmd)
return zone_names


def pipe_operation(
group_id, pipe_op, zone_names=None, bucket_name=None, policy_detail=None
group_id,
pipe_op,
zone_names=None,
bucket_name=None,
policy_detail=None,
source_zones=None,
dest_zones=None,
):
pipe_id = group_id + "pipe"
if zone_names is not None:
zone_name = zone_names.split(",")
zn = f" --source-zones='{zone_name[0]}','{zone_name[1]}' --dest-zones='{zone_name[0]}','{zone_name[1]}'"
if source_zones is not None:
zn = f" --source-zones={source_zones}"
if dest_zones is not None:
zn += f" --dest-zones={dest_zones}"
else:
zn += " --dest-zones='*'"
else:
zn = " --source-zones='*' --dest-zones='*'"
zn = " --source-zones='*'"
if dest_zones is not None:
zn += f" --dest-zones={dest_zones}"
else:
zn += " --dest-zones='*'"
if bucket_name is not None:
bkt = f" --bucket={bucket_name}"
else:
Expand Down
202 changes: 163 additions & 39 deletions rgw/v2/tests/s3_swift/test_multisite_sync_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import argparse
import json
import logging
import time
import traceback

import v2.lib.resource_op as s3lib
Expand All @@ -45,6 +46,27 @@ def test_exec(config, ssh_con):

# create user
all_users_info = s3lib.create_users(config.user_count)
for each_user in all_users_info:
# authenticate
auth = Auth(each_user, ssh_con, ssl=config.ssl)
if config.use_aws4 is True:
rgw_conn = auth.do_auth(**{"signature_version": "s3v4"})
else:
rgw_conn = auth.do_auth()
# create buckets
if config.test_ops["create_bucket"] is True:
log.info(f"no of buckets to create: {config.bucket_count}")
buckets = []
for bc in range(config.bucket_count):
bucket_name_to_create = utils.gen_bucket_name_from_userid(
each_user["user_id"], rand_no=bc
)
log.info(f"creating bucket with name: {bucket_name_to_create}")
bucket = reusable.create_bucket(
bucket_name_to_create, rgw_conn, each_user
)
buckets.append(bucket)

if config.multisite_global_sync_policy:
ceph_version_id, _ = utils.get_ceph_version()
ceph_version_id = ceph_version_id.split("-")
Expand Down Expand Up @@ -86,72 +108,169 @@ def test_exec(config, ssh_con):
if config.test_ops.get("group_transition", False):
reusable.group_operation(group_id2, "remove", group_status)

for each_user in all_users_info:
# authenticate
auth = Auth(each_user, ssh_con, ssl=config.ssl)
if config.use_aws4 is True:
rgw_conn = auth.do_auth(**{"signature_version": "s3v4"})
else:
rgw_conn = auth.do_auth()
# create buckets
if config.test_ops["create_bucket"] is True:
log.info("no of buckets to create: %s" % config.bucket_count)
for bc in range(config.bucket_count):
bucket_name_to_create = utils.gen_bucket_name_from_userid(
each_user["user_id"], rand_no=bc
)
log.info("creating bucket with name: %s" % bucket_name_to_create)
bucket = reusable.create_bucket(
bucket_name_to_create, rgw_conn, each_user
)
if config.test_ops["create_bucket"] is True:
for each_user in all_users_info:
# authenticate
auth = Auth(each_user, ssh_con, ssl=config.ssl)
if config.use_aws4 is True:
rgw_conn = auth.do_auth(**{"signature_version": "s3v4"})
else:
rgw_conn = auth.do_auth()
for bkt in buckets:
if config.multisite_sync_policy:
ceph_version_id, _ = utils.get_ceph_version()
ceph_version_id = ceph_version_id.split("-")
ceph_version_id = ceph_version_id[0].split(".")
if float(ceph_version_id[0]) >= 16:
if utils.is_cluster_multisite():
if config.test_ops["group_create"]:
# modifying global group status to allowed
# modifying global group status to allowed if its not allowed
bucket_group_status = config.test_ops[
"bucket_group_status"
]
reusable.group_operation(
group_id,
"modify",
group_status,
)
group_id1 = "group-" + bucket_name_to_create
group_info = reusable.get_sync_policy()
if group_info["groups"][0]["status"] != "allowed":
reusable.group_operation(
group_id,
"modify",
"allowed",
)
group_id1 = "group-" + bkt.name
reusable.group_operation(
group_id1,
"create",
bucket_group_status,
bucket_name_to_create,
bkt.name,
)
zone_names = None
if config.test_ops["pipe_create"]:
pipe_id = reusable.pipe_operation(
group_id1,
"create",
zone_names,
bucket_name=bucket_name_to_create,
bucket_name=bkt.name,
)

if config.test_ops["create_bucket"] is True:
for each_user in all_users_info:
# authenticate
auth = Auth(each_user, ssh_con, ssl=config.ssl)
if config.use_aws4 is True:
rgw_conn = auth.do_auth(**{"signature_version": "s3v4"})
else:
rgw_conn = auth.do_auth()

period_details = json.loads(
utils.exec_shell_cmd("radosgw-admin period get")
)
zone_list = json.loads(utils.exec_shell_cmd("radosgw-admin zone list"))
for zone in period_details["period_map"]["zonegroups"][0]["zones"]:
if zone["name"] not in zone_list["zones"]:
rgw_nodes = zone["endpoints"][0].split(":")
node_rgw = rgw_nodes[1].split("//")[-1]
break

for bkt in buckets:
if config.multisite_sync_policy:
ceph_version_id, _ = utils.get_ceph_version()
ceph_version_id = ceph_version_id.split("-")
ceph_version_id = ceph_version_id[0].split(".")
if float(ceph_version_id[0]) >= 16:
if utils.is_cluster_multisite():
if config.test_ops["group_create"]:
if config.test_ops["pipe_create"]:
rgw_ssh_con = utils.connect_remote(node_rgw)
_, stdout, stderr = rgw_ssh_con.exec_command(
f"radosgw-admin sync policy get --bucket {bkt.name}"
)
if stderr.read().decode():
raise TestExecError(
f"Get sync policy on another site failled with error {stderr.read().decode()}"
)
cmd_output = json.loads(stdout.read().decode())
log.info(
f"sync policy get from other site: {cmd_output} for bucket {bkt.name}"
)
if len(cmd_output["groups"]) == 0:
log.info(
f"bucket sync policy for {bkt.name} not synced to another site"
)
log.info(
f"sleep of 60s secs for sync to complete"
)
for retry_count in range(20):
time.sleep(60)
_, stdout, _ = rgw_ssh_con.exec_command(
f"radosgw-admin sync policy get --bucket {bkt.name}"
)
cmd_output = json.loads(stdout.read())
if len(cmd_output["groups"]) == 0:
log.info(
f"bucket sync policy for {bkt.name} not synced to another site, so retry"
)
else:
log.info(
"bucket sync policy synced to another site"
)
break

if (retry_count > 20) and (
len(cmd_output["groups"]) == 0
):
raise TestExecError(
f"bucket sync policy for {bkt.name} not synced to another site even after 20m"
)

if config.test_ops.get("create_object", False):
# uploading data
log.info(
f"s3 objects to create: {config.objects_count}"
)
for oc, size in list(config.mapped_sizes.items()):
config.obj_size = size
s3_object_name = utils.gen_s3_object_name(
bkt.name, oc
)
log.info(f"s3 object name: {s3_object_name}")
s3_object_path = os.path.join(
TEST_DATA_PATH, s3_object_name
)
log.info(f"s3 object path: {s3_object_path}")
if config.test_ops.get("enable_version", False):
reusable.upload_version_object(
config,
each_user,
rgw_conn,
s3_object_name,
config.obj_size,
bucket,
TEST_DATA_PATH,
)
else:
log.info("upload type: normal")
reusable.upload_object(
s3_object_name,
bucket,
TEST_DATA_PATH,
config,
each_user,
)
if config.test_ops["pipe_remove"]:
pipe_id = reusable.pipe_operation(
group_id1,
"remove",
zone_names,
bucket_name=bucket_name_to_create,
)
if config.test_ops["flow_remove"]:
flow_type = config.test_ops["flow_type"]
zone_names = reusable.flow_operation(
group_id1, "remove", flow_type
bucket_name=bkt.name,
)
if config.test_ops["group_remove"]:
group_status = config.test_ops["group_status"]
group_id = reusable.group_operation(
group_id1, "remove", group_status
)

if config.test_ops["flow_remove"]:
flow_type = config.test_ops["flow_type"]
zone_names = reusable.flow_operation(group_id1, "remove", flow_type)

if config.test_ops["group_remove"]:
group_status = config.test_ops["group_status"]
group_id = reusable.group_operation(group_id1, "remove", group_status)

# check for any crashes during the execution
crash_info = reusable.check_for_crash()
if crash_info:
Expand All @@ -174,7 +293,7 @@ def test_exec(config, ssh_con):
test_data_dir = "test_data"
rgw_service = RGWService()
TEST_DATA_PATH = os.path.join(project_dir, test_data_dir)
log.info("TEST_DATA_PATH: %s" % TEST_DATA_PATH)
log.info(f"TEST_DATA_PATH: {TEST_DATA_PATH}")
if not os.path.exists(TEST_DATA_PATH):
log.info("test data dir not exists, creating.. ")
os.makedirs(TEST_DATA_PATH)
Expand All @@ -200,6 +319,8 @@ def test_exec(config, ssh_con):
config = Config(yaml_file)
ceph_conf = CephConfOp(ssh_con)
config.read(ssh_con)
if config.mapped_sizes is None:
config.mapped_sizes = utils.make_mapped_sizes(config)
test_exec(config, ssh_con)
test_info.success_status("test passed")
sys.exit(0)
Expand All @@ -209,3 +330,6 @@ def test_exec(config, ssh_con):
log.error(traceback.format_exc())
test_info.failed_status("test failed")
sys.exit(1)

finally:
utils.cleanup_test_data_path(TEST_DATA_PATH)

0 comments on commit 91ac532

Please sign in to comment.