Skip to content
This repository has been archived by the owner on Jan 7, 2023. It is now read-only.

Active manager is required as Ceph upgrades. #298

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 58 additions & 20 deletions analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,12 @@ def summary_result(self, data):
max_lat = 0
max_lat_95 = 0
max_lat_99 = 0
max_write_lat = 0
max_read_lat = 0
max_write_lat_95 = 0
max_read_lat_95 = 0
max_write_lat_99 = 0
max_read_lat_99 = 0
for engine_candidate in data["workload"].keys():
if engine_candidate in benchmark_tool:
engine = engine_candidate
Expand All @@ -441,28 +447,43 @@ def summary_result(self, data):
write_IOPS += float(node_data["write_iops"])
write_BW += float(node_data["write_bw"])
write_Latency += float(node_data["write_lat"])
max_lat_95 += float(node_data["95.00th%_lat"])
max_lat_99 += float(node_data["99.00th%_lat"])
max_lat += float(node_data["99.99th%_lat"])
#max_lat_95 += float(node_data["95.00th%_lat"])
#max_lat_99 += float(node_data["99.00th%_lat"])
#max_lat += float(node_data["99.99th%_lat"])
max_write_lat += float(node_data["99.99th%_write_lat"])
max_read_lat += float(node_data["99.99th%_read_lat"])
max_write_lat_95 += float(node_data["95.00th%_write_lat"])
max_read_lat_95 += float(node_data["95.00th%_read_lat"])
max_write_lat_99 += float(node_data["99.00th%_write_lat"])
max_read_lat_99 += float(node_data["99.00th%_read_lat"])
if tmp_data["Op_Type"] in ["randread", "seqread", "read"]:
tmp_data["IOPS"] = "%.3f" % read_IOPS
tmp_data["BW(MB/s)"] = "%.3f" % read_BW
if rbd_count > 0:
tmp_data["Latency(ms)"] = "%.3f" % (read_Latency/rbd_count)
tmp_data["95.00th%_lat(ms)"] = "%.3f" % (max_read_lat_95/rbd_count)
tmp_data["99.00th%_lat(ms)"] = "%.3f" % (max_read_lat_99/rbd_count)
tmp_data["99.99th%_lat(ms)"] = "%.3f" % (max_read_lat/rbd_count)
elif tmp_data["Op_Type"] in ["randwrite", "seqwrite", "write"]:
tmp_data["IOPS"] = "%.3f" % write_IOPS
tmp_data["BW(MB/s)"] = "%.3f" % write_BW
if rbd_count > 0:
tmp_data["Latency(ms)"] = "%.3f" % (write_Latency/rbd_count)
tmp_data["95.00th%_lat(ms)"] = "%.3f" % (max_write_lat_95/rbd_count)
tmp_data["99.00th%_lat(ms)"] = "%.3f" % (max_write_lat_99/rbd_count)
tmp_data["99.99th%_lat(ms)"] = "%.3f" % (max_write_lat/rbd_count)
elif tmp_data["Op_Type"] in ["randrw", "rw", "readwrite"]:
tmp_data["IOPS"] = "%.3f, %.3f" % (read_IOPS, write_IOPS)
tmp_data["BW(MB/s)"] = "%.3f, %.3f" % (read_BW, write_BW)
if rbd_count > 0:
tmp_data["Latency(ms)"] = "%.3f, %.3f" % ((read_Latency/rbd_count), (write_Latency/rbd_count))
if rbd_count > 0:
tmp_data["95.00th%_lat(ms)"] = "%.3f" % (max_lat_95/rbd_count)
tmp_data["99.00th%_lat(ms)"] = "%.3f" % (max_lat_99/rbd_count)
tmp_data["99.99th%_lat(ms)"] = "%.3f" % (max_lat/rbd_count)
tmp_data["95.00th%_lat(ms)"] = "%.3f, %.3f" % ((max_read_lat_95/rbd_count), (max_write_lat_95/rbd_count))
tmp_data["99.00th%_lat(ms)"] = "%.3f, %.3f" % ((max_read_lat_99/rbd_count), (max_write_lat_99/rbd_count))
tmp_data["99.99th%_lat(ms)"] = "%.3f, %.3f" % ((max_read_lat/rbd_count), (max_write_lat/rbd_count))
# if rbd_count > 0:
# tmp_data["95.00th%_lat(ms)"] = "%.3f" % (max_lat_95/rbd_count)
# tmp_data["99.00th%_lat(ms)"] = "%.3f" % (max_lat_99/rbd_count)
# tmp_data["99.99th%_lat(ms)"] = "%.3f" % (max_lat/rbd_count)
except:
err_log = traceback.format_exc()
common.printout("ERROR","%s" % err_log)
Expand All @@ -474,7 +495,7 @@ def summary_result(self, data):
write_SN_Latency = 0
diskformat = common.parse_disk_format( self.cluster['diskformat'] )
if len(diskformat):
typename = diskformat[0]
typename = diskformat[1]
else:
typename = "osd"
for node, node_data in data["ceph"][typename]['summary'].items():
Expand Down Expand Up @@ -882,8 +903,8 @@ def process_iostat_data(self, node, path):
disk_list=[]
for osd_journal in common.get_list(self.all_conf_data.get_list(node)):
tmp_dev_name = osd_journal[i].split('/')[2]
if 'nvme' in tmp_dev_name:
tmp_dev_name = common.parse_nvme( tmp_dev_name )
#if 'nvme' in tmp_dev_name:
# tmp_dev_name = common.parse_nvme( tmp_dev_name )
if tmp_dev_name not in disk_list:
disk_list.append( tmp_dev_name )
dict_diskformat[output_list[i]]=disk_list
Expand Down Expand Up @@ -963,11 +984,15 @@ def process_fio_data(self, path, dirname):
result = {}
try:
stdout, stderr = common.bash("grep \" IOPS=.*BW=.*\| *io=.*bw=.*iops=.*runt=.*\|^ *lat.*min=.*max=.*avg=.*stdev=.*\" "+path, True)
stdout1, stderr1 = common.bash("grep \" *1.00th.*],\| *30.00th.*],\| *70.00th.*],\| *99.00th.*],\| *99.99th.*]\" "+path, True)
stdout1_read, stderr1_read = common.bash("grep \" *1.00th.*],\| *30.00th.*],\| *70.00th.*],\| *99.00th.*],\| *99.99th.*]\" "+path+" | head -5", True)
stdout1_write, stderr1_write = common.bash("grep \" *1.00th.*],\| *30.00th.*],\| *70.00th.*],\| *99.00th.*],\| *99.99th.*]\" "+path+" | tail -5", True)
stdout2, stderr2 = common.bash("grep \" *clat percentiles\" "+path, True)
lat_per_dict = {}
if stdout1 != '':
lat_per_dict = self.get_lat_persent_dict(stdout1)
lat_per_dict_read = {}
lat_per_dict_write = {}
if stdout1_read != '':
lat_per_dict_read = self.get_lat_persent_dict(stdout1_read)
if stdout1_write != '':
lat_per_dict_write = self.get_lat_persent_dict(stdout1_write)

fio_data_rw = {}
fio_data_rw["read"] = {}
Expand Down Expand Up @@ -999,17 +1024,30 @@ def process_fio_data(self, path, dirname):
output_fio_data['write_iops'] = 0
output_fio_data['write_bw'] = 0
output_fio_data['write_runtime'] = 0
if len(lat_per_dict) != 0:

if len(lat_per_dict_read) != 0:
for tmp_key in ["95.00th", "99.00th", "99.99th"]:
if tmp_key in lat_per_dict.keys():
if tmp_key in lat_per_dict_read.keys():
lat_persent_unit = re.findall(r"(?<=[\(])[^\)]+(?=[\)])", stdout2.strip('\n').strip(' ').replace(' ',''))
if len(lat_persent_unit) != 0:
output_fio_data[tmp_key+"%_lat"] = float(common.time_to_sec("%s%s" % (lat_per_dict[tmp_key], lat_persent_unit[0]),'msec'))
output_fio_data[tmp_key+"%_read_lat"] = float(common.time_to_sec("%s%s" % (lat_per_dict_read[tmp_key], lat_persent_unit[0]), 'msec'))
else:
output_fio_data[tmp_key+"%_read_lat"] = 'null'
else:
output_fio_data[tmp_key+"%_read_lat"] = 'null'

if len(lat_per_dict_write) != 0:
for tmp_key in ["95.00th", "99.00th", "99.99th"]:
if tmp_key in lat_per_dict_write.keys():
lat_persent_unit = re.findall(r"(?<=[\(])[^\)]+(?=[\)])", stdout2.strip('\n').strip(' ').replace(' ',''))
if len(lat_persent_unit) == 1:
output_fio_data[tmp_key+"%_write_lat"] = float(common.time_to_sec("%s%s" % (lat_per_dict_write[tmp_key], lat_persent_unit[0]), 'msec'))
elif len(lat_persent_unit) == 2:
output_fio_data[tmp_key+"%_write_lat"] = float(common.time_to_sec("%s%s" % (lat_per_dict_write[tmp_key], lat_persent_unit[1]), 'msec'))
else:
output_fio_data[tmp_key+"%_lat"] = 'null'
output_fio_data[tmp_key+"%_write_lat"] = 'null'
else:
output_fio_data[tmp_key+"%_lat"] = 'null'
output_fio_data[tmp_key+"%_write_lat"] = 'null'
output_fio_data['lat_unit'] = 'msec'
output_fio_data['runtime_unit'] = 'sec'
output_fio_data['bw_unit'] = 'MB/s'
Expand Down
1 change: 1 addition & 0 deletions benchmarking/mod/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def run(self):
common.pdsh(user, nodes, "sync && echo '%s' > /proc/sys/vm/drop_caches" % self.cluster["cache_drop_level"])

#send command to ceph cluster
common.pdsh(user, [head], "ceph osd pool application enable rbd rbd", option="console")
common.pdsh(user, nodes, "for i in `seq 1 %d`;do echo `date \"+%s\"` `ceph health` >> %s/`hostname`_ceph_health.txt; sleep %s;done" % (time_tmp/int(monitor_interval)+1, "%Y_%m_%d %H:%M:%S", dest_dir, monitor_interval), option="force")
common.pdsh(user, nodes, "ps aux | grep ceph-osd | grep -v 'grep' > %s/`hostname`_ps.txt" % (dest_dir))
common.pdsh(user, nodes, "date > %s/`hostname`_process_log.txt" % (dest_dir))
Expand Down
2 changes: 1 addition & 1 deletion conf/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ def get_ceph_health(user, node):
res = format_pdsh_return(stdout)
if len(res):
stdout = res[node]
output["ceph_status"] = stdout['health']['overall_status']
output["ceph_status"] = stdout['health']['status']
output["detail"] = stdout['health']['checks']
if "write_bytes_sec" in stdout['pgmap']:
str_wb = str(stdout['pgmap']['write_bytes_sec'] / 1024 / 1024) + ' MB/s wr, '
Expand Down
1 change: 0 additions & 1 deletion conf/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ def list_required_config(self):
required_list["system"]["disk|read_ahead_kb"] = 2048
required_list["ceph_tuning"] = OrderedDict()
required_list["ceph_tuning"]["pool|rbd|size"] = 2
required_list["ceph_tuning"]["global|mon_pg_warn_max_per_osd"] = 1000
required_list["analyzer"] = OrderedDict()
required_list["analyzer"]["analyzer"] = "all"

Expand Down
10 changes: 6 additions & 4 deletions deploy/mod/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(self, tunings=""):
self.cluster["mdss"][mds] = ip_handler.getIpByHostInSubnet(mds)

for osd in self.cluster["osds"]:
devices_id = self.translate_to_id(self.all_conf_data.get_list(osd))
devices_id = self.translate_to_id(osd, self.all_conf_data.get_list(osd))
self.cluster[osd] = devices_id

self.cluster["fs"] = "xfs"
Expand Down Expand Up @@ -913,12 +913,14 @@ def start_osd(self):
def start_mgr(self, force=False):
user = self.cluster["user"]
head = self.cluster["head"]
outStr, stderr = common.pdsh(user, [head], "ceph status --format json", "check_return")
outStr, stderr = common.pdsh(user, [head], "ceph status --format json-pretty", "check_return")
formatted_outStr = common.format_pdsh_return(outStr)
ceph_status = formatted_outStr[head]
#outList = [x.strip() for x in outStr.split('\n')]
common.pdsh(user, [head], "mkdir -p /var/lib/ceph/mgr/", option="console")
common.pdsh(user, [head], "ceph auth get-or-create mgr.admin mon 'allow profile mgr' osd 'allow *' mds 'allow *' 2>/dev/null 1>/var/lib/ceph/mgr/ceph-admin && ceph-mgr -i admin", option="console")

if "no active mgr" in outStr:
common.pdsh(user, [head], "ceph auth get-or-create mgr.admin mon 'allow *' && ceph-mgr -i %s" % ceph_status["fsid"], option="console")
common.pdsh(user, [head], "ceph auth get-or-create mgr.admin mon 'allow profile mgr' osd 'allow *' mds 'allow *' 2>/dev/null 1>/var/lib/ceph/mgr/ceph-admin && ceph-mgr -i admin", option="console")
common.printout("LOG", "create mgr success: admin")
else:
common.printout("LOG", "not need create mgr")
Expand Down
8 changes: 6 additions & 2 deletions tuner/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ def handle_disk(self, option="get", param={'read_ahead_kb':2048, 'max_sectors_kb
for osd in osds:
for device in self.cluster[osd]:
parsed_device_name = common.parse_device_name(device)
parsed_device_name = 'loop' + re.findall(r'\d', parsed_device_name)[0]
tmp = {}
for key, value in param.items():
stdout, stderr = common.pdsh(user, [osd], 'sh -c "cat /sys/block/%s/queue/%s"' % (parsed_device_name, key), option="check_return")
stdout, stderr = common.pdsh(user, [osd], 'sh -c "cat /sys/devices/virtual/block/%s/queue/%s"' % (parsed_device_name, key), option="check_return")
res = common.format_pdsh_return(stdout)
tmp[key] = res[osd]
stdout, stderr = common.pdsh(user, [osd], 'xfs_info %s' % (device), option="check_return")
Expand All @@ -69,8 +70,10 @@ def handle_disk(self, option="get", param={'read_ahead_kb':2048, 'max_sectors_kb
for osd in osds:
for device in self.cluster[osd]:
parsed_device_name = common.parse_device_name(device)
parsed_device_name = 'loop' + re.findall(r'\d', parsed_device_name)[0]
for key, value in param.items():
stdout, stderr = common.pdsh(user, [osd], 'sh -c "echo %s > /sys/block/%s/queue/%s"' % (str(value), parsed_device_name, key), option="check_return")
stdout, stderr = common.pdsh(user, [osd], 'sh -c "echo %s > /sys/devices/virtual/block/%s/queue/%s"' % (str(value), parsed_device_name, key), option="check_return")
common.printout("LOG", "change the read_ahead_kb to %s, device name = %s, key = %s" % (value, parsed_device_name, key))

def get_version(self):
common.printout("LOG","<CLASS_NAME:%s> Test start running function : %s"%(self.__class__.__name__,sys._getframe().f_code.co_name),screen=False,log_level="LVL4")
Expand Down Expand Up @@ -270,6 +273,7 @@ def apply_tuning(self, jobname, no_check = False):
else:
tmp_tuning_diff = ['global']

common.printout("LOG","tmp_tuning_diff = %s" % tmp_tuning_diff)
if 'disk' in tmp_tuning_diff:
param = {}
for param_name, param_data in self.worksheet[jobname]['disk'].items():
Expand Down