From 6359dd76a6128aeb31698628adf3622ce6f4e371 Mon Sep 17 00:00:00 2001 From: marc-vdm Date: Fri, 22 Sep 2023 10:48:39 +0200 Subject: [PATCH] Add first iteration of product contributions --- .../layouts/tabs/LCA_results_tabs.py | 264 +++++++++++++++++- 1 file changed, 260 insertions(+), 4 deletions(-) diff --git a/activity_browser/layouts/tabs/LCA_results_tabs.py b/activity_browser/layouts/tabs/LCA_results_tabs.py index 63cc3f214..e09f71ff4 100644 --- a/activity_browser/layouts/tabs/LCA_results_tabs.py +++ b/activity_browser/layouts/tabs/LCA_results_tabs.py @@ -6,8 +6,11 @@ from collections import namedtuple import traceback -from typing import List, Optional, Union +from typing import List, Tuple, Optional, Union + +import numpy as np import pandas as pd +import warnings from PySide2.QtWidgets import ( QWidget, QTabWidget, QVBoxLayout, QHBoxLayout, QScrollArea, QRadioButton, @@ -18,6 +21,7 @@ ) from PySide2 import QtGui, QtCore from stats_arrays.errors import InvalidParamsError +import brightway2 as bw from ...bwutils import ( Contributions, MonteCarloLCA, MLCA, @@ -34,6 +38,7 @@ from ...ui.tables import ContributionTable, InventoryTable, LCAResultsTable from ...ui.widgets import CutoffMenu, SwitchComboBox from ...ui.web import SankeyNavigatorWidget +from ...bwutils.superstructure.graph_traversal_with_scenario import GraphTraversalWithScenario from .base import BaseRightTab import logging @@ -69,7 +74,7 @@ def get_unit(method: tuple, relative: bool = False) -> str: # Special namedtuple for the LCAResults TabWidget. Tabs = namedtuple( - "tabs", ("inventory", "results", "ef", "process", "sankey", "mc", "gsa") + "tabs", ("inventory", "results", "ef", "process", "product", "sankey", "mc", "gsa") ) Relativity = namedtuple("relativity", ("relative", "absolute")) ExportTable = namedtuple("export_table", ("label", "copy", "csv", "excel")) @@ -107,7 +112,6 @@ def __init__(self, data: dict, parent=None): self.setMovable(True) self.setVisible(False) - self.visible = False QApplication.setOverrideCursor(QtCore.Qt.WaitCursor) self.mlca, self.contributions, self.mc = calculations.do_LCA_calculations(data) @@ -120,6 +124,7 @@ def __init__(self, data: dict, parent=None): results=LCAResultsTab(self), ef=ElementaryFlowContributionTab(self), process=ProcessContributionsTab(self), + product=ProductContributionsTab(self.cs_name, self), sankey=SankeyNavigatorWidget(self.cs_name, parent=self), mc=MonteCarloTab(self), # mc=None if self.mc is None else MonteCarloTab(self), gsa=GSATab(self), @@ -129,6 +134,7 @@ def __init__(self, data: dict, parent=None): results="LCA Results", ef="EF Contributions", process="Process Contributions", + product="Product Contributions", sankey="Sankey", mc="Monte Carlo", gsa="Sensitivity Analysis", @@ -521,7 +527,6 @@ def elementary_flows_contributing_to_IA_methods(self, contributary: bool = True, return data.loc[data['code'].isin(new_flows)] - def update_table(self): """Update the table.""" inventory = "biosphere" if self.radio_button_biosphere.isChecked() else "technosphere" @@ -1034,6 +1039,257 @@ def update_dataframe(self, *args, **kwargs): ) +class ProductContributionsTab(ContributionTab): + """Class for the 'Product Contributions' sub-tab. + + This tab allows for analysis of first-level product contributions. + The direct impact (from biosphere exchanges from the FU) + and cumulative impacts from all exchange inputs to the FU (first level) are calculated. + + e.g. the direct emissions from steel production and the cumulative impact for all electricity input + into that activity. This works on the basis of input products and their total (cumulative) impact, scaled to + how much of that product is needed in the FU. + + Example questions that can be answered by this tab: + What is the contribution of electricity (product) to reference flow XXX? + Which input product contributes the most to impact category YYY? + What products contribute most to reference flow ZZZ? + + Shows: + Compare options button to change between 'Reference Flows' and 'Impact Categories' + 'Impact Category'/'Reference Flow' chooser with aggregation method + Plot/Table on/off and Relative/Absolute options for data + Plot/Table + Export options + """ + + def __init__(self, cs_name, parent=None): + super().__init__(parent) + + self.cache = {'totals': {}} # We cache the calculated data, as it can take some time to generate. + # We cache the individual calculation results, as they are re-used in multiple views + # e.g. FU1 x method1 x scenario1 + # may be seen in both 'Reference Flows' and 'Impact Categories', just with different axes. + # we also cache totals, not for calculation speed, but to be able to easily convert for relative results + self.caching = True # set to False to disable caching for debug + + self.layout.addLayout(get_header_layout('Product Contributions')) + combobox = self.build_combobox(has_method=True, has_func=True) + self.layout.addLayout(combobox) + self.layout.addWidget(horizontal_line()) + self.layout.addWidget(self.build_main_space()) + self.layout.addLayout(self.build_export(True, True)) + + # get relevant data from calculation setup + self.cs = cs_name + func_units = bw.calculation_setups[self.cs]['inv'] + self.func_keys = [list(fu.keys())[0] for fu in func_units] # extract a list of keys from the functional units + self.func_units = [ + {bw.get_activity(k): v for k, v in fu.items()} + for fu in func_units + ] + self.methods = bw.calculation_setups[self.cs]['ia'] + + self.contribution_fn = 'Product contributions' + self.switches.configure(self.has_func, self.has_method) + self.connect_signals() + self.toggle_comparisons(self.switches.indexes.func) + + self.combobox_menu.agg_label.setVisible(False) + self.combobox_menu.agg.setVisible(False) + + def update_dataframe(self, *args, **kwargs): + """Retrieve the product contributions.""" + # get the right data + if self.has_scenarios: + scenario_index = self.combobox_menu.scenario.currentIndex() + else: + scenario_index = None + method_index = self.combobox_menu.method.currentIndex() + method = self.methods[method_index] + demand_index = self.combobox_menu.func.currentIndex() + demand = self.func_units[demand_index] + demand_key = self.func_keys[demand_index] + + all_data = [] + compare = self.switches.currentText() + if compare == 'Reference Flows': + # run the analysis for every reference flow + for demand_index, demand in enumerate(self.func_units): + demand_key = self.func_keys[demand_index] + cache_key = (demand_key, method_index, scenario_index) + if self.caching and self.cache.get(cache_key, False): + all_data.append([demand_key, self.cache[cache_key]]) + continue + data = self.calculate_contributions(demand, demand_key, + method=method, method_index=method_index, + scenario_lca=self.has_scenarios, scenario_index=scenario_index, + ) + if data: + data = self.reformat_data(data) + else: + data = {'Total': 0} + all_data.append([demand_key, data]) + self.cache[cache_key] = data + elif compare == 'Impact Categories': + # run the analysis for every method + for method_index, method in enumerate(self.methods): + cache_key = (demand_key, method_index, scenario_index) + if self.caching and self.cache.get(cache_key, False): + all_data.append([method, self.cache[cache_key]]) + continue + data = self.calculate_contributions(demand, demand_key, + method=method, method_index=method_index, + scenario_lca=self.has_scenarios, scenario_index=scenario_index, + ) + if data: + data = self.reformat_data(data) + else: + data = {'Total': 0} + all_data.append([method, data]) + self.cache[cache_key] = data + elif compare == 'Scenarios': + # run the analysis for every scenario + orig_idx = self.combobox_menu.scenario.currentIndex() + for scenario_index in range(self.combobox_menu.scenario.count()): + self.combobox_menu.scenario.setCurrentIndex(scenario_index) + scenario = self.combobox_menu.scenario.currentText() + + cache_key = (demand_key, method_index, scenario_index) + if self.caching and self.cache.get(cache_key, False): + all_data.append([scenario, self.cache[cache_key]]) + continue + data = self.calculate_contributions(demand, demand_key, + method=method, method_index=method_index, + scenario_lca=self.has_scenarios, scenario_index=scenario_index, + ) + if data: + data = self.reformat_data(data) + else: + data = {'Total': 0} + all_data.append([scenario, data]) + self.cache[cache_key] = data + self.combobox_menu.scenario.setCurrentIndex(orig_idx) + df = self.data_to_df(all_data, compare) + # TODO manage empty dataframe so overall calculation doesn't fail with plot generation + return df + + def calculate_contributions(self, demand, demand_key, + method, method_index: int = None, + scenario_lca: bool = False, scenario_index: int = None) -> Optional[dict]: + """Calculate LCA, do graph traversal on the first level of activities.""" + technosphere = bw.get_activity(demand_key).technosphere() + + max_calc = len([exch for exch in technosphere if + exch.input.key != exch.output.key]) # get the amount of exchanges that are not to self + + with warnings.catch_warnings(): + # Ignore the calculation count warning, as we don't care about it in this situation + warnings.filterwarnings("ignore", message="Stopping traversal due to calculation count.") + try: + if scenario_lca: + self.parent.mlca.update_lca_calculation_for_sankey(scenario_index, demand, method_index) + data = GraphTraversalWithScenario(self.parent.mlca).calculate(demand, method, cutoff=0, + max_calc=max_calc) + else: + data = bw.GraphTraversal().calculate(demand, method, cutoff=0, max_calc=max_calc) + except (ValueError, ZeroDivisionError) as e: + log.info('{}, no results calculated for {}'.format(e, method)) + return + return data + + def reformat_data(self, data: dict) -> dict: + """Reformat the data into a useable format.""" + lca = data["lca"] + demand = list(lca.demand.items())[0] + demand_key = list(lca.demand.keys())[0].key + total = lca.score + reverse_activity_dict = {v: k for k, v in lca.activity_dict.items()} + + # get the impact data per key + contributions = { + bw.get_activity(reverse_activity_dict[edge['from']]).key: edge['impact'] for edge in data["edges"] + if all(i != -1 for i in (edge["from"], edge["to"])) + } + # get impact for the total and demand + diff = total - sum([v for v in contributions.values()]) # for the demand process, calculate by the difference + contributions[demand_key] = diff + contributions['Total'] = total + return contributions + + def data_to_df(self, all_data: List[Tuple[object, dict]], compare: str) -> pd.DataFrame: + """Convert the provided data into a dataframe.""" + unique_keys = [] + # get all the unique keys: + d = {'index': ['Total'], 'reference product': [''], 'name': [''], + 'location': [''], 'unit': [''], 'database': ['']} + meta_cols = set(d.keys()) + for i, (item, data) in enumerate(all_data): + print('++ COL:', item, data) + unique_keys += list(data.keys()) + # already add the total with right column formatting depending on compares + if compare == 'Reference Flows': + col_name = self.metadata_to_index(self.key_to_metadata(item)) + elif compare == 'Impact Categories': + col_name = self.metadata_to_index(list(item)) + elif compare == 'Scenarios': + col_name = item + + self.cache['totals'][col_name] = data['Total'] + if self.relative: + d[col_name] = [1] + else: + d[col_name] = [data['Total']] + + all_data[i] = item, data, col_name + unique_keys = set(unique_keys) + + # convert to dict format to feed into dataframe + for key in unique_keys: + if key == 'Total': + continue + # get metadata + metadata = self.key_to_metadata(key) + d['index'].append(self.metadata_to_index(metadata)) + d['reference product'].append(metadata[0]) + d['name'].append(metadata[1]) + d['location'].append(metadata[2]) + d['unit'].append(metadata[3]) + d['database'].append(metadata[4]) + # check for each dataset if we have values, otherwise add np.nan + for item, data, col_name in all_data: + if val := data.get(key, False): + if self.relative: + value = val / self.cache['totals'][col_name] + else: + value = val + else: + value = np.nan + d[col_name].append(value) + df = pd.DataFrame(d) + check_cols = list(set(df.columns) - meta_cols) + df = df.dropna(subset=check_cols, how='all') + df.sort_values(by=col_name, ascending=False) # temporary sorting solution, just sort on the last column. + return df + + def key_to_metadata(self, key: tuple) -> list: + """Convert the key information to list with metadata. + + format: + [reference product, activity name, location, unit, database] + """ + act = bw.get_activity(key) + return [act.get('reference product'), act.get('name'), act.get('location'), act.get('unit'), key[0]] + + def metadata_to_index(self, data: list) -> str: + """Convert list to formatted index. + + format: + reference product | activity name | location | unit | database + """ + return ' | '.join(data) + + class CorrelationsTab(NewAnalysisTab): def __init__(self, parent): super().__init__(parent)