Skip to content

Commit

Permalink
redesign to comply fair rebalance per ana-grp and subsystem
Browse files Browse the repository at this point in the history
works for 3 GWs and 1 subsystem. Just 1 GW in time performs rebalance
  • Loading branch information
Leonid Chernin committed Nov 10, 2024
1 parent 52facad commit d3c2bc5
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 55 deletions.
11 changes: 10 additions & 1 deletion control/cephutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import rados
import time
import json
import re
from .utils import GatewayLogger

class CephUtils:
Expand All @@ -25,6 +26,7 @@ def __init__(self, config):
self.rados_id = config.get_with_default("ceph", "id", "")
self.anagroup_list = []
self.rebalance_supported = False
self.rebalance_ana_group = 0
self.last_sent = time.time()

def execute_ceph_monitor_command(self, cmd):
Expand Down Expand Up @@ -54,6 +56,9 @@ def get_gw_id_owner_ana_group(self, pool, group, anagrp):
def is_rebalance_supported(self):
return self.rebalance_supported

def get_rebalance_ana_group(self):
return self.rebalance_ana_group

def get_number_created_gateways(self, pool, group):
now = time.time()
if (now - self.last_sent) < 10 and self.anagroup_list :
Expand All @@ -71,8 +76,12 @@ def get_number_created_gateways(self, pool, group):
pos = conv_str.find('"LB"')
if pos != -1:
self.rebalance_supported = True
match = re.search(r'"rebalance_ana_group":\s*(\d+)', conv_str)
if match:
self.rebalance_ana_group = int(match.group(1))
self.logger.debug(f"Rebalance ana_group: {self.rebalance_ana_group}")
else :
self.rebalance_supported = False
self.rebalance_supported = False #pos = conv_str.find("rebalance_ana_group")
pos = conv_str.find("[")
if pos != -1:
new_str = conv_str[pos + len("[") :]
Expand Down
159 changes: 105 additions & 54 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,13 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self.ana_map = defaultdict(dict)
self.ana_grp_state = {}
self.ana_grp_ns_load = {}
self.ana_grp_subs_load = defaultdict(dict)
self.rebalance_period_sec = self.config.getint_with_default("gateway", "rebalance_period_sec", 10)
self.rebalance_max_ns_to_change_lb_grp = self.config.getint_with_default("gateway", "max_ns_to_change_lb_grp", 3)

for i in range(MAX_POSSIBLE_ANA_GRPS):
self.ana_grp_ns_load[i] = 0
self.ana_grp_state[i] = pb2.ana_state.INACCESSIBLE
self.ana_grp_state[i] = 0xff #pb2.ana_state.INACCESSIBLE
self.cluster_nonce = {}
self.bdev_cluster = {}
self.bdev_params = {}
Expand All @@ -362,8 +363,12 @@ def auto_rebalance_task(self ):
"""Periodically calls for auto rebalance."""
if (self.rebalance_period_sec > 0):
while True:
self.rebalance_logic()
time.sleep(self.rebalance_period_sec)
try:
self.rebalance_logic()
except Exception:
self.logger.exception(f"Exception in rebalance")
return None
time.sleep(self.rebalance_period_sec)

def get_directories_for_key_file(self, key_type : str, subsysnqn : str, create_dir : bool = False) -> []:
tmp_dirs = []
Expand Down Expand Up @@ -1204,6 +1209,8 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, no_au
self.subsystem_nsid_bdev_and_uuid.add_namespace(subsystem_nqn, nsid, bdev_name, uuid, anagrpid, no_auto_visible)
self.logger.debug(f"subsystem_add_ns: {nsid}")
self.ana_grp_ns_load[anagrpid] += 1
if anagrpid in self.ana_grp_subs_load and subsystem_nqn in self.ana_grp_subs_load[anagrpid]: self.ana_grp_subs_load[anagrpid][subsystem_nqn] += 1
else : self.ana_grp_subs_load[anagrpid][subsystem_nqn] = 1
except Exception as ex:
self.logger.exception(add_namespace_error_prefix)
errmsg = f"{add_namespace_error_prefix}:\n{ex}"
Expand Down Expand Up @@ -1323,69 +1330,105 @@ def find_min_loaded_group(self, grp_list)->int:
if self.ana_grp_ns_load[ana_grp] <= min_load:
min_load = self.ana_grp_ns_load[ana_grp]
chosen_ana_group = ana_grp
return chosen_ana_group
min_load = 2000
for nqn in self.ana_grp_subs_load[chosen_ana_group] :
if self.ana_grp_subs_load[chosen_ana_group][nqn] < min_load:
min_load = self.ana_grp_subs_load[chosen_ana_group][nqn]
chosen_nqn = nqn

def find_target_ns_per_group(self, grp_list):
total_num_ns = 0
num_valid_grps = 0
target = 0
for ana_grp in grp_list:
num_valid_grps += 1
for ana_grp in self.ana_grp_ns_load :
total_num_ns += self.ana_grp_ns_load[ana_grp]
target = total_num_ns/num_valid_grps
self.logger.info(f"Total namespaces {total_num_ns} number valid groups {num_valid_grps} target = {target}")
return num_valid_grps, target
return chosen_ana_group, chosen_nqn

def find_min_loaded_group_in_subsys(self, nqn, grp_list)->int:
min_load = 2000
chosen_ana_group = 0
for ana_grp in self.ana_grp_subs_load :
if ana_grp in grp_list :
if nqn in self.ana_grp_subs_load[ana_grp]:
if self.ana_grp_subs_load[ana_grp][nqn] <= min_load:
min_load = self.ana_grp_subs_load[ana_grp][nqn]
chosen_ana_group = ana_grp
else: #still no load on this ana and subs
chosen_ana_group = ana_grp
min_load = 0
break
return min_load, chosen_ana_group

""" def find_max_loaded_subsys(self, ana_grp)->int:
max_load = 0
chosen_nqn = 0
for nqn in self.ana_grp_subs_load[ana_grp] :
if self.ana_grp_subs_load[ana_grp][nqn] > max_load:
max_load = self.ana_grp_subs_load[ana_grp][nqn]
chosen_nqn = nqn
return max_load, chosen_nqn """

# 1.Not allow to perform regular rebalance when scale_down rebalance is ongoing
# 2. Monitor each time defines what GW is responsible for regular rebalance(fairness logic), so there will not be collisions between the GWs
# and reballance results will be accurate . monitor in nvme-gw show response publishes the index of ANA group that is responsible for rebalance
def rebalance_logic(self):
self.logger.info(f"Called rebalance logic")
worker_ana_group = self.ceph_utils.get_rebalance_ana_group()
self.logger.info(f"Called rebalance logic current rebalancing ana group {worker_ana_group}")
ongoing_scale_down_rebalance = False
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
if not self.ceph_utils.is_rebalance_supported():
self.logger.info(f"Auto rebalance is not supported with the curent ceph version")
return

for ana_grp in self.ana_grp_state:
if self.ana_grp_ns_load[ana_grp] != 0 : #internally valid group
if ana_grp not in grps_list: #monitor considers it invalid since GW owner was deleted
ongoing_scale_down_rebalance = True
self.logger.info(f"Scale-down rebalance is ongoing for ANA group {ana_grp}")
break
for ana_grp in self.ana_grp_state:
if self.ana_grp_state[ana_grp] == pb2.ana_state.OPTIMIZED :
if ana_grp not in grps_list:
self.logger.info(f"Found optimized ana group {ana_grp} belong to group of deleted GW num ms in group {self.ana_grp_ns_load[ana_grp]} - Start NS rebalance")
#rebalance up to 15 namespaces from "ana_grp" to min loaded ana_grp -> check ana_load[ana_grp]
num = self.rebalance_max_ns_to_change_lb_grp if self.ana_grp_ns_load[ana_grp] >= self.rebalance_max_ns_to_change_lb_grp else self.ana_grp_ns_load[ana_grp]
if num > 0 :
min_ana_grp = self.find_min_loaded_group(grps_list)
self.ns_rebalance(ana_grp, min_ana_grp, num)
min_ana_grp, chosen_nqn = self.find_min_loaded_group(grps_list)
self.logger.info(f" Found destination ana group {min_ana_grp}, subsystem {chosen_nqn}")
self.ns_rebalance(ana_grp, min_ana_grp, num, "0")
else :
self.logger.info(f" warning: empty group {ana_grp} of Deleting GW still appears Optimized")
return
else :
#rebalance only if this ana grp is more loaded
num_valid_grps, target = self.find_target_ns_per_group(grps_list)
self.logger.info(f"My load {self.ana_grp_ns_load[ana_grp]} ")
if self.ana_grp_ns_load[ana_grp] > target :
min_ana_grp = self.find_min_loaded_group(grps_list)
self.logger.info(f"My load {self.ana_grp_ns_load[ana_grp]}, min loaded group {min_ana_grp}, min load {self.ana_grp_ns_load[min_ana_grp]}")
if self.ana_grp_ns_load[ana_grp] >= (self.ana_grp_ns_load[min_ana_grp] + self.rebalance_max_ns_to_change_lb_grp + 5):# was 2*
self.logger.info(f"Found destination ana group {min_ana_grp}: load = {self.ana_grp_ns_load[min_ana_grp]} for source ana {ana_grp} Start NS rebalance")
num_2_move = self.rebalance_max_ns_to_change_lb_grp/(num_valid_grps -1) if num_valid_grps > 1 else self.rebalance_max_ns_to_change_lb_grp
if num_2_move == 0 :num_2_move = 1
self.ns_rebalance(ana_grp, min_ana_grp, num_2_move)
if not ongoing_scale_down_rebalance and (worker_ana_group == ana_grp) :
num_moved = 0
for i in range(self.rebalance_max_ns_to_change_lb_grp):
for nqn in self.ana_grp_subs_load[ana_grp] : #need to search all nqns not only inside the current load
max_load = self.ana_grp_subs_load[ana_grp][nqn]#not really max but we will try it
chosen_nqn = nqn
self.logger.info(f"total load {self.ana_grp_ns_load[ana_grp]} current-load = {max_load} for ana grp {ana_grp}, subsystem {chosen_nqn}")
if max_load == 0 or chosen_nqn == 0 :
continue
min_load, min_ana_grp = self.find_min_loaded_group_in_subsys(chosen_nqn, grps_list)
self.logger.info(f" min-load = {min_load} for ana group {min_ana_grp}, same subsystem")
if max_load - min_load >= 2 :#or min_load == 0:
self.logger.info(f"Found destination ana group {min_ana_grp} for source ana {ana_grp} in subsystem {chosen_nqn}. Start NS rebalance")
self.ns_rebalance(ana_grp, min_ana_grp, 1, chosen_nqn)
num_moved += 1
if num_moved > self.rebalance_max_ns_to_change_lb_grp : break
if num_moved == 0: break
return

def ns_rebalance(self, ana_id, dest_ana_id, num) ->int:
def ns_rebalance(self, ana_id, dest_ana_id, num, subs_nqn) ->int:
now = time.time()
num_rebalanced = 0
self.logger.info(f"== rebalance started == for anagrp {ana_id} destination anagrp {dest_ana_id} num ns {num} time {now} ")
ns_list = self.subsystem_nsid_bdev_and_uuid.get_all_namespaces_by_ana_group_id(ana_id)
self.logger.info(f"Doing loop on {ana_id} ")
for nsid, subsys in ns_list:
#self.logger.info(f"nsid {nsid} for nqn {subsys} to rebalance:")
self.logger.info(f"nsid for change_load_balancing : {nsid}, {subsys}, anagrpid:{ana_id}")
change_lb_group_req = pb2.namespace_change_load_balancing_group_req(subsystem_nqn=subsys, nsid= nsid, anagrpid=dest_ana_id, auto_lb_logic=True)
ret = self.namespace_change_load_balancing_group(change_lb_group_req, "context")
self.logger.info(f" ret namespace_change_load_balancing_group {ret}")
num_rebalanced += 1
if num_rebalanced >= num :
self.logger.info(f"== Completed rebalance in {time.time() - now } sec for {num} namespaces from anagrp {ana_id} to {dest_ana_id} ")
return 0
self.logger.info(f"nsid {nsid} for nqn {subsys} to rebalance:")
if subsys == subs_nqn or subs_nqn == "0":
self.logger.info(f"nsid for change_load_balancing : {nsid}, {subsys}, anagrpid: {ana_id}")
change_lb_group_req = pb2.namespace_change_load_balancing_group_req(subsystem_nqn=subsys, nsid= nsid, anagrpid=dest_ana_id, auto_lb_logic=True)
ret = self.namespace_change_load_balancing_group(change_lb_group_req, "context")
self.logger.info(f" ret namespace_change_load_balancing_group {ret}")
num_rebalanced += 1
if num_rebalanced >= num :
self.logger.info(f"== Completed rebalance in {time.time() - now } sec for {num} namespaces from anagrp {ana_id} to {dest_ana_id} ")
return 0
return 0

def namespace_add_safe(self, request, context):
Expand Down Expand Up @@ -1535,12 +1578,13 @@ def namespace_change_load_balancing_group_safe(self, request, context):
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"{change_lb_group_failure_prefix}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
if context: #below checks are legal only if command is initiated by local cli or is sent from the local rebalance logic.
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"{change_lb_group_failure_prefix}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

find_ret = self.subsystem_nsid_bdev_and_uuid.find_namespace(request.subsystem_nqn, request.nsid)

Expand Down Expand Up @@ -1569,21 +1613,15 @@ def namespace_change_load_balancing_group_safe(self, request, context):
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

try:
anagrpid = self.subsystem_nsid_bdev_and_uuid.get_ana_group_id_by_nsid_subsys(request.subsystem_nqn, request.nsid)
ret = rpc_nvmf.nvmf_subsystem_set_ns_ana_group(
self.spdk_rpc_client,
nqn=request.subsystem_nqn,
nsid=request.nsid,
anagrpid=request.anagrpid,
#transit_anagrpid=0, #temporary for spdk 24.05
)
self.logger.debug(f"nvmf_subsystem_set_ns_ana_group: {ret}")
anagrpid = self.subsystem_nsid_bdev_and_uuid.get_ana_group_id_by_nsid_subsys(request.subsystem_nqn, request.nsid)
self.ana_grp_ns_load[anagrpid] -= 1 #decrease loading of previous "old" ana group
self.logger.debug(f"updated load in grp {anagrpid} = {self.ana_grp_ns_load[anagrpid]} ")
self.ana_grp_ns_load[request.anagrpid] += 1
self.logger.debug(f"updated load in grp {request.anagrpid} = {self.ana_grp_ns_load[request.anagrpid]} ")
#here update find_ret.set_ana_group_id(request.anagrpid)
if not find_ret.empty():
find_ret.set_ana_group_id(request.anagrpid)
except Exception as ex:
errmsg = f"{change_lb_group_failure_prefix}:\n{ex}"
resp = self.parse_json_exeption(ex)
Expand All @@ -1597,6 +1635,18 @@ def namespace_change_load_balancing_group_safe(self, request, context):
if not ret:
self.logger.error(change_lb_group_failure_prefix)
return pb2.req_status(status=errno.EINVAL, error_message=change_lb_group_failure_prefix)
# change LB success - need to update the data structures
self.ana_grp_ns_load[anagrpid] -= 1 #decrease loading of previous "old" ana group
self.ana_grp_subs_load[anagrpid][request.subsystem_nqn] -= 1
self.logger.debug(f"updated load in grp {anagrpid} = {self.ana_grp_ns_load[anagrpid]} ")
self.ana_grp_ns_load[request.anagrpid] += 1
if request.anagrpid in self.ana_grp_subs_load and request.subsystem_nqn in self.ana_grp_subs_load[request.anagrpid]:
self.ana_grp_subs_load[request.anagrpid][request.subsystem_nqn] += 1
else : self.ana_grp_subs_load[request.anagrpid][request.subsystem_nqn] = 1
self.logger.debug(f"updated load in grp {request.anagrpid} = {self.ana_grp_ns_load[request.anagrpid]} ")
#here update find_ret.set_ana_group_id(request.anagrpid)
if not find_ret.empty():
find_ret.set_ana_group_id(request.anagrpid)

if context:
assert ns_entry, "Namespace entry is None for non-update call"
Expand Down Expand Up @@ -1682,6 +1732,7 @@ def remove_namespace(self, subsystem_nqn, nsid, context):
self.logger.debug(f"remove_namespace {nsid}: {ret}")
anagrpid = self.subsystem_nsid_bdev_and_uuid.get_ana_group_id_by_nsid_subsys(subsystem_nqn, nsid)
self.ana_grp_ns_load[anagrpid] -= 1
self.ana_grp_subs_load[anagrpid][subsystem_nqn] -= 1

except Exception as ex:
self.logger.exception(namespace_failure_prefix)
Expand Down

0 comments on commit d3c2bc5

Please sign in to comment.