Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Create individual calculators in Compute Studio runs #95

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 65 additions & 16 deletions taxbrain/taxbrain.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,33 +91,51 @@ def __init__(self, start_year: int, end_year: int = LAST_BUDGET_YEAR,

self.has_run = False

def run(self, varlist: list = DEFAULT_VARIABLES):
def run(self, varlist: list = DEFAULT_VARIABLES,
cs_run: bool = False):
"""
Run the calculators. TaxBrain will determine whether to do a static or
partial equilibrium run based on the user's inputs when initializing
the TaxBrain object.
Parameters
----------
varlist: list of variables from the microdata to be stored in each year
cs_run: Boolean indicator for whether or not Tax-Brain is being run on
the Compute Studio servers. If True, TaxBrain will create
calculators in each process that is running, rather than
create one set of calculators and pass them to each process
Returns
-------
None
"""
base_calc, reform_calc = self._make_calculators()
if not isinstance(varlist, list):
msg = f"'varlist' is of type {type(varlist)}. Must be a list."
raise TypeError(msg)
if self.params["behavior"]:
if self.verbose:
print("Running dynamic simulations")
self._dynamic_run(varlist, base_calc, reform_calc)
if "s006" not in varlist:
varlist.append("s006")
if cs_run:
if self.params["behavior"]:
run_func = self._cs_dynamic_run
else:
run_func = self._cs_static_run
delay = [
delayed(self._cs_run(varlist, run_func, year))
for year in range(self.start_year, self.end_year + 1)
]
compute(*delay)
else:
if self.verbose:
print("Running static simulations")
self._static_run(varlist, base_calc, reform_calc)
setattr(self, "has_run", True)
base_calc, reform_calc = self._make_calculators()
if self.params["behavior"]:
if self.verbose:
print("Running dynamic simulations")
self._dynamic_run(varlist, base_calc, reform_calc)
else:
if self.verbose:
print("Running static simulations")
self._static_run(varlist, base_calc, reform_calc)

del base_calc, reform_calc
del base_calc, reform_calc
setattr(self, "has_run", True)

def weighted_totals(self, var: str) -> pd.DataFrame:
"""
Expand Down Expand Up @@ -246,13 +264,45 @@ def differences_table(self, year: int, groupby: str, tax_to_diff: str,
return table

# ----- private methods -----
def _cs_run(self, varlist, run_func, year):
"""
Function for improving the memory usage of TaxBrain on Compute Studio
Parameters
----------
varlist: Variables from Tax-Calculator that will be saved
year: year the calculator needs to run
"""
base_calc, reform_calc = self._make_calculators()
base_calc.advance_to_year(year)
reform_calc.advance_to_year(year)
run_func(varlist, base_calc, reform_calc, year)
del base_calc, reform_calc

def _cs_static_run(self, varlist, base_calc, reform_calc, year):
"""
Function for running a static simulation on the Compute Studio servers
"""
delay = [delayed(base_calc.calc_all()),
delayed(reform_calc.calc_all())]
compute(*delay)
self.base_data[year] = base_calc.dataframe(varlist)
self.reform_data[year] = reform_calc.dataframe(varlist)

def _cs_dynamic_run(self, varlist, base_calc, reform_calc, year):
"""
Function for runnnig a dynamic simulation on the Compute Studio servers
"""
base, reform = behresp.response(base_calc, reform_calc,
self.params["behavior"],
dump=True)
self.base_data[year] = base[varlist]
self.reform_data[year] = reform[varlist]
del base, reform

def _static_run(self, varlist, base_calc, reform_calc):
"""
Run the calculator for a static analysis
"""
if "s006" not in varlist: # ensure weight is always included
varlist.append("s006")

for yr in range(self.start_year, self.end_year + 1):
base_calc.advance_to_year(yr)
reform_calc.advance_to_year(yr)
Expand All @@ -267,8 +317,6 @@ def _dynamic_run(self, varlist, base_calc, reform_calc):
"""
Run a dynamic response
"""
if "s006" not in varlist: # ensure weight is always included
varlist.append("s006")
for year in range(self.start_year, self.end_year + 1):
base_calc.advance_to_year(year)
reform_calc.advance_to_year(year)
Expand All @@ -277,6 +325,7 @@ def _dynamic_run(self, varlist, base_calc, reform_calc):
dump=True)
self.base_data[year] = base[varlist]
self.reform_data[year] = reform[varlist]
del base, reform

def _process_user_mods(self, reform, assump):
"""
Expand Down
18 changes: 14 additions & 4 deletions taxbrain/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@
CUR_PATH = os.path.abspath(os.path.dirname(__file__))


@pytest.fixture(scope="session")
def start_year():
return 2018


@pytest.fixture(scope="session")
def end_year():
return 2019


@pytest.fixture(scope="session")
def reform_json_str():
reform = """
Expand Down Expand Up @@ -43,13 +53,13 @@ def assump_json_str():


@pytest.fixture(scope="session",)
def tb_static(reform_json_str):
return TaxBrain(2018, 2019, use_cps=True, reform=reform_json_str)
def tb_static(reform_json_str, start_year, end_year):
return TaxBrain(start_year, end_year, use_cps=True, reform=reform_json_str)


@pytest.fixture(scope="session")
def tb_dynamic(reform_json_str):
return TaxBrain(2018, 2019, use_cps=True, reform=reform_json_str,
def tb_dynamic(reform_json_str, start_year, end_year):
return TaxBrain(start_year, end_year, use_cps=True, reform=reform_json_str,
behavior={"sub": 0.25})


Expand Down
13 changes: 13 additions & 0 deletions taxbrain/tests/test_brain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest
import pandas as pd
import numpy as np
from pandas.util.testing import assert_frame_equal
from taxbrain import TaxBrain


Expand Down Expand Up @@ -120,3 +121,15 @@ def test_user_input(reform_json_str, assump_json_str):
TaxBrain(2018, 2020, use_cps=True, reform=True)
with pytest.raises(TypeError):
TaxBrain(2018, 2020, use_cps=True, assump=True)


def test_cs_run(tb_static, reform_json_str, start_year, end_year):
"""
Test the Compute Studio run methods to ensure we get the same results as
with the traditional methods
"""
tb = TaxBrain(start_year, end_year, use_cps=True, reform=reform_json_str)
tb.run(cs_run=True)
for year in range(start_year, end_year + 1):
assert_frame_equal(tb.base_data[year], tb_static.base_data[year])
assert_frame_equal(tb.reform_data[year], tb_static.reform_data[year])