From 79c3fbe9613662d535afe6ac4391af4a8933cdab Mon Sep 17 00:00:00 2001 From: Chris Mutel Date: Thu, 12 Sep 2024 12:59:50 +0200 Subject: [PATCH] Linting --- bw2io/export/excel.py | 15 +- bw2io/importers/base_lci.py | 11 +- bw2io/importers/simapro_block_csv.py | 4 +- bw2io/strategies/generic.py | 6 +- bw2io/strategies/products.py | 74 ++-- bw2io/strategies/simapro.py | 10 +- bw2io/utils.py | 16 +- tests/base_lci_importer.py | 3 +- tests/simapro_block.py | 5 +- tests/strategies/generic.py | 7 +- tests/strategies/simapro.py | 45 +-- tests/strategies/simapro_name_splitting.py | 2 +- tests/strategies/test_products.py | 418 ++++++++++++--------- 13 files changed, 339 insertions(+), 277 deletions(-) diff --git a/bw2io/export/excel.py b/bw2io/export/excel.py index ecc2865..b46c118 100644 --- a/bw2io/export/excel.py +++ b/bw2io/export/excel.py @@ -1,8 +1,8 @@ -from pathlib import Path -from typing import Optional, List import collections import numbers import os +from pathlib import Path +from typing import List, Optional import xlsxwriter from bw2data import Database, projects @@ -289,7 +289,11 @@ def write_lci_excel(database_name, objs=None, sections=None, dirpath=None): def write_lci_matching( - db: List[dict], database_name: str, only_unlinked: bool=False, only_activity_names: bool=False, output_dir: Optional[Path] = None + db: List[dict], + database_name: str, + only_unlinked: bool = False, + only_activity_names: bool = False, + output_dir: Optional[Path] = None, ): """ Write matched and unmatched exchanges to Excel file @@ -357,7 +361,10 @@ def write_row(sheet, row, data, exc=True): safe_name = safe_filename(database_name, False) suffix = "-unlinked" if only_unlinked else ("-names" if only_activity_names else "") - filepath = Path(output_dir or projects.output_dir) / f"db-matching-{safe_name}{suffix}.xlsx" + filepath = ( + Path(output_dir or projects.output_dir) + / f"db-matching-{safe_name}{suffix}.xlsx" + ) workbook = xlsxwriter.Workbook(filepath) bold = workbook.add_format({"bold": True}) diff --git a/bw2io/importers/base_lci.py b/bw2io/importers/base_lci.py index 187a8a9..3453eb7 100644 --- a/bw2io/importers/base_lci.py +++ b/bw2io/importers/base_lci.py @@ -2,20 +2,20 @@ import functools import itertools import warnings -from typing import Optional, Tuple, Callable, List, Set, Union from pathlib import Path +from typing import Callable, List, Optional, Set, Tuple, Union -from bw2data import Database, config, databases, labels, parameters, get_node, projects +import randonneur as rn +import randonneur_data as rd +from bw2data import Database, config, databases, get_node, labels, parameters, projects from bw2data.data_store import ProcessedDataStore +from bw2data.errors import UnknownObject from bw2data.parameters import ( ActivityParameter, DatabaseParameter, ParameterizedExchange, ProjectParameter, ) -from bw2data.errors import UnknownObject -import randonneur as rn -import randonneur_data as rd from ..errors import NonuniqueCode, StrategyError, WrongDatabase from ..export.excel import write_lci_matching @@ -35,7 +35,6 @@ from ..utils import activity_hash from .base import ImportBase - EXCHANGE_SPECIFIC_KEYS = ( "amount", "functional", diff --git a/bw2io/importers/simapro_block_csv.py b/bw2io/importers/simapro_block_csv.py index 27e0058..c29f1bb 100644 --- a/bw2io/importers/simapro_block_csv.py +++ b/bw2io/importers/simapro_block_csv.py @@ -5,7 +5,7 @@ from typing import Optional, Union from uuid import uuid4 -from bw2data import Database, config, databases, labels, get_node +from bw2data import Database, config, databases, get_node, labels from bw2data.errors import UnknownObject from bw_simapro_csv import SimaProCSV @@ -25,6 +25,7 @@ normalize_biosphere_names, normalize_simapro_biosphere_categories, normalize_simapro_biosphere_names, + normalize_simapro_labels_to_brightway_standard, normalize_units, override_process_name_using_single_functional_exchange, set_code_by_activity_hash, @@ -32,7 +33,6 @@ split_simapro_name_geo, strip_biosphere_exc_locations, update_ecoinvent_locations, - normalize_simapro_labels_to_brightway_standard, ) from ..utils import activity_hash from .base_lci import LCIImporter diff --git a/bw2io/strategies/generic.py b/bw2io/strategies/generic.py index af5bfb8..6eae915 100644 --- a/bw2io/strategies/generic.py +++ b/bw2io/strategies/generic.py @@ -1,6 +1,6 @@ -from collections import defaultdict import numbers import pprint +from collections import defaultdict from copy import deepcopy from typing import Iterable, List, Optional, Union @@ -797,7 +797,9 @@ def match_against_only_available_in_given_context_tree( try: exc["input"] = mapping[ - tuple([exc.get(field) for field in ffields] + [exc['categories'][0]]) + tuple( + [exc.get(field) for field in ffields] + [exc["categories"][0]] + ) ] except KeyError: continue diff --git a/bw2io/strategies/products.py b/bw2io/strategies/products.py index 710055d..675a8db 100644 --- a/bw2io/strategies/products.py +++ b/bw2io/strategies/products.py @@ -1,8 +1,8 @@ from pprint import pformat -from uuid import uuid4 from typing import List -import bw2data as bd +from uuid import uuid4 +import bw2data as bd EDGE_CORE_COLUMNS = [ "name", @@ -23,51 +23,63 @@ def create_products_as_new_nodes(data: List[dict]) -> List[dict]: """ -Create new product nodes and link to them if needed. + Create new product nodes and link to them if needed. -We create new `product` if the following conditions are met: + We create new `product` if the following conditions are met: -* The dataset is not multifunctional ( - `dataset.get("type") != bd.labels.multifunctional_node_default`). Multifunctional datasets - handle product creation separately. -* The edge is functional (`obj.get("functional") is True`) -* The edge is unlinked (`obj.get("input")` is falsey) -* The given edge has a `name`, and that `name` is different than the dataset `name` -* The combination of `name` and `location` is not present in the other dataset nodes. If no - `location` attribute is given for the edge under consideration, we use the `location` of the - dataset. + * The dataset is not multifunctional ( + `dataset.get("type") != bd.labels.multifunctional_node_default`). Multifunctional datasets + handle product creation separately. + * The edge is functional (`obj.get("functional") is True`) + * The edge is unlinked (`obj.get("input")` is falsey) + * The given edge has a `name`, and that `name` is different than the dataset `name` + * The combination of `name` and `location` is not present in the other dataset nodes. If no + `location` attribute is given for the edge under consideration, we use the `location` of the + dataset. -Create new nodes, and links the originating edges to the new product nodes. + Create new nodes, and links the originating edges to the new product nodes. -Modifies data in-place, and returns the modified `data`. + Modifies data in-place, and returns the modified `data`. """ combos = {(ds.get("name"), ds.get("location")) for ds in data} nodes = [] for ds in data: - if ds.get('type') == bd.labels.multifunctional_node_default: + if ds.get("type") == bd.labels.multifunctional_node_default: # Has its own product handling continue - for edge in ds.get('exchanges', []): - if edge.get('functional') and not edge.get('input') and edge.get('name') and edge['name'] != ds.get('name'): + for edge in ds.get("exchanges", []): + if ( + edge.get("functional") + and not edge.get("input") + and edge.get("name") + and edge["name"] != ds.get("name") + ): if not ds.get("database"): - raise KeyError(""" + raise KeyError( + """ Can't create a new `product` node, as dataset is missing `database` attribute: -{}""".format(pformat(ds))) - key = (edge['name'], edge.get('location') or ds.get('location')) +{}""".format( + pformat(ds) + ) + ) + key = (edge["name"], edge.get("location") or ds.get("location")) if key not in combos: code = uuid4().hex - nodes.append({ - 'name': edge['name'], - 'location': key[1] or bd.config.global_location, - 'unit': edge.get('unit') or ds.get('unit'), - 'exchanges': [], - 'code': code, - 'type': bd.labels.product_node_default, - 'database': ds['database'], - } | {k: v for k, v in edge.items() if k not in EDGE_CORE_COLUMNS}) - edge['input'] = (ds['database'], code) + nodes.append( + { + "name": edge["name"], + "location": key[1] or bd.config.global_location, + "unit": edge.get("unit") or ds.get("unit"), + "exchanges": [], + "code": code, + "type": bd.labels.product_node_default, + "database": ds["database"], + } + | {k: v for k, v in edge.items() if k not in EDGE_CORE_COLUMNS} + ) + edge["input"] = (ds["database"], code) combos.add(key) if nodes: diff --git a/bw2io/strategies/simapro.py b/bw2io/strategies/simapro.py index 2bad989..86865a3 100644 --- a/bw2io/strategies/simapro.py +++ b/bw2io/strategies/simapro.py @@ -920,9 +920,9 @@ def normalize_simapro_labels_to_brightway_standard(db: List[dict]) -> List[dict] labels. """ for ds in db: - for exc in filter(lambda x: 'input' not in x, ds.get('exchanges', [])): - if 'context' in exc and 'categories' not in exc: - exc['categories'] = tuple(exc['context']) - if 'identifier' in exc and 'code' not in exc: - exc['code'] = exc['identifier'] + for exc in filter(lambda x: "input" not in x, ds.get("exchanges", [])): + if "context" in exc and "categories" not in exc: + exc["categories"] = tuple(exc["context"]) + if "identifier" in exc and "code" not in exc: + exc["code"] = exc["identifier"] return db diff --git a/bw2io/utils.py b/bw2io/utils.py index 4755f32..0ee1583 100644 --- a/bw2io/utils.py +++ b/bw2io/utils.py @@ -1,6 +1,6 @@ -import math import hashlib import json +import math import os import pprint from numbers import Number @@ -123,8 +123,8 @@ def rescale_exchange(exc: dict, factor: float) -> dict: exc.update( { "uncertainty type": UndefinedUncertainty.id, - "loc": exc['amount'] * factor, - "amount": exc['amount'] * factor, + "loc": exc["amount"] * factor, + "amount": exc["amount"] * factor, } ) for field in ("scale", "shape", "minimum", "maximum", "negative"): @@ -138,16 +138,16 @@ def rescale_exchange(exc: dict, factor: float) -> dict: exc.update( { "scale": abs(exc["scale"] * factor), - "loc": exc['amount'] * factor, - "amount": exc['amount'] * factor, + "loc": exc["amount"] * factor, + "amount": exc["amount"] * factor, } ) elif exc["uncertainty type"] == LognormalUncertainty.id: exc.update( { - "loc": math.log(abs(exc['amount'] * factor)), - "negative": (exc['amount'] * factor) < 0, - "amount": exc['amount'] * factor, + "loc": math.log(abs(exc["amount"] * factor)), + "negative": (exc["amount"] * factor) < 0, + "amount": exc["amount"] * factor, } ) elif exc["uncertainty type"] in (TriangularUncertainty.id, UniformUncertainty.id): diff --git a/tests/base_lci_importer.py b/tests/base_lci_importer.py index 2775c07..1ac119f 100644 --- a/tests/base_lci_importer.py +++ b/tests/base_lci_importer.py @@ -6,8 +6,7 @@ from bw2data.parameters import * from bw2data.tests import bw2test -from bw2io.errors import StrategyError -from bw2io.errors import NonuniqueCode, WrongDatabase +from bw2io.errors import NonuniqueCode, StrategyError, WrongDatabase from bw2io.importers.base_lci import LCIImporter DATA = [ diff --git a/tests/simapro_block.py b/tests/simapro_block.py index 6235f45..dfc7d37 100644 --- a/tests/simapro_block.py +++ b/tests/simapro_block.py @@ -1,6 +1,7 @@ -from bw2io.importers import SimaProBlockCSVImporter -from bw2data.tests import bw2test from bw2data import Database, get_node +from bw2data.tests import bw2test + +from bw2io.importers import SimaProBlockCSVImporter class Mock(SimaProBlockCSVImporter): diff --git a/tests/strategies/generic.py b/tests/strategies/generic.py index 3e0dac2..a3eec4e 100644 --- a/tests/strategies/generic.py +++ b/tests/strategies/generic.py @@ -720,12 +720,7 @@ def test_match_against_top_level_context_custom_kinds(): ] } ] - assert ( - match_against_top_level_context( - given, "foo", kinds=["other"] - ) - == expected - ) + assert match_against_top_level_context(given, "foo", kinds=["other"]) == expected @bw2test diff --git a/tests/strategies/simapro.py b/tests/strategies/simapro.py index 3887805..7f932cc 100644 --- a/tests/strategies/simapro.py +++ b/tests/strategies/simapro.py @@ -306,26 +306,27 @@ def test_override_process_name_using_single_functional_exchange(): def test_normalize_simapro_labels_to_brightway_standard(): - given = [{ - 'exchanges': [{ - 'input': True, - 'context': 'something' - }, { - 'context': ['foo'], - }, { - 'identifier': 'abcde' - }] - }] - expected = [{ - 'exchanges': [{ - 'input': True, - 'context': 'something' - }, { - 'categories': ('foo',), - 'context': ['foo'], - }, { - 'code': 'abcde', - 'identifier': 'abcde' - }] - }] + given = [ + { + "exchanges": [ + {"input": True, "context": "something"}, + { + "context": ["foo"], + }, + {"identifier": "abcde"}, + ] + } + ] + expected = [ + { + "exchanges": [ + {"input": True, "context": "something"}, + { + "categories": ("foo",), + "context": ["foo"], + }, + {"code": "abcde", "identifier": "abcde"}, + ] + } + ] assert normalize_simapro_labels_to_brightway_standard(given) == expected diff --git a/tests/strategies/simapro_name_splitting.py b/tests/strategies/simapro_name_splitting.py index 628d289..1a02b74 100644 --- a/tests/strategies/simapro_name_splitting.py +++ b/tests/strategies/simapro_name_splitting.py @@ -107,7 +107,7 @@ def test_splitting_exchanges(): {"name": "Absorption chiller 100kW /CH/I U"}, {"name": "Cheese/CH"}, ], - } + }, ] result = [ { diff --git a/tests/strategies/test_products.py b/tests/strategies/test_products.py index 645f26d..03db240 100644 --- a/tests/strategies/test_products.py +++ b/tests/strategies/test_products.py @@ -1,250 +1,296 @@ -import bw2data as bd from copy import deepcopy + +import bw2data as bd import pytest + from bw2io.strategies import create_products_as_new_nodes def test_create_products_as_new_nodes_basic(): - data = [{ - 'name': 'epsilon', - 'location': 'there', - }, { - 'name': 'alpha', - 'database': 'foo', - 'exchanges': [{ - 'name': 'beta', - 'unit': 'kg', - 'location': 'here', - 'functional': True, - 'type': 'technosphere', - 'extra': True, - }] - }] + data = [ + { + "name": "epsilon", + "location": "there", + }, + { + "name": "alpha", + "database": "foo", + "exchanges": [ + { + "name": "beta", + "unit": "kg", + "location": "here", + "functional": True, + "type": "technosphere", + "extra": True, + } + ], + }, + ] original = deepcopy(data) result = create_products_as_new_nodes(data) assert len(data) == 3 - original[1]['exchanges'][0]['input'] = (result[2]['database'], result[2]['code']) + original[1]["exchanges"][0]["input"] = (result[2]["database"], result[2]["code"]) assert result[:2] == original[:2] product = { - 'database': 'foo', - 'code': result[2]['code'], - 'name': 'beta', - 'unit': 'kg', - 'location': 'here', - 'exchanges': [], - 'type': bd.labels.product_node_default, - 'extra': True, + "database": "foo", + "code": result[2]["code"], + "name": "beta", + "unit": "kg", + "location": "here", + "exchanges": [], + "type": bd.labels.product_node_default, + "extra": True, } assert result[2] == product def test_create_products_as_new_nodes_ignore_multifunctional(): - data = [{ - 'name': 'alpha', - 'database': 'foo', - 'type': bd.labels.multifunctional_node_default, - 'exchanges': [{ - 'name': 'beta', - 'unit': 'kg', - 'location': 'here', - 'functional': True, - 'type': 'technosphere', - 'extra': True, - }] - }] + data = [ + { + "name": "alpha", + "database": "foo", + "type": bd.labels.multifunctional_node_default, + "exchanges": [ + { + "name": "beta", + "unit": "kg", + "location": "here", + "functional": True, + "type": "technosphere", + "extra": True, + } + ], + } + ] create_products_as_new_nodes(data) assert len(data) == 1 def test_create_products_as_new_nodes_skip_nonqualifying(): - data = [{ - 'name': 'epsilon', - 'location': 'there', - }, { - 'name': 'alpha', - 'database': 'foo', - 'exchanges': [{ - 'name': 'beta', - 'unit': 'kg', - 'location': 'here', - 'functional': True, - 'type': 'technosphere', - 'extra': True, - }, { - 'unit': 'kg', - 'location': 'here', - 'functional': True, - 'type': 'technosphere', - 'extra': True, - }, { - 'name': 'gamma', - 'unit': 'kg', - 'location': 'here', - 'functional': False, - 'type': 'production', - 'extra': True, - }, { - 'name': 'delta', - 'unit': 'kg', - 'location': 'here', - 'functional': True, - 'type': 'technosphere', - 'input': ("foo", "bar"), - }, { - 'name': 'epsilon', - 'unit': 'kg', - 'location': 'there', - 'functional': True, - 'type': 'technosphere', - }] - }] + data = [ + { + "name": "epsilon", + "location": "there", + }, + { + "name": "alpha", + "database": "foo", + "exchanges": [ + { + "name": "beta", + "unit": "kg", + "location": "here", + "functional": True, + "type": "technosphere", + "extra": True, + }, + { + "unit": "kg", + "location": "here", + "functional": True, + "type": "technosphere", + "extra": True, + }, + { + "name": "gamma", + "unit": "kg", + "location": "here", + "functional": False, + "type": "production", + "extra": True, + }, + { + "name": "delta", + "unit": "kg", + "location": "here", + "functional": True, + "type": "technosphere", + "input": ("foo", "bar"), + }, + { + "name": "epsilon", + "unit": "kg", + "location": "there", + "functional": True, + "type": "technosphere", + }, + ], + }, + ] original = deepcopy(data) result = create_products_as_new_nodes(data) assert len(data) == 3 - original[1]['exchanges'][0]['input'] = (result[2]['database'], result[2]['code']) + original[1]["exchanges"][0]["input"] = (result[2]["database"], result[2]["code"]) assert result[:2] == original[:2] - assert result[2]['name'] == 'beta' + assert result[2]["name"] == "beta" def test_create_products_as_new_nodes_duplicate_exchanges(): - data = [{ - 'name': 'alpha', - 'database': 'foo', - 'exchanges': [{ - 'name': 'beta', - 'unit': 'kg', - 'location': 'here', - 'functional': True, - 'type': 'technosphere', - 'extra': True, - 'amount': 7, - }, { - 'name': 'beta', - 'unit': 'kg', - 'location': 'here', - 'functional': True, - 'type': 'technosphere', - 'extra': True, - 'amount': 17, - }] - }] + data = [ + { + "name": "alpha", + "database": "foo", + "exchanges": [ + { + "name": "beta", + "unit": "kg", + "location": "here", + "functional": True, + "type": "technosphere", + "extra": True, + "amount": 7, + }, + { + "name": "beta", + "unit": "kg", + "location": "here", + "functional": True, + "type": "technosphere", + "extra": True, + "amount": 17, + }, + ], + } + ] result = create_products_as_new_nodes(data) assert len(data) == 2 - assert result[1]['name'] == 'beta' + assert result[1]["name"] == "beta" def test_create_products_as_new_nodes_inherit_process_location(): - data = [{ - 'name': 'alpha', - 'database': 'foo', - 'location': 'here', - 'exchanges': [{ - 'name': 'beta', - 'unit': 'kg', - 'functional': True, - 'type': 'technosphere', - 'extra': True, - }] - }] + data = [ + { + "name": "alpha", + "database": "foo", + "location": "here", + "exchanges": [ + { + "name": "beta", + "unit": "kg", + "functional": True, + "type": "technosphere", + "extra": True, + } + ], + } + ] result = create_products_as_new_nodes(data) assert len(data) == 2 product = { - 'database': 'foo', - 'code': result[1]['code'], - 'name': 'beta', - 'unit': 'kg', - 'location': 'here', - 'exchanges': [], - 'type': bd.labels.product_node_default, - 'extra': True, + "database": "foo", + "code": result[1]["code"], + "name": "beta", + "unit": "kg", + "location": "here", + "exchanges": [], + "type": bd.labels.product_node_default, + "extra": True, } assert result[1] == product def test_create_products_as_new_nodes_inherit_process_unit(): - data = [{ - 'name': 'alpha', - 'database': 'foo', - 'unit': 'kg', - 'exchanges': [{ - 'name': 'beta', - 'location': 'here', - 'functional': True, - 'type': 'technosphere', - 'extra': True, - }] - }] + data = [ + { + "name": "alpha", + "database": "foo", + "unit": "kg", + "exchanges": [ + { + "name": "beta", + "location": "here", + "functional": True, + "type": "technosphere", + "extra": True, + } + ], + } + ] result = create_products_as_new_nodes(data) assert len(data) == 2 product = { - 'database': 'foo', - 'code': result[1]['code'], - 'name': 'beta', - 'unit': 'kg', - 'location': 'here', - 'exchanges': [], - 'type': bd.labels.product_node_default, - 'extra': True, + "database": "foo", + "code": result[1]["code"], + "name": "beta", + "unit": "kg", + "location": "here", + "exchanges": [], + "type": bd.labels.product_node_default, + "extra": True, } assert result[1] == product def test_create_products_as_new_nodes_inherit_process_location_when_searching(): - data = [{ - 'name': 'beta', - 'location': 'here', - }, { - 'name': 'alpha', - 'database': 'foo', - 'location': 'here', - 'exchanges': [{ - 'name': 'beta', - 'unit': 'kg', - 'functional': True, - 'type': 'technosphere', - 'extra': True, - }] - }] + data = [ + { + "name": "beta", + "location": "here", + }, + { + "name": "alpha", + "database": "foo", + "location": "here", + "exchanges": [ + { + "name": "beta", + "unit": "kg", + "functional": True, + "type": "technosphere", + "extra": True, + } + ], + }, + ] create_products_as_new_nodes(data) assert len(data) == 2 def test_create_products_as_new_nodes_get_default_global_location(): - data = [{ - 'name': 'alpha', - 'database': 'foo', - 'exchanges': [{ - 'name': 'beta', - 'unit': 'kg', - 'functional': True, - 'type': 'technosphere', - 'extra': True, - }] - }] + data = [ + { + "name": "alpha", + "database": "foo", + "exchanges": [ + { + "name": "beta", + "unit": "kg", + "functional": True, + "type": "technosphere", + "extra": True, + } + ], + } + ] result = create_products_as_new_nodes(data) assert len(data) == 2 product = { - 'database': 'foo', - 'code': result[1]['code'], - 'name': 'beta', - 'unit': 'kg', - 'location': bd.config.global_location, - 'exchanges': [], - 'type': bd.labels.product_node_default, - 'extra': True, + "database": "foo", + "code": result[1]["code"], + "name": "beta", + "unit": "kg", + "location": bd.config.global_location, + "exchanges": [], + "type": bd.labels.product_node_default, + "extra": True, } assert result[1] == product def test_create_products_as_new_nodes_dataset_must_have_database_key(): - data = [{ - 'name': 'alpha', - 'exchanges': [{ - 'name': 'beta', - 'unit': 'kg', - 'functional': True, - 'type': 'technosphere', - }] - }] + data = [ + { + "name": "alpha", + "exchanges": [ + { + "name": "beta", + "unit": "kg", + "functional": True, + "type": "technosphere", + } + ], + } + ] with pytest.raises(KeyError): create_products_as_new_nodes(data)