From a8fb0488da759408d010f7b05726038059fcf70f Mon Sep 17 00:00:00 2001 From: dgboss Date: Thu, 5 Dec 2024 10:22:54 -0800 Subject: [PATCH] Hourly FFMC (#4153) Co-authored-by: Conor Brady --- api/app/jobs/rdps_sfms.py | 2 +- api/app/jobs/sfms_calculations.py | 42 ++++- api/app/sfms/daily_fwi_processor.py | 5 +- api/app/sfms/fwi_processor.py | 6 +- api/app/sfms/hourly_ffmc_processor.py | 92 +++++++++++ api/app/sfms/raster_addresser.py | 64 +++++++- api/app/tests/dataset_common.py | 39 +++++ api/app/tests/jobs/test_sfms_calculations.py | 2 + .../tests/sfms/test_daily_fwi_processor.py | 48 +----- .../tests/sfms/test_hourly_ffmc_processor.py | 152 ++++++++++++++++++ api/app/tests/sfms/test_raster_addresser.py | 40 +++++ .../rdps_filename_marshaller.py | 15 ++ 12 files changed, 452 insertions(+), 55 deletions(-) create mode 100644 api/app/sfms/hourly_ffmc_processor.py create mode 100644 api/app/tests/sfms/test_hourly_ffmc_processor.py diff --git a/api/app/jobs/rdps_sfms.py b/api/app/jobs/rdps_sfms.py index 2538a103b..3e8ee931f 100644 --- a/api/app/jobs/rdps_sfms.py +++ b/api/app/jobs/rdps_sfms.py @@ -37,7 +37,7 @@ DAYS_TO_RETAIN = 7 -MAX_MODEL_RUN_HOUR = 37 +MAX_MODEL_RUN_HOUR = 45 GRIB_LAYERS = {"temp": "TMP_TGL_2", "rh": "RH_TGL_2", "precip": "APCP_SFC_0", "wind_speed": "WIND_TGL_10"} diff --git a/api/app/jobs/sfms_calculations.py b/api/app/jobs/sfms_calculations.py index 2deb590e7..37d42b5d9 100644 --- a/api/app/jobs/sfms_calculations.py +++ b/api/app/jobs/sfms_calculations.py @@ -6,8 +6,10 @@ from app import configure_logging from app.geospatial.wps_dataset import multi_wps_dataset_context +from app.jobs.rdps_sfms import MAX_MODEL_RUN_HOUR from app.rocketchat_notifications import send_rocketchat_notification from app.sfms.daily_fwi_processor import DailyFWIProcessor +from app.sfms.hourly_ffmc_processor import HourlyFFMCProcessor from app.sfms.raster_addresser import RasterKeyAddresser from app.utils.s3_client import S3Client from app.utils.time import get_utc_now @@ -18,12 +20,44 @@ class SFMSCalcJob: - async def calculate_daily_fwi(self, start_time: datetime): + async def calculate_fwi_rasters(self, start_time: datetime) -> None: """ - Entry point for processing SFMS daily FWI rasters. To run from a specific date manually in openshift, + Entry point for processing SFMS daily FWI rasters and hFFMC rasters. To run from a specific date manually in openshift, see openshift/sfms-calculate/README.md + + :param start_time: The RDPS model run time to use for processing. + """ + + await self.calculate_daily_fwi(start_time) + await self.calculate_hffmc(start_time) + + async def calculate_hffmc(self, start_time: datetime) -> None: + """ + Entry point for calculating hourly FFMC rasters. Uses a 04:00 or 16:00 PST (12:00 or 24:00 UTC) hFFMC raster from SFMS as a base input. + + :param start_time: The date time to use for processing. Calculations will begin at the most recent RDPS model run (00Z or 12Z). + """ + logger.info("Begin hFFMC raster calculations.") + + start_exec = get_utc_now() + + hffmc_processor = HourlyFFMCProcessor(start_time, RasterKeyAddresser()) + + async with S3Client() as s3_client: + await hffmc_processor.process(s3_client, multi_wps_dataset_context, MAX_MODEL_RUN_HOUR) + + # calculate the execution time. + execution_time = get_utc_now() - start_exec + hours, remainder = divmod(execution_time.seconds, 3600) + minutes, seconds = divmod(remainder, 60) + + logger.info(f"hFFMC raster processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds") + + async def calculate_daily_fwi(self, start_time: datetime): + """ + Entry point for processing SFMS daily FWI rasters. """ - logger.info(f"Begin BUI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward") + logger.info(f"Begin FWI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward") start_exec = get_utc_now() @@ -55,7 +89,7 @@ def main(): job = SFMSCalcJob() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - loop.run_until_complete(job.calculate_daily_fwi(start_time)) + loop.run_until_complete(job.calculate_fwi_rasters(start_time)) except Exception as e: logger.error("An exception occurred while processing SFMS raster calculations", exc_info=e) rc_message = ":scream: Encountered an error while processing SFMS raster data." diff --git a/api/app/sfms/daily_fwi_processor.py b/api/app/sfms/daily_fwi_processor.py index b958ff677..9d645175d 100644 --- a/api/app/sfms/daily_fwi_processor.py +++ b/api/app/sfms/daily_fwi_processor.py @@ -46,7 +46,7 @@ async def process( # Get and check existence of weather s3 keys temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys(self.start_datetime, datetime_to_calculate_utc, prediction_hour) - weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, precip_key) + weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, wind_speed_key, precip_key) if not weather_keys_exist: logging.warning(f"Missing weather keys for {model_run_for_hour(self.start_datetime.hour):02} model run") break @@ -76,6 +76,7 @@ async def process( precip_ds.close() rh_ds.close() temp_ds.close() + wind_speed_ds.close() # Create latitude and month arrays needed for calculations latitude_array = dmc_ds.generate_latitude_array() month_array = np.full(latitude_array.shape, datetime_to_calculate_utc.month) @@ -105,7 +106,7 @@ async def process( ) # Create and store FFMC dataset - ffmc_values, ffmc_no_data_value = calculate_ffmc(ffmc_ds, warped_temp_ds, warped_rh_ds, warped_wind_speed_ds, warped_precip_ds) + ffmc_values, ffmc_no_data_value = calculate_ffmc(ffmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, warped_wind_speed_ds) new_ffmc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.FFMC) new_ffmc_path = await s3_client.persist_raster_data( temp_dir, diff --git a/api/app/sfms/fwi_processor.py b/api/app/sfms/fwi_processor.py index 2f2ca2ead..0d1ff459e 100644 --- a/api/app/sfms/fwi_processor.py +++ b/api/app/sfms/fwi_processor.py @@ -65,8 +65,12 @@ def calculate_ffmc(previous_ffmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPS precip_array, _ = precip_ds.replace_nodata_with(0) wind_speed_array, _ = wind_speed_ds.replace_nodata_with(0) + # Due to warping of the rh dataset, rh values can exceed 100 which breaks the ffmc calculation. + # Set rh values greater than 100 to the max allowable which is 100. + rh_array[rh_array > 100] = 100 + start = perf_counter() - ffmc_values = vectorized_ffmc(previous_ffmc_array, temp_array, rh_array, precip_array, wind_speed_array) + ffmc_values = vectorized_ffmc(previous_ffmc_array, temp_array, rh_array, wind_speed_array, precip_array) logger.info("%f seconds to calculate vectorized ffmc", perf_counter() - start) nodata_mask, nodata_value = previous_ffmc_ds.get_nodata_mask() diff --git a/api/app/sfms/hourly_ffmc_processor.py b/api/app/sfms/hourly_ffmc_processor.py new file mode 100644 index 000000000..13ed119c7 --- /dev/null +++ b/api/app/sfms/hourly_ffmc_processor.py @@ -0,0 +1,92 @@ +import logging +import os +import tempfile +from datetime import datetime, timedelta +from osgeo import gdal +from typing import List, cast + +from app.weather_models.rdps_filename_marshaller import model_run_for_hour + +from app.geospatial.wps_dataset import WPSDataset +from app.jobs.rdps_sfms import MAX_MODEL_RUN_HOUR +from app.sfms.daily_fwi_processor import MultiDatasetContext +from app.sfms.fwi_processor import calculate_ffmc +from app.sfms.raster_addresser import RasterKeyAddresser +from app.utils.geospatial import GDALResamplingMethod +from app.utils.s3 import set_s3_gdal_config +from app.utils.s3_client import S3Client + + +logger = logging.getLogger(__name__) + + +class HourlyFFMCProcessor: + """ + Class for calculating/generating forecasted hourly FFMC rasters. + """ + + def __init__(self, start_datetime: datetime, addresser: RasterKeyAddresser): + self.start_datetime = start_datetime + self.addresser = addresser + + async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, hours_to_process: int = MAX_MODEL_RUN_HOUR): + set_s3_gdal_config() + + # hFFMC general process + # 1. Cron job kicks off the job and we use current UTC time as start time + # 2. Create HourlyFFMCProcessor with the start time and begin processing + # 3. Use job start time to determine most recent RDPS model run start time (date and 00z or 12z) + # 4. Use most recent RDPS model run start time to determine most recent hFFMC key to use as source which is always one hour before the RDPS start time (04 or 16 PDT) + # 5. Start calculating hFFMC from model run hour 0 through to 47. Save the calculated hFFMCs to S3. Most recently calculated hFFMC is used as input to the next hour's hFFMC calculation. + # 6. hFFMC rasters are saved to S3 with UTC based keys. + + # Determine most recent RDPS model run + rdps_model_run_hour = model_run_for_hour(self.start_datetime.hour) + rdps_model_run_start = datetime( + year=self.start_datetime.year, month=self.start_datetime.month, day=self.start_datetime.day, hour=rdps_model_run_hour, tzinfo=self.start_datetime.tzinfo + ) + + # Determine key to the initial/seed hFFMC from SFMS and check if it exists. Initial hffmc will be a 04 or 16 hour hffmc from SFMS. + hffmc_key = self.addresser.get_uploaded_hffmc_key(rdps_model_run_start) + hffmc_key_exists = await s3_client.all_objects_exist(hffmc_key) + if not hffmc_key_exists: + logger.warning(f"Missing initial hFFMC raster from SFMS for date {self.start_datetime}. Missing key is {hffmc_key}.") + return + + for hour in range(0, hours_to_process): + with tempfile.TemporaryDirectory() as temp_dir: + # Get and check existence of weather s3 keys + temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys_hffmc(rdps_model_run_start, hour) + weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, wind_speed_key, precip_key) + if not weather_keys_exist: + logging.warning(f"Missing weather keys for model run: {rdps_model_run_start} and hour {hour}") + break + + # Prefix our S3 keys for access via gdal + temp_key, rh_key, wind_speed_key, precip_key, hffmc_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, wind_speed_key, precip_key, hffmc_key) + with input_dataset_context([temp_key, rh_key, wind_speed_key, precip_key, hffmc_key]) as input_datasets: + input_datasets = cast(List[WPSDataset], input_datasets) # Ensure correct type inference + temp_ds, rh_ds, wind_speed_ds, precip_ds, hffmc_ds = input_datasets + # Warp weather datasets to match hffmc + warped_temp_ds = temp_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(temp_key)}", GDALResamplingMethod.BILINEAR) + warped_rh_ds = rh_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(rh_key)}", GDALResamplingMethod.BILINEAR) + warped_wind_speed_ds = wind_speed_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(wind_speed_key)}", GDALResamplingMethod.BILINEAR) + warped_precip_ds = precip_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(precip_key)}", GDALResamplingMethod.BILINEAR) + + # Create and store new hFFMC dataset + hffmc_values, hffmc_no_data_value = calculate_ffmc(hffmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, warped_wind_speed_ds) + new_hffmc_datetime = rdps_model_run_start + timedelta(hours=hour) + hffmc_key = self.addresser.get_calculated_hffmc_index_key(new_hffmc_datetime) + geotransform = hffmc_ds.as_gdal_ds().GetGeoTransform() + projection = hffmc_ds.as_gdal_ds().GetProjection() + hffmc_ds.close() + await s3_client.persist_raster_data( + temp_dir, + hffmc_key, + geotransform, + projection, + hffmc_values, + hffmc_no_data_value, + ) + # Clear gdal virtual file system cache of S3 metadata in order to allow newly uploaded hffmc rasters to be opened immediately. + gdal.VSICurlClearCache() diff --git a/api/app/sfms/raster_addresser.py b/api/app/sfms/raster_addresser.py index 3a99bd90a..c87f14609 100644 --- a/api/app/sfms/raster_addresser.py +++ b/api/app/sfms/raster_addresser.py @@ -1,10 +1,11 @@ import os import enum -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from zoneinfo import ZoneInfo from app import config +from app.utils.time import convert_utc_to_pdt from app.weather_models import ModelEnum -from app.weather_models.rdps_filename_marshaller import compose_computed_precip_rdps_key, compose_rdps_key +from app.weather_models.rdps_filename_marshaller import compose_computed_precip_rdps_key, compose_rdps_key, compose_rdps_key_hffmc class WeatherParameter(enum.Enum): @@ -36,15 +37,16 @@ class RasterKeyAddresser: def __init__(self): self.sfms_calculated_prefix = "sfms/calculated" self.s3_prefix = f"/vsis3/{config.get('OBJECT_STORE_BUCKET')}" + self.smfs_hourly_upload_prefix = "sfms/uploads/hourlies" self.sfms_upload_prefix = "sfms/uploads/actual" self.weather_model_prefix = f"weather_models/{ModelEnum.RDPS.lower()}" def get_uploaded_index_key(self, datetime_utc: datetime, fwi_param: FWIParameter): assert_all_utc(datetime_utc) iso_date = datetime_utc.date().isoformat() - return f"{self.sfms_upload_prefix}/{iso_date}/{fwi_param.value}{iso_date.replace('-', '')}.tif" + def get_calculated_index_key(self, datetime_utc: datetime, fwi_param: FWIParameter): """ Generates the calculated fire weather index key that points to the associated raster artifact in the object store. @@ -105,3 +107,59 @@ def gdal_prefix_keys(self, *keys): :return: A tuple of all strings provided, prefixed with vsis3/{bucket} """ return tuple(f"{self.s3_prefix}/{key}" for key in keys) + + def get_uploaded_hffmc_key(self, datetime_utc: datetime): + """ + Given the start time of an RDPS model run, return a key to the most recent hFFMC raster which will be + equivalent to RDPS model run start time minus one hour in PDT. Note that the hFFMC rasters are stored according + to PDT times. hFFMC keys will end with 04 or 16 for their hour. + + :param datetime_utc: The RDPS model run start date and time. + :return: A key to the most recent hFFMC raster. + """ + assert_all_utc(datetime_utc) + + # Convert utc into pdt and substract one hour to get hFFMC source raster time. sfms only produces hFFMC from Apr - Oct which is always PDT + datetime_pdt = convert_utc_to_pdt(datetime_utc) - timedelta(hours=1) + iso_date = datetime_pdt.date().isoformat() + return f"{self.smfs_hourly_upload_prefix}/{iso_date}/fine_fuel_moisture_code{iso_date.replace('-', '')}{datetime_pdt.hour:02d}.tif" + + def get_weather_data_keys_hffmc(self, rdps_model_run_start: datetime, offset_hour): + """ + Gets temp, rh, wind speed and calculated accumulated precip for the specified RDPS model run start date and hour. + + :param rdps_model_run_start: The RDPS model run start date and time. + :param offset_hour: The hour offset from the RDPS model run start hour. + :return: Keys to rasters in S3 storage for temp, rh, wind speed and calculated precip rasters. + """ + assert_all_utc(rdps_model_run_start) + non_precip_keys = tuple(self.get_model_data_key_hffmc(rdps_model_run_start, offset_hour, param) for param in WeatherParameter) + datetime_to_calculate_utc = rdps_model_run_start + timedelta(hours=offset_hour) + precip_key = self.get_calculated_precip_key(datetime_to_calculate_utc) + all_weather_data_keys = non_precip_keys + (precip_key,) + return all_weather_data_keys + + def get_model_data_key_hffmc(self, rdps_model_run_start: datetime, offset_hour: int, weather_param: WeatherParameter): + """ + Gets a S3 key for the weather parameter of interest for the specified RDPS model run start date and time at the provided offset. + + :param rdps_model_run_start: The RDPS model run start date and time. + :param offset_hour: The hour offset from the RDPS model run start hour. + :param weather_param: The weather parameter of interest (temp, rh, or wind speed). + :return: A key to a raster in S3 storage. + """ + assert_all_utc(rdps_model_run_start) + weather_model_date_prefix = f"{self.weather_model_prefix}/{rdps_model_run_start.date().isoformat()}/" + return os.path.join(weather_model_date_prefix, compose_rdps_key_hffmc(rdps_model_run_start, offset_hour, weather_param.value)) + + def get_calculated_hffmc_index_key(self, datetime_utc: datetime): + """ + Given a UTC datetime return a calculated key based on PDT time as hFFMC rasters are named according to PDT. + + :param datetime_utc: A UTC datetime. + :return: An S3 key for hFFMC using PDT time. + """ + assert_all_utc(datetime_utc) + iso_date = datetime_utc.date().isoformat() + weather_param_prefix = "fine_fuel_moisture_code" + return f"{self.sfms_calculated_prefix}/hourlies/{iso_date}/{weather_param_prefix}{iso_date.replace('-', '')}{datetime_utc.hour:02d}.tif" \ No newline at end of file diff --git a/api/app/tests/dataset_common.py b/api/app/tests/dataset_common.py index 32b735378..961f51ac1 100644 --- a/api/app/tests/dataset_common.py +++ b/api/app/tests/dataset_common.py @@ -1,5 +1,7 @@ +from contextlib import ExitStack, contextmanager import numpy as np from osgeo import osr, gdal +from typing import List import uuid from app.geospatial.wps_dataset import WPSDataset @@ -47,3 +49,40 @@ def create_mock_gdal_dataset(): def create_mock_wps_dataset(): mock_ds = create_mock_gdal_dataset() return WPSDataset(ds=mock_ds, ds_path=None) + +def create_mock_wps_datasets(num: int) -> List[WPSDataset]: + return [create_mock_wps_dataset() for _ in range(num)] + + +def create_mock_input_dataset_context(num: int): + input_datasets = create_mock_wps_datasets(num) + + @contextmanager + def mock_input_dataset_context(_: List[str]): + try: + # Enter each dataset's context and yield the list of instances + with ExitStack() as stack: + yield [stack.enter_context(ds) for ds in input_datasets] + finally: + # Close all datasets to ensure cleanup + for ds in input_datasets: + ds.close() + + return input_datasets, mock_input_dataset_context + + +def create_mock_new_ds_context(number_of_datasets: int): + new_datasets = create_mock_wps_datasets(number_of_datasets) + + @contextmanager + def mock_new_datasets_context(_: List[str]): + try: + # Enter each dataset's context and yield the list of instances + with ExitStack() as stack: + yield [stack.enter_context(ds) for ds in new_datasets] + finally: + # Close all datasets to ensure cleanup + for ds in new_datasets: + ds.close() + + return new_datasets, mock_new_datasets_context diff --git a/api/app/tests/jobs/test_sfms_calculations.py b/api/app/tests/jobs/test_sfms_calculations.py index 478cf104d..2e68ad43e 100644 --- a/api/app/tests/jobs/test_sfms_calculations.py +++ b/api/app/tests/jobs/test_sfms_calculations.py @@ -28,6 +28,7 @@ async def mock_job_error(): def test_sfms_calc_job_cli_arg(monkeypatch, mocker: MockerFixture): daily_fwi_calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_daily_fwi", return_value=None) + hffmc_calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_hffmc", return_value=None) test_datetime = "2024-10-10 5" monkeypatch.setattr("sys.argv", ["sfms_calculations.py", test_datetime]) @@ -35,6 +36,7 @@ def test_sfms_calc_job_cli_arg(monkeypatch, mocker: MockerFixture): sfms_calculations.main() daily_fwi_calc_spy.assert_called_once_with(datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc)) + hffmc_calc_spy.assert_called_once_with(datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc)) @pytest.mark.anyio diff --git a/api/app/tests/sfms/test_daily_fwi_processor.py b/api/app/tests/sfms/test_daily_fwi_processor.py index 0f0bf8dc1..db22c9425 100644 --- a/api/app/tests/sfms/test_daily_fwi_processor.py +++ b/api/app/tests/sfms/test_daily_fwi_processor.py @@ -1,6 +1,4 @@ -from contextlib import ExitStack, contextmanager from datetime import datetime, timedelta, timezone -from typing import List from unittest.mock import AsyncMock import pytest @@ -10,7 +8,7 @@ from app.sfms import daily_fwi_processor from app.sfms.daily_fwi_processor import DailyFWIProcessor from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser -from app.tests.dataset_common import create_mock_gdal_dataset, create_mock_wps_dataset +from app.tests.dataset_common import create_mock_gdal_dataset, create_mock_input_dataset_context, create_mock_new_ds_context from app.utils.geospatial import GDALResamplingMethod from app.utils.s3_client import S3Client @@ -19,44 +17,6 @@ EXPECTED_SECOND_DAY = TEST_DATETIME.replace(hour=20, minute=0, second=0, microsecond=0) + timedelta(days=1) -def create_mock_wps_datasets(num: int) -> List[WPSDataset]: - return [create_mock_wps_dataset() for _ in range(num)] - - -def create_mock_input_dataset_context(): - input_datasets = create_mock_wps_datasets(7) - - @contextmanager - def mock_input_dataset_context(_: List[str]): - try: - # Enter each dataset's context and yield the list of instances - with ExitStack() as stack: - yield [stack.enter_context(ds) for ds in input_datasets] - finally: - # Close all datasets to ensure cleanup - for ds in input_datasets: - ds.close() - - return input_datasets, mock_input_dataset_context - - -def create_mock_new_ds_context(number_of_datasets: int): - new_datasets = create_mock_wps_datasets(number_of_datasets) - - @contextmanager - def mock_new_datasets_context(_: List[str]): - try: - # Enter each dataset's context and yield the list of instances - with ExitStack() as stack: - yield [stack.enter_context(ds) for ds in new_datasets] - finally: - # Close all datasets to ensure cleanup - for ds in new_datasets: - ds.close() - - return new_datasets, mock_new_datasets_context - - @pytest.mark.anyio async def test_daily_fwi_processor(mocker: MockerFixture): mock_key_addresser = RasterKeyAddresser() @@ -64,11 +24,11 @@ async def test_daily_fwi_processor(mocker: MockerFixture): get_weather_data_key_spy = mocker.spy(mock_key_addresser, "get_weather_data_keys") gdal_prefix_keys_spy = mocker.spy(mock_key_addresser, "gdal_prefix_keys") get_calculated_index_key_spy = mocker.spy(mock_key_addresser, "get_calculated_index_key") + fwi_processor = DailyFWIProcessor(TEST_DATETIME, 2, mock_key_addresser) - # mock/spy dataset storage # mock weather index, param datasets used for calculations - input_datasets, mock_input_dataset_context = create_mock_input_dataset_context() + input_datasets, mock_input_dataset_context = create_mock_input_dataset_context(7) mock_temp_ds, mock_rh_ds, mock_precip_ds, mock_wind_speed_ds, mock_dc_ds, mock_dmc_ds, mock_ffmc_ds = input_datasets temp_ds_spy = mocker.spy(mock_temp_ds, "warp_to_match") rh_ds_spy = mocker.spy(mock_rh_ds, "warp_to_match") @@ -234,7 +194,7 @@ async def test_no_weather_keys_exist(side_effect_1: bool, side_effect_2: bool, m mocker.patch.object(mock_s3_client, "all_objects_exist", side_effect=[side_effect_1, side_effect_2]) - _, mock_input_dataset_context = create_mock_input_dataset_context() + _, mock_input_dataset_context = create_mock_input_dataset_context(7) _, mock_new_dmc_dc_datasets_context = create_mock_new_ds_context(2) _, mock_new_ffmc_dataset_context = create_mock_new_ds_context(1) diff --git a/api/app/tests/sfms/test_hourly_ffmc_processor.py b/api/app/tests/sfms/test_hourly_ffmc_processor.py new file mode 100644 index 000000000..fbead32f0 --- /dev/null +++ b/api/app/tests/sfms/test_hourly_ffmc_processor.py @@ -0,0 +1,152 @@ +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock + +import pytest +from pytest_mock import MockerFixture + +from app.geospatial.wps_dataset import WPSDataset +from app.sfms import hourly_ffmc_processor +from app.sfms.hourly_ffmc_processor import HourlyFFMCProcessor +from app.sfms.raster_addresser import RasterKeyAddresser +from app.tests.dataset_common import create_mock_gdal_dataset, create_mock_input_dataset_context, create_mock_new_ds_context +from app.utils.geospatial import GDALResamplingMethod +from app.utils.s3_client import S3Client + +TEST_DATETIME = datetime(2024, 10, 10, 10, tzinfo=timezone.utc) +RDPS_MODEL_RUN_DATETIME = datetime(2024, 10, 10, 0, tzinfo=timezone.utc) + + +@pytest.mark.anyio +async def test_source_hffmc_key_not_exist(mocker: MockerFixture): + mock_s3_client = S3Client() + mocker.patch.object(mock_s3_client, "all_objects_exist", side_effect=[False]) + _, mock_input_dataset_context = create_mock_new_ds_context(2) + + # calculation spies + calculate_hffmc_spy = mocker.spy(hourly_ffmc_processor, "calculate_ffmc") + + hffmc_processor = HourlyFFMCProcessor(TEST_DATETIME, RasterKeyAddresser()) + await hffmc_processor.process(mock_s3_client, mock_input_dataset_context) + + calculate_hffmc_spy.assert_not_called() + + +@pytest.mark.anyio +async def test_no_weather_keys_exist(mocker: MockerFixture): + mock_s3_client = S3Client() + mocker.patch.object(mock_s3_client, "all_objects_exist", side_effect=[True, False]) + _, mock_input_dataset_context = create_mock_new_ds_context(2) + + # calculation spy + calculate_hffmc_spy = mocker.spy(hourly_ffmc_processor, "calculate_ffmc") + + hffmc_processor = HourlyFFMCProcessor(TEST_DATETIME, RasterKeyAddresser()) + await hffmc_processor.process(mock_s3_client, mock_input_dataset_context) + + calculate_hffmc_spy.assert_not_called() + + +@pytest.mark.anyio +async def test_hourly_ffmc_processor(mocker: MockerFixture): + num_hours_to_process = 2 + mock_key_addresser = RasterKeyAddresser() + # key address spies + get_weather_data_keys_hffmc_spy = mocker.spy(mock_key_addresser, "get_weather_data_keys_hffmc") + gdal_prefix_keys_spy = mocker.spy(mock_key_addresser, "gdal_prefix_keys") + get_uploaded_hffmc_key_spy = mocker.spy(mock_key_addresser, "get_uploaded_hffmc_key") + get_calculated_hffmc_index_key_spy = mocker.spy(mock_key_addresser, "get_calculated_hffmc_index_key") + + # The processor instance + hffmc_processor = HourlyFFMCProcessor(TEST_DATETIME, mock_key_addresser) + + # mock weather index and source hffmc dataset used for calculations + input_datasets, mock_input_dataset_context = create_mock_input_dataset_context(5) + mock_temp_ds, mock_rh_ds, mock_precip_ds, mock_wind_speed_ds, mock_hffmc_ds = input_datasets + + # dataset spies + temp_ds_spy = mocker.spy(mock_temp_ds, "warp_to_match") + rh_ds_spy = mocker.spy(mock_rh_ds, "warp_to_match") + wind_speed_ds_spy = mocker.spy(mock_wind_speed_ds, "warp_to_match") + precip_ds_spy = mocker.spy(mock_precip_ds, "warp_to_match") + + # mock gdal open + mocker.patch("osgeo.gdal.Open", return_value=create_mock_gdal_dataset()) + + # calculation spy + calculate_hffmc_spy = mocker.spy(hourly_ffmc_processor, "calculate_ffmc") + + async with S3Client() as mock_s3_client: + # mock s3 client + mock_all_objects_exist = AsyncMock(return_value=True) + mocker.patch.object(mock_s3_client, "all_objects_exist", new=mock_all_objects_exist) + persist_raster_spy = mocker.patch.object(mock_s3_client, "persist_raster_data", return_value="test_key.tif") + + await hffmc_processor.process(mock_s3_client, mock_input_dataset_context, num_hours_to_process) + + # Verify weather model keys and actual keys are checked for both days + assert mock_all_objects_exist.call_count == num_hours_to_process + 1 + + # Verify retrivel of hffmc + assert get_uploaded_hffmc_key_spy.call_args_list == [mocker.call(RDPS_MODEL_RUN_DATETIME)] + + # Verify the arguments for each call for get_weather_data_keys + assert get_weather_data_keys_hffmc_spy.call_args_list == [ + mocker.call(RDPS_MODEL_RUN_DATETIME, 0), + mocker.call(RDPS_MODEL_RUN_DATETIME, 1), + ] + + # Verify the arguments for each call for gdal_prefix_keys + assert gdal_prefix_keys_spy.call_args_list == [ + # first hour weather models and source hffmc + mocker.call( + "weather_models/rdps/2024-10-10/00/temp/CMC_reg_TMP_TGL_2_ps10km_2024101000_P000.grib2", + "weather_models/rdps/2024-10-10/00/rh/CMC_reg_RH_TGL_2_ps10km_2024101000_P000.grib2", + "weather_models/rdps/2024-10-10/00/wind_speed/CMC_reg_WIND_TGL_10_ps10km_2024101000_P000.grib2", + "weather_models/rdps/2024-10-10/00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20241010_00z.tif", + "sfms/uploads/hourlies/2024-10-09/fine_fuel_moisture_code2024100916.tif", + ), + mocker.call( + "weather_models/rdps/2024-10-10/00/temp/CMC_reg_TMP_TGL_2_ps10km_2024101000_P001.grib2", + "weather_models/rdps/2024-10-10/00/rh/CMC_reg_RH_TGL_2_ps10km_2024101000_P001.grib2", + "weather_models/rdps/2024-10-10/00/wind_speed/CMC_reg_WIND_TGL_10_ps10km_2024101000_P001.grib2", + "weather_models/rdps/2024-10-10/00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20241010_01z.tif", + "sfms/calculated/hourlies/2024-10-10/fine_fuel_moisture_code2024101000.tif", + ), + ] + + # Verify calculated keys are generated in order + assert get_calculated_hffmc_index_key_spy.call_args_list == [ + # first day + mocker.call(RDPS_MODEL_RUN_DATETIME + timedelta(hours=0)), + mocker.call(RDPS_MODEL_RUN_DATETIME + timedelta(hours=1)), + ] + + # Verify weather inputs are warped to match source hffmc raster + assert temp_ds_spy.call_args_list == [ + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + ] + + assert rh_ds_spy.call_args_list == [ + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + ] + + assert wind_speed_ds_spy.call_args_list == [ + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + ] + + assert precip_ds_spy.call_args_list == [ + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + mocker.call(mock_hffmc_ds, mocker.ANY, GDALResamplingMethod.BILINEAR), + ] + + for hffmc_calls in calculate_hffmc_spy.call_args_list: + hffmc_ds = hffmc_calls.args[0] + assert hffmc_ds == mock_hffmc_ds + wps_datasets = hffmc_calls[0] # Extract dataset arguments + assert all(isinstance(ds, WPSDataset) for ds in wps_datasets) + + # 1 hffmc per day + assert persist_raster_spy.call_count == 2 \ No newline at end of file diff --git a/api/app/tests/sfms/test_raster_addresser.py b/api/app/tests/sfms/test_raster_addresser.py index fcf24aee2..130b62c07 100644 --- a/api/app/tests/sfms/test_raster_addresser.py +++ b/api/app/tests/sfms/test_raster_addresser.py @@ -1,12 +1,23 @@ from app.sfms.raster_addresser import RasterKeyAddresser, FWIParameter import pytest from datetime import datetime, timezone +from app.sfms.raster_addresser import WeatherParameter TEST_DATETIME_1 = datetime(2024, 10, 10, 23, tzinfo=timezone.utc) TEST_DATE_1_ISO = TEST_DATETIME_1.date().isoformat() +TEST_DATETIME_2 = datetime(2024, 10, 10, 11, tzinfo=timezone.utc) +TEST_DATE_2_ISO = TEST_DATETIME_2.date().isoformat() + TEST_DATETIME_TO_CALC = TEST_DATETIME_1.replace(hour=20) +RDPS_MODEL_RUN_HOUR = 0 +RDPS_MODEL_RUN_00_START = datetime(2024, 10, 10, 0, tzinfo=timezone.utc) +RDPS_MODEL_RUN_12_START = datetime(2024, 10, 10, 12, tzinfo=timezone.utc) +HOUR_OFFSET = 3 +HFFMC_DATETIME = datetime(2024, 10, 10, 5, tzinfo=timezone.utc) +HFFMC_DATETIME_ISO = HFFMC_DATETIME.date().isoformat() + @pytest.fixture def raster_key_addresser(): @@ -27,3 +38,32 @@ def test_get_weather_data_keys(raster_key_addresser): result = raster_key_addresser.get_weather_data_keys(TEST_DATETIME_1, TEST_DATETIME_TO_CALC, 20) assert len(result) == 4 + +def test_get_uploaded_hffmc_key_00_hour(raster_key_addresser): + result = raster_key_addresser.get_uploaded_hffmc_key(RDPS_MODEL_RUN_00_START) + assert result == "sfms/uploads/hourlies/2024-10-09/fine_fuel_moisture_code2024100916.tif" + + +def test_get_uploaded_hffmc_key_afternoon(raster_key_addresser): + result = raster_key_addresser.get_uploaded_hffmc_key(RDPS_MODEL_RUN_12_START) + assert result == "sfms/uploads/hourlies/2024-10-10/fine_fuel_moisture_code2024101004.tif" + + +def test_get_weather_data_keys_hffmc(raster_key_addresser: RasterKeyAddresser): + result = raster_key_addresser.get_weather_data_keys_hffmc(RDPS_MODEL_RUN_00_START, HOUR_OFFSET) + assert len(result) == 4 + assert result[0] == "weather_models/rdps/2024-10-10/00/temp/CMC_reg_TMP_TGL_2_ps10km_2024101000_P003.grib2" + assert result[1] == "weather_models/rdps/2024-10-10/00/rh/CMC_reg_RH_TGL_2_ps10km_2024101000_P003.grib2" + assert result[2] == "weather_models/rdps/2024-10-10/00/wind_speed/CMC_reg_WIND_TGL_10_ps10km_2024101000_P003.grib2" + assert result[3] == "weather_models/rdps/2024-10-10/00/precip/COMPUTED_reg_APCP_SFC_0_ps10km_20241010_03z.tif" + + +def test_get_model_data_key_hffmc(raster_key_addresser): + weather_param = WeatherParameter.TEMP + result = raster_key_addresser.get_model_data_key_hffmc(RDPS_MODEL_RUN_00_START, HOUR_OFFSET, weather_param) + assert result == "weather_models/rdps/2024-10-10/00/temp/CMC_reg_TMP_TGL_2_ps10km_2024101000_P003.grib2" + + +def test_get_calculated_hffmc_index_key(raster_key_addresser: RasterKeyAddresser): + result = raster_key_addresser.get_calculated_hffmc_index_key(HFFMC_DATETIME) + assert result == f"sfms/calculated/hourlies/{HFFMC_DATETIME_ISO}/fine_fuel_moisture_code{HFFMC_DATETIME_ISO.replace('-','')}{HFFMC_DATETIME.hour:02d}.tif" diff --git a/api/app/weather_models/rdps_filename_marshaller.py b/api/app/weather_models/rdps_filename_marshaller.py index c2f420338..a2f14a3a1 100644 --- a/api/app/weather_models/rdps_filename_marshaller.py +++ b/api/app/weather_models/rdps_filename_marshaller.py @@ -144,3 +144,18 @@ def compose_computed_precip_rdps_key(accumulation_end_datetime: datetime): """Compose and return a computed RDPS url given the datetime that precip is being accumulated to.""" model_hour = model_run_for_hour(accumulation_end_datetime.hour) return f"{model_hour:02d}/precip/{compose_computed_rdps_filename(accumulation_end_datetime)}" + + +def compose_rdps_key_hffmc(model_run_start: datetime, offset_hour: int, weather_parameter: str): + """Compose and return a computed RDPS url given a forecast start date and hour offset.""" + model_hour = model_run_for_hour(model_run_start.hour) + return f"{model_hour:02d}/{weather_parameter}/{compose_rdps_filename_hffmc(model_run_start, offset_hour, weather_parameter)}" + + +def compose_rdps_filename_hffmc(model_run_start: datetime, offset_hour: int, weather_parameter: str): + key_params = get_weather_key_params(weather_parameter) + file_ext = ".grib2" + return ( + f"{SourcePrefix.CMC.value}{DELIMITER}{REG}{DELIMITER}{key_params.variable}{DELIMITER}{key_params.level_type}{DELIMITER}{key_params.level}{DELIMITER}{PS10KM}{DELIMITER}" + f"{model_run_start.date().isoformat().replace('-','')}{model_run_start.hour:02d}{DELIMITER}P{offset_hour:03d}{file_ext}" + )