diff --git a/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py b/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py index 9783aae1..9c675789 100644 --- a/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py +++ b/pytest-embedded-idf/pytest_embedded_idf/unity_tester.py @@ -6,9 +6,9 @@ import time import typing as t import warnings +from collections import namedtuple from collections.abc import Iterable from dataclasses import dataclass -from threading import Semaphore, Thread import pexpect from pexpect.exceptions import TIMEOUT @@ -23,9 +23,9 @@ if t.TYPE_CHECKING: from .dut import IdfDut - DEFAULT_START_RETRY = 3 DEFAULT_TIMEOUT = 90 +WAIT_FOR_MENU_TIMEOUT = 10 READY_PATTERN_LIST = [ 'Press ENTER to see the list of tests', @@ -247,10 +247,14 @@ def wrapper(self, *args, **kwargs): _timeout = kwargs.get('timeout', 30) _case = args[0] + if _case.type not in func.__name__: + logging.warning('The %s case can\'t be executed with %s function.', _case.type, func.__name__) + return + try: # do it here since the first hard reset before test case shouldn't be counted in duration time if 'reset' in kwargs: - if kwargs.pop('reset') and self._hard_reset_func: + if kwargs.get('reset') and self._hard_reset_func: try: self._hard_reset_func() except NotImplementedError: @@ -322,6 +326,7 @@ def _add_single_unity_test_case( def _run_normal_case( self, case: UnittestMenuCase, + *, reset: bool = False, # noqa timeout: float = 30, ) -> None: @@ -347,6 +352,7 @@ def _run_normal_case( def _run_multi_stage_case( self, case: UnittestMenuCase, + *, reset: bool = False, # noqa timeout: float = 30, ) -> None: @@ -416,32 +422,9 @@ def run_all_single_board_cases( self._run_multi_stage_case(case, reset=reset, timeout=timeout) -class MultiDevResource: +class _MultiDevTestDut: """ - Resources of multi_dev dut - - Attributes: - dut (IdfDut): Object of the Device under test - sem (Semaphore): Semaphore of monitoring whether the case finished - recv_sig (t.List[str]): The list of received signals from other dut - thread (Thread): The thread of monitoring the signals - """ - - def __init__(self, dut: 'IdfDut') -> None: - self.dut = dut - self.sem = Semaphore() - self.recv_sig: t.List[str] = [] - self.thread: Thread = None # type: ignore - - -class CaseTester: - """ - The Generic tester of all the types - - Attributes: - group (t.List[MultiDevResource]): The group of the devices' resources - dut (IdfDut): The first dut if there is more than one - test_menu (t.List[UnittestMenuCase]): The list of the cases + Dut control for multidevice test case """ # The signal pattens come from 'test_utils.c' @@ -449,139 +432,311 @@ class CaseTester: WAIT_SIGNAL_PREFIX = 'Waiting for signal: ' UNITY_SEND_SIGNAL_REGEX = SEND_SIGNAL_PREFIX + r'\[(.*?)\]!' UNITY_WAIT_SIGNAL_REGEX = WAIT_SIGNAL_PREFIX + r'\[(.*?)\]!' + signal_pattern_list = [ + UNITY_SEND_SIGNAL_REGEX, # The dut send a signal + UNITY_WAIT_SIGNAL_REGEX, # The dut is blocked and waiting for a signal + UNITY_SUMMARY_LINE_REGEX, # Means the case finished + ] - def __init__(self, dut: t.Union['IdfDut', t.List['IdfDut']]) -> None: # type: ignore - """ - Create the object for every dut and put them into the group - """ - if isinstance(dut, Iterable): - self.is_multi_dut = True - self.dut = list(dut) - self.first_dut = self.dut[0] - self.test_menu = self.first_dut.test_menu - else: - self.is_multi_dut = False - self.dut = dut - self.first_dut = dut - self.test_menu = self.dut.test_menu - - if self.is_multi_dut: - self.group: t.List[MultiDevResource] = [] - if isinstance(dut, list): - for item in dut: - dev_res = MultiDevResource(item) - self.group.append(dev_res) - - def _wait_multi_dev_case_finish(self, timeout: float = DEFAULT_TIMEOUT) -> None: - """ - Wait until all the sub-cases of this multi_device case finished - """ - for d in self.group: - if d.sem.acquire(timeout=timeout): - d.sem.release() - else: - raise TimeoutError('Wait case to finish timeout') + DevResponse = namedtuple('DevResponse', ['completed', 'data']) - def _start_sub_case_thread( + def __init__( self, - dev_res: MultiDevResource, - case: UnittestMenuCase, - sub_case_index: int, - case_start_time: float, - start_retry: int = DEFAULT_START_RETRY, - ) -> None: - """ - Start the thread monitoring on the corresponding dut of the sub-case - """ - # Allocate the kwargs that pass to '_run' - _kwargs = { - 'dut': dev_res.dut, - 'dev_res': dev_res, - 'case': case, - 'sub_case_index': sub_case_index, - 'start_retry': start_retry, - 'start_time': case_start_time, - } - - # Create the thread of the sub-case - dev_res.thread = Thread(target=self._run, kwargs=_kwargs, daemon=True) - dev_res.thread.start() - # Thread starts, acquire the semaphore to block '_wait_multi_dev_case_finish' - dev_res.sem.acquire() - - def _run(self, **kwargs) -> None: # type: ignore - """ - The thread target function - Will run for each case on each dut + dut, + case, + sub_case_index, + shared_message_query, + start_retry, + wait_for_menu_timeout=WAIT_FOR_MENU_TIMEOUT, + runtest_timeout=DEFAULT_TIMEOUT, + ): + self.dut = dut + self.case = case + self.sub_case_index = sub_case_index + self.shared_message_query = shared_message_query + self.dut_index = self.sub_case_index - 1 + self.start_retry = start_retry + self.wait_for_menu_timeout = wait_for_menu_timeout + self.runtest_timeout = runtest_timeout + + self.work = self.run_case() + self.init_time = time.perf_counter() + self.response: _MultiDevTestDut.DevResponse = _MultiDevTestDut.DevResponse(False, None) + + def __iter__(self): + return self.work + + def __next__(self): + if self.response.completed: + return self.response + else: + try: + next(self.__iter__()) + except StopIteration as e: + self.response = _MultiDevTestDut.DevResponse(True, e.value) + self.work.close() + except TIMEOUT as e: + self.response = _MultiDevTestDut.DevResponse(True, e) + self.work.close() + return self.response - Call the wrapped function to trigger the case - Then keep listening on the dut for the signal + def close(self): + self.work.close() - - If the dut send a signal, it will be put into others' recv_sig - - If the dut waits for a signal, it block and keep polling for the recv_sig until get the signal it requires - - If the dut finished running the case, it will quite the loop and terminate the thread - """ - signal_pattern_list = [ - self.UNITY_SEND_SIGNAL_REGEX, # The dut send a signal - self.UNITY_WAIT_SIGNAL_REGEX, # The dut is blocked and waiting for a signal - UNITY_SUMMARY_LINE_REGEX, # Means the case finished - ] - dut = kwargs['dut'] - dev_res = kwargs['dev_res'] - case = kwargs['case'] - sub_case_index = kwargs['sub_case_index'] - start_retry = kwargs['start_retry'] - start_time = kwargs['start_time'] - # Start the case - dut.expect_exact(READY_PATTERN_LIST) - # Retry at defined number of times if not write successfully - for retry in range(start_retry): - dut.write(str(case.index)) + def run_case(self): + yield from self._expect_exact(READY_PATTERN_LIST, self.wait_for_menu_timeout) + + for retry in range(self.start_retry): + self.dut.write(str(self.case.index)) try: - dut.expect_exact(case.name, timeout=1) + yield from self._expect_exact(self.case.name, 1) break except TIMEOUT as e: - if retry >= start_retry - 1: - dev_res.sem.release() + if retry >= self.start_retry - 1: raise e + self.dut.write(str(self.sub_case_index)) - dut.write(str(sub_case_index)) - - # Wait for the specific patterns, only exist when the sub-case finished + _start_time = time.perf_counter() while True: - pat = dut.expect(signal_pattern_list, timeout=60) + _current = time.perf_counter() + _timeout = _start_time + self.runtest_timeout - _current + + if _timeout < 0: + raise TIMEOUT('Tasks timed out, without other exception') + + pat = yield from self._expect(self.signal_pattern_list, _timeout) + if pat is not None: match_str = pat.group().decode('utf-8') # Send a signal if self.SEND_SIGNAL_PREFIX in match_str: send_sig = pat.group(1).decode('utf-8') - for d in self.group: - d.recv_sig.append(send_sig) + for i, q in enumerate(self.shared_message_query): + if i != self.dut_index: + q.append(send_sig) # Waiting for a signal elif self.WAIT_SIGNAL_PREFIX in match_str: wait_sig = pat.group(1).decode('utf-8') while True: - if wait_sig in dev_res.recv_sig: - dev_res.recv_sig.remove(wait_sig) - dut.write('') + yield + if wait_sig in self.shared_message_query[self.dut_index]: + self.shared_message_query[self.dut_index].remove(wait_sig) + self.dut.write('') break # Keep waiting the signal else: - time.sleep(0.1) + if _start_time + self.runtest_timeout < time.perf_counter(): + raise TIMEOUT('Not receive signal %r' % wait_sig) # Case finished elif 'Tests' in match_str: - case_end_time = time.perf_counter() - case_duration = case_end_time - start_time + case_duration = time.perf_counter() - _start_time additional_attrs = {'time': round(case_duration, 3)} - log = remove_asci_color_code(dut.pexpect_proc.before) - dut.testsuite.add_unity_test_cases(log, additional_attrs=additional_attrs) - break + log = remove_asci_color_code(self.dut.pexpect_proc.before) + return log, additional_attrs + + def _expect_exact(self, pattern, timeout): + start = time.perf_counter() + while True: + try: + yield 'Await for result' + res = self.dut.expect_exact(pattern, timeout=0.01) + return res + except TIMEOUT as e: + if time.perf_counter() - start > timeout: + raise e + + def _expect(self, pattern, timeout): + start = time.perf_counter() + while True: + try: + yield 'Await for result' + res = self.dut.expect(pattern, timeout=0.01) + return res + except TIMEOUT as e: + if time.perf_counter() - start > timeout: + raise e + + def process_raw_report_data(self, raw_data_to_report) -> t.Dict: + additional_attrs = {} + if isinstance(raw_data_to_report, tuple) and len(raw_data_to_report) == 2: + log = str(raw_data_to_report[0]) + additional_attrs = raw_data_to_report[1] + else: + log = str(raw_data_to_report) + + if log: + # check format + check = UNITY_FIXTURE_REGEX.search(log) + if check: + regex = UNITY_FIXTURE_REGEX + else: + regex = UNITY_BASIC_REGEX + + res = list(regex.finditer(log)) + else: + res = [] + + # real parsing + if len(res) == 0: + logging.warning(f'unity test case not found, use case {self.case.name} instead') + attrs = { + 'name': self.case.name, + 'result': 'FAIL', + 'message': self.dut.pexpect_proc.buffer_debug_str, + 'time': time.perf_counter() - self.init_time, + } + elif len(res) == 1: + attrs = {k: v for k, v in res[0].groupdict().items() if v is not None} + else: + warnings.warn('This function is for recording single unity test case only. Use the last matched one') + attrs = {k: v for k, v in res[-1].groupdict().items() if v is not None} + + if additional_attrs: + attrs.update(additional_attrs) + + if log: + attrs.update({'stdout': log}) + + return attrs + + def add_to_report(self, attrs): + testcase = TestCase(**attrs) + self.dut.testsuite.testcases.append(testcase) - # The case finished, release the semaphore to unblock the '_wait_multi_dev_case_finish' - dev_res.sem.release() + if testcase.result == 'FAIL': + self.dut.testsuite.attrs['failures'] += 1 + elif testcase.result == 'IGNORE': + self.dut.testsuite.attrs['skipped'] += 1 + else: + self.dut.testsuite.attrs['tests'] += 1 + + +class MultiDevRunTestManager: + """ + Manager for control dut generator function + """ + + def __init__(self, duts, case, start_retry, wait_for_menu_timeout, runtest_timeout): + self.case = case + self.workers: t.List[_MultiDevTestDut] = [] + shared_query = [[] for _ in case.subcases] + for sub_case in case.subcases: + index: int + if isinstance(sub_case['index'], str): + index = int(sub_case['index'], 10) + else: + index = sub_case['index'] + + self.workers.append( + _MultiDevTestDut( + dut=duts[index - 1], + case=case, + sub_case_index=index, + shared_message_query=shared_query, + start_retry=start_retry, + wait_for_menu_timeout=wait_for_menu_timeout, + runtest_timeout=runtest_timeout, + ) + ) + + def next_for_all(self): + res = [] + err = [] + for i, it in enumerate(self.workers): + try: + r = next(it) + if r.completed: + res.append(r.data) + except Exception as e: + err.append(e) + return res, err + + def gather(self): + try: + while True: + res, er = self.next_for_all() + if er: + raise Exception('There are Exception: ', er) + if len(res) == len(self.workers): + return res + finally: + for _t in self.workers: + _t.close() + + def get_processed_report_data(self, res: t.List[t.Any]) -> t.List[t.Dict]: + return [self.workers[i].process_raw_report_data(res[i]) for i in range(len(res))] + + @staticmethod + def get_merge_data(test_cases_attr: t.List[t.Dict]) -> t.Dict: + output = {} + results = set() + time_attr = 0.0 + name_attr = set() + for ind, attr in enumerate(test_cases_attr): + for k, val in attr.items(): + if k == 'result': + results.add(val) + continue + if k == 'name': + name_attr.add(val) + continue + if k == 'time': + time_attr = max(time_attr, float(val)) + continue + + if k not in output: + output[k] = [f'[dut-{ind}]: {val}'] + else: + output[k].append(f'[dut-{ind}]: {val}') + + for k, val in output.items(): + if k in ('file', 'line'): + output[k] = val[0] + else: + output[k] = '<------------------->\n'.join(val) + + output['time'] = time_attr + output['name'] = ' <---> '.join(list(name_attr)) + + if 'FAIL' in results: + output['result'] = 'FAIL' + elif 'IGNORE' in results: + output['result'] = 'IGNORE' + else: + output['result'] = (results - {'FAIL', 'IGNORE'}).pop() + + return output + + def add_report_to_first_dut(self, attrs): + self.workers[0].add_to_report(attrs) + + +class CaseTester: + """ + The Generic tester of all the types + + Attributes: + dut (IdfDut): The first dut if there is more than one + test_menu (t.List[UnittestMenuCase]): The list of the cases + """ + + def __init__(self, dut: t.Union['IdfDut', t.List['IdfDut']]) -> None: # type: ignore + """ + Create the object for every dut and put them into the group + """ + if isinstance(dut, Iterable): + self.is_multi_dut = True + self.dut: t.List['IdfDut'] = list(dut) + self.first_dut = self.dut[0] + self.test_menu = self.first_dut.test_menu + else: + self.is_multi_dut = False + self.dut = [dut] + self.first_dut = dut + self.test_menu = self.dut[0].test_menu def run_multi_dev_case( self, @@ -615,24 +770,66 @@ def run_multi_dev_case( return if reset: - for dev_res in self.group: - dev_res.dut.serial.hard_reset() + for dut in self.dut: + dut.serial.hard_reset() - start_time = time.perf_counter() - for sub_case in case.subcases: - if isinstance(sub_case['index'], str): - index = int(sub_case['index'], 10) - else: - index = sub_case['index'] - self._start_sub_case_thread( - dev_res=self.group[index - 1], - case=case, - sub_case_index=index, - case_start_time=start_time, - start_retry=start_retry, - ) - # Waiting all the devices to finish their test cases - self._wait_multi_dev_case_finish(timeout=timeout) + mdm = MultiDevRunTestManager( + duts=self.dut, case=case, start_retry=start_retry, wait_for_menu_timeout=timeout, runtest_timeout=timeout + ) + res = mdm.gather() + data_to_report = mdm.get_processed_report_data(res) + merged_data = mdm.get_merge_data(data_to_report) + mdm.add_report_to_first_dut(merged_data) + + def run_normal_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None: + """ + Run a specific normal case + + Notes: + Will skip if the case type is not normal + + Args: + case: the specific case that parsed in test menu + reset: whether do a hardware reset before running the case + timeout: timeout in second + """ + self.first_dut._run_normal_case(case, reset=reset, timeout=timeout) + + def run_multi_stage_case(self, case: UnittestMenuCase, reset: bool = False, timeout: int = 90) -> None: + """ + Run a specific multi_stage case + + Notes: + Will skip if the case type is not multi_stage + + Args: + case: the specific case that parsed in test menu + reset: whether do a hardware reset before running the case + timeout: timeout in second + """ + self.first_dut._run_multi_stage_case(case, reset=reset, timeout=timeout) + + def run_all_normal_cases(self, reset: bool = False, timeout: int = 90) -> None: + """ + Run all normal cases + + Args: + reset: whether do a hardware reset before running the case + timeout: timeout in second + """ + for case in self.test_menu: + self.run_normal_case(case, reset, timeout=timeout) + + def run_all_multi_stage_cases(self, reset: bool = False, timeout: int = 90) -> None: + """ + Run all multi_stage cases + + Args: + reset: whether do a hardware reset before running the case + timeout: timeout in second + """ + for case in self.test_menu: + self.run_multi_stage_case(case, reset=reset, timeout=timeout) def run_all_multi_dev_cases( self, @@ -682,9 +879,13 @@ def run_case( Args: case: the specific case that parsed in test menu reset: whether to perform a hardware reset before running a case - timeout: timeout in second + timeout: timeout in second, setup time excluded start_retry (int): number of retries for a single case when it is failed to start """ + + if case.attributes.get('timeout'): + timeout = int(case.attributes['timeout']) + if case.type == 'normal': self.first_dut._run_normal_case(case, reset=reset, timeout=timeout) elif case.type == 'multi_stage': diff --git a/pytest-embedded-idf/tests/test_idf.py b/pytest-embedded-idf/tests/test_idf.py index b98ac53a..ad7bb964 100644 --- a/pytest-embedded-idf/tests/test_idf.py +++ b/pytest-embedded-idf/tests/test_idf.py @@ -544,7 +544,7 @@ def test_unity_test_case_runner(unity_tester): assert junit_report.attrib['errors'] == '0' assert junit_report.attrib['failures'] == '1' assert junit_report.attrib['skipped'] == '0' - assert junit_report.attrib['tests'] == '4' + assert junit_report.attrib['tests'] == '3' case_names_one_dev = [ 'normal_case1', @@ -559,7 +559,7 @@ def test_unity_test_case_runner(unity_tester): multi_dev_duts = ['dut-0', 'dut-1'] required_names_one_dev = [f'{case_name} [{dut}]' for dut in one_dev_dut for case_name in case_names_one_dev] - required_names_multi_dev = [f'{case_name} [{dut}]' for dut in multi_dev_duts for case_name in case_names_multi_dev] + required_names_multi_dev = [f'{case_name} [{multi_dev_duts[0]}]' for case_name in case_names_multi_dev] junit_case_names = [item.attrib['name'] for item in junit_report]