Skip to content

Commit

Permalink
Fix Klever bridge for several duplicated tasks in the job
Browse files Browse the repository at this point in the history
  • Loading branch information
vmordan committed Sep 18, 2023
1 parent 763c16e commit 34d6091
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 68 deletions.
77 changes: 43 additions & 34 deletions scripts/klever_bridge/index_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,55 +46,64 @@ def __parse_args():
return sys.argv[1]


def _iterate_over_tasks(tasks_dir: str, jobs: dict, tasks: dict, job_id=None):
def _iterate_over_tasks(tasks_dir: str, jobs: dict, tasks: dict):
job_dir = os.path.realpath(os.path.join(tasks_dir, os.pardir, "jobs"))
tasks_dir_list = glob.glob(os.path.join(tasks_dir, "*"))
tasks_num = len(tasks_dir_list)
counter = 0
prev_percent = 0
attrs_to_tasks = {}

def _save_attrs(cur_task_id):
attrs = "&&&".join(tasks[cur_task_id])
if attrs not in attrs_to_tasks:
attrs_to_tasks[attrs] = []
attrs_to_tasks[attrs].append(cur_task_id)

def _get_job_from_attrs(str_attrs: str) -> str:
attrs = str_attrs.split("&&&")
return attrs[2]

for task_dir in tasks_dir_list:
cil_file = os.path.join(task_dir, CIL_FILE)
task_id = os.path.basename(task_dir)
if task_id in tasks:
continue
counter += 1
percent = int(100 * counter / tasks_num)
if prev_percent != percent:
prev_percent = percent
print(f"Tasks processed {counter} ({percent}%)")
if task_id in tasks:
_save_attrs(task_id)
continue
if os.path.exists(cil_file):
with open(cil_file, encoding="utf8", errors='ignore') as cil_fp:
for line in cil_fp.readlines():
if job_id and job_id not in line:
continue
if "vtg" not in line or "emg" not in line:
continue
if job_id:
res = re.search(
rf"{job_dir}/{job_id}/klever-core-work-dir/job/vtg/(.+)\.ko/(.+)/emg",
line
)
if res:
module = res.group(1) + ".ko"
prop = PROP_NAMING.get(res.group(2), res.group(2))
if job_id not in jobs:
jobs[job_id] = []
jobs[job_id].append(task_id)
tasks[task_id] = [module, prop]
break
else:
res = re.search(
rf"{job_dir}/(.+)/klever-core-work-dir/job/vtg/(.+)\.ko/(.+)/emg", line
)
if res:
new_job_id = res.group(1)
module = res.group(2) + ".ko"
prop = PROP_NAMING.get(res.group(3), res.group(3))
if new_job_id not in jobs:
jobs[new_job_id] = []
jobs[new_job_id].append(task_id)
tasks[task_id] = [module, prop]
break
res = re.search(
rf"{job_dir}/(.+)/klever-core-work-dir/job/vtg/(.+)\.ko/(.+)/emg", line
)
if res:
new_job_id = res.group(1)
module = res.group(2) + ".ko"
prop = res.group(3)
prop = PROP_NAMING.get(prop, prop)
task_attrs = [module, prop, new_job_id]
# if not _check_attrs(task_attrs):
if new_job_id not in jobs:
jobs[new_job_id] = []
jobs[new_job_id].append(task_id)
tasks[task_id] = task_attrs
_save_attrs(task_id)
break
for target_attrs, task_ids in attrs_to_tasks.items():
if len(task_ids) > 1:
target_job_id = _get_job_from_attrs(target_attrs)
transformed_list = sorted([int(t_id) for t_id in task_ids])
for elem in transformed_list[:-1]:
elem = str(elem)
if elem in jobs[target_job_id]:
jobs[target_job_id].remove(elem)


def _save_index(jobs: dict, tasks: dict):
Expand All @@ -109,19 +118,19 @@ def _upload_index() -> tuple:
def _proc_single_file(file_name: str) -> dict:
if os.path.exists(file_name):
with open(file_name, "r", encoding="ascii") as index_fp:
return json.load(index_fp)
return json.load(index_fp, parse_int=int)
return {}
jobs = _proc_single_file(JOBS_FILE)
tasks = _proc_single_file(TASKS_FILE)
return jobs, tasks


def index_klever_tasks(tasks_dir: str, job_id=None) -> tuple:
def index_klever_tasks(tasks_dir: str) -> tuple:
"""
Upload index cache and update it.
"""
jobs, tasks = _upload_index()
_iterate_over_tasks(tasks_dir, jobs, tasks, job_id=job_id)
_iterate_over_tasks(tasks_dir, jobs, tasks)
_save_index(jobs, tasks)
return jobs, tasks

Expand Down
69 changes: 35 additions & 34 deletions scripts/klever_bridge/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __process_single_klever_result(self, result: VerificationResults, output_dir
queue.put(result)
sys.exit(0)

def __parse_tasks_dir(self, processed_tasks: dict):
def __parse_tasks_dir(self, processed_tasks: dict, job_id: str):
queue = multiprocessing.Queue()
process_pool = []
results = []
Expand All @@ -79,7 +79,7 @@ def __parse_tasks_dir(self, processed_tasks: dict):

for output_dir, tasks_args in processed_tasks.items():
xml_file = tasks_args[TAG_TASK_XML]
module, prop = tasks_args[TAG_TASK_ARGS]
module, prop, _ = tasks_args[TAG_TASK_ARGS]
tree = ElementTree.ElementTree()
tree.parse(xml_file)
root = tree.getroot()
Expand Down Expand Up @@ -129,13 +129,13 @@ def __parse_tasks_dir(self, processed_tasks: dict):
kill_launches(process_pool)
wait_for_launches(process_pool)
self._get_from_queue_into_list(queue, results)
return self._export_results(results, job_config, self.__parse_job_resource_log())
return self._export_results(results, job_config, self.__parse_job_resource_log(job_id))

def __parse_job_resource_log(self) -> dict:
def __parse_job_resource_log(self, job_id: str) -> dict:
job_resources = {TAG_CPU_TIME: 0.0, TAG_WALL_TIME: 0.0, TAG_MEMORY_USAGE: 0}
if not self.job_id:
if not job_id:
return {}
res_file = os.path.join(self.output_dir, os.path.pardir, JOBS_DIR, self.job_id,
res_file = os.path.join(self.output_dir, os.path.pardir, JOBS_DIR, job_id,
JOB_RES_FILE)
if not os.path.exists:
self.logger.warning(f"File with job resources {res_file} does not exist")
Expand All @@ -156,31 +156,32 @@ def process_results(self):
"""
# TODO: support launching klever tasks.
self.logger.info("Indexing klever tasks files")
jobs_to_tasks, tasks_to_attrs = index_klever_tasks(self.output_dir, self.job_id)
self.logger.info(f"Process job {self.job_id}")
processed_tasks = {}
self.job_name_suffix = self.job_id
for task_id in jobs_to_tasks[self.job_id]:
path_to_dir = os.path.join(self.output_dir, str(task_id), "output")
xml_files = glob.glob(os.path.join(path_to_dir, '*results.*xml'))
if len(xml_files) != 1:
self.logger.error(f"Abnormal number of xml reports {len(xml_files)} for task "
f"{task_id} in a directory {path_to_dir}")
continue
processed_tasks[path_to_dir] = {
TAG_TASK_XML: xml_files[0],
TAG_TASK_ARGS: tasks_to_attrs[task_id]
}
self.logger.info(f"Got {len(processed_tasks)} tasks")
uploader_config = self.config.get(UPLOADER, {})
is_upload = uploader_config and uploader_config.get(TAG_UPLOADER_UPLOAD_RESULTS, False)
self.process_dir = os.path.abspath(tempfile.mkdtemp(dir=self.work_dir))
result_archive = self.__parse_tasks_dir(processed_tasks)
if is_upload and result_archive:
self._upload_results(uploader_config, result_archive)
if not self.debug:
shutil.rmtree(self.process_dir, ignore_errors=True)
if not self.debug:
clear_symlink(self.tasks_dir)
for task_dir_in in glob.glob(os.path.join(self.tasks_dir, "*")):
clear_symlink(task_dir_in)
jobs_to_tasks, tasks_to_attrs = index_klever_tasks(self.output_dir)
for job_id in self.job_id.split(","):
self.logger.info(f"Process job {job_id}")
processed_tasks = {}
self.job_name_suffix = job_id
for task_id in jobs_to_tasks[job_id]:
path_to_dir = os.path.join(self.output_dir, str(task_id), "output")
xml_files = glob.glob(os.path.join(path_to_dir, '*results.*xml'))
if len(xml_files) != 1:
self.logger.error(f"Abnormal number of xml reports {len(xml_files)} for task "
f"{task_id} in a directory {path_to_dir}")
continue
processed_tasks[path_to_dir] = {
TAG_TASK_XML: xml_files[0],
TAG_TASK_ARGS: tasks_to_attrs[task_id]
}
self.logger.info(f"Got {len(processed_tasks)} tasks")
uploader_config = self.config.get(UPLOADER, {})
is_upload = uploader_config and uploader_config.get(TAG_UPLOADER_UPLOAD_RESULTS, False)
self.process_dir = os.path.abspath(tempfile.mkdtemp(dir=self.work_dir))
result_archive = self.__parse_tasks_dir(processed_tasks, job_id)
if is_upload and result_archive:
self._upload_results(uploader_config, result_archive)
if not self.debug:
shutil.rmtree(self.process_dir, ignore_errors=True)
if not self.debug:
clear_symlink(self.tasks_dir)
for task_dir_in in glob.glob(os.path.join(self.tasks_dir, "*")):
clear_symlink(task_dir_in)

0 comments on commit 34d6091

Please sign in to comment.