From 91ac5324741257f36f655777a35ae19a09d69f3e Mon Sep 17 00:00:00 2001 From: Anuchaithra Date: Thu, 2 Nov 2023 16:55:13 +0530 Subject: [PATCH] Rgw granular sync policy changes Signed-off-by: Anuchaithra --- ...t_multisite_bucket_mirror_sync_policy.yaml | 7 +- rgw/v2/tests/s3_swift/reusable.py | 49 ++++- .../s3_swift/test_multisite_sync_policy.py | 202 ++++++++++++++---- 3 files changed, 214 insertions(+), 44 deletions(-) diff --git a/rgw/v2/tests/s3_swift/multisite_configs/test_multisite_bucket_mirror_sync_policy.yaml b/rgw/v2/tests/s3_swift/multisite_configs/test_multisite_bucket_mirror_sync_policy.yaml index e86c3ff7c..93d6a5164 100644 --- a/rgw/v2/tests/s3_swift/multisite_configs/test_multisite_bucket_mirror_sync_policy.yaml +++ b/rgw/v2/tests/s3_swift/multisite_configs/test_multisite_bucket_mirror_sync_policy.yaml @@ -2,16 +2,21 @@ config: user_count: 1 bucket_count: 2 + objects_count: 100 + objects_size_range: + min: 5K + max: 2M multisite_global_sync_policy: true multisite_sync_policy: true test_ops: create_bucket: true + create_object: true group_create: true group_remove: false flow_create: true flow_remove: false pipe_create: true pipe_remove: false - group_status: allowed # Enable , Allowed, Forbidden + group_status: enabled # Enable , Allowed, Forbidden bucket_group_status: enabled flow_type: symmetrical # symmetrical , directional diff --git a/rgw/v2/tests/s3_swift/reusable.py b/rgw/v2/tests/s3_swift/reusable.py index ff7b8325e..b4068e743 100644 --- a/rgw/v2/tests/s3_swift/reusable.py +++ b/rgw/v2/tests/s3_swift/reusable.py @@ -1700,23 +1700,64 @@ def group_operation(group_id, group_op, group_status="enabled", bucket_name=None utils.exec_shell_cmd(cmd) -def flow_operation(group_id, flow_op, flow_type="symmetrical"): +def get_sync_policy(bucket_name=None): + if bucket_name is not None: + bkt = f" --bucket={bucket_name}" + else: + bkt = "" + sync_policy_resp = json.loads( + utils.exec_shell_cmd(f"radosgw-admin sync policy get" + bkt) + ) + return sync_policy_resp + + +def flow_operation( + group_id, + flow_op, + flow_type="symmetrical", + bucket_name=None, + source_zone=None, + dest_zone=None, +): flow_id = group_id + "flow" + bkt = "" + if bucket_name is not None: + bkt = f" --bucket={bucket_name}" zone_names, _ = get_multisite_info() - cmd = f"radosgw-admin sync group flow {flow_op} --group-id={group_id} --flow-id={flow_id} --flow-type={flow_type} --zones={zone_names}" + cmd = f"radosgw-admin sync group flow {flow_op} --group-id={group_id} --flow-id={flow_id} --flow-type={flow_type}" + if flow_type == "directional": + cmd += f" --source-zone={source_zone} --dest-zone={dest_zone}" + bkt + else: + cmd += f" --zones={zone_names}" + bkt utils.exec_shell_cmd(cmd) return zone_names def pipe_operation( - group_id, pipe_op, zone_names=None, bucket_name=None, policy_detail=None + group_id, + pipe_op, + zone_names=None, + bucket_name=None, + policy_detail=None, + source_zones=None, + dest_zones=None, ): pipe_id = group_id + "pipe" if zone_names is not None: zone_name = zone_names.split(",") zn = f" --source-zones='{zone_name[0]}','{zone_name[1]}' --dest-zones='{zone_name[0]}','{zone_name[1]}'" + if source_zones is not None: + zn = f" --source-zones={source_zones}" + if dest_zones is not None: + zn += f" --dest-zones={dest_zones}" + else: + zn += " --dest-zones='*'" else: - zn = " --source-zones='*' --dest-zones='*'" + zn = " --source-zones='*'" + if dest_zones is not None: + zn += f" --dest-zones={dest_zones}" + else: + zn += " --dest-zones='*'" if bucket_name is not None: bkt = f" --bucket={bucket_name}" else: diff --git a/rgw/v2/tests/s3_swift/test_multisite_sync_policy.py b/rgw/v2/tests/s3_swift/test_multisite_sync_policy.py index 84c97ec8c..bbdab2465 100644 --- a/rgw/v2/tests/s3_swift/test_multisite_sync_policy.py +++ b/rgw/v2/tests/s3_swift/test_multisite_sync_policy.py @@ -20,6 +20,7 @@ import argparse import json import logging +import time import traceback import v2.lib.resource_op as s3lib @@ -45,6 +46,27 @@ def test_exec(config, ssh_con): # create user all_users_info = s3lib.create_users(config.user_count) + for each_user in all_users_info: + # authenticate + auth = Auth(each_user, ssh_con, ssl=config.ssl) + if config.use_aws4 is True: + rgw_conn = auth.do_auth(**{"signature_version": "s3v4"}) + else: + rgw_conn = auth.do_auth() + # create buckets + if config.test_ops["create_bucket"] is True: + log.info(f"no of buckets to create: {config.bucket_count}") + buckets = [] + for bc in range(config.bucket_count): + bucket_name_to_create = utils.gen_bucket_name_from_userid( + each_user["user_id"], rand_no=bc + ) + log.info(f"creating bucket with name: {bucket_name_to_create}") + bucket = reusable.create_bucket( + bucket_name_to_create, rgw_conn, each_user + ) + buckets.append(bucket) + if config.multisite_global_sync_policy: ceph_version_id, _ = utils.get_ceph_version() ceph_version_id = ceph_version_id.split("-") @@ -86,24 +108,15 @@ def test_exec(config, ssh_con): if config.test_ops.get("group_transition", False): reusable.group_operation(group_id2, "remove", group_status) - for each_user in all_users_info: - # authenticate - auth = Auth(each_user, ssh_con, ssl=config.ssl) - if config.use_aws4 is True: - rgw_conn = auth.do_auth(**{"signature_version": "s3v4"}) - else: - rgw_conn = auth.do_auth() - # create buckets - if config.test_ops["create_bucket"] is True: - log.info("no of buckets to create: %s" % config.bucket_count) - for bc in range(config.bucket_count): - bucket_name_to_create = utils.gen_bucket_name_from_userid( - each_user["user_id"], rand_no=bc - ) - log.info("creating bucket with name: %s" % bucket_name_to_create) - bucket = reusable.create_bucket( - bucket_name_to_create, rgw_conn, each_user - ) + if config.test_ops["create_bucket"] is True: + for each_user in all_users_info: + # authenticate + auth = Auth(each_user, ssh_con, ssl=config.ssl) + if config.use_aws4 is True: + rgw_conn = auth.do_auth(**{"signature_version": "s3v4"}) + else: + rgw_conn = auth.do_auth() + for bkt in buckets: if config.multisite_sync_policy: ceph_version_id, _ = utils.get_ceph_version() ceph_version_id = ceph_version_id.split("-") @@ -111,21 +124,23 @@ def test_exec(config, ssh_con): if float(ceph_version_id[0]) >= 16: if utils.is_cluster_multisite(): if config.test_ops["group_create"]: - # modifying global group status to allowed + # modifying global group status to allowed if its not allowed bucket_group_status = config.test_ops[ "bucket_group_status" ] - reusable.group_operation( - group_id, - "modify", - group_status, - ) - group_id1 = "group-" + bucket_name_to_create + group_info = reusable.get_sync_policy() + if group_info["groups"][0]["status"] != "allowed": + reusable.group_operation( + group_id, + "modify", + "allowed", + ) + group_id1 = "group-" + bkt.name reusable.group_operation( group_id1, "create", bucket_group_status, - bucket_name_to_create, + bkt.name, ) zone_names = None if config.test_ops["pipe_create"]: @@ -133,25 +148,129 @@ def test_exec(config, ssh_con): group_id1, "create", zone_names, - bucket_name=bucket_name_to_create, + bucket_name=bkt.name, ) + + if config.test_ops["create_bucket"] is True: + for each_user in all_users_info: + # authenticate + auth = Auth(each_user, ssh_con, ssl=config.ssl) + if config.use_aws4 is True: + rgw_conn = auth.do_auth(**{"signature_version": "s3v4"}) + else: + rgw_conn = auth.do_auth() + + period_details = json.loads( + utils.exec_shell_cmd("radosgw-admin period get") + ) + zone_list = json.loads(utils.exec_shell_cmd("radosgw-admin zone list")) + for zone in period_details["period_map"]["zonegroups"][0]["zones"]: + if zone["name"] not in zone_list["zones"]: + rgw_nodes = zone["endpoints"][0].split(":") + node_rgw = rgw_nodes[1].split("//")[-1] + break + + for bkt in buckets: + if config.multisite_sync_policy: + ceph_version_id, _ = utils.get_ceph_version() + ceph_version_id = ceph_version_id.split("-") + ceph_version_id = ceph_version_id[0].split(".") + if float(ceph_version_id[0]) >= 16: + if utils.is_cluster_multisite(): + if config.test_ops["group_create"]: + if config.test_ops["pipe_create"]: + rgw_ssh_con = utils.connect_remote(node_rgw) + _, stdout, stderr = rgw_ssh_con.exec_command( + f"radosgw-admin sync policy get --bucket {bkt.name}" + ) + if stderr.read().decode(): + raise TestExecError( + f"Get sync policy on another site failled with error {stderr.read().decode()}" + ) + cmd_output = json.loads(stdout.read().decode()) + log.info( + f"sync policy get from other site: {cmd_output} for bucket {bkt.name}" + ) + if len(cmd_output["groups"]) == 0: + log.info( + f"bucket sync policy for {bkt.name} not synced to another site" + ) + log.info( + f"sleep of 60s secs for sync to complete" + ) + for retry_count in range(20): + time.sleep(60) + _, stdout, _ = rgw_ssh_con.exec_command( + f"radosgw-admin sync policy get --bucket {bkt.name}" + ) + cmd_output = json.loads(stdout.read()) + if len(cmd_output["groups"]) == 0: + log.info( + f"bucket sync policy for {bkt.name} not synced to another site, so retry" + ) + else: + log.info( + "bucket sync policy synced to another site" + ) + break + + if (retry_count > 20) and ( + len(cmd_output["groups"]) == 0 + ): + raise TestExecError( + f"bucket sync policy for {bkt.name} not synced to another site even after 20m" + ) + + if config.test_ops.get("create_object", False): + # uploading data + log.info( + f"s3 objects to create: {config.objects_count}" + ) + for oc, size in list(config.mapped_sizes.items()): + config.obj_size = size + s3_object_name = utils.gen_s3_object_name( + bkt.name, oc + ) + log.info(f"s3 object name: {s3_object_name}") + s3_object_path = os.path.join( + TEST_DATA_PATH, s3_object_name + ) + log.info(f"s3 object path: {s3_object_path}") + if config.test_ops.get("enable_version", False): + reusable.upload_version_object( + config, + each_user, + rgw_conn, + s3_object_name, + config.obj_size, + bucket, + TEST_DATA_PATH, + ) + else: + log.info("upload type: normal") + reusable.upload_object( + s3_object_name, + bucket, + TEST_DATA_PATH, + config, + each_user, + ) if config.test_ops["pipe_remove"]: pipe_id = reusable.pipe_operation( group_id1, "remove", zone_names, - bucket_name=bucket_name_to_create, - ) - if config.test_ops["flow_remove"]: - flow_type = config.test_ops["flow_type"] - zone_names = reusable.flow_operation( - group_id1, "remove", flow_type + bucket_name=bkt.name, ) - if config.test_ops["group_remove"]: - group_status = config.test_ops["group_status"] - group_id = reusable.group_operation( - group_id1, "remove", group_status - ) + + if config.test_ops["flow_remove"]: + flow_type = config.test_ops["flow_type"] + zone_names = reusable.flow_operation(group_id1, "remove", flow_type) + + if config.test_ops["group_remove"]: + group_status = config.test_ops["group_status"] + group_id = reusable.group_operation(group_id1, "remove", group_status) + # check for any crashes during the execution crash_info = reusable.check_for_crash() if crash_info: @@ -174,7 +293,7 @@ def test_exec(config, ssh_con): test_data_dir = "test_data" rgw_service = RGWService() TEST_DATA_PATH = os.path.join(project_dir, test_data_dir) - log.info("TEST_DATA_PATH: %s" % TEST_DATA_PATH) + log.info(f"TEST_DATA_PATH: {TEST_DATA_PATH}") if not os.path.exists(TEST_DATA_PATH): log.info("test data dir not exists, creating.. ") os.makedirs(TEST_DATA_PATH) @@ -200,6 +319,8 @@ def test_exec(config, ssh_con): config = Config(yaml_file) ceph_conf = CephConfOp(ssh_con) config.read(ssh_con) + if config.mapped_sizes is None: + config.mapped_sizes = utils.make_mapped_sizes(config) test_exec(config, ssh_con) test_info.success_status("test passed") sys.exit(0) @@ -209,3 +330,6 @@ def test_exec(config, ssh_con): log.error(traceback.format_exc()) test_info.failed_status("test failed") sys.exit(1) + + finally: + utils.cleanup_test_data_path(TEST_DATA_PATH)