From 4f45f90045b29067a12202b15fd2e59f23fa25e6 Mon Sep 17 00:00:00 2001 From: andersonfrailey Date: Sat, 2 Nov 2019 18:23:52 -0400 Subject: [PATCH] Create individual calculators --- taxbrain/taxbrain.py | 81 +++++++++++++++++++++++++++++------- taxbrain/tests/conftest.py | 18 ++++++-- taxbrain/tests/test_brain.py | 13 ++++++ 3 files changed, 92 insertions(+), 20 deletions(-) diff --git a/taxbrain/taxbrain.py b/taxbrain/taxbrain.py index 6bce5a3..7eb978c 100644 --- a/taxbrain/taxbrain.py +++ b/taxbrain/taxbrain.py @@ -91,7 +91,8 @@ def __init__(self, start_year: int, end_year: int = LAST_BUDGET_YEAR, self.has_run = False - def run(self, varlist: list = DEFAULT_VARIABLES): + def run(self, varlist: list = DEFAULT_VARIABLES, + cs_run: bool = False): """ Run the calculators. TaxBrain will determine whether to do a static or partial equilibrium run based on the user's inputs when initializing @@ -99,25 +100,42 @@ def run(self, varlist: list = DEFAULT_VARIABLES): Parameters ---------- varlist: list of variables from the microdata to be stored in each year + cs_run: Boolean indicator for whether or not Tax-Brain is being run on + the Compute Studio servers. If True, TaxBrain will create + calculators in each process that is running, rather than + create one set of calculators and pass them to each process Returns ------- None """ - base_calc, reform_calc = self._make_calculators() if not isinstance(varlist, list): msg = f"'varlist' is of type {type(varlist)}. Must be a list." raise TypeError(msg) - if self.params["behavior"]: - if self.verbose: - print("Running dynamic simulations") - self._dynamic_run(varlist, base_calc, reform_calc) + if "s006" not in varlist: + varlist.append("s006") + if cs_run: + if self.params["behavior"]: + run_func = self._cs_dynamic_run + else: + run_func = self._cs_static_run + delay = [ + delayed(self._cs_run(varlist, run_func, year)) + for year in range(self.start_year, self.end_year + 1) + ] + compute(*delay) else: - if self.verbose: - print("Running static simulations") - self._static_run(varlist, base_calc, reform_calc) - setattr(self, "has_run", True) + base_calc, reform_calc = self._make_calculators() + if self.params["behavior"]: + if self.verbose: + print("Running dynamic simulations") + self._dynamic_run(varlist, base_calc, reform_calc) + else: + if self.verbose: + print("Running static simulations") + self._static_run(varlist, base_calc, reform_calc) - del base_calc, reform_calc + del base_calc, reform_calc + setattr(self, "has_run", True) def weighted_totals(self, var: str) -> pd.DataFrame: """ @@ -246,13 +264,45 @@ def differences_table(self, year: int, groupby: str, tax_to_diff: str, return table # ----- private methods ----- + def _cs_run(self, varlist, run_func, year): + """ + Function for improving the memory usage of TaxBrain on Compute Studio + Parameters + ---------- + varlist: Variables from Tax-Calculator that will be saved + year: year the calculator needs to run + """ + base_calc, reform_calc = self._make_calculators() + base_calc.advance_to_year(year) + reform_calc.advance_to_year(year) + run_func(varlist, base_calc, reform_calc, year) + del base_calc, reform_calc + + def _cs_static_run(self, varlist, base_calc, reform_calc, year): + """ + Function for running a static simulation on the Compute Studio servers + """ + delay = [delayed(base_calc.calc_all()), + delayed(reform_calc.calc_all())] + compute(*delay) + self.base_data[year] = base_calc.dataframe(varlist) + self.reform_data[year] = reform_calc.dataframe(varlist) + + def _cs_dynamic_run(self, varlist, base_calc, reform_calc, year): + """ + Function for runnnig a dynamic simulation on the Compute Studio servers + """ + base, reform = behresp.response(base_calc, reform_calc, + self.params["behavior"], + dump=True) + self.base_data[year] = base[varlist] + self.reform_data[year] = reform[varlist] + del base, reform + def _static_run(self, varlist, base_calc, reform_calc): """ Run the calculator for a static analysis """ - if "s006" not in varlist: # ensure weight is always included - varlist.append("s006") - for yr in range(self.start_year, self.end_year + 1): base_calc.advance_to_year(yr) reform_calc.advance_to_year(yr) @@ -267,8 +317,6 @@ def _dynamic_run(self, varlist, base_calc, reform_calc): """ Run a dynamic response """ - if "s006" not in varlist: # ensure weight is always included - varlist.append("s006") for year in range(self.start_year, self.end_year + 1): base_calc.advance_to_year(year) reform_calc.advance_to_year(year) @@ -277,6 +325,7 @@ def _dynamic_run(self, varlist, base_calc, reform_calc): dump=True) self.base_data[year] = base[varlist] self.reform_data[year] = reform[varlist] + del base, reform def _process_user_mods(self, reform, assump): """ diff --git a/taxbrain/tests/conftest.py b/taxbrain/tests/conftest.py index ba119ac..7588598 100644 --- a/taxbrain/tests/conftest.py +++ b/taxbrain/tests/conftest.py @@ -7,6 +7,16 @@ CUR_PATH = os.path.abspath(os.path.dirname(__file__)) +@pytest.fixture(scope="session") +def start_year(): + return 2018 + + +@pytest.fixture(scope="session") +def end_year(): + return 2019 + + @pytest.fixture(scope="session") def reform_json_str(): reform = """ @@ -43,13 +53,13 @@ def assump_json_str(): @pytest.fixture(scope="session",) -def tb_static(reform_json_str): - return TaxBrain(2018, 2019, use_cps=True, reform=reform_json_str) +def tb_static(reform_json_str, start_year, end_year): + return TaxBrain(start_year, end_year, use_cps=True, reform=reform_json_str) @pytest.fixture(scope="session") -def tb_dynamic(reform_json_str): - return TaxBrain(2018, 2019, use_cps=True, reform=reform_json_str, +def tb_dynamic(reform_json_str, start_year, end_year): + return TaxBrain(start_year, end_year, use_cps=True, reform=reform_json_str, behavior={"sub": 0.25}) diff --git a/taxbrain/tests/test_brain.py b/taxbrain/tests/test_brain.py index dd516a6..e9f54b3 100644 --- a/taxbrain/tests/test_brain.py +++ b/taxbrain/tests/test_brain.py @@ -2,6 +2,7 @@ import pytest import pandas as pd import numpy as np +from pandas.util.testing import assert_frame_equal from taxbrain import TaxBrain @@ -120,3 +121,15 @@ def test_user_input(reform_json_str, assump_json_str): TaxBrain(2018, 2020, use_cps=True, reform=True) with pytest.raises(TypeError): TaxBrain(2018, 2020, use_cps=True, assump=True) + + +def test_cs_run(tb_static, reform_json_str, start_year, end_year): + """ + Test the Compute Studio run methods to ensure we get the same results as + with the traditional methods + """ + tb = TaxBrain(start_year, end_year, use_cps=True, reform=reform_json_str) + tb.run(cs_run=True) + for year in range(start_year, end_year + 1): + assert_frame_equal(tb.base_data[year], tb_static.base_data[year]) + assert_frame_equal(tb.reform_data[year], tb_static.reform_data[year])