Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New rebalance #909

Open
wants to merge 7 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-container.yml
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ jobs:
strategy:
fail-fast: false
matrix:
test: ["sanity", "no_huge", "ns_lb_change", "no_subsystems", "state_transitions", "state_transitions_both_gws", "state_transitions_loop", "state_transitions_rand_loop", "late_registration", "late_registration_loop", "4gws", "4gws_loop", "4gws_create_delete", "4gws_create_delete_loop", "namespaces", "namespaces_loop", "mtls", "notify", "ceph_status", "blocklist", "main_exit"]
test: ["sanity", "no_huge", "ns_lb_change", "no_subsystems", "auto_load_balance", "state_transitions", "state_transitions_both_gws", "state_transitions_loop", "state_transitions_rand_loop", "late_registration", "late_registration_loop", "4gws", "4gws_loop", "4gws_create_delete", "4gws_create_delete_loop", "namespaces", "namespaces_loop", "mtls", "notify", "ceph_status", "blocklist", "main_exit"]
runs-on: ubuntu-latest
env:
HUGEPAGES: 1024 # 4 spdk instances
Expand Down
2 changes: 2 additions & 0 deletions ceph-nvmeof.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ state_update_notify = True
state_update_timeout_in_msec = 2000
state_update_interval_sec = 5
enable_spdk_discovery_controller = False
rebalance_period_sec = 7
max_ns_to_change_lb_grp = 8
#omap_file_lock_duration = 20
#omap_file_lock_retries = 30
#omap_file_lock_retry_sleep_interval = 1.0
Expand Down
18 changes: 18 additions & 0 deletions control/cephutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import rados
import time
import json
import re
from .utils import GatewayLogger

class CephUtils:
Expand All @@ -24,6 +25,8 @@ def __init__(self, config):
self.ceph_conf = config.get_with_default("ceph", "config_file", "/etc/ceph/ceph.conf")
self.rados_id = config.get_with_default("ceph", "id", "")
self.anagroup_list = []
self.rebalance_supported = False
self.rebalance_ana_group = 0
self.last_sent = time.time()

def execute_ceph_monitor_command(self, cmd):
Expand All @@ -50,6 +53,12 @@ def get_gw_id_owner_ana_group(self, pool, group, anagrp):
break
return gw_id

def is_rebalance_supported(self):
return self.rebalance_supported

def get_rebalance_ana_group(self):
return self.rebalance_ana_group

def get_number_created_gateways(self, pool, group):
now = time.time()
if (now - self.last_sent) < 10 and self.anagroup_list :
Expand All @@ -64,6 +73,15 @@ def get_number_created_gateways(self, pool, group):
rply = self.execute_ceph_monitor_command(str)
self.logger.debug(f"reply \"{rply}\"")
conv_str = rply[1].decode()
pos = conv_str.find('"LB"')
if pos != -1:
self.rebalance_supported = True
match = re.search(r'"rebalance_ana_group":\s*(\d+)', conv_str)
leonidc marked this conversation as resolved.
Show resolved Hide resolved
if match:
self.rebalance_ana_group = int(match.group(1))
self.logger.debug(f"Rebalance ana_group: {self.rebalance_ana_group}")
else :
self.rebalance_supported = False
pos = conv_str.find("[")
if pos != -1:
new_str = conv_str[pos + len("[") :]
Expand Down
124 changes: 93 additions & 31 deletions control/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@
from .utils import GatewayLogger
from .state import GatewayState, GatewayStateHandler, OmapLock
from .cephutils import CephUtils
from .rebalance import Rebalance

# Assuming max of 32 gateways and protocol min 1 max 65519
CNTLID_RANGE_SIZE = 2040
DEFAULT_MODEL_NUMBER = "Ceph bdev Controller"

MAX_POSSIBLE_ANA_GRPS = 100
Copy link
Collaborator

@baum baum Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leonidc Please remove MAX_POSSIBLE_ANA_GRPS arbitrary constant. Let me know if you need any assistance with this.

MONITOR_POLLING_RATE_SEC = 2 #monitor polls gw each 2 seconds
class BdevStatus:
def __init__(self, status, error_message, bdev_name = ""):
self.status = status
Expand Down Expand Up @@ -268,6 +271,41 @@ def get_namespace_infos_for_anagrpid(self, nqn: str, anagrpid: int) -> Iterator[
if ns_info.anagrpid == anagrpid:
yield ns_info

def get_all_namespaces_by_ana_group_id(self, anagrpid):
ns_list = []
# Loop through all nqn values in the namespace list
for nqn in self.namespace_list:
for nsid in self.namespace_list[nqn]:
ns = self.namespace_list[nqn][nsid]
if ns.empty():
continue
if ns.anagrpid == anagrpid:
ns_list.append((nsid, nqn))#list of tupples
return ns_list

def get_ana_group_id_by_nsid_subsys(self, nqn, nsid):
if nqn not in self.namespace_list:
return 0
leonidc marked this conversation as resolved.
Show resolved Hide resolved
ns = self.namespace_list[nqn][nsid]
if ns.empty():
return 0
anagrp = ns.anagrpid
return anagrp
leonidc marked this conversation as resolved.
Show resolved Hide resolved

def get_subsys_namespaces_by_ana_group_id(self, nqn, anagrpid):
ns_list = []
if nqn not in self.namespace_list:
return ns_list

for nsid in self.namespace_list[nqn]:
ns = self.namespace_list[nqn][nsid]
if ns.empty():
continue
if ns.anagrpid == anagrpid:
ns_list.append(ns)

return ns_list

class GatewayService(pb2_grpc.GatewayServicer):
"""Implements gateway service interface.

Expand Down Expand Up @@ -332,6 +370,12 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self.max_hosts_per_subsystem = self.config.getint_with_default("gateway", "max_hosts_per_subsystem", GatewayService.MAX_HOSTS_PER_SUBSYS_DEFAULT)
self.gateway_pool = self.config.get_with_default("ceph", "pool", "")
self.ana_map = defaultdict(dict)
self.ana_grp_state = {}
self.ana_grp_ns_load = {}
self.ana_grp_subs_load = defaultdict(dict)
for i in range(MAX_POSSIBLE_ANA_GRPS):
self.ana_grp_ns_load[i] = 0
self.ana_grp_state[i] = 0xff #pb2.ana_state.INACCESSIBLE
leonidc marked this conversation as resolved.
Show resolved Hide resolved
self.cluster_nonce = {}
self.bdev_cluster = {}
self.bdev_params = {}
Expand All @@ -340,7 +384,8 @@ def __init__(self, config: GatewayConfig, gateway_state: GatewayStateHandler, rp
self._init_cluster_context()
self.subsys_max_ns = {}
self.host_info = SubsystemHostAuth()

self.rebalance = Rebalance(self, self.config)
leonidc marked this conversation as resolved.
Show resolved Hide resolved

def get_directories_for_key_file(self, key_type : str, subsysnqn : str, create_dir : bool = False) -> []:
tmp_dirs = []
dir_prefix = f"{key_type}_{subsysnqn}_"
Expand Down Expand Up @@ -1221,6 +1266,9 @@ def create_namespace(self, subsystem_nqn, bdev_name, nsid, anagrpid, uuid, no_au
)
self.subsystem_nsid_bdev_and_uuid.add_namespace(subsystem_nqn, nsid, bdev_name, uuid, anagrpid, no_auto_visible)
self.logger.debug(f"subsystem_add_ns: {nsid}")
self.ana_grp_ns_load[anagrpid] += 1
if anagrpid in self.ana_grp_subs_load and subsystem_nqn in self.ana_grp_subs_load[anagrpid]: self.ana_grp_subs_load[anagrpid][subsystem_nqn] += 1
else : self.ana_grp_subs_load[anagrpid][subsystem_nqn] = 1
except Exception as ex:
self.logger.exception(add_namespace_error_prefix)
errmsg = f"{add_namespace_error_prefix}:\n{ex}"
Expand Down Expand Up @@ -1261,6 +1309,7 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None):
nqn = nas.nqn
for gs in nas.states:
self.ana_map[nqn][gs.grp_id] = gs.state
self.ana_grp_state[gs.grp_id] = gs.state

# If this is not set the subsystem was not created yet
if not nqn in self.subsys_max_ns:
Expand Down Expand Up @@ -1325,27 +1374,22 @@ def set_ana_state_safe(self, ana_info: pb2.ana_info, context=None):
def choose_anagrpid_for_namespace(self, nsid) ->int:
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
for ana_grp in grps_list:
if not self.clusters[ana_grp]: # still no namespaces in this ana-group - probably the new GW added
if self.ana_grp_ns_load[ana_grp] == 0: # still no namespaces in this ana-group - probably the new GW added
self.logger.info(f"New GW created: chosen ana group {ana_grp} for ns {nsid} ")
return ana_grp
#not found ana_grp .To calulate it. Find minimum loaded ana_grp cluster
ana_load = {}
min_load = 2000
chosen_ana_group = 0
for ana_grp in self.clusters:
if ana_grp in grps_list: #to take into consideration only valid groups
ana_load[ana_grp] = 0;
for name in self.clusters[ana_grp]:
ana_load[ana_grp] += self.clusters[ana_grp][name] # accumulate the total load per ana group for all valid ana_grp clusters
for ana_grp in ana_load :
self.logger.info(f" ana group {ana_grp} load = {ana_load[ana_grp]} ")
if ana_load[ana_grp] <= min_load:
min_load = ana_load[ana_grp]
chosen_ana_group = ana_grp
self.logger.info(f" ana group {ana_grp} load = {ana_load[ana_grp]} set as min {min_load} ")
for ana_grp in self.ana_grp_ns_load:
if ana_grp in grps_list:
self.logger.info(f" ana group {ana_grp} load = {self.ana_grp_ns_load[ana_grp]} ")
if self.ana_grp_ns_load[ana_grp] <= min_load:
min_load = self.ana_grp_ns_load[ana_grp]
chosen_ana_group = ana_grp
self.logger.info(f" ana group {ana_grp} load = {self.ana_grp_ns_load[ana_grp]} set as min {min_load} ")
self.logger.info(f"Found min loaded cluster: chosen ana group {chosen_ana_group} for ns {nsid} ")
return chosen_ana_group


def namespace_add_safe(self, request, context):
"""Adds a namespace to a subsystem."""

Expand Down Expand Up @@ -1481,7 +1525,7 @@ def namespace_change_load_balancing_group_safe(self, request, context):
grps_list = []
peer_msg = self.get_peer_message(context)
change_lb_group_failure_prefix = f"Failure changing load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn}"
self.logger.info(f"Received request to change load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")
self.logger.info(f"Received auto {request.auto_lb_logic} request to change load balancing group for namespace with NSID {request.nsid} in {request.subsystem_nqn} to {request.anagrpid}, context: {context}{peer_msg}")

if not request.subsystem_nqn:
errmsg = f"Failure changing load balancing group for namespace, missing subsystem NQN"
Expand All @@ -1493,12 +1537,13 @@ def namespace_change_load_balancing_group_safe(self, request, context):
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"{change_lb_group_failure_prefix}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
if context: #below checks are legal only if command is initiated by local cli or is sent from the local rebalance logic.
grps_list = self.ceph_utils.get_number_created_gateways(self.gateway_pool, self.gateway_group)
if request.anagrpid not in grps_list:
self.logger.debug(f"ANA groups: {grps_list}")
errmsg = f"{change_lb_group_failure_prefix}: Load balancing group {request.anagrpid} doesn't exist"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)

find_ret = self.subsystem_nsid_bdev_and_uuid.find_namespace(request.subsystem_nqn, request.nsid)

Expand All @@ -1517,24 +1562,25 @@ def namespace_change_load_balancing_group_safe(self, request, context):
errmsg = f"{change_lb_group_failure_prefix}: Can't find entry for namespace {request.nsid} in {request.subsystem_nqn}"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENOENT, error_message=errmsg)
anagrp = ns_entry["anagrpid"]
gw_id = self.ceph_utils.get_gw_id_owner_ana_group(self.gateway_pool, self.gateway_group, anagrp)
self.logger.debug(f"ANA group of ns#{request.nsid} - {anagrp} is owned by gateway {gw_id}, self.name is {self.gateway_name}")
if self.gateway_name != gw_id:
errmsg = f"ANA group of ns#{request.nsid} - {anagrp} is owned by gateway {gw_id} so try this command from it, this gateway name is {self.gateway_name}"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
if request.auto_lb_logic == False :
leonidc marked this conversation as resolved.
Show resolved Hide resolved
anagrp = ns_entry["anagrpid"]
gw_id = self.ceph_utils.get_gw_id_owner_ana_group(self.gateway_pool, self.gateway_group, anagrp)
self.logger.debug(f"ANA group of ns#{request.nsid} - {anagrp} is owned by gateway {gw_id}, self.name is {self.gateway_name}")
if self.gateway_name != gw_id:
errmsg = f"ANA group of ns#{request.nsid} - {anagrp} is owned by gateway {gw_id} so try this command from it, this gateway name is {self.gateway_name}"
self.logger.error(errmsg)
return pb2.req_status(status=errno.ENODEV, error_message=errmsg)
leonidc marked this conversation as resolved.
Show resolved Hide resolved

try:
anagrpid = self.subsystem_nsid_bdev_and_uuid.get_ana_group_id_by_nsid_subsys(request.subsystem_nqn, request.nsid)
ret = rpc_nvmf.nvmf_subsystem_set_ns_ana_group(
self.spdk_rpc_client,
nqn=request.subsystem_nqn,
nsid=request.nsid,
anagrpid=request.anagrpid,
#transit_anagrpid=0, #temporary for spdk 24.05
)
self.logger.debug(f"nvmf_subsystem_set_ns_ana_group: {ret}")
if not find_ret.empty():
find_ret.set_ana_group_id(request.anagrpid)
except Exception as ex:
errmsg = f"{change_lb_group_failure_prefix}:\n{ex}"
resp = self.parse_json_exeption(ex)
Expand All @@ -1548,6 +1594,18 @@ def namespace_change_load_balancing_group_safe(self, request, context):
if not ret:
self.logger.error(change_lb_group_failure_prefix)
return pb2.req_status(status=errno.EINVAL, error_message=change_lb_group_failure_prefix)
# change LB success - need to update the data structures
self.ana_grp_ns_load[anagrpid] -= 1 #decrease loading of previous "old" ana group
self.ana_grp_subs_load[anagrpid][request.subsystem_nqn] -= 1
self.logger.debug(f"updated load in grp {anagrpid} = {self.ana_grp_ns_load[anagrpid]} ")
self.ana_grp_ns_load[request.anagrpid] += 1
if request.anagrpid in self.ana_grp_subs_load and request.subsystem_nqn in self.ana_grp_subs_load[request.anagrpid]:
self.ana_grp_subs_load[request.anagrpid][request.subsystem_nqn] += 1
else : self.ana_grp_subs_load[request.anagrpid][request.subsystem_nqn] = 1
self.logger.debug(f"updated load in grp {request.anagrpid} = {self.ana_grp_ns_load[request.anagrpid]} ")
#here update find_ret.set_ana_group_id(request.anagrpid)
if not find_ret.empty():
find_ret.set_ana_group_id(request.anagrpid)

if context:
assert ns_entry, "Namespace entry is None for non-update call"
Expand Down Expand Up @@ -1631,6 +1689,10 @@ def remove_namespace(self, subsystem_nqn, nsid, context):
nsid=nsid,
)
self.logger.debug(f"remove_namespace {nsid}: {ret}")
anagrpid = self.subsystem_nsid_bdev_and_uuid.get_ana_group_id_by_nsid_subsys(subsystem_nqn, nsid)
self.ana_grp_ns_load[anagrpid] -= 1
self.ana_grp_subs_load[anagrpid][subsystem_nqn] -= 1

except Exception as ex:
self.logger.exception(namespace_failure_prefix)
errmsg = f"{namespace_failure_prefix}:\n{ex}"
Expand Down
1 change: 1 addition & 0 deletions control/proto/gateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ message namespace_change_load_balancing_group_req {
uint32 nsid = 2;
optional string OBSOLETE_uuid = 3;
int32 anagrpid = 4;
optional bool auto_lb_logic = 5;
}

message namespace_delete_req {
Expand Down
Loading
Loading