diff --git a/rgw/v2/tests/s3_swift/multisite_configs/test_multisite_bucket_mirror_sync_policy.yaml b/rgw/v2/tests/s3_swift/multisite_configs/test_multisite_bucket_mirror_sync_policy.yaml index e86c3ff7c..b3b6d85a1 100644 --- a/rgw/v2/tests/s3_swift/multisite_configs/test_multisite_bucket_mirror_sync_policy.yaml +++ b/rgw/v2/tests/s3_swift/multisite_configs/test_multisite_bucket_mirror_sync_policy.yaml @@ -2,16 +2,21 @@ config: user_count: 1 bucket_count: 2 + objects_count: 10 + objects_size_range: + min: 5K + max: 2M multisite_global_sync_policy: true multisite_sync_policy: true test_ops: create_bucket: true + create_object: true group_create: true group_remove: false flow_create: true flow_remove: false pipe_create: true pipe_remove: false - group_status: allowed # Enable , Allowed, Forbidden + group_status: enabled # Enable , Allowed, Forbidden bucket_group_status: enabled flow_type: symmetrical # symmetrical , directional diff --git a/rgw/v2/tests/s3_swift/reusable.py b/rgw/v2/tests/s3_swift/reusable.py index ff7b8325e..b4068e743 100644 --- a/rgw/v2/tests/s3_swift/reusable.py +++ b/rgw/v2/tests/s3_swift/reusable.py @@ -1700,23 +1700,64 @@ def group_operation(group_id, group_op, group_status="enabled", bucket_name=None utils.exec_shell_cmd(cmd) -def flow_operation(group_id, flow_op, flow_type="symmetrical"): +def get_sync_policy(bucket_name=None): + if bucket_name is not None: + bkt = f" --bucket={bucket_name}" + else: + bkt = "" + sync_policy_resp = json.loads( + utils.exec_shell_cmd(f"radosgw-admin sync policy get" + bkt) + ) + return sync_policy_resp + + +def flow_operation( + group_id, + flow_op, + flow_type="symmetrical", + bucket_name=None, + source_zone=None, + dest_zone=None, +): flow_id = group_id + "flow" + bkt = "" + if bucket_name is not None: + bkt = f" --bucket={bucket_name}" zone_names, _ = get_multisite_info() - cmd = f"radosgw-admin sync group flow {flow_op} --group-id={group_id} --flow-id={flow_id} --flow-type={flow_type} --zones={zone_names}" + cmd = f"radosgw-admin sync group flow {flow_op} --group-id={group_id} --flow-id={flow_id} --flow-type={flow_type}" + if flow_type == "directional": + cmd += f" --source-zone={source_zone} --dest-zone={dest_zone}" + bkt + else: + cmd += f" --zones={zone_names}" + bkt utils.exec_shell_cmd(cmd) return zone_names def pipe_operation( - group_id, pipe_op, zone_names=None, bucket_name=None, policy_detail=None + group_id, + pipe_op, + zone_names=None, + bucket_name=None, + policy_detail=None, + source_zones=None, + dest_zones=None, ): pipe_id = group_id + "pipe" if zone_names is not None: zone_name = zone_names.split(",") zn = f" --source-zones='{zone_name[0]}','{zone_name[1]}' --dest-zones='{zone_name[0]}','{zone_name[1]}'" + if source_zones is not None: + zn = f" --source-zones={source_zones}" + if dest_zones is not None: + zn += f" --dest-zones={dest_zones}" + else: + zn += " --dest-zones='*'" else: - zn = " --source-zones='*' --dest-zones='*'" + zn = " --source-zones='*'" + if dest_zones is not None: + zn += f" --dest-zones={dest_zones}" + else: + zn += " --dest-zones='*'" if bucket_name is not None: bkt = f" --bucket={bucket_name}" else: diff --git a/rgw/v2/tests/s3_swift/test_multisite_sync_policy.py b/rgw/v2/tests/s3_swift/test_multisite_sync_policy.py index 84c97ec8c..fa2416c8e 100644 --- a/rgw/v2/tests/s3_swift/test_multisite_sync_policy.py +++ b/rgw/v2/tests/s3_swift/test_multisite_sync_policy.py @@ -5,6 +5,7 @@ Note: Any one of these yamls can be used test_multisite_sync_policy.yaml + test_multisite_bucket_mirror_sync_policy.yaml Operation: Creates and delete sync policy group @@ -20,6 +21,7 @@ import argparse import json import logging +import time import traceback import v2.lib.resource_op as s3lib @@ -45,6 +47,27 @@ def test_exec(config, ssh_con): # create user all_users_info = s3lib.create_users(config.user_count) + for each_user in all_users_info: + # authenticate + auth = Auth(each_user, ssh_con, ssl=config.ssl) + if config.use_aws4 is True: + rgw_conn = auth.do_auth(**{"signature_version": "s3v4"}) + else: + rgw_conn = auth.do_auth() + # create buckets + if config.test_ops["create_bucket"] is True: + log.info(f"no of buckets to create: {config.bucket_count}") + buckets = [] + for bc in range(config.bucket_count): + bucket_name_to_create = utils.gen_bucket_name_from_userid( + each_user["user_id"], rand_no=bc + ) + log.info(f"creating bucket with name: {bucket_name_to_create}") + bucket = reusable.create_bucket( + bucket_name_to_create, rgw_conn, each_user + ) + buckets.append(bucket) + if config.multisite_global_sync_policy: ceph_version_id, _ = utils.get_ceph_version() ceph_version_id = ceph_version_id.split("-") @@ -86,24 +109,15 @@ def test_exec(config, ssh_con): if config.test_ops.get("group_transition", False): reusable.group_operation(group_id2, "remove", group_status) - for each_user in all_users_info: - # authenticate - auth = Auth(each_user, ssh_con, ssl=config.ssl) - if config.use_aws4 is True: - rgw_conn = auth.do_auth(**{"signature_version": "s3v4"}) - else: - rgw_conn = auth.do_auth() - # create buckets - if config.test_ops["create_bucket"] is True: - log.info("no of buckets to create: %s" % config.bucket_count) - for bc in range(config.bucket_count): - bucket_name_to_create = utils.gen_bucket_name_from_userid( - each_user["user_id"], rand_no=bc - ) - log.info("creating bucket with name: %s" % bucket_name_to_create) - bucket = reusable.create_bucket( - bucket_name_to_create, rgw_conn, each_user - ) + if config.test_ops["create_bucket"] is True: + for each_user in all_users_info: + # authenticate + auth = Auth(each_user, ssh_con, ssl=config.ssl) + if config.use_aws4 is True: + rgw_conn = auth.do_auth(**{"signature_version": "s3v4"}) + else: + rgw_conn = auth.do_auth() + for bkt in buckets: if config.multisite_sync_policy: ceph_version_id, _ = utils.get_ceph_version() ceph_version_id = ceph_version_id.split("-") @@ -111,21 +125,23 @@ def test_exec(config, ssh_con): if float(ceph_version_id[0]) >= 16: if utils.is_cluster_multisite(): if config.test_ops["group_create"]: - # modifying global group status to allowed + # modifying global group status to allowed if its not allowed bucket_group_status = config.test_ops[ "bucket_group_status" ] - reusable.group_operation( - group_id, - "modify", - group_status, - ) - group_id1 = "group-" + bucket_name_to_create + group_info = reusable.get_sync_policy() + if group_info["groups"][0]["status"] != "allowed": + reusable.group_operation( + group_id, + "modify", + "allowed", + ) + group_id1 = "group-" + bkt.name reusable.group_operation( group_id1, "create", bucket_group_status, - bucket_name_to_create, + bkt.name, ) zone_names = None if config.test_ops["pipe_create"]: @@ -133,25 +149,185 @@ def test_exec(config, ssh_con): group_id1, "create", zone_names, - bucket_name=bucket_name_to_create, + bucket_name=bkt.name, + ) + + period_details = json.loads( + utils.exec_shell_cmd("radosgw-admin period get") + ) + zone_list = json.loads(utils.exec_shell_cmd("radosgw-admin zone list")) + for zone in period_details["period_map"]["zonegroups"][0]["zones"]: + if zone["name"] not in zone_list["zones"]: + rgw_nodes = zone["endpoints"][0].split(":") + node_rgw = rgw_nodes[1].split("//")[-1] + log.info(f"Another site is: {zone['name']} and ip {node_rgw}") + break + rgw_ssh_con = utils.connect_remote(node_rgw) + + for bkt in buckets: + if config.multisite_sync_policy: + ceph_version_id, _ = utils.get_ceph_version() + ceph_version_id = ceph_version_id.split("-") + ceph_version_id = ceph_version_id[0].split(".") + if float(ceph_version_id[0]) >= 16: + if utils.is_cluster_multisite(): + if config.test_ops["group_create"]: + if config.test_ops["pipe_create"]: + _, stdout, stderr = rgw_ssh_con.exec_command( + f"radosgw-admin sync policy get --bucket {bkt.name}" + ) + sync_policy_error = stderr.read().decode() + sync_policy_error_list = sync_policy_error.split( + "\n" ) + if sync_policy_error_list[0] != "": + raise TestExecError( + f"Get sync policy on bucket {bkt.name} another site failled :{sync_policy_error_list}" + ) + cmd_output = json.loads(stdout.read().decode()) + log.info( + f"sync policy get from other site: {cmd_output} for bucket {bkt.name}" + ) + if len(cmd_output["groups"]) == 0: + log.info( + f"bucket sync policy for {bkt.name} not synced to another site, sleep 60s and retry" + ) + for retry_count in range(20): + time.sleep(60) + _, re_stdout, _ = rgw_ssh_con.exec_command( + f"radosgw-admin sync policy get --bucket {bkt.name}" + ) + re_cmd_output = json.loads( + re_stdout.read().decode() + ) + log.info( + f"sync policy get from other site after 60s: {re_cmd_output} for bucket {bkt.name}" + ) + if len(re_cmd_output["groups"]) == 0: + log.info( + f"bucket sync policy for {bkt.name} not synced to another site, so retry" + ) + else: + log.info( + "bucket sync policy synced to another site" + ) + break + + if (retry_count > 20) and ( + len(re_cmd_output["groups"]) == 0 + ): + raise TestExecError( + f"bucket sync policy for {bkt.name} not synced to another site even after 20m" + ) + + if config.test_ops.get("create_object", False): + # uploading data + log.info( + f"s3 objects to create: {config.objects_count}" + ) + for oc, size in list(config.mapped_sizes.items()): + config.obj_size = size + s3_object_name = utils.gen_s3_object_name( + bkt.name, oc + ) + log.info(f"s3 object name: {s3_object_name}") + s3_object_path = os.path.join( + TEST_DATA_PATH, s3_object_name + ) + log.info(f"s3 object path: {s3_object_path}") + if config.test_ops.get("enable_version", False): + reusable.upload_version_object( + config, + each_user, + rgw_conn, + s3_object_name, + config.obj_size, + bkt, + TEST_DATA_PATH, + ) + else: + log.info("upload type: normal") + reusable.upload_object( + s3_object_name, + bkt, + TEST_DATA_PATH, + config, + each_user, + ) + + bucket_stats = json.loads( + utils.exec_shell_cmd( + f"radosgw-admin bucket stats --bucket {bkt.name}" + ) + ) + bkt_objects = bucket_stats["usage"]["rgw.main"][ + "num_objects" + ] + if bkt_objects != config.objects_count: + raise TestExecError( + f"Did not find {config.objects_count} in bucket {bkt.name}, but found {bkt_objects}" + ) + _, stdout, _ = rgw_ssh_con.exec_command( + f"radosgw-admin bucket stats --bucket {bkt.name}" + ) + cmd_output = json.loads(stdout.read().decode()) + log.info(cmd_output) + if "rgw.main" not in cmd_output["usage"].keys(): + for retry_count in range(20): + time.sleep(60) + _, re_stdout, _ = rgw_ssh_con.exec_command( + f"radosgw-admin bucket stats --bucket {bkt.name}" + ) + re_cmd_output = json.loads( + re_stdout.read().decode() + ) + log.info( + f"check bucket stats on other site after 60s: {re_cmd_output} for bucket {bkt.name}" + ) + if ( + "rgw.main" + not in re_cmd_output["usage"].keys() + ): + log.info( + f"bucket stats not synced: for bucket {bkt.name}, so retry" + ) + else: + log.info( + "bucket stats synced for bucket {bkt.name}" + ) + break + + if (retry_count > 20) and ( + "rgw.main" + not in re_cmd_output["usage"].keys() + ): + raise TestExecError( + f"object not synced on bucket {bkt.name} in another site even after 20m" + ) + site_bkt_objects = re_cmd_output["usage"][ + "rgw.main" + ]["num_objects"] + if bkt_objects != site_bkt_objects: + raise TestExecError( + f"object missmatch found in another site for bucket {bkt.name} : {site_bkt_objects} expected {bkt_objects}" + ) + if config.test_ops["pipe_remove"]: pipe_id = reusable.pipe_operation( group_id1, "remove", zone_names, - bucket_name=bucket_name_to_create, - ) - if config.test_ops["flow_remove"]: - flow_type = config.test_ops["flow_type"] - zone_names = reusable.flow_operation( - group_id1, "remove", flow_type + bucket_name=bkt.name, ) - if config.test_ops["group_remove"]: - group_status = config.test_ops["group_status"] - group_id = reusable.group_operation( - group_id1, "remove", group_status - ) + + if config.test_ops["flow_remove"]: + flow_type = config.test_ops["flow_type"] + zone_names = reusable.flow_operation(group_id1, "remove", flow_type) + + if config.test_ops["group_remove"]: + group_status = config.test_ops["group_status"] + group_id = reusable.group_operation(group_id1, "remove", group_status) + # check for any crashes during the execution crash_info = reusable.check_for_crash() if crash_info: @@ -174,7 +350,7 @@ def test_exec(config, ssh_con): test_data_dir = "test_data" rgw_service = RGWService() TEST_DATA_PATH = os.path.join(project_dir, test_data_dir) - log.info("TEST_DATA_PATH: %s" % TEST_DATA_PATH) + log.info(f"TEST_DATA_PATH: {TEST_DATA_PATH}") if not os.path.exists(TEST_DATA_PATH): log.info("test data dir not exists, creating.. ") os.makedirs(TEST_DATA_PATH) @@ -200,6 +376,8 @@ def test_exec(config, ssh_con): config = Config(yaml_file) ceph_conf = CephConfOp(ssh_con) config.read(ssh_con) + if config.mapped_sizes is None: + config.mapped_sizes = utils.make_mapped_sizes(config) test_exec(config, ssh_con) test_info.success_status("test passed") sys.exit(0) @@ -209,3 +387,6 @@ def test_exec(config, ssh_con): log.error(traceback.format_exc()) test_info.failed_status("test failed") sys.exit(1) + + finally: + utils.cleanup_test_data_path(TEST_DATA_PATH)