diff --git a/openapscontrib/predict/__init__.py b/openapscontrib/predict/__init__.py index 9881863..e18363f 100644 --- a/openapscontrib/predict/__init__.py +++ b/openapscontrib/predict/__init__.py @@ -18,6 +18,7 @@ from predict import Schedule from predict import calculate_momentum_effect from predict import calculate_carb_effect +from predict import calculate_cob from predict import calculate_glucose_from_effects from predict import calculate_insulin_effect from predict import calculate_iob @@ -48,7 +49,15 @@ def display_device(device): # agp as a vendor. Return a list of classes which inherit from Use, # or are compatible with it: def get_uses(device, config): - return [glucose, glucose_from_effects, glucose_momentum_effect, scheiner_carb_effect, walsh_insulin_effect, walsh_iob] + return [ + glucose, + glucose_from_effects, + glucose_momentum_effect, + scheiner_carb_effect, + scheiner_cob, + walsh_insulin_effect, + walsh_iob + ] def _opt_date(timestamp): @@ -239,6 +248,73 @@ def main(self, args, app): return calculate_carb_effect(*args, **kwargs) +# noinspection PyPep8Naming +class scheiner_cob(Use): + """Predict unabsorbed carbohydrates, using the Scheiner GI curve + + """ + @staticmethod + def configure_app(app, parser): + parser.add_argument( + 'history', + help='JSON-encoded pump history data file, normalized by openapscontrib.mmhistorytools' + ) + + parser.add_argument( + '--absorption-time', + type=int, + nargs=argparse.OPTIONAL, + help='The total length of carbohydrate absorption in minutes' + ) + + parser.add_argument( + '--absorption-delay', + type=int, + nargs=argparse.OPTIONAL, + help='The delay time between a dosing event and when absorption begins' + ) + + def get_params(self, args): + params = super(scheiner_cob, self).get_params(args) + + args_dict = dict(**args.__dict__) + + for key in ('history', 'absorption_time', 'absorption_delay'): + value = args_dict.get(key) + if value is not None: + params[key] = value + + return params + + @staticmethod + def get_program(params): + """Parses params into history parser constructor arguments + + :param params: + :type params: dict + :return: + :rtype: tuple(list, dict) + """ + args = ( + _json_file(params['history']), + ) + + kwargs = dict() + + if params.get('absorption_time'): + kwargs.update(absorption_duration=int(params.get('absorption_time'))) + + if params.get('absorption_delay'): + kwargs.update(absorption_delay=int(params.get('absorption_delay'))) + + return args, kwargs + + def main(self, args, app): + args, kwargs = self.get_program(self.get_params(args)) + + return calculate_cob(*args, **kwargs) + + # noinspection PyPep8Naming class walsh_insulin_effect(Use): """Predict insulin effect on glucose, using Walsh's IOB algorithm diff --git a/openapscontrib/predict/predict.py b/openapscontrib/predict/predict.py index 1dee3ed..0e18423 100644 --- a/openapscontrib/predict/predict.py +++ b/openapscontrib/predict/predict.py @@ -2,6 +2,7 @@ import datetime from dateutil.parser import parse import math +from numpy import arange from operator import add from scipy.stats import linregress @@ -91,7 +92,7 @@ def carb_effect_curve(t, absorption_time): See: https://github.com/kenstack/GlucoDyn - :param t: The time in t since the carbs were eaten + :param t: The time in minutes since the carbs were eaten :type t: float :param absorption_time: The total absorption time of the carbohydrates in minutes :type absorption_time: int @@ -175,25 +176,33 @@ def integrate_iob(t0, t1, insulin_action_duration, t): return integral * dx / 3.0 -def sum_iob(t0, t1, insulin_action_duration, t, dt): +def sum_iob(t0, t1, insulin_action_duration, t, dt, absorption_delay=0): """Sums the percent IOB activity at a given time for a temp basal dose :param t0: The start time in minutes of the dose - :type t0: int + :type t0: float :param t1: The end time in minutes of the dose - :type t1: int + :type t1: float :param insulin_action_duration: The duration of insulin action (DIA) of the patient, in minutes :type insulin_action_duration: int :param t: The current time in minutes :type t: float + :param absorption_delay: The delay time before a dose begins absorption in minutes. Any value greater than 0 will + result in an IOB value appearing immediately after dosing that remains constant for the + specified time before decaying. + :type absorption_delay: int :param dt: The segment size over which to sum :return: The sum of IOB at time t, in percent """ - return reduce( - add, - [walsh_iob_curve(t - i, insulin_action_duration) for i in range(t0, t1 + dt, dt) if t - i >= 0], - 0 - ) + iob = 0 + + # Divide the dose into equal segments of dt, from t0 to t1 + for i in arange(t0, t1 + dt, dt): + if t + absorption_delay >= i: + segment = max(0, min(i + dt, t1) - i) / (t1 - t0) + iob += segment * walsh_iob_curve(t - i, insulin_action_duration) + + return iob def cumulative_bolus_effect_at_time(event, t, insulin_sensitivity, insulin_action_duration): @@ -249,7 +258,7 @@ def cumulative_temp_basal_effect_at_time(event, t, t0, t1, insulin_sensitivity, :return: :rtype: float """ - if t < 0: + if t < t0: return 0 int_iob = integrate_iob(t0, t1, insulin_action_duration * 60, t) @@ -274,6 +283,8 @@ def calculate_momentum_effect( :type dt: int :param prediction_time: The total length of forward trend extrapolation in minutes :type prediction_time: int + :param fit_points: The number of historical values to use to create the trend + :type fit_points: int :return: A list of relative blood glucose values and their timestamps :rtype: list(dict) """ @@ -397,11 +408,39 @@ def calculate_cob( :type absorption_duration: int :param absorption_delay: The delay time before a meal begins absorption in minutes :type absorption_delay: int - :return: A list of relative blood glucose values and their timestamps + :return: A list of remaining carbohydrate values and their timestamps :rtype: list(dict) """ - # TODO: - pass + if len(normalized_history) == 0: + return [] + + first_history_event = sorted(normalized_history, key=lambda e: e['start_at'])[0] + last_history_event = sorted(normalized_history, key=lambda e: e['end_at'])[-1] + last_history_datetime = ceil_datetime_at_minute_interval(parse(last_history_event['end_at']), dt) + simulation_start = floor_datetime_at_minute_interval(parse(first_history_event['start_at']), dt) + simulation_end = last_history_datetime + datetime.timedelta(minutes=(absorption_duration + absorption_delay)) + + simulation_minutes = range(0, int(math.ceil((simulation_end - simulation_start).total_seconds() / 60.0)) + dt, dt) + simulation_timestamps = [simulation_start + datetime.timedelta(minutes=m) for m in simulation_minutes] + simulation_count = len(simulation_minutes) + + carbs = [0.0] * simulation_count + + for history_event in normalized_history: + if history_event['unit'] == Unit.grams: + start_at = parse(history_event['start_at']) + + for i, timestamp in enumerate(simulation_timestamps): + t = (timestamp - start_at).total_seconds() / 60.0 - absorption_delay + + if t >= 0 - absorption_delay: + carbs[i] += history_event['amount'] * (1 - carb_effect_curve(t, absorption_duration)) + + return [{ + 'date': timestamp.isoformat(), + 'amount': carbs[i], + 'unit': Unit.grams + } for i, timestamp in enumerate(simulation_timestamps)] def calculate_insulin_effect( @@ -469,7 +508,7 @@ def calculate_insulin_effect( end_at = basal_dosing_end t0 = 0 - t1 = math.ceil((end_at - start_at).total_seconds() / 60.0) + t1 = (end_at - start_at).total_seconds() / 60.0 effect = cumulative_temp_basal_effect_at_time( history_event, @@ -498,7 +537,8 @@ def calculate_iob( absorption_delay=10, basal_dosing_end=None, start_at=None, - end_at=None + end_at=None, + visual_iob_only=True ): """Calculates insulin on board degradation according to Walsh's algorithm, from the latest history entry until 0 @@ -516,6 +556,9 @@ def calculate_iob( :type start_at: datetime.datetime :param end_at: A datetime override at which to end the output :type end_at: datetime.datetime + :param visual_iob_only: Whether the dose should appear as IOB immediately after delivery rather than waiting for the + absorption delay. You might want this to be False if you plan to integrate the area under + the resulting curve. :return: A list of IOB values and their timestamps :rtype: list(dict) """ @@ -547,15 +590,23 @@ def calculate_iob( if t < 0 - absorption_delay: continue elif history_event['unit'] == Unit.units: - effect = history_event['amount'] * walsh_iob_curve(t, insulin_duration_minutes) + if visual_iob_only or t >= 0: + effect = history_event['amount'] * walsh_iob_curve(t, insulin_duration_minutes) elif history_event['unit'] == Unit.units_per_hour: if history_event['type'] == 'TempBasal' and basal_dosing_end and end_at > basal_dosing_end: end_at = basal_dosing_end t0 = 0 - t1 = int(math.ceil((end_at - start_at).total_seconds() / 60.0)) + t1 = (end_at - start_at).total_seconds() / 60.0 - effect = history_event['amount'] / (60.0 / dt) * sum_iob(t0, t1, insulin_duration_minutes, t, dt) + effect = history_event['amount'] * (t1 - t0) / 60.0 * sum_iob( + t0, + t1, + insulin_duration_minutes, + t, + dt, + absorption_delay=(absorption_delay if visual_iob_only else 0) + ) else: continue diff --git a/openapscontrib/predict/version.py b/openapscontrib/predict/version.py index fa9c4ec..2792152 100644 --- a/openapscontrib/predict/version.py +++ b/openapscontrib/predict/version.py @@ -1 +1 @@ -__version__ = '0.0.6' +__version__ = '0.0.7' diff --git a/tests/predict_tests.py b/tests/predict_tests.py index 6bedfbc..29a8b74 100644 --- a/tests/predict_tests.py +++ b/tests/predict_tests.py @@ -605,6 +605,51 @@ def test_datetime_rounding(self): self.assertAlmostEqual(-0.12, effect[3]['amount'], delta=0.01) self.assertDictEqual({'date': '2015-07-13T16:15:00', 'amount': -40.0, 'unit': 'mg/dL'}, effect[-1]) + def test_datetime_rounding_basal(self): + normalized_history = [ + { + "type": "TempBasal", + "start_at": "2015-07-13T12:02:37", + "end_at": "2015-07-13T12:07:37", + "amount": 12.0, + "unit": "U/hour" + } + ] + + effect = calculate_insulin_effect( + normalized_history, + 4, + Schedule(self.insulin_sensitivities['sensitivities']) + ) + + self.assertDictEqual({'date': '2015-07-13T12:00:00', 'amount': 0.0, 'unit': 'mg/dL'}, effect[0]) + self.assertDictContainsSubset({'date': '2015-07-13T12:15:00', 'unit': 'mg/dL'}, effect[3]) + self.assertAlmostEqual(-1.10, effect[3]['amount'], delta=0.01) + self.assertDictEqual({'date': '2015-07-13T16:20:00', 'amount': -40.0, 'unit': 'mg/dL'}, effect[-1]) + + def test_irregular_basal_duration(self): + normalized_history = [ + { + "type": "TempBasal", + "start_at": "2015-07-13T12:02:37", + "end_at": "2015-07-13T12:05:16", + "amount": 22.641509433962263, + "unit": "U/hour" + } + ] + + effect = calculate_insulin_effect( + normalized_history, + 4, + Schedule(self.insulin_sensitivities['sensitivities']) + ) + + self.assertDictEqual({'date': '2015-07-13T12:00:00', 'amount': 0.0, 'unit': 'mg/dL'}, effect[0]) + self.assertDictContainsSubset({'date': '2015-07-13T12:15:00', 'unit': 'mg/dL'}, effect[3]) + self.assertAlmostEqual(-1.13, effect[3]['amount'], delta=0.01) + self.assertDictContainsSubset({'date': '2015-07-13T16:20:00', 'unit': 'mg/dL'}, effect[-1]) + self.assertAlmostEqual(-40.0, effect[-1]['amount'], delta=0.01) + def test_multiple_bolus(self): normalized_history = [ { @@ -889,7 +934,7 @@ def test_complicated_history(self): self.assertDictContainsSubset({'date': '2015-10-15T21:05:00', 'unit': 'mg/dL'}, effect[36]) self.assertAlmostEqual(-96.69, effect[36]['amount'], delta=0.01) self.assertDictContainsSubset({'date': '2015-10-16T02:40:00', 'unit': 'mg/dL'}, effect[-1]) - self.assertAlmostEqual(-591.15, effect[-1]['amount'], delta=0.01) + self.assertAlmostEqual(-589.61, effect[-1]['amount'], delta=0.01) class CalculateIOBTestCase(unittest.TestCase): @@ -980,14 +1025,57 @@ def test_square_bolus(self): iob = calculate_iob( normalized_history, - 4 + 4, + visual_iob_only=False ) + self.assertDictEqual({'date': '2015-07-13T12:00:00', 'amount': 0.0, 'unit': 'U'}, iob[0]) self.assertDictContainsSubset({'date': '2015-07-13T12:10:00', 'unit': 'U'}, iob[2]) self.assertAlmostEqual(0.083, iob[2]['amount'], delta=0.01) self.assertDictEqual({'date': '2015-07-13T17:10:00', 'amount': 0.0, 'unit': 'U'}, iob[-1]) self.assertEqual('2015-07-13T13:50:00', iob[22]['date']) - self.assertAlmostEqual(0.75, iob[24]['amount'], delta=0.01) + self.assertAlmostEqual(0.67, iob[24]['amount'], delta=0.01) + + iob = calculate_iob( + normalized_history, + 4, + visual_iob_only=True + ) + + self.assertDictContainsSubset({'date': '2015-07-13T12:00:00', 'unit': 'U'}, iob[0]) + self.assertAlmostEqual(0.083, iob[0]['amount'], delta=0.01) + self.assertDictContainsSubset({'date': '2015-07-13T12:10:00', 'unit': 'U'}, iob[2]) + self.assertAlmostEqual(0.25, iob[2]['amount'], delta=0.01) + self.assertDictEqual({'date': '2015-07-13T17:10:00', 'amount': 0.0, 'unit': 'U'}, iob[-1]) + self.assertEqual('2015-07-13T13:50:00', iob[22]['date']) + self.assertAlmostEqual(0.67, iob[24]['amount'], delta=0.01) + + def test_irregular_basal_duration(self): + normalized_history = [ + { + "type": "TempBasal", + "start_at": "2015-07-13T12:02:37", + "end_at": "2015-07-13T12:05:16", + "amount": 22.641509433962263, + "unit": "U/hour" + } + ] + + effect = calculate_iob( + normalized_history, + 4, + visual_iob_only=False + ) + + self.assertDictEqual({'date': '2015-07-13T12:00:00', 'amount': 0.0, 'unit': 'U'}, effect[0]) + self.assertDictEqual({'date': '2015-07-13T12:05:00', 'amount': 0.0, 'unit': 'U'}, effect[1]) + self.assertDictEqual({'date': '2015-07-13T12:10:00', 'amount': 0.0, 'unit': 'U'}, effect[2]) + self.assertDictContainsSubset({'date': '2015-07-13T12:15:00', 'unit': 'U'}, effect[3]) + self.assertAlmostEqual(1.00, effect[3]['amount'], delta=0.01) + self.assertDictContainsSubset({'date': '2015-07-13T14:10:00', 'unit': 'U'}, effect[26]) + self.assertAlmostEqual(0.48, effect[26]['amount'], delta=0.01) + self.assertDictContainsSubset({'date': '2015-07-13T16:20:00', 'unit': 'U'}, effect[-1]) + self.assertAlmostEqual(0.0, effect[-1]['amount'], delta=0.01) def test_carb_completion_with_ratio_change(self): normalized_history = [ @@ -1022,14 +1110,15 @@ def test_basal_dosing_end(self): iob = calculate_iob( normalized_history, 4, - basal_dosing_end=datetime(2015, 7, 17, 12, 30) + basal_dosing_end=datetime(2015, 7, 17, 12, 30), + visual_iob_only=False ) self.assertDictEqual({'date': '2015-07-17T17:10:00', 'amount': 0.0, 'unit': 'U'}, iob[-1]) self.assertDictEqual({'date': '2015-07-17T12:00:00', 'amount': 0.0, 'unit': 'U'}, iob[0]) self.assertDictEqual({'date': '2015-07-17T16:40:00', 'amount': 0.0, 'unit': 'U'}, iob[-7]) self.assertDictContainsSubset({'date': '2015-07-17T12:40:00', 'unit': 'U'}, iob[8]) - self.assertAlmostEqual(0.56, iob[8]['amount'], delta=0.01) + self.assertAlmostEqual(0.48, iob[8]['amount'], delta=0.01) def test_no_input_history(self): normalized_history = [] @@ -1068,7 +1157,8 @@ def test_complicated_history(self): effect = calculate_iob( normalized_history, - 4 + 4, + visual_iob_only=False ) self.assertDictEqual({'date': '2015-10-15T18:05:00', 'amount': 0.0, 'unit': 'U'}, effect[0]) @@ -1077,11 +1167,11 @@ def test_complicated_history(self): self.assertDictContainsSubset({'date': '2015-10-15T18:20:00', 'unit': 'U'}, effect[3]) self.assertAlmostEqual(-0.02, effect[3]['amount'], delta=0.01) self.assertDictContainsSubset({'date': '2015-10-15T19:05:00', 'unit': 'U'}, effect[12]) - self.assertAlmostEqual(-0.47, effect[12]['amount'], delta=0.01) + self.assertAlmostEqual(-0.39, effect[12]['amount'], delta=0.01) self.assertDictContainsSubset({'date': '2015-10-15T20:05:00', 'unit': 'U'}, effect[24]) - self.assertAlmostEqual(5.70, effect[24]['amount'], delta=0.01) + self.assertAlmostEqual(5.61, effect[24]['amount'], delta=0.01) self.assertDictContainsSubset({'date': '2015-10-15T21:05:00', 'unit': 'U'}, effect[36]) - self.assertAlmostEqual(7.27, effect[36]['amount'], delta=0.01) + self.assertAlmostEqual(6.92, effect[36]['amount'], delta=0.01) self.assertDictContainsSubset({'date': '2015-10-16T02:40:00', 'unit': 'U'}, effect[-1]) self.assertAlmostEqual(0, effect[-1]['amount'], delta=0.01) @@ -1092,11 +1182,12 @@ def test_start_at(self): effect = calculate_iob( normalized_history, 4, - start_at=datetime(2015, 10, 15, 22, 11, 00) + start_at=datetime(2015, 10, 15, 22, 11, 00), + visual_iob_only=False ) self.assertDictContainsSubset({'date': '2015-10-15T22:11:00', 'unit': 'U'}, effect[0]) - self.assertAlmostEqual(8.30, effect[0]['amount'], delta=0.01) + self.assertAlmostEqual(7.73, effect[0]['amount'], delta=0.01) self.assertDictContainsSubset({'date': '2015-10-16T02:41:00', 'unit': 'U'}, effect[-1]) self.assertAlmostEqual(0, effect[-1]['amount'], delta=0.01) @@ -1107,7 +1198,27 @@ def test_end_at(self): effect = calculate_iob( normalized_history, 4, - end_at=datetime(2015, 10, 15, 20, 11, 00) + end_at=datetime(2015, 10, 15, 20, 16, 00), + visual_iob_only=True + ) + + self.assertDictEqual({'date': '2015-10-15T18:05:00', 'amount': 0.0, 'unit': 'U'}, effect[0]) + self.assertDictContainsSubset({'date': '2015-10-15T18:10:00', 'unit': 'U'}, effect[1]) + self.assertAlmostEqual(-0.02, effect[1]['amount'], delta=0.01) + self.assertDictContainsSubset({'date': '2015-10-15T18:20:00', 'unit': 'U'}, effect[3]) + self.assertAlmostEqual(-0.15, effect[3]['amount'], delta=0.01) + self.assertDictContainsSubset({'date': '2015-10-15T19:05:00', 'unit': 'U'}, effect[12]) + self.assertAlmostEqual(-0.46, effect[12]['amount'], delta=0.01) + self.assertDictContainsSubset({'date': '2015-10-15T20:15:00', 'unit': 'U'}, effect[-2]) + self.assertAlmostEqual(9.27, effect[-2]['amount'], delta=0.01) + self.assertDictContainsSubset({'date': '2015-10-15T20:20:00', 'unit': 'U'}, effect[-1]) + self.assertAlmostEqual(9.13, effect[-1]['amount'], delta=0.01) + + effect = calculate_iob( + normalized_history, + 4, + end_at=datetime(2015, 10, 15, 20, 16, 00), + visual_iob_only=False ) self.assertDictEqual({'date': '2015-10-15T18:05:00', 'amount': 0.0, 'unit': 'U'}, effect[0]) @@ -1116,9 +1227,11 @@ def test_end_at(self): self.assertDictContainsSubset({'date': '2015-10-15T18:20:00', 'unit': 'U'}, effect[3]) self.assertAlmostEqual(-0.02, effect[3]['amount'], delta=0.01) self.assertDictContainsSubset({'date': '2015-10-15T19:05:00', 'unit': 'U'}, effect[12]) - self.assertAlmostEqual(-0.47, effect[12]['amount'], delta=0.01) - self.assertDictContainsSubset({'date': '2015-10-15T20:15:00', 'unit': 'U'}, effect[-1]) - self.assertAlmostEqual(9.26, effect[-1]['amount'], delta=0.01) + self.assertAlmostEqual(-0.39, effect[12]['amount'], delta=0.01) + self.assertDictContainsSubset({'date': '2015-10-15T20:15:00', 'unit': 'U'}, effect[-2]) + self.assertAlmostEqual(5.94, effect[-2]['amount'], delta=0.01) + self.assertDictContainsSubset({'date': '2015-10-15T20:20:00', 'unit': 'U'}, effect[-1]) + self.assertAlmostEqual(9.10, effect[-1]['amount'], delta=0.01) def test_start_at_end_at(self): with open(get_file_at_path("fixtures/normalize_history.json")) as fp: @@ -1128,13 +1241,14 @@ def test_start_at_end_at(self): normalized_history, 4, start_at=datetime(2015, 10, 15, 22, 11, 00), - end_at=datetime(2015, 10, 16, 00, 01, 50) + end_at=datetime(2015, 10, 16, 00, 01, 50), + visual_iob_only=False ) self.assertDictContainsSubset({'date': '2015-10-15T22:11:00', 'unit': 'U'}, effect[0]) - self.assertAlmostEqual(8.30, effect[0]['amount'], delta=0.01) + self.assertAlmostEqual(7.73, effect[0]['amount'], delta=0.01) self.assertDictContainsSubset({'date': '2015-10-16T00:06:00', 'unit': 'U'}, effect[-1]) - self.assertAlmostEqual(2.53, effect[-1]['amount'], delta=0.01) + self.assertAlmostEqual(2.16, effect[-1]['amount'], delta=0.01) def test_single_entry(self): with open(get_file_at_path("fixtures/normalize_history.json")) as fp: @@ -1144,14 +1258,15 @@ def test_single_entry(self): normalized_history, 4, start_at=datetime(2015, 10, 15, 22, 11, 00), - end_at=datetime(2015, 10, 15, 22, 11, 00) + end_at=datetime(2015, 10, 15, 22, 11, 00), + visual_iob_only=False ) self.assertListEqual( [{ 'date': '2015-10-15T22:11:00', 'unit': 'U', - 'amount': 8.306 + 'amount': 7.729 }], [e.update({'amount': round(e['amount'], 3)}) or e for e in effect] )