From 7c6a7c887ce11fec74a2e9b275c7e432859d300b Mon Sep 17 00:00:00 2001 From: coderustic Date: Sat, 16 Nov 2024 08:57:36 -0800 Subject: [PATCH 1/2] Refactored coverage processor * A new class hierarchy for processing coverage from different tools * New interface representing CoverageReport and CoverageData * A factory to create the appropriate coverage processor --- cover_agent/CoverAgent.py | 14 +- cover_agent/CoverageProcessor.py | 413 ----------------------- cover_agent/UnitTestGenerator.py | 1 - cover_agent/UnitTestValidator.py | 117 ++----- cover_agent/coverage/processor.py | 362 ++++++++++++++++++++ tests/coverage/test_processor.py | 379 +++++++++++++++++++++ tests/test_CoverAgent.py | 17 +- tests/test_CoverageProcessor.py | 532 ------------------------------ tests/test_UnitTestGenerator.py | 1 - tests/test_UnitTestValidator.py | 37 +-- 10 files changed, 807 insertions(+), 1066 deletions(-) delete mode 100644 cover_agent/CoverageProcessor.py create mode 100644 cover_agent/coverage/processor.py create mode 100644 tests/coverage/test_processor.py delete mode 100644 tests/test_CoverageProcessor.py diff --git a/cover_agent/CoverAgent.py b/cover_agent/CoverAgent.py index 8881d08b1..266fc8d63 100644 --- a/cover_agent/CoverAgent.py +++ b/cover_agent/CoverAgent.py @@ -199,19 +199,19 @@ def run_test_gen(self, failed_test_runs: List, language: str, test_framework: st # Check if the desired coverage has been reached failed_test_runs, language, test_framework, coverage_report = self.test_validator.get_coverage() - if self.test_validator.current_coverage >= (self.test_validator.desired_coverage / 100): + if self.test_validator.get_current_coverage() >= (self.test_validator.desired_coverage / 100): break # Log the final coverage - if self.test_validator.current_coverage >= (self.test_validator.desired_coverage / 100): + if self.test_validator.get_current_coverage() >= (self.test_validator.desired_coverage / 100): self.logger.info( - f"Reached above target coverage of {self.test_validator.desired_coverage}% (Current Coverage: {round(self.test_validator.current_coverage * 100, 2)}%) in {iteration_count} iterations." + f"Reached above target coverage of {self.test_validator.desired_coverage}% (Current Coverage: {round(self.test_validator.get_current_coverage() * 100, 2)}%) in {iteration_count} iterations." ) elif iteration_count == self.args.max_iterations: if self.args.diff_coverage: - failure_message = f"Reached maximum iteration limit without achieving desired diff coverage. Current Coverage: {round(self.test_validator.current_coverage * 100, 2)}%" + failure_message = f"Reached maximum iteration limit without achieving desired diff coverage. Current Coverage: {round(self.test_validator.get_current_coverage() * 100, 2)}%" else: - failure_message = f"Reached maximum iteration limit without achieving desired coverage. Current Coverage: {round(self.test_validator.current_coverage * 100, 2)}%" + failure_message = f"Reached maximum iteration limit without achieving desired coverage. Current Coverage: {round(self.test_validator.get_current_coverage() * 100, 2)}%" if self.args.strict_coverage: # User requested strict coverage (similar to "--cov-fail-under in pytest-cov"). Fail with exist code 2. self.logger.error(failure_message) @@ -237,11 +237,11 @@ def run_test_gen(self, failed_test_runs: List, language: str, test_framework: st def log_coverage(self): if self.args.diff_coverage: self.logger.info( - f"Current Diff Coverage: {round(self.test_validator.current_coverage * 100, 2)}%" + f"Current Diff Coverage: {round(self.test_validator.get_current_coverage() * 100, 2)}%" ) else: self.logger.info( - f"Current Coverage: {round(self.test_validator.current_coverage * 100, 2)}%" + f"Current Coverage: {round(self.test_validator.get_current_coverage() * 100, 2)}%" ) self.logger.info(f"Desired Coverage: {self.test_validator.desired_coverage}%") diff --git a/cover_agent/CoverageProcessor.py b/cover_agent/CoverageProcessor.py deleted file mode 100644 index 08d07e8d0..000000000 --- a/cover_agent/CoverageProcessor.py +++ /dev/null @@ -1,413 +0,0 @@ -from cover_agent.CustomLogger import CustomLogger -from typing import Literal, Tuple, Union, List -import csv -import json -import os -import re -import xml.etree.ElementTree as ET - - -class CoverageProcessor: - def __init__( - self, - file_path: str, - src_file_path: str, - coverage_type: Literal["cobertura", "lcov", "jacoco"], - use_report_coverage_feature_flag: bool = False, - diff_coverage_report_path: str = None, - ): - """ - Initializes a CoverageProcessor object. - - Args: - file_path (str): The path to the coverage report file. - src_file_path (str): The fully qualified path of the file for which coverage data is being processed. - coverage_type (Literal["cobertura", "lcov"]): The type of coverage report being processed. - - Attributes: - file_path (str): The path to the coverage report file. - src_file_path (str): The fully qualified path of the file for which coverage data is being processed. - coverage_type (Literal["cobertura", "lcov"]): The type of coverage report being processed. - logger (CustomLogger): The logger object for logging messages. - - Returns: - None - """ - self.file_path = file_path - self.src_file_path = src_file_path - self.coverage_type = coverage_type - self.logger = CustomLogger.get_logger(__name__) - self.use_report_coverage_feature_flag = use_report_coverage_feature_flag - self.diff_coverage_report_path = diff_coverage_report_path - - def process_coverage_report( - self, time_of_test_command: int - ) -> Tuple[list, list, float]: - """ - Verifies the coverage report's existence and update time, and then - parses the report based on its type to extract coverage data. - - Args: - time_of_test_command (int): The time the test command was run, in milliseconds. - - Returns: - Tuple[list, list, float]: A tuple containing lists of covered and missed line numbers, and the coverage percentage. - """ - self.verify_report_update(time_of_test_command) - return self.parse_coverage_report() - - def verify_report_update(self, time_of_test_command: int): - """ - Verifies the coverage report's existence and update time. - - Args: - time_of_test_command (int): The time the test command was run, in milliseconds. - - Raises: - AssertionError: If the coverage report does not exist or was not updated after the test command. - """ - assert os.path.exists( - self.file_path - ), f'Fatal: Coverage report "{self.file_path}" was not generated.' - - # Convert file modification time to milliseconds for comparison - file_mod_time_ms = int(round(os.path.getmtime(self.file_path) * 1000)) - - assert ( - file_mod_time_ms > time_of_test_command - ), f"Fatal: The coverage report file was not updated after the test command. file_mod_time_ms: {file_mod_time_ms}, time_of_test_command: {time_of_test_command}. {file_mod_time_ms > time_of_test_command}" - - def parse_coverage_report(self) -> Tuple[list, list, float]: - """ - Parses a code coverage report to extract covered and missed line numbers for a specific file, - and calculates the coverage percentage, based on the specified coverage report type. - - Returns: - Tuple[list, list, float]: A tuple containing lists of covered and missed line numbers, and the coverage percentage. - """ - if self.use_report_coverage_feature_flag: - if self.coverage_type == "cobertura": - return self.parse_coverage_report_cobertura() - elif self.coverage_type == "lcov": - return self.parse_coverage_report_lcov() - elif self.coverage_type == "jacoco": - return self.parse_coverage_report_jacoco() - else: - raise ValueError(f"Unsupported coverage report type: {self.coverage_type}") - else: - if self.coverage_type == "cobertura": - # Default behavior is to parse out a single file from the report - return self.parse_coverage_report_cobertura(filename=os.path.basename(self.src_file_path)) - elif self.coverage_type == "lcov": - return self.parse_coverage_report_lcov() - elif self.coverage_type == "jacoco": - return self.parse_coverage_report_jacoco() - elif self.coverage_type == "diff_cover_json": - return self.parse_json_diff_coverage_report() - else: - raise ValueError(f"Unsupported coverage report type: {self.coverage_type}") - - def parse_coverage_report_cobertura(self, filename: str = None) -> Union[Tuple[list, list, float], dict]: - """ - Parses a Cobertura XML code coverage report to extract covered and missed line numbers - for a specific file or for all files (if filename is None). Aggregates coverage data from - multiple entries that share the same filename. - - Args: - filename (str, optional): Filename to process. If None, process all files. - - Returns: - If filename is provided, returns (covered_lines, missed_lines, coverage_percent). - If filename is None, returns a dict: { filename: (covered_lines, missed_lines, coverage_percent) }. - """ - tree = ET.parse(self.file_path) - root = tree.getroot() - - if filename: - # Collect coverage for all elements matching the given filename - all_covered, all_missed = [], [] - for cls in root.findall(".//class"): - name_attr = cls.get("filename") - if name_attr and name_attr.endswith(filename): - c_covered, c_missed, _ = self.parse_coverage_data_for_class(cls) - all_covered.extend(c_covered) - all_missed.extend(c_missed) - - # Deduplicate and compute coverage - covered_set = set(all_covered) - missed_set = set(all_missed) - covered_set - total_lines = len(covered_set) + len(missed_set) - coverage_percentage = (len(covered_set) / total_lines) if total_lines else 0 - - return list(covered_set), list(missed_set), coverage_percentage - - else: - # Collect coverage for every , grouping by filename - coverage_data = {} - file_map = {} # filename -> ([covered], [missed]) - - for cls in root.findall(".//class"): - cls_filename = cls.get("filename") - if cls_filename: - c_covered, c_missed, _ = self.parse_coverage_data_for_class(cls) - if cls_filename not in file_map: - file_map[cls_filename] = ([], []) - file_map[cls_filename][0].extend(c_covered) - file_map[cls_filename][1].extend(c_missed) - - # Convert raw lists to sets, compute coverage, store results - for f_name, (c_covered, c_missed) in file_map.items(): - covered_set = set(c_covered) - missed_set = set(c_missed) - covered_set - total_lines = len(covered_set) + len(missed_set) - coverage_percentage = (len(covered_set) / total_lines) if total_lines else 0 - coverage_data[f_name] = (list(covered_set), list(missed_set), coverage_percentage) - - return coverage_data - - def parse_coverage_data_for_class(self, cls) -> Tuple[list, list, float]: - """ - Parses coverage data for a single class. - - Args: - cls (Element): XML element representing the class. - - Returns: - Tuple[list, list, float]: A tuple containing lists of covered and missed line numbers, - and the coverage percentage. - """ - lines_covered, lines_missed = [], [] - - for line in cls.findall(".//line"): - line_number = int(line.get("number")) - hits = int(line.get("hits")) - if hits > 0: - lines_covered.append(line_number) - else: - lines_missed.append(line_number) - - total_lines = len(lines_covered) + len(lines_missed) - coverage_percentage = (len(lines_covered) / total_lines) if total_lines > 0 else 0 - - return lines_covered, lines_missed, coverage_percentage - - def parse_coverage_report_lcov(self): - - lines_covered, lines_missed = [], [] - filename = os.path.basename(self.src_file_path) - try: - with open(self.file_path, "r") as file: - for line in file: - line = line.strip() - if line.startswith("SF:"): - if line.endswith(filename): - for line in file: - line = line.strip() - if line.startswith("DA:"): - line_number = line.replace("DA:", "").split(",")[0] - hits = line.replace("DA:", "").split(",")[1] - if int(hits) > 0: - lines_covered.append(int(line_number)) - else: - lines_missed.append(int(line_number)) - elif line.startswith("end_of_record"): - break - - except (FileNotFoundError, IOError) as e: - self.logger.error(f"Error reading file {self.file_path}: {e}") - raise - - total_lines = len(lines_covered) + len(lines_missed) - coverage_percentage = ( - (len(lines_covered) / total_lines) if total_lines > 0 else 0 - ) - - return lines_covered, lines_missed, coverage_percentage - - def parse_coverage_report_jacoco(self) -> Tuple[list, list, float]: - """ - Parses a JaCoCo XML code coverage report to extract covered and missed line numbers for a specific file, - and calculates the coverage percentage. - - Returns: Tuple[list, list, float]: A tuple containing empty lists of covered and missed line numbers, - and the coverage percentage. The reason being the format of the report for jacoco gives the totals we do not - sum them up. to stick with the current contract of the code and to do little change returning empty arrays. - I expect this should bring up a discussion on introduce a factory for different CoverageProcessors. Where the - total coverage percentage is returned to be evaluated only. - """ - lines_covered, lines_missed = [], [] - source_file_extension = self.get_file_extension(self.src_file_path) - - package_name, class_name = "","" - if source_file_extension == 'java': - package_name, class_name = self.extract_package_and_class_java() - elif source_file_extension == 'kt': - package_name, class_name = self.extract_package_and_class_kotlin() - else: - self.logger.warn(f"Unsupported Bytecode Language: {source_file_extension}. Using default Java logic.") - package_name, class_name = self.extract_package_and_class_java() - - - file_extension = self.get_file_extension(self.file_path) - - missed, covered = 0, 0 - if file_extension == 'xml': - lines_missed, lines_covered = self.parse_missed_covered_lines_jacoco_xml( - class_name - ) - missed, covered = len(lines_missed), len(lines_covered) - elif file_extension == 'csv': - missed, covered = self.parse_missed_covered_lines_jacoco_csv( - package_name, class_name - ) - else: - raise ValueError(f"Unsupported JaCoCo code coverage report format: {file_extension}") - - total_lines = missed + covered - coverage_percentage = (float(covered) / total_lines) if total_lines > 0 else 0 - - return lines_covered, lines_missed, coverage_percentage - - def parse_missed_covered_lines_jacoco_xml( - self, class_name: str - ) -> tuple[list, list]: - """Parses a JaCoCo XML code coverage report to extract covered and missed line numbers for a specific file.""" - tree = ET.parse(self.file_path) - root = tree.getroot() - sourcefile = ( - root.find(f".//sourcefile[@name='{class_name}.java']") or - root.find(f".//sourcefile[@name='{class_name}.kt']") - ) - - if sourcefile is None: - return [], [] - - missed, covered = [], [] - for line in sourcefile.findall('line'): - if line.attrib.get('mi') == '0': - covered += [int(line.attrib.get('nr', 0))] - else : - missed += [int(line.attrib.get('nr', 0))] - - return missed, covered - - def parse_missed_covered_lines_jacoco_csv( - self, package_name: str, class_name: str - ) -> tuple[int, int]: - with open(self.file_path, "r") as file: - reader = csv.DictReader(file) - missed, covered = 0, 0 - for row in reader: - if row["PACKAGE"] == package_name and row["CLASS"] == class_name: - try: - missed = int(row["LINE_MISSED"]) - covered = int(row["LINE_COVERED"]) - break - except KeyError as e: - self.logger.error(f"Missing expected column in CSV: {str(e)}") - raise - - return missed, covered - - def extract_package_and_class_java(self): - package_pattern = re.compile(r"^\s*package\s+([\w\.]+)\s*;.*$") - class_pattern = re.compile(r"^\s*public\s+class\s+(\w+).*") - - package_name = "" - class_name = "" - try: - with open(self.src_file_path, "r") as file: - for line in file: - if not package_name: # Only match package if not already found - package_match = package_pattern.match(line) - if package_match: - package_name = package_match.group(1) - - if not class_name: # Only match class if not already found - class_match = class_pattern.match(line) - if class_match: - class_name = class_match.group(1) - - if package_name and class_name: # Exit loop if both are found - break - except (FileNotFoundError, IOError) as e: - self.logger.error(f"Error reading file {self.src_file_path}: {e}") - raise - - return package_name, class_name - def extract_package_and_class_kotlin(self): - package_pattern = re.compile(r"^\s*package\s+([\w.]+)\s*(?:;)?\s*(?://.*)?$") - class_pattern = re.compile(r"^\s*(?:public|internal|abstract|data|sealed|enum|open|final|private|protected)*\s*class\s+(\w+).*") - - package_name = "" - class_name = "" - try: - with open(self.src_file_path, "r") as file: - for line in file: - if not package_name: # Only match package if not already found - package_match = package_pattern.match(line) - if package_match: - package_name = package_match.group(1) - - if not class_name: # Only match class if not already found - class_match = class_pattern.match(line) - if class_match: - class_name = class_match.group(1) - - if package_name and class_name: # Exit loop if both are found - break - except (FileNotFoundError, IOError) as e: - self.logger.error(f"Error reading file {self.src_file_path}: {e}") - raise - - return package_name, class_name - - def parse_json_diff_coverage_report(self) -> Tuple[List[int], List[int], float]: - """ - Parses a JSON-formatted diff coverage report to extract covered lines, missed lines, - and the coverage percentage for the specified src_file_path. - Returns: - Tuple[List[int], List[int], float]: A tuple containing lists of covered and missed lines, - and the coverage percentage. - """ - with open(self.diff_coverage_report_path, "r") as file: - report_data = json.load(file) - - # Create relative path components of `src_file_path` for matching - src_relative_path = os.path.relpath(self.src_file_path) - src_relative_components = src_relative_path.split(os.sep) - - # Initialize variables for covered and missed lines - relevant_stats = None - - for file_path, stats in report_data["src_stats"].items(): - # Split the JSON's file path into components - file_path_components = file_path.split(os.sep) - - # Match if the JSON path ends with the same components as `src_file_path` - if ( - file_path_components[-len(src_relative_components) :] - == src_relative_components - ): - relevant_stats = stats - break - - # If a match is found, extract the data - if relevant_stats: - covered_lines = relevant_stats["covered_lines"] - violation_lines = relevant_stats["violation_lines"] - coverage_percentage = ( - relevant_stats["percent_covered"] / 100 - ) # Convert to decimal - else: - # Default values if the file isn't found in the report - covered_lines = [] - violation_lines = [] - coverage_percentage = 0.0 - - return covered_lines, violation_lines, coverage_percentage - - - def get_file_extension(self, filename: str) -> str | None: - """Get the file extension from a given filename.""" - return os.path.splitext(filename)[1].lstrip(".") diff --git a/cover_agent/UnitTestGenerator.py b/cover_agent/UnitTestGenerator.py index 949ace788..06e68ea82 100644 --- a/cover_agent/UnitTestGenerator.py +++ b/cover_agent/UnitTestGenerator.py @@ -6,7 +6,6 @@ import re from cover_agent.AICaller import AICaller -from cover_agent.CoverageProcessor import CoverageProcessor from cover_agent.CustomLogger import CustomLogger from cover_agent.FilePreprocessor import FilePreprocessor from cover_agent.PromptBuilder import PromptBuilder diff --git a/cover_agent/UnitTestValidator.py b/cover_agent/UnitTestValidator.py index 9421ead37..0a92220da 100644 --- a/cover_agent/UnitTestValidator.py +++ b/cover_agent/UnitTestValidator.py @@ -3,17 +3,15 @@ import json import logging import os -import re from cover_agent.AICaller import AICaller -from cover_agent.CoverageProcessor import CoverageProcessor from cover_agent.CustomLogger import CustomLogger from cover_agent.FilePreprocessor import FilePreprocessor from cover_agent.PromptBuilder import PromptBuilder from cover_agent.Runner import Runner from cover_agent.settings.config_loader import get_settings from cover_agent.utils import load_yaml - +from cover_agent.coverage.processor import process_coverage, CoverageReport, CoverageData class UnitTestValidator: def __init__( @@ -106,15 +104,6 @@ def __init__( with open(self.source_file_path, "r") as f: self.source_code = f.read() - # initialize the coverage processor - self.coverage_processor = CoverageProcessor( - file_path=self.code_coverage_report_path, - src_file_path=self.source_file_path, - coverage_type=self.coverage_type, - use_report_coverage_feature_flag=self.use_report_coverage_feature_flag, - diff_coverage_report_path=self.diff_cover_report_path, - ) - def get_coverage(self): """ Run code coverage and build the prompt to be used for generating tests. @@ -124,6 +113,9 @@ def get_coverage(self): """ # Run coverage and build the prompt self.run_coverage() + # Run diff coverage if enabled + if self.diff_coverage: + self.generate_diff_coverage_report() return self.failed_test_runs, self.language, self.testing_framework, self.code_coverage_report def get_code_language(self, source_file_path: str) -> str: @@ -294,14 +286,9 @@ def run_coverage(self): ), f'Fatal: Error running test command. Are you sure the command is correct? "{self.test_command}"\nExit code {exit_code}. \nStdout: \n{stdout} \nStderr: \n{stderr}' try: - # Process the extracted coverage metrics - coverage, coverage_percentages = self.post_process_coverage_report( - time_of_test_command - ) - self.current_coverage = coverage - self.last_coverage_percentages = coverage_percentages.copy() + self.current_coverage_report = self.post_process_coverage_report(time_of_test_command) self.logger.info( - f"Initial coverage: {round(self.current_coverage * 100, 2)}%" + f"Initial coverage: {round(self.current_coverage_report.total_coverage * 100, 2)}%" ) except AssertionError as error: @@ -503,11 +490,9 @@ def validate_test(self, generated_test: dict): # If test passed, check for coverage increase try: - new_percentage_covered, new_coverage_percentages = self.post_process_coverage_report( - time_of_test_command - ) + new_coverage_report = self.post_process_coverage_report(time_of_test_command) - if new_percentage_covered <= self.current_coverage: + if self.current_coverage_report is not None and new_coverage_report.total_coverage <= self.current_coverage_report.total_coverage: # Coverage has not increased, rollback the test by removing it from the test file with open(self.test_file_path, "w") as test_file: test_file.write(original_content) @@ -579,20 +564,20 @@ def validate_test(self, generated_test: dict): additional_imports_lines ) # this is important, otherwise the next test will be inserted at the wrong line - for key in new_coverage_percentages: - if new_coverage_percentages[key] > self.last_coverage_percentages[key] and key == self.source_file_path.split("/")[-1]: + for key in new_coverage_report.file_coverage: + new_v: CoverageData = new_coverage_report.file_coverage[key] + old_v: CoverageData = self.current_coverage_report.file_coverage[key] + if new_v.coverage > old_v.coverage and key == self.source_file_path.split("/")[-1]: self.logger.info( - f"Coverage for provided source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(new_coverage_percentages[key] * 100, 2)}" + f"Coverage for provided source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(new_v.coverage * 100, 2)}" ) - elif new_coverage_percentages[key] > self.last_coverage_percentages[key]: + elif new_v.coverage > old_v.coverage: self.logger.info( - f"Coverage for non-source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(new_coverage_percentages[key] * 100, 2)}" + f"Coverage for non-source file: {key} increased from {round(self.last_coverage_percentages[key] * 100, 2)} to {round(new_v.coverage * 100, 2)}" ) - self.current_coverage = new_percentage_covered - self.last_coverage_percentages = new_coverage_percentages.copy() self.logger.info( - f"Test passed and coverage increased. Current coverage: {round(new_percentage_covered * 100, 2)}%" + f"Test passed and coverage increased. Current coverage: {round(new_coverage_report.total_coverage * 100, 2)}%" ) return { "status": "PASS", @@ -689,59 +674,20 @@ def extract_error_message(self, fail_details): logging.error(f"Error extracting error message: {e}") return "" - def post_process_coverage_report(self, time_of_test_command): - coverage_percentages = {} - if self.use_report_coverage_feature_flag: - self.logger.info( - "Using the report coverage feature flag to process the coverage report" - ) - file_coverage_dict = self.coverage_processor.process_coverage_report( - time_of_test_command=time_of_test_command - ) - total_lines_covered = 0 - total_lines_missed = 0 - total_lines = 0 - for key in file_coverage_dict: - lines_covered, lines_missed, percentage_covered = ( - file_coverage_dict[key] - ) - total_lines_covered += len(lines_covered) - total_lines_missed += len(lines_missed) - total_lines += len(lines_covered) + len(lines_missed) - if key == self.source_file_path: - self.last_source_file_coverage = percentage_covered - if key not in coverage_percentages: - coverage_percentages[key] = 0 - coverage_percentages[key] = percentage_covered - try: - percentage_covered = total_lines_covered / total_lines - except ZeroDivisionError: - self.logger.error(f"ZeroDivisionError: Attempting to perform total_lines_covered / total_lines: {total_lines_covered} / {total_lines}.") - percentage_covered = 0 - - self.logger.info( - f"Total lines covered: {total_lines_covered}, Total lines missed: {total_lines_missed}, Total lines: {total_lines}" - ) - self.logger.info( - f"coverage: Percentage {round(percentage_covered * 100, 2)}%" - ) - elif self.diff_coverage: - self.generate_diff_coverage_report() - lines_covered, lines_missed, percentage_covered = ( - self.coverage_processor.process_coverage_report( - time_of_test_command=time_of_test_command - ) - ) - self.code_coverage_report = f"Lines covered: {lines_covered}\nLines missed: {lines_missed}\nPercentage covered: {round(percentage_covered * 100, 2)}%" - else: - lines_covered, lines_missed, percentage_covered = ( - self.coverage_processor.process_coverage_report( - time_of_test_command=time_of_test_command - ) - ) - self.code_coverage_report = f"Lines covered: {lines_covered}\nLines missed: {lines_missed}\nPercentage covered: {round(percentage_covered * 100, 2)}%" - return percentage_covered, coverage_percentages - + def post_process_coverage_report(self, time_of_test_command: int): + report: CoverageReport = process_coverage( + tool_type=self.coverage_type, + time_of_test_command=time_of_test_command, + report_path=self.code_coverage_report_path, + src_file_path=self.source_file_path, + is_global_coverage_enabled=self.use_report_coverage_feature_flag, + file_pattern=None, + diff_coverage_report_path=self.diff_cover_report_path, + ) + self.logger.info( + f"coverage: Percentage {round(report.total_coverage * 100, 2)}%" + ) + return report def generate_diff_coverage_report(self): # Run the diff-cover command to generate a JSON diff coverage report @@ -758,3 +704,6 @@ def generate_diff_coverage_report(self): f'Fatal: Error running diff coverage command. Are you sure the command is correct? "{coverage_command}"' f"\nExit code {exit_code}. \nStdout: \n{stdout} \nStderr: \n{stderr}" ) + + def get_current_coverage(self): + return self.current_coverage_report.total_coverage diff --git a/cover_agent/coverage/processor.py b/cover_agent/coverage/processor.py new file mode 100644 index 000000000..7f84d8783 --- /dev/null +++ b/cover_agent/coverage/processor.py @@ -0,0 +1,362 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from cover_agent.CustomLogger import CustomLogger +from typing import Dict, Optional, List, Tuple, Union +import csv +import os +import re +import json +import xml.etree.ElementTree as ET + +@dataclass(frozen=True) +class CoverageData: + """ + A class to represent coverage data. + + Attributes: + covered_lines (int): The line numbers that are covered by tests. + covered (int) : The number of lines that are covered by tests. + missed_lines (int) : The line numbers that are not covered by tests. + missed (int) : The number of lines that are not covered by tests. + coverage (float) : The coverage percentage of the file or class. + """ + covered_lines: List[int] + covered: int + missed_lines: List[int] + missed: int + coverage: float + +@dataclass +class CoverageReport: + """ + A class to represent the coverage report of a project. + + Attributes: + ---------- + total_coverage : float + The total coverage percentage of the project. + file_coverage : Dict[str, CoverageData] + A dictionary mapping file names to their respective coverage data. + """ + total_coverage: float + file_coverage: Dict[str, CoverageData] + +class CoverageProcessor(ABC): + """ + Abstract base class for processing coverage reports. + + Attributes: + file_path (str): The path to the coverage report file. + src_file_path (str): The path to the source file. + logger (Logger): The logger object for logging messages. + Methods: + parse_coverage_report() -> Union[Tuple[list, list, float], dict]: + Abstract method to parse the coverage report. + + process_coverage_report(time_of_test_command: int) -> Union[Tuple[list, list, float], dict]: + Processes the coverage report and returns the coverage data. + + _is_report_exist(): + Checks if the coverage report file exists. + + _is_report_obsolete(time_of_test_command: int): + Checks if the coverage report file is obsolete based on the test command time. + """ + def __init__( + self, + file_path: str, + src_file_path: str, + ): + self.file_path = file_path + self.src_file_path = src_file_path + self.logger = CustomLogger.get_logger(__name__) + + @abstractmethod + def parse_coverage_report(self) -> Dict[str, CoverageData]: + pass + + def process_coverage_report(self, time_of_test_command: int) -> CoverageReport: + self._is_coverage_valid(time_of_test_command=time_of_test_command) + coverage = self.parse_coverage_report() + report = CoverageReport(0.0, coverage) + if coverage: + total_covered = sum(cov.covered for cov in coverage.values()) + total_missed = sum(cov.missed for cov in coverage.values()) + total_lines = total_covered + total_missed + report.total_coverage = (float(total_covered) / float(total_lines)) if total_lines > 0 else 0.0 + return report + + def _is_coverage_valid( + self, time_of_test_command: int + ) -> None: + if not self._is_report_exist(): + raise FileNotFoundError(f'Coverage report "{self.file_path}" not found') + if self._is_report_obsolete(time_of_test_command): + raise ValueError("Coverage report is outdated") + + def _is_report_exist(self) -> bool: + return os.path.exists(self.file_path) + + def _is_report_obsolete(self, time_of_test_command: int) -> bool: + return int(round(os.path.getmtime(self.file_path) * 1000)) < time_of_test_command + +class CoberturaProcessor(CoverageProcessor): + def parse_coverage_report(self) -> Dict[str, CoverageData]: + tree = ET.parse(self.file_path) + root = tree.getroot() + coverage = {} + for cls in root.findall(".//class"): + cls_filename = cls.get("filename") + if cls_filename: + coverage[cls_filename] = self._parse_coverage_data_for_class(cls) + return coverage + + def _parse_coverage_data_for_class(self, cls) -> CoverageData: + lines_covered, lines_missed = [], [] + for line in cls.findall(".//line"): + line_number = int(line.get("number")) + hits = int(line.get("hits")) + if hits > 0: + lines_covered.append(line_number) + else: + lines_missed.append(line_number) + total_lines = len(lines_covered) + len(lines_missed) + coverage_percentage = (float(len(lines_covered)) / total_lines) if total_lines > 0 else 0.0 + return CoverageData(lines_covered, len(lines_covered), lines_missed, len(lines_missed), coverage_percentage) + +class LcovProcessor(CoverageProcessor): + def parse_coverage_report(self) -> Dict[str, CoverageData]: + coverage = {} + try: + with open(self.file_path, "r") as file: + for line in file: + line = line.strip() + if line.startswith("SF:"): + filename = line[3:] + lines_covered, lines_missed = [], [] + for line in file: + line = line.strip() + if line.startswith("DA:"): + line_number, hits = map(int, line[3:].split(",")) + if hits > 0: + lines_covered.append(int(line_number)) + else: + lines_missed.append(int(line_number)) + elif line.startswith("end_of_record"): + break + total_lines = len(lines_covered) + len(lines_missed) + coverage_percentage = (float(len(lines_covered)) / total_lines) if total_lines > 0 else 0.0 + coverage[filename] = CoverageData(lines_covered, len(lines_covered), lines_missed, len(lines_missed), coverage_percentage) + except (FileNotFoundError, IOError) as e: + self.logger.error(f"Error reading file {self.file_path}: {e}") + raise + return coverage + +class JacocoProcessor(CoverageProcessor): + def parse_coverage_report(self) -> Dict[str, CoverageData]: + coverage = {} + package_name, class_name = self._extract_package_and_class_java() + file_extension = self._get_file_extension(self.file_path) + if file_extension == 'xml': + missed, covered = self._parse_jacoco_xml(class_name=class_name) + elif file_extension == 'csv': + missed, covered = self._parse_jacoco_csv(package_name=package_name, class_name=class_name) + else: + raise ValueError(f"Unsupported JaCoCo code coverage report format: {file_extension}") + total_lines = missed + covered + coverage_percentage = (float(covered) / total_lines) if total_lines > 0 else 0.0 + coverage[class_name] = CoverageData(covered_lines=[], covered=covered, missed_lines=[], missed=missed, coverage=coverage_percentage) + return coverage + + def _get_file_extension(self, filename: str) -> str | None: + """Get the file extension from a given filename.""" + return os.path.splitext(filename)[1].lstrip(".") + + def _extract_package_and_class_java(self): + package_pattern = re.compile(r"^\s*package\s+([\w\.]+)\s*;.*$") + class_pattern = re.compile(r"^\s*public\s+class\s+(\w+).*") + + package_name = "" + class_name = "" + try: + with open(self.src_file_path, "r") as file: + for line in file: + if not package_name: # Only match package if not already found + package_match = package_pattern.match(line) + if package_match: + package_name = package_match.group(1) + + if not class_name: # Only match class if not already found + class_match = class_pattern.match(line) + if class_match: + class_name = class_match.group(1) + + if package_name and class_name: # Exit loop if both are found + break + except (FileNotFoundError, IOError) as e: + self.logger.error(f"Error reading file {self.src_file_path}: {e}") + raise + + return package_name, class_name + + def _parse_jacoco_xml( + self, class_name: str + ) -> tuple[int, int]: + """Parses a JaCoCo XML code coverage report to extract covered and missed line numbers for a specific file.""" + tree = ET.parse(self.file_path) + root = tree.getroot() + sourcefile = root.find(f".//sourcefile[@name='{class_name}.java']") + + if sourcefile is None: + return 0, 0 + + missed, covered = 0, 0 + for counter in sourcefile.findall('counter'): + if counter.attrib.get('type') == 'LINE': + missed += int(counter.attrib.get('missed', 0)) + covered += int(counter.attrib.get('covered', 0)) + break + + return missed, covered + def _parse_jacoco_csv(self, package_name, class_name) -> Dict[str, CoverageData]: + with open(self.file_path, "r") as file: + reader = csv.DictReader(file) + missed, covered = 0, 0 + for row in reader: + if row["PACKAGE"] == package_name and row["CLASS"] == class_name: + try: + missed = int(row["LINE_MISSED"]) + covered = int(row["LINE_COVERED"]) + break + except KeyError as e: + self.logger.error(f"Missing expected column in CSV: {e}") + raise + + return missed, covered + +class DiffCoverageProcessor(CoverageProcessor): + def __init__( + self, + diff_coverage_report_path: str, + file_path: str, + src_file_path: str, + ): + super().__init__(file_path, src_file_path) + self.diff_coverage_report_path = diff_coverage_report_path + + def parse_coverage_report(self) -> Dict[str, CoverageData]: + """ + Parses a JSON-formatted diff coverage report to extract covered lines, missed lines, + and the coverage percentage for the specified src_file_path. + Returns: + Tuple[List[int], List[int], float]: A tuple containing lists of covered and missed lines, + and the coverage percentage. + """ + with open(self.diff_coverage_report_path, "r") as file: + report_data = json.load(file) + + # Create relative path components of `src_file_path` for matching + src_relative_path = os.path.relpath(self.src_file_path) + src_relative_components = src_relative_path.split(os.sep) + + # Initialize variables for covered and missed lines + relevant_stats = None + coverage = {} + for file_path, stats in report_data["src_stats"].items(): + # Split the JSON's file path into components + file_path_components = file_path.split(os.sep) + + # Match if the JSON path ends with the same components as `src_file_path` + if ( + file_path_components[-len(src_relative_components) :] + == src_relative_components + ): + relevant_stats = stats + break + + # If a match is found, extract the data + if relevant_stats: + covered_lines = relevant_stats["covered_lines"] + violation_lines = relevant_stats["violation_lines"] + coverage_percentage = ( + relevant_stats["percent_covered"] / 100 + ) # Convert to decimal + else: + # Default values if the file isn't found in the report + covered_lines = [] + violation_lines = [] + coverage_percentage = 0.0 + + coverage[self.file_path] = CoverageData(covered_lines=covered_lines, covered=len(covered_lines), missed_lines=violation_lines,missed=len(violation_lines), coverage=coverage_percentage) + return coverage + +class CoverageReportFilter: + def filter_report(self, report: CoverageReport, file_pattern: str) -> CoverageReport: + filtered_coverage = { + file: coverage + for file, coverage in report.file_coverage.items() + if file_pattern in file + } + total_lines = sum(len(cov.covered_lines) + len(cov.missed_lines) for cov in filtered_coverage.values()) + total_coverage = (sum(len(cov.covered_lines) for cov in filtered_coverage.values()) / total_lines) if total_lines > 0 else 0.0 + return CoverageReport(total_coverage = total_coverage, file_coverage=filtered_coverage) + +class CoverageProcessorFactory: + """Factory for creating coverage processors based on tool type.""" + + @staticmethod + def create_processor( + tool_type: str, + report_path: str, + src_file_path: str, + diff_coverage_report_path: Optional[str] = None + ) -> CoverageProcessor: + """ + Creates appropriate coverage processor instance. + + Args: + tool_type: Coverage tool type (cobertura/jacoco/lcov) + report_path: Path to coverage report + src_file_path: Path to source file + + Returns: + CoverageProcessor instance + + Raises: + ValueError: If invalid tool type specified + """ + processors = { + 'cobertura': CoberturaProcessor, + 'jacoco': JacocoProcessor, + 'lcov': LcovProcessor, + 'diff_cover_json': DiffCoverageProcessor + } + if tool_type.lower() not in processors: + raise ValueError(f"Invalid coverage type specified: {tool_type}") + if tool_type.lower() == 'diff_cover_json': + return DiffCoverageProcessor(diff_coverage_report_path, report_path, src_file_path) + return processors[tool_type.lower()](report_path, src_file_path) + +def process_coverage( + tool_type: str, + time_of_test_command: int, + report_path: str, + src_file_path: str, + is_global_coverage_enabled: bool = True, + file_pattern: Optional[str] = None, + diff_coverage_report_path: Optional[str] = None +) -> CoverageReport: + # Create appropriate processor + processor = CoverageProcessorFactory.create_processor(tool_type, report_path, src_file_path, diff_coverage_report_path) + + # Process full report + report = processor.process_coverage_report(time_of_test_command=time_of_test_command) + + if is_global_coverage_enabled: + return report + + # Apply filtering if needed + if file_pattern: + filter = CoverageReportFilter() + report = filter.filter_report(report, file_pattern) + return report \ No newline at end of file diff --git a/tests/coverage/test_processor.py b/tests/coverage/test_processor.py new file mode 100644 index 000000000..7b967f6d5 --- /dev/null +++ b/tests/coverage/test_processor.py @@ -0,0 +1,379 @@ +import pytest +import json +import xml.etree.ElementTree as ET +from cover_agent.coverage.processor import ( + CoverageProcessor, + CoverageProcessorFactory, + JacocoProcessor, + CoberturaProcessor, + LcovProcessor, + CoverageData, + CoverageReport, + CoverageReportFilter, + DiffCoverageProcessor +) +from unittest.mock import patch, MagicMock + +class TestCoverageProcessorFactory: + def test_create_processor_returns_correct_instance(self): + processor = CoverageProcessorFactory.create_processor( + tool_type='cobertura', + report_path='dummy_path.xml', + src_file_path='dummy_src.java' + ) + assert isinstance(processor, CoberturaProcessor) + + processor = CoverageProcessorFactory.create_processor( + tool_type='jacoco', + report_path='dummy_path.xml', + src_file_path='dummy_src.java' + ) + assert isinstance(processor, JacocoProcessor) + + processor = CoverageProcessorFactory.create_processor( + tool_type='lcov', + report_path='dummy_path.info', + src_file_path='dummy_src.java' + ) + assert isinstance(processor, LcovProcessor) + + processor = CoverageProcessorFactory.create_processor( + tool_type='diff_cover_json', + report_path='dummy_path.json', + src_file_path='dummy_src.java', + diff_coverage_report_path='dummy_diff.json' + ) + assert isinstance(processor, DiffCoverageProcessor) + +class TestCoverageProcessor: + @patch('os.path.exists', return_value=False) + def test_process_coverage_report_file_not_found(self, mock_exists): + processor = CoberturaProcessor('non_existent_file.xml', 'dummy_src.java') + with pytest.raises(FileNotFoundError): + processor.process_coverage_report(time_of_test_command=1234567890) + +class TestCoverageReportFilter: + def test_filter_report_with_file_pattern(self): + coverage_data = { + 'file1.java': CoverageData([1, 2, 3], 3, [4, 5], 2, 0.6), + 'file2.java': CoverageData([1, 2], 2, [3, 4, 5], 3, 0.4), + 'test_file.java': CoverageData([1], 1, [2, 3, 4, 5], 4, 0.2) + } + report = CoverageReport(total_coverage=0.5, file_coverage=coverage_data) + filter = CoverageReportFilter() + filtered_report = filter.filter_report(report, 'test_file') + + assert len(filtered_report.file_coverage) == 1 + assert 'test_file.java' in filtered_report.file_coverage + assert filtered_report.total_coverage == 0.2 + +@pytest.fixture +def mock_xml_tree(monkeypatch): + """ + Creates a mock function to simulate the ET.parse method, returning a mocked XML tree structure. + """ + def mock_parse(file_path): + # Mock XML structure for the test + xml_str = """ + + + + + + + + + + + + + """ + root = ET.ElementTree(ET.fromstring(xml_str)) + return root + + monkeypatch.setattr(ET, "parse", mock_parse) + +class TestCoverageProcessorFactory: + def test_create_processor_cobertura(self): + processor = CoverageProcessorFactory.create_processor("cobertura", "fake_path", "app.py") + assert isinstance(processor, CoberturaProcessor), "Expected CoberturaProcessor instance" + + def test_create_processor_jacoco(self): + processor = CoverageProcessorFactory.create_processor("jacoco", "fake_path", "app.py") + assert isinstance(processor, JacocoProcessor), "Expected JacocoProcessor instance" + + def test_create_processor_lcov(self): + processor = CoverageProcessorFactory.create_processor("lcov", "fake_path", "app.py") + assert isinstance(processor, LcovProcessor), "Expected LcovProcessor instance" + + def test_create_processor_unsupported_type(self): + with pytest.raises(ValueError, match="Invalid coverage type specified: unsupported_type"): + CoverageProcessorFactory.create_processor("unsupported_type", "fake_path", "app.py") + +class TestCoverageProcessor: + def test_is_report_obsolete(self, mocker): + mocker.patch("os.path.exists", return_value=True) + mocker.patch("os.path.getmtime", return_value=1234567.0) + processor = CoverageProcessorFactory.create_processor( + "cobertura", "fake_path", "app.py" + ) + with pytest.raises( + ValueError, + match="Coverage report is outdated", + ): + processor._is_coverage_valid(1234567890) + + def test_is_report_exist(self, mocker): + mocker.patch("os.path.exists", return_value=False) + + processor = CoverageProcessorFactory.create_processor( + "cobertura", "fake_path", "app.py" + ) + with pytest.raises( + FileNotFoundError, + match='Coverage report "fake_path" not found', + ): + processor._is_coverage_valid(1234567890) + + # Process valid coverage data and calculate correct total coverage percentage + def test_process_valid_coverage_data(self, mocker): + # Arrange + time_of_test = 123456 + coverage_data = { + "file1.py": CoverageData(covered_lines=[], covered=80, missed_lines=[], missed=20, coverage=0.8), + "file2.py": CoverageData(covered_lines=[], covered=60, missed_lines=[], missed=40, coverage=0.6) + } + + processor = CoverageProcessorFactory.create_processor("cobertura", "fake_path", "app.py") + mocker.patch.object(processor, '_is_coverage_valid') + mocker.patch.object(processor, 'parse_coverage_report', return_value=coverage_data) + + # Act + report = processor.process_coverage_report(time_of_test) + + # Assert + assert report.total_coverage == 0.7 # (140 covered)/(200 total) = 0.7 + assert report.file_coverage == coverage_data + processor._is_coverage_valid.assert_called_once_with(time_of_test_command=time_of_test) + + # Handle coverage data with zero total lines + def test_process_zero_lines_coverage(self, mocker): + # Arrange + time_of_test = 123456 + coverage_data = { + "file1.py": CoverageData(covered_lines=[], covered=0, missed_lines=[], missed=0, coverage=0.0), + "file2.py": CoverageData(covered_lines=[], covered=0, missed_lines=[], missed=0, coverage=0.0) + } + + processor = CoverageProcessorFactory.create_processor("cobertura", "fake_path", "app.py") + mocker.patch.object(processor, '_is_coverage_valid') + mocker.patch.object(processor, 'parse_coverage_report', return_value=coverage_data) + + # Act + report = processor.process_coverage_report(time_of_test) + + # Assert + assert report.total_coverage == 0.0 + assert report.file_coverage == coverage_data + processor._is_coverage_valid.assert_called_once_with(time_of_test_command=time_of_test) + +class TestCoberturaProcessor: + @pytest.fixture + def processor(self): + # Initializes CoberturaProcessor with cobertura coverage type for each test + return CoverageProcessorFactory.create_processor("cobertura", "fake_path", "app.py") + + def test_parse_coverage_report_cobertura(self, mock_xml_tree, processor): + """ + Tests the parse_coverage_report method for correct line number and coverage calculation with Cobertura reports. + """ + coverage = processor.parse_coverage_report() + assert len(coverage) == 1, "Expected coverage data for one file" + assert coverage["app.py"].covered_lines == [1], "Should list line 1 as covered" + assert coverage["app.py"].covered == 1, "Should have 1 line as covered" + assert coverage["app.py"].missed_lines == [2], "Should list line 2 as missed" + assert coverage["app.py"].missed == 1, "Should have 1 line as missed" + assert coverage["app.py"].coverage == 0.5, "Coverage should be 50 percent" + +class TestLcovProcessor: + # Parse LCOV file with single source file containing covered and uncovered lines + def test_parse_lcov_file_with_covered_and_uncovered_lines(self, tmp_path): + # Arrange + lcov_content = """SF:src/file1.py + DA:1,1 + DA:2,0 + DA:3,1 + end_of_record""" + lcov_file = tmp_path / "coverage.lcov" + lcov_file.write_text(lcov_content) + + processor = LcovProcessor(str(lcov_file), "src/file1.py") + + # Act + result = processor.parse_coverage_report() + + # Assert + assert len(result) == 1 + assert "src/file1.py" in result + coverage_data = result["src/file1.py"] + assert coverage_data.covered_lines == [1, 3] + assert coverage_data.missed_lines == [2] + assert coverage_data.covered == 2 + assert coverage_data.missed == 1 + assert coverage_data.coverage == 2/3 + + # Handle malformed LCOV file with missing end_of_record + def test_parse_malformed_lcov_missing_end_record(self, tmp_path): + # Arrange + lcov_content = """SF:src/file1.py + DA:1,1 + DA:2,0 + DA:3,1""" + lcov_file = tmp_path / "coverage.lcov" + lcov_file.write_text(lcov_content) + + processor = LcovProcessor(str(lcov_file), "src/file1.py") + + # Act + result = processor.parse_coverage_report() + + # Assert + assert len(result) == 1 + assert "src/file1.py" in result + coverage_data = result["src/file1.py"] + assert coverage_data.covered_lines == [1, 3] + assert coverage_data.missed_lines == [2] + assert coverage_data.covered == 2 + assert coverage_data.missed == 1 + assert coverage_data.coverage == 2/3 + +class TestJacocoProcessor: + # Successfully parse XML JaCoCo report and extract coverage data + def test_parse_xml_coverage_report_success(self, mocker): + # Arrange + xml_content = ''' + + + + + + + ''' + + mock_file = mocker.mock_open(read_data='package com.example;\npublic class MyClass {') + mocker.patch('builtins.open', mock_file) + mocker.patch('xml.etree.ElementTree.parse', return_value=ET.ElementTree(ET.fromstring(xml_content))) + + processor = JacocoProcessor('coverage.xml', 'MyClass.java') + + # Act + coverage_data = processor.parse_coverage_report() + + # Assert + assert len(coverage_data) == 1 + assert 'MyClass' in coverage_data + assert coverage_data['MyClass'].missed == 5 + assert coverage_data['MyClass'].covered == 15 + assert coverage_data['MyClass'].coverage == 0.75 + + # Handle empty or malformed XML/CSV coverage reports + def test_parse_empty_xml_coverage_report(self, mocker): + # Arrange + xml_content = ''' + + + + ''' + + mock_file = mocker.mock_open(read_data='package com.example;\npublic class MyClass {') + mocker.patch('builtins.open', mock_file) + mocker.patch('xml.etree.ElementTree.parse', return_value=ET.ElementTree(ET.fromstring(xml_content))) + + processor = JacocoProcessor('coverage.xml', 'MyClass.java') + + # Act + coverage_data = processor.parse_coverage_report() + + # Assert + assert len(coverage_data) == 1 + assert 'MyClass' in coverage_data + assert coverage_data['MyClass'].missed == 0 + assert coverage_data['MyClass'].covered == 0 + assert coverage_data['MyClass'].coverage == 0.0 + +class TestDiffCoverageProcessor: + # Successfully parse JSON diff coverage report and extract coverage data for matching file path + def test_parse_coverage_report_with_matching_file(self, mocker): + # Arrange + test_file_path = "test/file.py" + test_src_path = "src/test/file.py" + test_diff_report = "diff_coverage.json" + + mock_json_data = { + "src_stats": { + "src/test/file.py": { + "covered_lines": [1, 2, 3], + "violation_lines": [4, 5], + "percent_covered": 60.0 + } + } + } + + mock_open = mocker.mock_open(read_data=json.dumps(mock_json_data)) + mocker.patch("builtins.open", mock_open) + + processor = DiffCoverageProcessor( + diff_coverage_report_path=test_diff_report, + file_path=test_file_path, + src_file_path=test_src_path + ) + + # Act + result = processor.parse_coverage_report() + + # Assert + assert test_file_path in result + coverage_data = result[test_file_path] + assert coverage_data.covered_lines == [1, 2, 3] + assert coverage_data.missed_lines == [4, 5] + assert coverage_data.covered == 3 + assert coverage_data.missed == 2 + assert coverage_data.coverage == 0.6 + + # Handle case when file is not found in coverage report + def test_parse_coverage_report_with_no_matching_file(self, mocker): + # Arrange + test_file_path = "test/file.py" + test_src_path = "src/test/file.py" + test_diff_report = "diff_coverage.json" + + mock_json_data = { + "src_stats": { + "src/other/file.py": { + "covered_lines": [1, 2], + "violation_lines": [3], + "percent_covered": 66.7 + } + } + } + + mock_open = mocker.mock_open(read_data=json.dumps(mock_json_data)) + mocker.patch("builtins.open", mock_open) + + processor = DiffCoverageProcessor( + diff_coverage_report_path=test_diff_report, + file_path=test_file_path, + src_file_path=test_src_path + ) + + # Act + result = processor.parse_coverage_report() + + # Assert + assert test_file_path in result + coverage_data = result[test_file_path] + assert coverage_data.covered_lines == [] + assert coverage_data.missed_lines == [] + assert coverage_data.covered == 0 + assert coverage_data.missed == 0 + assert coverage_data.coverage == 0.0 \ No newline at end of file diff --git a/tests/test_CoverAgent.py b/tests/test_CoverAgent.py index 6ff0a5cfd..85ce219ba 100644 --- a/tests/test_CoverAgent.py +++ b/tests/test_CoverAgent.py @@ -1,4 +1,5 @@ from cover_agent.CoverAgent import CoverAgent +from cover_agent.coverage.processor import CoverageReport from cover_agent.main import parse_args from unittest.mock import patch, MagicMock import argparse @@ -129,13 +130,12 @@ def test_duplicate_test_file_with_output_path(self, mock_isfile, mock_copy): run_tests_multiple_times=1, ) - with pytest.raises(AssertionError) as exc_info: + with pytest.raises(FileNotFoundError) as exc_info: agent = CoverAgent(args) failed_test_runs = agent.test_validator.get_coverage() agent.test_gen.build_prompt(failed_test_runs) agent._duplicate_test_file() - - assert "Fatal: Coverage report" in str(exc_info.value) + assert "Coverage report" in str(exc_info.value) mock_copy.assert_called_once_with(args.test_file_path, args.test_file_output_path) # Clean up the temp files @@ -169,13 +169,12 @@ def test_duplicate_test_file_without_output_path(self, mock_isfile): run_tests_multiple_times=1, ) - with pytest.raises(AssertionError) as exc_info: + with pytest.raises(FileNotFoundError) as exc_info: agent = CoverAgent(args) failed_test_runs = agent.test_validator.get_coverage() agent.test_gen.build_prompt(failed_test_runs) agent._duplicate_test_file() - - assert "Fatal: Coverage report" in str(exc_info.value) + assert "Coverage report" in str(exc_info.value) assert args.test_file_output_path == args.test_file_path # Clean up the temp files @@ -215,7 +214,7 @@ def test_run_max_iterations_strict_coverage(self, mock_test_db, mock_unit_test_v ) # Mock the methods used in run validator = mock_unit_test_validator.return_value - validator.current_coverage = 0.5 # below desired coverage + validator.get_current_coverage.return_value = 0.5 # below desired coverage validator.desired_coverage = 90 validator.get_coverage.return_value = [{}, "python", "pytest", ""] generator = mock_unit_test_generator.return_value @@ -279,14 +278,14 @@ def test_run_diff_coverage(self, mock_logger, mock_test_db, mock_test_gen, mock_ diff_coverage=True, branch="main" ) - mock_test_validator.return_value.current_coverage = 0.5 + mock_test_validator.return_value.get_current_coverage.return_value = 0.5 mock_test_validator.return_value.desired_coverage = 90 mock_test_validator.return_value.get_coverage.return_value = [{}, "python", "pytest", ""] mock_test_gen.return_value.generate_tests.return_value = {"new_tests": [{}]} agent = CoverAgent(args) agent.run() mock_logger.get_logger.return_value.info.assert_any_call( - f"Current Diff Coverage: {round(mock_test_validator.return_value.current_coverage * 100, 2)}%" + f"Current Diff Coverage: {round(mock_test_validator.return_value.get_current_coverage() * 100, 2)}%" ) # Clean up the temp files diff --git a/tests/test_CoverageProcessor.py b/tests/test_CoverageProcessor.py deleted file mode 100644 index 0cfca1bf4..000000000 --- a/tests/test_CoverageProcessor.py +++ /dev/null @@ -1,532 +0,0 @@ -import pytest -import xml.etree.ElementTree as ET -from cover_agent.CoverageProcessor import CoverageProcessor - - -@pytest.fixture -def mock_xml_tree(monkeypatch): - """ - Creates a mock function to simulate the ET.parse method, returning a mocked XML tree structure. - """ - - def mock_parse(file_path): - # Mock XML structure for the test - xml_str = """ - - - - - - - - - - - - - - - - - - - """ - root = ET.ElementTree(ET.fromstring(xml_str)) - return root - - monkeypatch.setattr(ET, "parse", mock_parse) - - -class TestCoverageProcessor: - @pytest.fixture - def processor(self): - # Initializes CoverageProcessor with cobertura coverage type for each test - return CoverageProcessor("fake_path", "app.py", "cobertura") - - def test_parse_coverage_report_cobertura(self, mock_xml_tree, processor): - """ - Tests the parse_coverage_report method for correct line number and coverage calculation with Cobertura reports. - """ - covered_lines, missed_lines, coverage_pct = processor.parse_coverage_report() - - assert covered_lines == [1, 3], "Should list lines 1 and 3 as covered" - assert missed_lines == [2, 4], "Should list lines 2 and 4 as missed" - assert coverage_pct == 0.5, "Coverage should be 50 percent" - - def test_correct_parsing_for_matching_package_and_class(self, mocker): - # Setup - mock_open = mocker.patch( - "builtins.open", - mocker.mock_open( - read_data="PACKAGE,CLASS,LINE_MISSED,LINE_COVERED\ncom.example,MyClass,5,10" - ), - ) - mocker.patch( - "csv.DictReader", - return_value=[ - { - "PACKAGE": "com.example", - "CLASS": "MyClass", - "LINE_MISSED": "5", - "LINE_COVERED": "10", - } - ], - ) - processor = CoverageProcessor( - "path/to/coverage_report.csv", "path/to/MyClass.java", "jacoco" - ) - - # Action - missed, covered = processor.parse_missed_covered_lines_jacoco_csv( - "com.example", "MyClass" - ) - - # Assert - assert missed == 5 - assert covered == 10 - - def test_returns_empty_lists_and_float(self, mocker): - # Mocking the necessary methods - mocker.patch( - "cover_agent.CoverageProcessor.CoverageProcessor.extract_package_and_class_java", - return_value=("com.example", "Example"), - ) - mocker.patch( - "cover_agent.CoverageProcessor.CoverageProcessor.parse_missed_covered_lines_jacoco_xml", - return_value=([], []), - ) - - # Initialize the CoverageProcessor object - coverage_processor = CoverageProcessor( - file_path="path/to/coverage.xml", - src_file_path="path/to/example.java", - coverage_type="jacoco", - ) - - # Invoke the parse_coverage_report_jacoco method - lines_covered, lines_missed, coverage_percentage = ( - coverage_processor.parse_coverage_report_jacoco() - ) - - # Assert the results - assert lines_covered == [], "Expected lines_covered to be an empty list" - assert lines_missed == [], "Expected lines_missed to be an empty list" - assert coverage_percentage == 0, "Expected coverage percentage to be 0" - - def test_parse_coverage_report_unsupported_type(self): - processor = CoverageProcessor("fake_path", "app.py", "unsupported_type") - with pytest.raises( - ValueError, match="Unsupported coverage report type: unsupported_type" - ): - processor.parse_coverage_report() - - def test_extract_package_and_class_java_file_error(self, mocker): - mocker.patch("builtins.open", side_effect=FileNotFoundError("File not found")) - processor = CoverageProcessor("fake_path", "path/to/MyClass.java", "jacoco") - with pytest.raises(FileNotFoundError, match="File not found"): - processor.extract_package_and_class_java() - - def test_extract_package_and_class_kotlin(self, mocker): - kotlin_file_content = """ - package com.madrapps.playground - - import androidx.lifecycle.ViewModel - - class MainViewModel : ViewModel() { - - fun validate(userId: String): Boolean { - return userId == "admin" - } - - fun verifyAccess1(userId: String): Boolean { - return userId == "super-admin" - } - - fun verifyPassword(password: String): Boolean { - return password.isNotBlank() - } - } - """ - mocker.patch("builtins.open", mocker.mock_open(read_data=kotlin_file_content)) - processor = CoverageProcessor("fake_path", "path/to/MainViewModel.kt", "jacoco") - package_name, class_name = processor.extract_package_and_class_kotlin() - assert ( - package_name == "com.madrapps.playground" - ), "Expected package name to be 'com.madrapps.playground'" - assert class_name == "MainViewModel", "Expected class name to be 'MainViewModel'" - - def test_extract_package_and_class_java(self, mocker): - java_file_content = """ - package com.example; - - public class MyClass { - // class content - } - """ - mocker.patch("builtins.open", mocker.mock_open(read_data=java_file_content)) - processor = CoverageProcessor("fake_path", "path/to/MyClass.java", "jacoco") - package_name, class_name = processor.extract_package_and_class_java() - assert ( - package_name == "com.example" - ), "Expected package name to be 'com.example'" - assert class_name == "MyClass", "Expected class name to be 'MyClass'" - - def test_verify_report_update_file_not_updated(self, mocker): - mocker.patch("os.path.exists", return_value=True) - mocker.patch("os.path.getmtime", return_value=1234567.0) - - processor = CoverageProcessor("fake_path", "app.py", "cobertura") - with pytest.raises( - AssertionError, - match="Fatal: The coverage report file was not updated after the test command.", - ): - processor.verify_report_update(1234567890) - - def test_verify_report_update_file_not_exist(self, mocker): - mocker.patch("os.path.exists", return_value=False) - - processor = CoverageProcessor("fake_path", "app.py", "cobertura") - with pytest.raises( - AssertionError, - match='Fatal: Coverage report "fake_path" was not generated.', - ): - processor.verify_report_update(1234567890) - - def test_process_coverage_report(self, mocker): - mock_verify = mocker.patch( - "cover_agent.CoverageProcessor.CoverageProcessor.verify_report_update" - ) - mock_parse = mocker.patch( - "cover_agent.CoverageProcessor.CoverageProcessor.parse_coverage_report", - return_value=([], [], 0.0), - ) - - processor = CoverageProcessor("fake_path", "app.py", "cobertura") - result = processor.process_coverage_report(1234567890) - - mock_verify.assert_called_once_with(1234567890) - mock_parse.assert_called_once() - assert result == ([], [], 0.0), "Expected result to be ([], [], 0.0)" - - def test_parse_missed_covered_lines_jacoco_csv_key_error(self, mocker): - mock_open = mocker.patch( - "builtins.open", - mocker.mock_open( - read_data="PACKAGE,CLASS,LINE_MISSED,LINE_COVERED\ncom.example,MyClass,5,10" - ), - ) - mocker.patch( - "csv.DictReader", - return_value=[ - {"PACKAGE": "com.example", "CLASS": "MyClass", "LINE_MISSED": "5"} - ], - ) # Missing 'LINE_COVERED' - - processor = CoverageProcessor( - "path/to/coverage_report.csv", "path/to/MyClass.java", "jacoco" - ) - - with pytest.raises(KeyError): - processor.parse_missed_covered_lines_jacoco_csv("com.example", "MyClass") - - def test_parse_coverage_report_lcov_no_coverage_data(self, mocker): - """ - Test parse_coverage_report_lcov returns empty lists and 0 coverage when the lcov report contains no relevant data. - """ - mocker.patch("builtins.open", mocker.mock_open(read_data="")) - processor = CoverageProcessor("empty_report.lcov", "app.py", "lcov") - covered_lines, missed_lines, coverage_pct = processor.parse_coverage_report_lcov() - assert covered_lines == [], "Expected no covered lines" - assert missed_lines == [], "Expected no missed lines" - assert coverage_pct == 0, "Expected 0% coverage" - - def test_parse_coverage_report_lcov_with_coverage_data(self, mocker): - """ - Test parse_coverage_report_lcov correctly parses coverage data from an lcov report. - """ - lcov_data = """ - SF:app.py - DA:1,1 - DA:2,0 - DA:3,1 - end_of_record - """ - mocker.patch("builtins.open", mocker.mock_open(read_data=lcov_data)) - processor = CoverageProcessor("report.lcov", "app.py", "lcov") - covered_lines, missed_lines, coverage_pct = processor.parse_coverage_report_lcov() - assert covered_lines == [1, 3], "Expected lines 1 and 3 to be covered" - assert missed_lines == [2], "Expected line 2 to be missed" - assert coverage_pct == 2/3, "Expected 66.67% coverage" - - def test_parse_coverage_report_lcov_with_multiple_files(self, mocker): - """ - Test parse_coverage_report_lcov correctly parses coverage data for the target file among multiple files in the lcov report. - """ - lcov_data = """ - SF:other.py - DA:1,1 - DA:2,0 - end_of_record - SF:app.py - DA:1,1 - DA:2,0 - DA:3,1 - end_of_record - SF:another.py - DA:1,1 - end_of_record - """ - mocker.patch("builtins.open", mocker.mock_open(read_data=lcov_data)) - processor = CoverageProcessor("report.lcov", "app.py", "lcov") - covered_lines, missed_lines, coverage_pct = processor.parse_coverage_report_lcov() - assert covered_lines == [1, 3], "Expected lines 1 and 3 to be covered for app.py" - assert missed_lines == [2], "Expected line 2 to be missed for app.py" - assert coverage_pct == 2/3, "Expected 66.67% coverage for app.py" - - def test_parse_coverage_report_unsupported_type(self, mocker): - mocker.patch( - "cover_agent.CoverageProcessor.CoverageProcessor.extract_package_and_class_java", - return_value=("com.example", "Example"), - ) - - processor = CoverageProcessor( - "path/to/coverage_report.html", "path/to/MyClass.java", "jacoco" - ) - with pytest.raises( - ValueError, match="Unsupported JaCoCo code coverage report format: html" - ): - processor.parse_coverage_report_jacoco() - - def test_parse_missed_covered_lines_jacoco_xml_no_source_file(self, mocker): - #, mock_xml_tree - mocker.patch( - "cover_agent.CoverageProcessor.CoverageProcessor.extract_package_and_class_java", - return_value=("com.example", "Example"), - ) - - xml_str = """ - - - - - - - - - - - - - - - - - - """ - - mocker.patch( - "xml.etree.ElementTree.parse", - return_value=ET.ElementTree(ET.fromstring(xml_str)) - ) - - processor = CoverageProcessor( - "path/to/coverage_report.xml", "path/to/MySecondClass.java", "jacoco" - ) - - # Action - missed, covered = processor.parse_missed_covered_lines_jacoco_xml( - "MySecondClass" - ) - - # Assert - assert missed == [] - assert covered == [] - - def test_parse_missed_covered_lines_jacoco_xml(self, mocker): - #, mock_xml_tree - mocker.patch( - "cover_agent.CoverageProcessor.CoverageProcessor.extract_package_and_class_java", - return_value=("com.example", "Example"), - ) - - xml_str = """ - - - - - - - - - - - - - - - - - - """ - - mocker.patch( - "xml.etree.ElementTree.parse", - return_value=ET.ElementTree(ET.fromstring(xml_str)) - ) - - processor = CoverageProcessor( - "path/to/coverage_report.xml", "path/to/MyClass.java", "jacoco" - ) - - # Action - missed, covered = processor.parse_missed_covered_lines_jacoco_xml( - "MyClass" - ) - - # Assert - assert missed == [39, 40, 41] - assert covered == [35, 36, 37, 38] - - def test_parse_missed_covered_lines_kotlin_jacoco_xml(self, mocker): - #, mock_xml_tree - mocker.patch( - "cover_agent.CoverageProcessor.CoverageProcessor.extract_package_and_class_kotlin", - return_value=("com.example", "Example"), - ) - - xml_str = """ - - - - - - - - - - - - - - - - - - """ - - mocker.patch( - "xml.etree.ElementTree.parse", - return_value=ET.ElementTree(ET.fromstring(xml_str)) - ) - - processor = CoverageProcessor( - "path/to/coverage_report.xml", "path/to/MyClass.kt", "jacoco" - ) - - # Action - missed, covered = processor.parse_missed_covered_lines_jacoco_xml( - "MyClass" - ) - - # Assert - assert missed == [39, 40, 41] - assert covered == [35, 36, 37, 38] - - def test_get_file_extension_with_valid_file_extension(self): - processor = CoverageProcessor( - "path/to/coverage_report.xml", "path/to/MyClass.java", "jacoco" - ) - - file_extension = processor.get_file_extension("coverage_report.xml") - - # Assert - assert file_extension == 'xml' - - def test_get_file_extension_with_no_file_extension(self): - processor = CoverageProcessor( - "path/to/coverage_report", "path/to/MyClass.java", "jacoco" - ) - - file_extension = processor.get_file_extension("coverage_report") - - # Assert - assert file_extension is '' - - def test_parse_coverage_report_lcov_with_feature_flag(self, mocker): - mock_parse_lcov = mocker.patch("cover_agent.CoverageProcessor.CoverageProcessor.parse_coverage_report_lcov", return_value=([], [], 0.0)) - processor = CoverageProcessor("fake_path", "app.py", "lcov", use_report_coverage_feature_flag=True) - result = processor.parse_coverage_report() - mock_parse_lcov.assert_called_once() - assert result == ([], [], 0.0), "Expected result to be ([], [], 0.0)" - - - def test_parse_coverage_report_cobertura_with_feature_flag(self, mocker): - mock_parse_cobertura = mocker.patch("cover_agent.CoverageProcessor.CoverageProcessor.parse_coverage_report_cobertura", return_value=([], [], 0.0)) - processor = CoverageProcessor("fake_path", "app.py", "cobertura", use_report_coverage_feature_flag=True) - result = processor.parse_coverage_report() - mock_parse_cobertura.assert_called_once() - assert result == ([], [], 0.0), "Expected result to be ([], [], 0.0)" - - - def test_parse_coverage_report_jacoco(self, mocker): - mock_parse_jacoco = mocker.patch("cover_agent.CoverageProcessor.CoverageProcessor.parse_coverage_report_jacoco", return_value=([], [], 0.0)) - processor = CoverageProcessor("fake_path", "app.py", "jacoco", use_report_coverage_feature_flag=True) - result = processor.parse_coverage_report() - mock_parse_jacoco.assert_called_once() - assert result == ([], [], 0.0), "Expected result to be ([], [], 0.0)" - - - def test_parse_coverage_report_cobertura_filename_not_found(self, mock_xml_tree, processor): - covered_lines, missed_lines, coverage_pct = processor.parse_coverage_report_cobertura("non_existent_file.py") - assert covered_lines == [], "Expected no covered lines" - assert missed_lines == [], "Expected no missed lines" - assert coverage_pct == 0.0, "Expected 0% coverage" - - - def test_parse_coverage_report_lcov_file_read_error(self, mocker): - mocker.patch("builtins.open", side_effect=IOError("File read error")) - processor = CoverageProcessor("report.lcov", "app.py", "lcov") - with pytest.raises(IOError, match="File read error"): - processor.parse_coverage_report_lcov() - - - def test_parse_coverage_report_cobertura_all_files(self, mock_xml_tree, processor): - coverage_data = processor.parse_coverage_report_cobertura() - expected_data = { - "app.py": ([1, 3], [2, 4], 0.5) - } - assert coverage_data == expected_data, "Expected coverage data for all files" - - - def test_parse_coverage_report_unsupported_type_with_feature_flag(self): - processor = CoverageProcessor("fake_path", "app.py", "unsupported_type", use_report_coverage_feature_flag=True) - with pytest.raises(ValueError, match="Unsupported coverage report type: unsupported_type"): - processor.parse_coverage_report() - - def test_parse_coverage_report_jacoco_without_feature_flag(self, mocker): - mock_parse_jacoco = mocker.patch( - "cover_agent.CoverageProcessor.CoverageProcessor.parse_coverage_report_jacoco", - return_value=([], [], 0.0) - ) - processor = CoverageProcessor("fake_path", "app.py", "jacoco", use_report_coverage_feature_flag=False) - result = processor.parse_coverage_report() - mock_parse_jacoco.assert_called_once() - assert result == ([], [], 0.0), "Expected result to be ([], [], 0.0)" - - def test_parse_coverage_report_unsupported_type_without_feature_flag(self): - processor = CoverageProcessor("fake_path", "app.py", "unsupported_type", use_report_coverage_feature_flag=False) - with pytest.raises(ValueError, match="Unsupported coverage report type: unsupported_type"): - processor.parse_coverage_report() - - - - def test_parse_coverage_report_lcov_without_feature_flag(self, mocker): - mock_parse_lcov = mocker.patch( - "cover_agent.CoverageProcessor.CoverageProcessor.parse_coverage_report_lcov", - return_value=([], [], 0.0) - ) - processor = CoverageProcessor("fake_path", "app.py", "lcov", use_report_coverage_feature_flag=False) - result = processor.parse_coverage_report() - mock_parse_lcov.assert_called_once() - assert result == ([], [], 0.0), "Expected result to be ([], [], 0.0)" - diff --git a/tests/test_UnitTestGenerator.py b/tests/test_UnitTestGenerator.py index c220d3274..838459272 100644 --- a/tests/test_UnitTestGenerator.py +++ b/tests/test_UnitTestGenerator.py @@ -1,4 +1,3 @@ -from cover_agent.CoverageProcessor import CoverageProcessor from cover_agent.ReportGenerator import ReportGenerator from cover_agent.Runner import Runner from cover_agent.UnitTestGenerator import UnitTestGenerator diff --git a/tests/test_UnitTestValidator.py b/tests/test_UnitTestValidator.py index 06c5dc7b1..3e34023f4 100644 --- a/tests/test_UnitTestValidator.py +++ b/tests/test_UnitTestValidator.py @@ -1,13 +1,10 @@ -from cover_agent.CoverageProcessor import CoverageProcessor -from cover_agent.ReportGenerator import ReportGenerator +from cover_agent.coverage.processor import process_coverage, CoverageProcessor, CoverageReport, CoverageData from cover_agent.Runner import Runner from cover_agent.UnitTestValidator import UnitTestValidator from unittest.mock import patch, mock_open from unittest.mock import MagicMock import datetime -import os -import pytest import tempfile class TestUnitValidator: @@ -27,6 +24,7 @@ def test_extract_error_message_exception_handling(self): error_message = generator.extract_error_message(fail_details) assert '' in error_message + # TODO: Is this still valid test? def test_run_coverage_with_report_coverage_flag(self): with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as temp_source_file: generator = UnitTestValidator( @@ -38,10 +36,10 @@ def test_run_coverage_with_report_coverage_flag(self): use_report_coverage_feature_flag=True ) with patch.object(Runner, 'run_command', return_value=("", "", 0, datetime.datetime.now())): - with patch.object(CoverageProcessor, 'process_coverage_report', return_value={'test.py': ([], [], 1.0)}): + with patch.object(CoverageProcessor, 'process_coverage_report', return_value=CoverageReport(total_coverage=1.0, file_coverage={'test.py': CoverageData([], 0, [], 0, 1.0)})): generator.run_coverage() # Dividing by zero so we're expecting a logged error and a return of 0 - assert generator.current_coverage == 0 + assert generator.current_coverage_report.total_coverage == 1.0 def test_extract_error_message_with_prompt_builder(self): @@ -82,7 +80,7 @@ def test_validate_test_pass_no_coverage_increase_with_prompt(self): ) # Setup initial state - generator.current_coverage = 0.5 + generator.current_coverage_report = CoverageReport(total_coverage=0.5, file_coverage={'test.py': CoverageData([], 0, [], 0, 0.0)}) generator.test_headers_indentation = 4 generator.relevant_line_number_to_insert_tests_after = 100 generator.relevant_line_number_to_insert_imports_after = 10 @@ -99,7 +97,7 @@ def test_validate_test_pass_no_coverage_increase_with_prompt(self): with patch("builtins.open", mock_file), \ patch.object(Runner, 'run_command', return_value=("", "", 0, datetime.datetime.now())), \ - patch.object(CoverageProcessor, 'process_coverage_report', return_value=([], [], 0.4)): + patch.object(CoverageProcessor, 'process_coverage_report', return_value=CoverageReport(total_coverage=0.4, file_coverage={'test.py': CoverageData([], 0, [], 0, 0.0)})): result = generator.validate_test(test_to_validate) @@ -139,7 +137,7 @@ def test_initial_test_suite_analysis_with_prompt_builder(self): assert generator.relevant_line_number_to_insert_imports_after == 10 assert generator.testing_framework == "pytest" - + # TODO: Is this still valid test? def test_post_process_coverage_report_with_report_coverage_flag(self): with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as temp_source_file: generator = UnitTestValidator( @@ -150,10 +148,11 @@ def test_post_process_coverage_report_with_report_coverage_flag(self): llm_model="gpt-3", use_report_coverage_feature_flag=True ) - with patch.object(CoverageProcessor, 'process_coverage_report', return_value={'test.py': ([1], [1], 1.0)}): - percentage_covered, coverage_percentages = generator.post_process_coverage_report(datetime.datetime.now()) - assert percentage_covered == 0.5 - assert coverage_percentages == {'test.py': 1.0} + # patch.object(CoverageProcessor, 'process_coverage_report', return_value=CoverageReport(total_coverage=0.4, file_coverage={'test.py': CoverageData([], 0, [], 0, 0.0)})): + with patch.object(CoverageProcessor, 'process_coverage_report', return_value=CoverageReport(total_coverage=1.0, file_coverage={'test.py': CoverageData([1], 1, [1], 1, 1.0)})): + coverage_report = generator.post_process_coverage_report(datetime.datetime.now()) + assert coverage_report.total_coverage == 1.0 + assert coverage_report.file_coverage == {'test.py': CoverageData([1], 1, [1], 1, 1.0)} def test_post_process_coverage_report_with_diff_coverage(self): with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as temp_source_file: @@ -166,9 +165,9 @@ def test_post_process_coverage_report_with_diff_coverage(self): diff_coverage=True ) with patch.object(generator, 'generate_diff_coverage_report'), \ - patch.object(CoverageProcessor, 'process_coverage_report', return_value=([], [], 0.8)): - percentage_covered, coverage_percentages = generator.post_process_coverage_report(datetime.datetime.now()) - assert percentage_covered == 0.8 + patch.object(CoverageProcessor, 'process_coverage_report', return_value=CoverageReport(total_coverage=0.8, file_coverage={'test.py': CoverageData([1], 1, [1], 1, 1.0)})): + coverage_report = generator.post_process_coverage_report(datetime.datetime.now()) + assert coverage_report.total_coverage == 0.8 def test_post_process_coverage_report_without_flags(self): with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as temp_source_file: @@ -179,9 +178,9 @@ def test_post_process_coverage_report_without_flags(self): test_command="pytest", llm_model="gpt-3" ) - with patch.object(CoverageProcessor, 'process_coverage_report', return_value=([], [], 0.7)): - percentage_covered, coverage_percentages = generator.post_process_coverage_report(datetime.datetime.now()) - assert percentage_covered == 0.7 + with patch.object(CoverageProcessor, 'process_coverage_report', return_value=CoverageReport(total_coverage=0.7, file_coverage={'test.py': CoverageData([1], 1, [1], 1, 1.0)})): + coverage_report = generator.post_process_coverage_report(datetime.datetime.now()) + assert coverage_report.total_coverage == 0.7 def test_generate_diff_coverage_report(self): with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as temp_source_file: From 883fa6f285a8f17a33d512fe562795ae065837c2 Mon Sep 17 00:00:00 2001 From: coderustic Date: Wed, 1 Jan 2025 21:42:20 -0800 Subject: [PATCH 2/2] Added missing documentation --- cover_agent/coverage/processor.py | 59 +++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/cover_agent/coverage/processor.py b/cover_agent/coverage/processor.py index 7f84d8783..42f15fbaa 100644 --- a/cover_agent/coverage/processor.py +++ b/cover_agent/coverage/processor.py @@ -13,6 +13,12 @@ class CoverageData: """ A class to represent coverage data. + This class is used to encapsulate information about code coverage + for a file or class, such as the line numbers that are covered by + tests, the number of lines that are covered, the line numbers that + are not covered by tests, the number of lines that are not covered, + and the coverage percentage. + Attributes: covered_lines (int): The line numbers that are covered by tests. covered (int) : The number of lines that are covered by tests. @@ -31,6 +37,10 @@ class CoverageReport: """ A class to represent the coverage report of a project. + This class is used to encapsulate information about the coverage + of a project, such as the total coverage percentage and the coverage + data for each file in the project. + Attributes: ---------- total_coverage : float @@ -73,9 +83,24 @@ def __init__( @abstractmethod def parse_coverage_report(self) -> Dict[str, CoverageData]: + """ + Parses the coverage report and extracts coverage data. + + This method should be implemented by subclasses to parse the specific + coverage report format and return a dictionary mapping file names to + their respective coverage data. + + Returns: + Dict[str, CoverageData]: A dictionary where keys are file names and + values are CoverageData instances containing + coverage information for each file. + """ pass def process_coverage_report(self, time_of_test_command: int) -> CoverageReport: + """ + Processes the coverage report and returns the coverage data. + """ self._is_coverage_valid(time_of_test_command=time_of_test_command) coverage = self.parse_coverage_report() report = CoverageReport(0.0, coverage) @@ -89,6 +114,9 @@ def process_coverage_report(self, time_of_test_command: int) -> CoverageReport: def _is_coverage_valid( self, time_of_test_command: int ) -> None: + """ + Checks if the coverage report is valid and up-to-date. + """ if not self._is_report_exist(): raise FileNotFoundError(f'Coverage report "{self.file_path}" not found') if self._is_report_obsolete(time_of_test_command): @@ -101,6 +129,11 @@ def _is_report_obsolete(self, time_of_test_command: int) -> bool: return int(round(os.path.getmtime(self.file_path) * 1000)) < time_of_test_command class CoberturaProcessor(CoverageProcessor): + """ + A class to process Cobertura code coverage reports. + Inherits from CoverageProcessor class and implements + the parse_coverage_report method. + """ def parse_coverage_report(self) -> Dict[str, CoverageData]: tree = ET.parse(self.file_path) root = tree.getroot() @@ -125,6 +158,11 @@ def _parse_coverage_data_for_class(self, cls) -> CoverageData: return CoverageData(lines_covered, len(lines_covered), lines_missed, len(lines_missed), coverage_percentage) class LcovProcessor(CoverageProcessor): + """ + A class to process LCOV code coverage reports. + Inherits from CoverageProcessor class and implements + the parse_coverage_report method. + """ def parse_coverage_report(self) -> Dict[str, CoverageData]: coverage = {} try: @@ -153,6 +191,14 @@ def parse_coverage_report(self) -> Dict[str, CoverageData]: return coverage class JacocoProcessor(CoverageProcessor): + """ + A class to process JaCoCo code coverage reports. + Inherits from CoverageProcessor class and implements + the parse_coverage_report method. + + This class supports parsing JaCoCo code coverage + reports in both XML and CSV formats. + """ def parse_coverage_report(self) -> Dict[str, CoverageData]: coverage = {} package_name, class_name = self._extract_package_and_class_java() @@ -235,6 +281,13 @@ def _parse_jacoco_csv(self, package_name, class_name) -> Dict[str, CoverageData] return missed, covered class DiffCoverageProcessor(CoverageProcessor): + """ + A class to process diff coverage reports. + Inherits from CoverageProcessor class and implements + the parse_coverage_report method. + + This class is used to process diff coverage reports in JSON format. + """ def __init__( self, diff_coverage_report_path: str, @@ -291,6 +344,12 @@ def parse_coverage_report(self) -> Dict[str, CoverageData]: return coverage class CoverageReportFilter: + """ + A class to filter coverage reports based on + file patterns. This class abstracts the logic + for filtering coverage reports based on file + patterns. + """ def filter_report(self, report: CoverageReport, file_pattern: str) -> CoverageReport: filtered_coverage = { file: coverage