Skip to content

Commit

Permalink
code review fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Leonid Chernin <[email protected]>
  • Loading branch information
Leonid Chernin committed Nov 21, 2024
1 parent efcbc54 commit 313c07a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 39 deletions.
7 changes: 3 additions & 4 deletions control/cephutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ def get_number_created_gateways(self, pool, group):
rply = self.execute_ceph_monitor_command(str)
self.logger.debug(f"reply \"{rply}\"")
conv_str = rply[1].decode()
data = json.loads(conv_str)
pos = conv_str.find('"LB"')
if pos != -1:
self.rebalance_supported = True
match = re.search(r'"rebalance_ana_group":\s*(\d+)', conv_str)
if match:
self.rebalance_ana_group = int(match.group(1))
self.logger.debug(f"Rebalance ana_group: {self.rebalance_ana_group}")
self.rebalance_ana_group = data.get("rebalance_ana_group", None)
self.logger.debug(f"Rebalance ana_group: {self.rebalance_ana_group}")
else :
self.rebalance_supported = False
pos = conv_str.find("[")
Expand Down
14 changes: 8 additions & 6 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,13 @@ def get_all_namespaces_by_ana_group_id(self, anagrpid):
def get_ana_group_id_by_nsid_subsys(self, nqn, nsid):
if nqn not in self.namespace_list:
return 0
if nsid not in self.namespace_list[nqn]:
return 0
ns = self.namespace_list[nqn][nsid]
if ns.empty():
return 0
anagrp = ns.anagrpid
return anagrp
return ns.anagrpid


def get_subsys_namespaces_by_ana_group_id(self, nqn, anagrpid):
ns_list = []
Expand Down Expand Up @@ -375,7 +377,7 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self.ana_grp_subs_load = defaultdict(dict)
for i in range(MAX_POSSIBLE_ANA_GRPS):
self.ana_grp_ns_load[i] = 0
self.ana_grp_state[i] = 0xff #pb2.ana_state.INACCESSIBLE
self.ana_grp_state[i] = pb2.ana_state.INACCESSIBLE
self.cluster_nonce = {}
self.bdev_cluster = {}
self.bdev_params = {}
Expand All @@ -384,7 +386,7 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self._init_cluster_context()
self.subsys_max_ns = {}
self.host_info = SubsystemHostAuth()
self.rebalance = Rebalance(self, self.config)
self.rebalance = Rebalance(self)

def get_directories_for_key_file(self, key_type : str, subsysnqn : str, create_dir : bool = False) -> []:
tmp_dirs = []
Expand Down Expand Up @@ -1562,14 +1564,14 @@ def namespace_change_load_balancing_group_safe(self, request, context):
errmsg = f"{change_lb_group_failure_prefix}: Can't find entry for namespace {request.nsid} in {request.subsystem_nqn}"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)
if request.auto_lb_logic == False :
if not request.auto_lb_logic:
anagrp = ns_entry["anagrpid"]
gw_id = self.ceph_utils.get_gw_id_owner_ana_group(self.gateway_pool, self.gateway_group, anagrp)
self.logger.debug(f"ANA group of ns#{request.nsid} - {anagrp} is owned by gateway {gw_id}, self.name is {self.gateway_name}")
if self.gateway_name != gw_id:
errmsg = f"ANA group of ns#{request.nsid} - {anagrp} is owned by gateway {gw_id} so try this command from it, this gateway name is {self.gateway_name}"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
return pb2.req_status(status=errno.EEXIST, error_message=errmsg)

try:
anagrpid = self.subsystem_nsid_bdev_and_uuid.get_ana_group_id_by_nsid_subsys(request.subsystem_nqn, request.nsid)
Expand Down
56 changes: 29 additions & 27 deletions control/rebalance.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ class Rebalance:
"""Miscellaneous functions which do rebalance of ANA groups
"""

def __init__(self, gateway_service, config: GatewayConfig):
self.logger = GatewayLogger(config).logger
def __init__(self, gateway_service):
self.logger = gateway_service.logger
self.gw_srv = gateway_service
self.ceph_utils = gateway_service.ceph_utils
self.rebalance_period_sec = config.getint_with_default("gateway", "rebalance_period_sec", 10)
self.rebalance_max_ns_to_change_lb_grp = config.getint_with_default("gateway", "max_ns_to_change_lb_grp", 3)
self.rebalance_period_sec = gateway_service.config.getint_with_default("gateway", "rebalance_period_sec", 10)
self.rebalance_max_ns_to_change_lb_grp = gateway_service.config.getint_with_default("gateway", "max_ns_to_change_lb_grp", 3)
self.auto_rebalance = threading.Thread(target=self.auto_rebalance_task, daemon=True)
self.auto_rebalance.start() #start the thread

Expand All @@ -40,11 +40,11 @@ def auto_rebalance_task(self ):
if rc == 1:
self.logger.info(f"Nothing found for rebalance, break at {i} iteration")
break
except AssertionError as e:
self.logger.exception(f"Got an assert while trying to do auto rebalance {e}")
except AssertionError:
self.logger.exception(f"Got an assert while trying to do auto rebalance")
raise
except Exception as e:
self.logger.exception(f"Exception in auto rebalance {e}")
except Exception:
self.logger.exception(f"Exception in auto rebalance")
raise
time.sleep(0.01) #release lock for 10ms after rebalancing each 1 NS
time.sleep(self.rebalance_period_sec)
Expand Down Expand Up @@ -94,23 +94,23 @@ def rebalance_logic(self, request, context)->int:
grps_list = self.ceph_utils.get_number_created_gateways(self.gw_srv.gateway_pool, self.gw_srv.gateway_group)
if not self.ceph_utils.is_rebalance_supported():
self.logger.info(f"Auto rebalance is not supported with the curent ceph version")
return
return 1
for ana_grp in self.gw_srv.ana_grp_state:
if self.gw_srv.ana_grp_ns_load[ana_grp] != 0 : #internally valid group
if ana_grp not in grps_list: #monitor considers it invalid since GW owner was deleted
ongoing_scale_down_rebalance = True
self.logger.info(f"Scale-down rebalance is ongoing for ANA group {ana_grp}")
self.logger.info(f"Scale-down rebalance is ongoing for ANA group {ana_grp} current load {self.gw_srv.ana_grp_ns_load[ana_grp]}")
break
num_active_ana_groups = 0;
for ana_grp in grps_list:
num_active_ana_groups += 1
num_active_ana_groups = len(grps_list)
for ana_grp in self.gw_srv.ana_grp_state:
if self.gw_srv.ana_grp_state[ana_grp] == pb2.ana_state.OPTIMIZED :
if ana_grp not in grps_list:
self.logger.info(f"Found optimized ana group {ana_grp} that handles to group of deleted GW.\
Number NS in group {self.gw_srv.ana_grp_ns_load[ana_grp]} - Start NS rebalance")
num = self.rebalance_max_ns_to_change_lb_grp if self.gw_srv.ana_grp_ns_load[ana_grp] >= \
self.rebalance_max_ns_to_change_lb_grp else self.gw_srv.ana_grp_ns_load[ana_grp]
self.logger.info(f"Found optimized ana group {ana_grp} that handles to group of deleted GW."
f"Number NS in group {self.gw_srv.ana_grp_ns_load[ana_grp]} - Start NS rebalance")
if self.gw_srv.ana_grp_ns_load[ana_grp] >= self.rebalance_max_ns_to_change_lb_grp:
num = self.rebalance_max_ns_to_change_lb_grp
else:
num = self.gw_srv.ana_grp_ns_load[ana_grp]
if num > 0 :
min_ana_grp, chosen_nqn = self.find_min_loaded_group(grps_list)
self.logger.info(f" Found destination ana group {min_ana_grp}, subsystem {chosen_nqn}")
Expand All @@ -125,16 +125,18 @@ def rebalance_logic(self, request, context)->int:
for nqn in self.gw_srv.ana_grp_subs_load[ana_grp] : #need to search all nqns not only inside the current load
num_ns_in_nqn = self.gw_srv.subsystem_nsid_bdev_and_uuid.get_namespace_count(nqn, None, 0)
target_subs_per_ana = num_ns_in_nqn/num_active_ana_groups
self.logger.debug(f"loop: nqn {nqn} ana group {ana_grp} load {self.gw_srv.ana_grp_subs_load[ana_grp][nqn]},\
num-ns in nqn {num_ns_in_nqn}, target_subs_per_ana {target_subs_per_ana} ")
self.logger.debug(f"loop: nqn {nqn} ana group {ana_grp} load {self.gw_srv.ana_grp_subs_load[ana_grp][nqn]}, "
f"num-ns in nqn {num_ns_in_nqn}, target_subs_per_ana {target_subs_per_ana} ")
if self.gw_srv.ana_grp_subs_load[ana_grp][nqn] > target_subs_per_ana:
self.logger.debug(f"max-nqn load {self.gw_srv.ana_grp_subs_load[ana_grp][nqn]} nqn {nqn} ")
min_load, min_ana_grp = self.find_min_loaded_group_in_subsys(nqn, grps_list)
if ((self.gw_srv.ana_grp_subs_load[min_ana_grp][nqn] + 1) <= target_subs_per_ana ) or \
((self.gw_srv.ana_grp_subs_load[min_ana_grp][nqn] + 1) == (self.gw_srv.ana_grp_subs_load[ana_grp][nqn] - 1)):
self.logger.debug(f" Start rebalance in subsystem {nqn}, dest ana {min_ana_grp}, dest ana load per subs {min_load}")
self.ns_rebalance(context, ana_grp, min_ana_grp, 1, nqn) #regular rebalance
return 0
if (
(self.gw_srv.ana_grp_subs_load[min_ana_grp][nqn] + 1) <= target_subs_per_ana
or (self.gw_srv.ana_grp_subs_load[min_ana_grp][nqn] + 1) == (self.gw_srv.ana_grp_subs_load[ana_grp][nqn] - 1)
):
self.logger.debug(f" Start rebalance in subsystem {nqn}, dest ana {min_ana_grp}, dest ana load per subs {min_load}")
self.ns_rebalance(context, ana_grp, min_ana_grp, 1, nqn) #regular rebalance
return 0
else:
self.logger.debug(f" Found min loaded subsystem {nqn}, ana {min_ana_grp}, load {min_load} does not fit rebalance criteria!")
continue
Expand All @@ -145,14 +147,14 @@ def ns_rebalance(self, context, ana_id, dest_ana_id, num, subs_nqn) ->int:
num_rebalanced = 0
self.logger.info(f"== rebalance started == for subsystem {subs_nqn}, anagrp {ana_id}, destination anagrp {dest_ana_id}, num ns {num} time {now} ")
ns_list = self.gw_srv.subsystem_nsid_bdev_and_uuid.get_all_namespaces_by_ana_group_id(ana_id)
self.logger.info(f"Doing loop on {ana_id} ")
self.logger.debug(f"Doing loop on {ana_id} ")
for nsid, subsys in ns_list:
self.logger.info(f"nsid {nsid} for nqn {subsys} to rebalance:")
self.logger.debug(f"nsid {nsid} for nqn {subsys} to rebalance:")
if subsys == subs_nqn or subs_nqn == "0":
self.logger.info(f"nsid for change_load_balancing : {nsid}, {subsys}, anagrpid: {ana_id}")
change_lb_group_req = pb2.namespace_change_load_balancing_group_req(subsystem_nqn=subsys, nsid= nsid, anagrpid=dest_ana_id, auto_lb_logic=True)
ret = self.gw_srv.namespace_change_load_balancing_group_safe(change_lb_group_req, context)
self.logger.info(f" ret namespace_change_load_balancing_group {ret}")
self.logger.debug(f" ret namespace_change_load_balancing_group {ret}")
num_rebalanced += 1
if num_rebalanced >= num :
self.logger.info(f"== Completed rebalance in {time.time() - now } sec for {num} namespaces from anagrp {ana_id} to {dest_ana_id} ")
Expand Down
4 changes: 2 additions & 2 deletions tests/ha/auto_load_balance.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
#set -xe
set -xe
# See
# - https://github.com/spdk/spdk/blob/master/doc/jsonrpc.md
# - https://spdk.io/doc/nvmf_multipath_howto.html
Expand All @@ -17,7 +17,7 @@ NQN1="nqn.2016-06.io.spdk:cnode1"
NQN2="nqn.2016-06.io.spdk:cnode2"
NQN3="nqn.2016-06.io.spdk:cnode3"
NUM_SUBSYSTEMS=3
MAX_NAMESPACE=26
MAX_NAMESPACE=58

test_ns_distribution()
{
Expand Down

0 comments on commit 313c07a

Please sign in to comment.