Skip to content

Commit

Permalink
SFMS: Daily FFMC (#4081)
Browse files Browse the repository at this point in the history
- Adds daily FFMC logic for calculating daily FFMC rasters.
- Renames bui job to account for ffmc and bui logic
- Updates `coverage` and adds `pytest-cov` dev dependencies for running coverage in VSCode locally
  • Loading branch information
conbrad authored Nov 13, 2024
1 parent f00bc9f commit 6ef7b1c
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 110 deletions.
3 changes: 2 additions & 1 deletion api/app/auto_spatial_advisory/sfms.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from cffdrs import bui, dc, dmc
from cffdrs import bui, dc, dmc, ffmc
from numba import vectorize

vectorized_bui = vectorize(bui)
vectorized_dc = vectorize(dc)
vectorized_dmc = vectorize(dmc)
vectorized_ffmc = vectorize(ffmc)
14 changes: 7 additions & 7 deletions api/app/jobs/sfms_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from app import configure_logging
from app.rocketchat_notifications import send_rocketchat_notification
from app.sfms.date_range_processor import BUIDateRangeProcessor
from app.sfms.daily_fwi_processor import DailyFWIProcessor
from app.sfms.raster_addresser import RasterKeyAddresser
from app.utils.s3_client import S3Client
from app.utils.time import get_utc_now
Expand All @@ -19,7 +19,7 @@


class SFMSCalcJob:
async def calculate_bui(self, start_time: datetime):
async def calculate_daily_fwi(self, start_time: datetime):
"""
Entry point for processing SFMS DMC/DC/BUI rasters. To run from a specific date manually in openshift,
see openshift/sfms-calculate/README.md
Expand All @@ -28,17 +28,17 @@ async def calculate_bui(self, start_time: datetime):

start_exec = get_utc_now()

bui_processor = BUIDateRangeProcessor(start_time, DAYS_TO_CALCULATE, RasterKeyAddresser())
daily_processor = DailyFWIProcessor(start_time, DAYS_TO_CALCULATE, RasterKeyAddresser())

async with S3Client() as s3_client:
await bui_processor.process_bui(s3_client, multi_wps_dataset_context, multi_wps_dataset_context)
await daily_processor.process(s3_client, multi_wps_dataset_context, multi_wps_dataset_context)

# calculate the execution time.
execution_time = get_utc_now() - start_exec
hours, remainder = divmod(execution_time.seconds, 3600)
minutes, seconds = divmod(remainder, 60)

logger.info(f"BUI processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds")
logger.info(f"Daily FWI processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds")


def main():
Expand All @@ -56,9 +56,9 @@ def main():
job = SFMSCalcJob()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(job.calculate_bui(start_time))
loop.run_until_complete(job.calculate_daily_fwi(start_time))
except Exception as e:
logger.error("An exception occurred while processing DMC/DC/BUI raster calculations", exc_info=e)
logger.error("An exception occurred while processing SFMS raster calculations", exc_info=e)
rc_message = ":scream: Encountered an error while processing SFMS raster data."
send_rocketchat_notification(rc_message, e)
sys.exit(os.EX_SOFTWARE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from app.geospatial.wps_dataset import WPSDataset
from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc
from app.utils.geospatial import GDALResamplingMethod
from app.utils.s3 import set_s3_gdal_config
from app.utils.s3_client import S3Client
Expand All @@ -20,48 +20,49 @@
MultiDatasetContext = Callable[[List[str]], Iterator[List["WPSDataset"]]]


class BUIDateRangeProcessor:
class DailyFWIProcessor:
"""
Class for calculating/generating forecasted DMC/DC/BUI rasters for a date range
Class for calculating/generating forecasted daily FWI rasters for a date range
"""

def __init__(self, start_datetime: datetime, days: int, addresser: RasterKeyAddresser):
self.start_datetime = start_datetime
self.days = days
self.addresser = addresser

async def process_bui(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, new_dmc_dc_context: MultiDatasetContext):
async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, new_dmc_dc_context: MultiDatasetContext):
set_s3_gdal_config()

for day in range(self.days):
datetime_to_calculate_utc, previous_fwi_datetime, prediction_hour = self._get_calculate_dates(day)
logger.info(f"Calculating DMC/DC/BUI for {datetime_to_calculate_utc.isoformat()}")

# Get and check existence of weather s3 keys
temp_key, rh_key, _, precip_key = self.addresser.get_weather_data_keys(self.start_datetime, datetime_to_calculate_utc, prediction_hour)
temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys(self.start_datetime, datetime_to_calculate_utc, prediction_hour)
weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, precip_key)
if not weather_keys_exist:
logging.warning(f"No weather keys found for {model_run_for_hour(self.start_datetime.hour):02} model run")
break

# get and check existence of fwi s3 keys
dc_key, dmc_key = self._get_previous_fwi_keys(day, previous_fwi_datetime)
dc_key, dmc_key, ffmc_key = self._get_previous_fwi_keys(day, previous_fwi_datetime)
fwi_keys_exist = await s3_client.all_objects_exist(dc_key, dmc_key)
if not fwi_keys_exist:
logging.warning(f"No previous DMC/DC keys found for {previous_fwi_datetime.date().isoformat()}")
break

temp_key, rh_key, precip_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, precip_key)
temp_key, rh_key, wind_speed_key, precip_key, ffmc_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, wind_speed_key, precip_key, ffmc_key)
dc_key, dmc_key = self.addresser.gdal_prefix_keys(dc_key, dmc_key)

with tempfile.TemporaryDirectory() as temp_dir:
with input_dataset_context([temp_key, rh_key, precip_key, dc_key, dmc_key]) as input_datasets:
with input_dataset_context([temp_key, rh_key, wind_speed_key, precip_key, dc_key, dmc_key, ffmc_key]) as input_datasets:
input_datasets = cast(List[WPSDataset], input_datasets) # Ensure correct type inference
temp_ds, rh_ds, precip_ds, dc_ds, dmc_ds = input_datasets
temp_ds, rh_ds, wind_speed_ds, precip_ds, dc_ds, dmc_ds, ffmc_ds = input_datasets

# Warp weather datasets to match fwi
warped_temp_ds = temp_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(temp_key)}", GDALResamplingMethod.BILINEAR)
warped_rh_ds = rh_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(rh_key)}", GDALResamplingMethod.BILINEAR)
warped_wind_speed_ds = wind_speed_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(wind_speed_key)}", GDALResamplingMethod.BILINEAR)
warped_precip_ds = precip_ds.warp_to_match(dmc_ds, f"{temp_dir}/{os.path.basename(precip_key)}", GDALResamplingMethod.BILINEAR)

# close unneeded datasets to reduce memory usage
Expand Down Expand Up @@ -96,6 +97,18 @@ async def process_bui(self, s3_client: S3Client, input_dataset_context: MultiDat
dc_nodata_value,
)

# Create and store FFMC dataset
ffmc_values, ffmc_no_data_value = calculate_ffmc(ffmc_ds, warped_temp_ds, warped_rh_ds, warped_wind_speed_ds, warped_precip_ds)
new_ffmc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.FFMC)
await s3_client.persist_raster_data(
temp_dir,
new_ffmc_key,
dc_ds.as_gdal_ds().GetGeoTransform(),
dc_ds.as_gdal_ds().GetProjection(),
ffmc_values,
ffmc_no_data_value,
)

# Open new DMC and DC datasets and calculate BUI
new_bui_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.BUI)
with new_dmc_dc_context([new_dmc_path, new_dc_path]) as new_dmc_dc_datasets:
Expand Down Expand Up @@ -137,7 +150,10 @@ def _get_previous_fwi_keys(self, day_to_calculate: int, previous_fwi_datetime: d
if day_to_calculate == 0: # if we're running the first day of the calculation, use previously uploaded actuals
dc_key = self.addresser.get_uploaded_index_key(previous_fwi_datetime, FWIParameter.DC)
dmc_key = self.addresser.get_uploaded_index_key(previous_fwi_datetime, FWIParameter.DMC)
ffmc_key = self.addresser.get_uploaded_index_key(previous_fwi_datetime, FWIParameter.FFMC)
else: # otherwise use the last calculated key
dc_key = self.addresser.get_calculated_index_key(previous_fwi_datetime, FWIParameter.DC)
dmc_key = self.addresser.get_calculated_index_key(previous_fwi_datetime, FWIParameter.DMC)
return dc_key, dmc_key
ffmc_key = self.addresser.get_calculated_index_key(previous_fwi_datetime, FWIParameter.FFMC)

return dc_key, dmc_key, ffmc_key
20 changes: 19 additions & 1 deletion api/app/sfms/fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np

from app.geospatial.wps_dataset import WPSDataset
from app.auto_spatial_advisory.sfms import vectorized_dmc, vectorized_dc, vectorized_bui
from app.auto_spatial_advisory.sfms import vectorized_dmc, vectorized_dc, vectorized_bui, vectorized_ffmc

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,3 +55,21 @@ def calculate_bui(dmc_ds: WPSDataset, dc_ds: WPSDataset):
bui_values[nodata_mask] = nodata_value

return bui_values, nodata_value


def calculate_ffmc(previous_ffmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPSDataset, precip_ds: WPSDataset, wind_speed_ds: WPSDataset):
previous_ffmc_array, _ = previous_ffmc_ds.replace_nodata_with(0)
temp_array, _ = temp_ds.replace_nodata_with(0)
rh_array, _ = rh_ds.replace_nodata_with(0)
precip_array, _ = precip_ds.replace_nodata_with(0)
wind_speed_array, _ = wind_speed_ds.replace_nodata_with(0)

start = perf_counter()
ffmc_values = vectorized_ffmc(previous_ffmc_array, temp_array, rh_array, precip_array, wind_speed_array)
logger.info("%f seconds to calculate vectorized ffmc", perf_counter() - start)

nodata_mask, nodata_value = previous_ffmc_ds.get_nodata_mask()
if nodata_mask is not None:
ffmc_values[nodata_mask] = nodata_value

return ffmc_values, nodata_value
2 changes: 1 addition & 1 deletion api/app/sfms/raster_addresser.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import enum
from datetime import datetime, timezone, timedelta
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from app import config
from app.weather_models import ModelEnum
Expand Down
7 changes: 3 additions & 4 deletions api/app/tests/jobs/test_sfms_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def test_sfms_calc_job_fail_default(monkeypatch, mocker: MockerFixture):
async def mock_job_error():
raise OSError("Error")

monkeypatch.setattr(SFMSCalcJob, "calculate_bui", mock_job_error)
monkeypatch.setattr(SFMSCalcJob, "calculate_daily_fwi", mock_job_error)

monkeypatch.setattr("sys.argv", ["sfms_calculations.py"])

Expand All @@ -27,15 +27,14 @@ async def mock_job_error():


def test_sfms_calc_job_cli_arg(monkeypatch, mocker: MockerFixture):
calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_bui", return_value=None)
daily_fwi_calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_daily_fwi", return_value=None)

test_datetime = "2024-10-10 5"
monkeypatch.setattr("sys.argv", ["sfms_calculations.py", test_datetime])

sfms_calculations.main()

called_args, _ = calc_spy.call_args
assert called_args[0] == datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc)
daily_fwi_calc_spy.assert_called_once_with(datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc))


@pytest.mark.anyio
Expand Down
Loading

0 comments on commit 6ef7b1c

Please sign in to comment.