diff --git a/.env-example b/.env-example index ebf4b8f..751e0b7 100644 --- a/.env-example +++ b/.env-example @@ -73,6 +73,9 @@ SURVEY_BACK_API_URL=http://localhost:8000/ # The name of the project for Survey API SURVEY_PROJECT_NAME= +# Whether TOPSIS method should be used or not to calculate coefficients for deployment risk +# If not, static coefficients are used +OTTM_USE_TOPSIS=True # The correlation name for TOPSIS # Available values are pearson, spearman, kendall, weighted # Let it blank for all of them diff --git a/configuration.py b/configuration.py index 8d54bc7..220387b 100644 --- a/configuration.py +++ b/configuration.py @@ -77,6 +77,7 @@ def __init__(self): self.legacy_minimum_days = self.__get_int("OTTM_LEGACY_MINIMUM_DAYS", 365) + self.use_topsis = self.__get_bool("OTTM_USE_TOPSIS", True) self.topsis_corr_method = self.__get_str_list("OTTM_CORR_METHOD") self.topsis_criteria = self.__get_str_list("OTTM_CRITERIA") self.topsis_alternatives = self.__get_str_list("OTTM_ALTERNATIVES") diff --git a/docs/Next Version Risk Assessment Method.md b/docs/Next Version Risk Assessment Method.md new file mode 100644 index 0000000..daa2b41 --- /dev/null +++ b/docs/Next Version Risk Assessment Method.md @@ -0,0 +1,165 @@ +# Next Version Risk Assessment Method + +This document presents a method specifically designed to assess risks associated with deploying the next version of software. This method utilizes weighted metrics collected from previous versions of the software to provide a quantitative assessment of potential risks. + +## Overview + +Assessing the risks of the next software version is a critical step in software project management. It enables development teams to make informed decisions about the opportune time to release a new version by identifying the risks in the version and proposing key performance indicators (KPIs) for these risks. + +The implemented method is based on the principles of multi-criteria analysis, notably employing the TOPSIS (Technique for Order of Preference by Similarity to Ideal Solution) method. This approach allows for the comparison of versions based on multiple criteria and determining the impact value of different metrics. + +## How It Works + +The calculation of the risk score is accomplished using the TOPSIS algorithm. To achieve this, we have implemented both criteria and basic alternatives. The criteria include the number of bugs, while the alternatives encompass bug velocity, the number of changes, the average experience of the development team, cyclomatic complexity, churn rate, and the number of legacy files. The process of adding additional alternatives and criteria will be explained in another section of this document. + +The underlying concept of this algorithm is to determine the weights for each of these alternatives to perform the risk score calculation. To determine these weights, we create a correlation matrix between the alternatives and criteria. However, it's possible that there may not be a linear correlation among them. Therefore, we need to determine this by testing different correlation methods and selecting the one with the highest correlation coefficient. We take the absolute values of this correlation matrix to adhere to the necessary monotonicity principle for the TOPSIS algorithm. + +At this stage, we have two key hypotheses: strongly correlated values impact each other, and the alternatives impact the criteria. Using this decision matrix, we can execute the TOPSIS algorithm, which involves several steps. + +1. **Normalization**: First, the data for each criterion and alternative are normalized. This step ensures that all criteria are on the same scale and avoids bias due to differences in measurement units. + +2. **Weight Assignment**: As mentioned earlier, the correlation matrix helps determine the weights for each alternative with respect to the criteria. These weights reflect the relative importance of each criterion in the risk assessment process. + +3. **Ideal and Anti-Ideal Solutions**: The ideal and anti-ideal solutions are calculated for each criterion. The ideal solution represents the best possible value for each criterion, while the anti-ideal solution represents the worst possible value. The ideal solution is calculated by taking the maximum value for benefit criteria and the minimum value for cost criteria, while the anti-ideal solution is the opposite. + +4. **Similarity Scores**: For each alternative, TOPSIS calculates the similarity score (closeness) to the ideal solution and the anti-ideal solution for all criteria. This is done using distance metrics such as the Euclidean distance or Minkowski distance. + +Once the algorithm completes, the normalized distances represent the impact of each alternative on all the criteria, effectively serving as the weightings for the risk score calculation. To calculate the risk score, we simply sum the products of the metrics with their corresponding weights. + +## How to Use This Method + +### Add New Criteria or Alternative + +To extend the functionality of this risk assessment method by adding new criteria or alternatives, follow these steps: + +#### Adding a New Criterion + +1. In the project's root directory, navigate to the "utils" folder. + +2. Inside the "utils" folder, you will find two Python files named "criterion.py" and "alternatives.py." These files are structured to allow easy extension of criteria and alternatives. + +3. To add a new criterion, create a new Python class that inherits from the abstract class Criterion defined in "criterion.py." The class should implement the following methods: + - `get_name()`: This method should return the name used to retrieve the criterion's values using the `get_data` method. + - `get_direction()`: Return a constant that indicates whether the criterion should be maximized or minimized. You can use `mt.Math.TOPSIS.MAX` or `mt.Math.TOPSIS.MIN` from the provided mt module. A criterion that minimizes the deployment risk should be minimized. Contrariwise, a criterion that maximizes the deployment risk should be maximized. For example, a high number of bugs maximizes the risk of deployment, so get_direction should return `mt.Math.TOPSIS.MAX` for the bugs criterion. + +4. After implementing the new criterion class, add an instance of it to the `criteria_map` dictionary in the `CriterionParser` class within "criterion.py." This dictionary maps criterion names to their respective classes. + +#### Adding a New Alternative + +1. Similar to adding a new criterion, create a new Python class that inherits from the abstract class `Alternative` defined in "alternatives.py." + +2. Implement the `get_name()` method in your new alternative class. This method should return the name used to identify the alternative. + +3. After implementing the new alternative class, add an instance of it to the `alternatives_map` dictionary in the `AlternativesParser` class within "alternatives.py." This dictionary maps alternative names to their respective classes. + +By following these steps, you can seamlessly expand the set of criteria and alternatives available for risk assessment within the method, enabling a more comprehensive evaluation of your software's next version. + +### Example of Retrieving Criteria and Alternatives + +To retrieve criteria and alternatives from environment variables defined in a .env file, follow this example code. This step is essential to prepare the data for use in the TOPSIS algorithm. + +```python +# Prepare data for TOPSIS +criteria_parser = CriterionParser() +alternative_parser = AlternativesParser() + +criteria_names = configuration.topsis_criteria +criteria_weights = configuration.topsis_weigths +alternative_names = configuration.topsis_alternatives + +try: + criteria = criteria_parser.parse_criteria(criteria_names, criteria_weights) +except (InvalidCriterionError, MissingWeightError, NoCriteriaProvidedError) as e: + print(f"Error: {e}") + return + +try: + alternatives = alternative_parser.parse_alternatives(alternative_names) +except (InvalidAlternativeError, NoAlternativeProvidedError) as e: + print(f"Error: {e}") + return +``` + +#### Code Explanation + +1. We use the `CriterionParser` and `AlternativesParser` classes to parse and convert the names of criteria and alternatives from environment variables defined in a `.env` file. + +2. Criterion names are extracted from `configuration.topsis_criteria`, criterion weights from `configuration.topsis_weights`, and alternative names from `configuration.topsis_alternatives`. These values are typically stored in a configuration file or a `.env` file for more flexible management. + +3. We use `try` and `except` blocks to handle potential errors related to retrieving criteria and alternatives. These errors include `InvalidCriterionError`, `MissingWeightError`, `NoCriteriaProvidedError`, `InvalidAlternativeError`, and `NoAlternativeProvidedError`. + +Once this code is successfully executed, you have retrieved the necessary criteria and alternatives from your environment variables. You can then use the `get_data` method of the criteria and alternatives to obtain the corresponding data from your dataset, thus preparing the essential information for the TOPSIS algorithm. + +### Example of Decision Matrix Construction + +Constructing the decision matrix is a crucial step in the TOPSIS-based risk assessment method. This matrix serves as the foundation for evaluating the alternatives against the defined criteria. Here's an example code illustrating the process: + +```python +# Create the decision matrix +decision_matrix_builder = mt.Math.DecisionMatrixBuilder() + +# Add criteria to the decision matrix +for criterion in criteria: + decision_matrix_builder.add_criteria(criterion.get_data(df), criterion.get_name()) + +# Add alternatives to the decision matrix +for alternative in alternatives: + decision_matrix_builder.add_alternative(alternative.get_data(df), alternative.get_name()) + +# Set correlation methods if provided in the configuration +methods = [] +for method in configuration.topsis_corr_method: + methods.append(mt.Math.get_correlation_methods_from_name(method)) +if len(methods) > 0: + decision_matrix_builder.set_correlation_methods(methods) + +# Build the decision matrix +decision_matrix = decision_matrix_builder.build() +``` + +#### Code Explanation + +1. We begin by creating an instance of the `DecisionMatrixBuilder` from the `mt.Math` module. This builder will help us construct the decision matrix. + +2. Next, we iterate through the criteria selected earlier, which were parsed and retrieved from environment variables. For each criterion, we call `criterion.get_data(df)` to obtain the normalized data from the DataFrame `df`, and `criterion.get_name()` to get the criterion's name. We then add this data and name to the decision matrix builder using `decision_matrix_builder.add_criteria()`. + +3. Similarly, we iterate through the alternatives and add their normalized data and names to the decision matrix builder using `decision_matrix_builder.add_alternative()`. + +4. If correlation methods have been provided in the configuration, we set these methods using `decision_matrix_builder.set_correlation_methods()`. This step is crucial for determining the relationships between the criteria and alternatives. + +5. Finally, we build the decision matrix by calling `decision_matrix_builder.build()`. This results in a fully constructed decision matrix that includes all the selected criteria and alternatives, allowing us to proceed with the TOPSIS algorithm for risk assessment. + +By following this code example, you'll have successfully created the decision matrix required for the subsequent steps in the risk assessment process using TOPSIS. + +### Example of Using the Decision Matrix in TOPSIS + +Once the decision matrix has been constructed, you can proceed to perform the TOPSIS analysis. Here's an example code illustrating how to use the decision matrix in the TOPSIS algorithm: + +```python +# Compute TOPSIS +ts = mt.Math.TOPSIS( + decision_matrix, + [criterion.get_weight() for criterion in criteria], + [criterion.get_direction() for criterion in criteria] +) +ts.topsis() +``` + +#### Code Explanation + +1. We create an instance of the `mt.Math.TOPSIS` class, which is responsible for performing the TOPSIS analysis. We pass three essential parameters to initialize it: + - `decision_matrix`: This is the decision matrix constructed earlier, containing all the criteria and alternatives. + - `[criterion.get_weight() for criterion in criteria]`: Here, we provide a list of weights for each criterion. These weights reflect the relative importance of each criterion in the risk assessment process. We retrieve these weights from the `criteria` list, which contains the parsed criteria objects. + - `[criterion.get_direction() for criterion in criteria]`: This list specifies whether each criterion should be maximized or minimized. We retrieve this information from the criteria list as well. + +2. After initializing the TOPSIS class, we call the `ts.topsis()` method to execute the TOPSIS algorithm. This method will perform all the necessary calculations and store the results for further analysis. + +Once the TOPSIS analysis is completed, you can utilize the following methods available in the TOPSIS class: + +- `get_closeness()`: This method returns an array of relative closeness values for each alternative. These values represent the degree of preference for each alternative based on the TOPSIS analysis. + +- `get_ranking()`: It returns the ranking of alternatives based on their relative closeness values. The ranking is determined by sorting the relative closeness values in descending order and assigning ranks to the alternatives accordingly. + +- `get_coef_from_label(label)`: This method allows you to retrieve the coefficient value associated with a specific alternative, given its label. + +By using these methods, you can gain insights into the rankings and preferences of alternatives based on the TOPSIS analysis, helping you make informed decisions regarding the risk assessment for your software's next version. diff --git a/main.py b/main.py index 7a21184..bf39b95 100644 --- a/main.py +++ b/main.py @@ -525,7 +525,6 @@ def datasetgen(dataset_dir, output_file, configuration: Configuration = Provide[ configuration.source_repo = env_var.get("OTTM_SOURCE_REPO") configuration.source_project = env_var.get("OTTM_SOURCE_PROJECT") configuration.target_database = f"sqlite:///{project_path}/{configuration.source_project}.sqlite3" - print("Current db : ", configuration.source_project) print(configuration.target_database) # Créer un nouveau moteur SQLAlchemy @@ -534,17 +533,34 @@ def datasetgen(dataset_dir, output_file, configuration: Configuration = Provide[ # Créer une nouvelle session à partir du nouveau moteur Session = sessionmaker() Session.configure(bind=new_engine) + session = Session() + + # Query to get the id of the version with the name "Next Release" + next_release_version = ( + session.query(Version) + .filter(Version.project_id == project.project_id) + .filter(Version.name == "Next Release") + .first() + ) + + # Check if "Next Release" version exists for this project + if next_release_version is not None: + next_release_version_id = next_release_version.version_id + num_lines_query = session.query(Metric.lizard_total_nloc).filter(Metric.version_id == next_release_version_id) + num_lines = num_lines_query.scalar() + else: + # Handle case when "Next Release" version does not exist + num_lines = np.nan + print(num_lines) # Remplacer l'ancienne session par la nouvelle dans le conteneur container.session.override(providers.Singleton(Session)) - # Call the topsis command with the project-specific configuration - configuration.topsis_corr_method = [] topsis_output = topsis() # Write topsis_output to CSV file - write_output_to_csv(configuration.source_repo, topsis_output, output_file) + write_output_to_csv(configuration.source_repo, num_lines, topsis_output, output_file) @cli.command() @inject @@ -584,9 +600,9 @@ def display_topsis_weight(): @inject def topsis( - session = Provide[Container.session], - configuration: Configuration = Provide[Container.configuration] - ): + session = Provide[Container.session], + configuration: Configuration = Provide[Container.configuration] +): """ Perform TOPSIS analysis on a dataset. @@ -668,8 +684,7 @@ def topsis( methods = [] for method in configuration.topsis_corr_method: methods.append(mt.Math.get_correlation_methods_from_name(method)) - - if methods or len(methods) > 0: + if len(methods) > 0: decision_matrix_builder.set_correlation_methods(methods) # Build the decision matrix @@ -683,9 +698,15 @@ def topsis( ) ts.topsis() + # Calculate the weights of alternatives after TOPSIS analysis weight = ts.get_closeness() - weight = weight / sum(weight) + total_weight = sum(weight) + + if total_weight != 0: + weight = weight / total_weight + else: + weight = np.full(len(weight), np.nan) # Prepare the output dictionary containing the weights of alternatives output = {} @@ -695,7 +716,7 @@ def topsis( return output -def write_output_to_csv(project_name, output_dict, output_file_path): +def write_output_to_csv(project_name, num_lines, output_dict, output_file_path): """ Write the dictionary and project name to a CSV file. @@ -715,7 +736,7 @@ def write_output_to_csv(project_name, output_dict, output_file_path): # Open the file in 'a' mode (append mode) to create if it doesn't exist with open(output_file_path, mode="a", newline="") as output_file: - fieldnames = ["Project"] + list(output_dict.keys()) + fieldnames = ["Project", "num_lines"] + list(output_dict.keys()) writer = csv.DictWriter(output_file, fieldnames=fieldnames) # Write header only if the file is newly created @@ -723,7 +744,7 @@ def write_output_to_csv(project_name, output_dict, output_file_path): writer.writeheader() # Create a new row with the project name and dictionary values - row = {"Project": project_name} + row = {"Project": project_name, "num_lines": num_lines} row.update(output_dict) writer.writerow(row) diff --git a/metrics/versions.py b/metrics/versions.py index 2e460c5..da67722 100644 --- a/metrics/versions.py +++ b/metrics/versions.py @@ -3,6 +3,7 @@ from datetime import datetime, timedelta import pandas as pd +from exceptions.topsis_configuration import InvalidAlternativeError, InvalidCriterionError, MissingWeightError, NoAlternativeProvidedError, NoCriteriaProvidedError from sqlalchemy.sql import func from sklearn import preprocessing import pandas as pd @@ -14,6 +15,8 @@ from models.metric import Metric from models.commit import Commit from models.issue import Issue +from utils.alternatives import AlternativesParser +from utils.criterion import CriterionParser from utils.database import get_included_and_current_versions_filter from utils.timeit import timeit import utils.math as mt @@ -153,31 +156,124 @@ def assess_next_release_risk(session, configuration: Configuration, project_id:i .order_by(Version.start_date.asc()).statement logging.debug(metrics_statement) df = pd.read_sql(metrics_statement, session.get_bind()) + + if configuration.use_topsis: + scaled_df = get_scaled_df_topsis(configuration, df) + else: + scaled_df = get_scaled_static(df) - # TODO : we should Remove outliers in the dataframe - # while preserving the "Next Release" row - # cols = ['pdays', 'campaign', 'previous'] # The columns you want to search for outliers in - # # Calculate quantiles and IQR - # Q1 = df[cols].quantile(0.25) # Same as np.percentile but maps (0,1) and not (0,100) - # Q3 = df[cols].quantile(0.75) - # IQR = Q3 - Q1 - # # Return a boolean array of the rows with (any) non-outlier column values - # condition = ~((df[cols] < (Q1 - 1.5 * IQR)) | (df[cols] > (Q3 + 1.5 * IQR))).any(axis=1) - # ---> or (df['name'] == 'Next Release') - # # Filter our dataframe based on condition - # filtered_df = df[condition] - - bugs = df['bugs'].to_numpy() - bugs = preprocessing.normalize([bugs]) - bug_velocity = df['bug_velocity'].to_numpy() + # Return risk assessment along with median and max risk scores for all versions + median_risk = scaled_df["risk_assessment"].median() + max_risk = scaled_df["risk_assessment"].max() + risk_score = scaled_df.loc[(scaled_df["name"] == configuration.next_version_name)] + + output = { + "median": median_risk, + "max": max_risk, + "score": risk_score.iloc[0]['risk_assessment']} + print("risk asseeement = ", output) + return output + +def get_scaled_df_topsis(configuration: Configuration, df): + """ + Get the scaled dataframe using topsis method + + Parameters: + ----------- + - configuration : Configuration + Project configuration + - df : Dataframe + dataframe red from sqlite database + """ + + # Prepare data for topsis + criteria_parser = CriterionParser() + alternative_parser = AlternativesParser() + + criteria_names = configuration.topsis_criteria + criteria_weights = configuration.topsis_weigths + alternative_names = configuration.topsis_alternatives + + try: + criteria = criteria_parser.parse_criteria(criteria_names, criteria_weights) + except (InvalidCriterionError, MissingWeightError, NoCriteriaProvidedError) as e: + print(f"Error: {e}") + return + + try: + alternatives = alternative_parser.parse_alternatives(alternative_names) + except (InvalidAlternativeError, NoAlternativeProvidedError) as e: + print(f"Error: {e}") + return + + # Create the decision matrix + decision_matrix_builder = mt.Math.DecisionMatrixBuilder() + + # Add criteria to the decision matrix + for criterion in criteria: + decision_matrix_builder.add_criteria(criterion.get_data(df), criterion.get_name()) + + # Add alternatives to the decision matrix + for alternative in alternatives: + decision_matrix_builder.add_alternative(alternative.get_data(df), alternative.get_name()) + + # Set correlation methods if provided in the configuration + methods = [] + for method in configuration.topsis_corr_method: + methods.append(mt.Math.get_correlation_methods_from_name(method)) + if len(methods) > 0: + decision_matrix_builder.set_correlation_methods(methods) + + # Build the decision matrix + decision_matrix = decision_matrix_builder.build() + + # Compute topsis + ts = mt.Math.TOPSIS( + decision_matrix, + [criterion.get_weight() for criterion in criteria], + [criterion.get_direction() for criterion in criteria] + ) + ts.topsis() + + # Create a scaled dataframe for alternatives + scaled_df = pd.DataFrame() + for alternative in alternatives: + scaled_df[alternative.get_name()] = alternative.get_data(df)[0] + + # Join original columns back to the scaled dataframe + old_cols = df[["name", "bugs"]] + scaled_df = scaled_df.join(old_cols) + + # Set XP to 1 day for all versions that are too short (avoid inf values in dataframe) + scaled_df['avg_team_xp'] = scaled_df['avg_team_xp'].replace({0:1}) + scaled_df["risk_assessment"] = 0 + + # Calculate risk assessment for each alternative + for alternative in alternatives: + scaled_df["risk_assessment"] += scaled_df[alternative.get_name()] * ts.get_coef_from_label(alternative.get_name()) + scaled_df["risk_assessment"] = scaled_df["risk_assessment"] * 100 + + return scaled_df + +def get_scaled_static(df): + """ + Get the scaled dataframe using static coefficients + + Parameters: + ----------- + - df : Dataframe + dataframe red from sqlite database + """ + + bug_velocity = np.array(df['bug_velocity']) bug_velocity = preprocessing.normalize([bug_velocity]) - changes = df['changes'].to_numpy() + changes = np.array(df['changes']) changes = preprocessing.normalize([changes]) - avg_team_xp = df['avg_team_xp'].to_numpy() + avg_team_xp = np.array(df['avg_team_xp']) avg_team_xp = preprocessing.normalize([avg_team_xp]) - lizard_avg_complexity = df['lizard_avg_complexity'].to_numpy() + lizard_avg_complexity = np.array(df['lizard_avg_complexity']) lizard_avg_complexity = preprocessing.normalize([lizard_avg_complexity]) - code_churn_avg = df['code_churn_avg'].to_numpy() + code_churn_avg = np.array(df['code_churn_avg']) code_churn_avg = preprocessing.normalize([code_churn_avg]) scaled_df = pd.DataFrame({ @@ -201,14 +297,7 @@ def assess_next_release_risk(session, configuration: Configuration, project_id:i (scaled_df["code_churn_avg"] * 20) ) - # Return risk assessment along with median and max risk scores for all versions - median_risk = scaled_df["risk_assessment"].median() - max_risk = scaled_df["risk_assessment"].max() - risk_score = scaled_df.loc[(scaled_df["name"] == configuration.next_version_name)] - return { - "median": math.ceil(median_risk), - "max": math.ceil(max_risk), - "score": math.ceil(risk_score.iloc[0]['risk_assessment'])} + return scaled_df @timeit def compute_bugvelocity_last_30_days(session, project_id:int)->pd.DataFrame: