Skip to content

Commit

Permalink
Hourly FFMC (#4153)
Browse files Browse the repository at this point in the history
Co-authored-by: Conor Brady <[email protected]>
  • Loading branch information
dgboss and conbrad authored Dec 5, 2024
1 parent 0ee473d commit a8fb048
Show file tree
Hide file tree
Showing 12 changed files with 452 additions and 55 deletions.
2 changes: 1 addition & 1 deletion api/app/jobs/rdps_sfms.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@


DAYS_TO_RETAIN = 7
MAX_MODEL_RUN_HOUR = 37
MAX_MODEL_RUN_HOUR = 45
GRIB_LAYERS = {"temp": "TMP_TGL_2", "rh": "RH_TGL_2", "precip": "APCP_SFC_0", "wind_speed": "WIND_TGL_10"}


Expand Down
42 changes: 38 additions & 4 deletions api/app/jobs/sfms_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

from app import configure_logging
from app.geospatial.wps_dataset import multi_wps_dataset_context
from app.jobs.rdps_sfms import MAX_MODEL_RUN_HOUR
from app.rocketchat_notifications import send_rocketchat_notification
from app.sfms.daily_fwi_processor import DailyFWIProcessor
from app.sfms.hourly_ffmc_processor import HourlyFFMCProcessor
from app.sfms.raster_addresser import RasterKeyAddresser
from app.utils.s3_client import S3Client
from app.utils.time import get_utc_now
Expand All @@ -18,12 +20,44 @@


class SFMSCalcJob:
async def calculate_daily_fwi(self, start_time: datetime):
async def calculate_fwi_rasters(self, start_time: datetime) -> None:
"""
Entry point for processing SFMS daily FWI rasters. To run from a specific date manually in openshift,
Entry point for processing SFMS daily FWI rasters and hFFMC rasters. To run from a specific date manually in openshift,
see openshift/sfms-calculate/README.md
:param start_time: The RDPS model run time to use for processing.
"""

await self.calculate_daily_fwi(start_time)
await self.calculate_hffmc(start_time)

async def calculate_hffmc(self, start_time: datetime) -> None:
"""
Entry point for calculating hourly FFMC rasters. Uses a 04:00 or 16:00 PST (12:00 or 24:00 UTC) hFFMC raster from SFMS as a base input.
:param start_time: The date time to use for processing. Calculations will begin at the most recent RDPS model run (00Z or 12Z).
"""
logger.info("Begin hFFMC raster calculations.")

start_exec = get_utc_now()

hffmc_processor = HourlyFFMCProcessor(start_time, RasterKeyAddresser())

async with S3Client() as s3_client:
await hffmc_processor.process(s3_client, multi_wps_dataset_context, MAX_MODEL_RUN_HOUR)

# calculate the execution time.
execution_time = get_utc_now() - start_exec
hours, remainder = divmod(execution_time.seconds, 3600)
minutes, seconds = divmod(remainder, 60)

logger.info(f"hFFMC raster processing finished -- time elapsed {hours} hours, {minutes} minutes, {seconds:.2f} seconds")

async def calculate_daily_fwi(self, start_time: datetime):
"""
Entry point for processing SFMS daily FWI rasters.
"""
logger.info(f"Begin BUI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward")
logger.info(f"Begin FWI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward")

start_exec = get_utc_now()

Expand Down Expand Up @@ -55,7 +89,7 @@ def main():
job = SFMSCalcJob()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(job.calculate_daily_fwi(start_time))
loop.run_until_complete(job.calculate_fwi_rasters(start_time))
except Exception as e:
logger.error("An exception occurred while processing SFMS raster calculations", exc_info=e)
rc_message = ":scream: Encountered an error while processing SFMS raster data."
Expand Down
5 changes: 3 additions & 2 deletions api/app/sfms/daily_fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async def process(

# Get and check existence of weather s3 keys
temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys(self.start_datetime, datetime_to_calculate_utc, prediction_hour)
weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, precip_key)
weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, wind_speed_key, precip_key)
if not weather_keys_exist:
logging.warning(f"Missing weather keys for {model_run_for_hour(self.start_datetime.hour):02} model run")
break
Expand Down Expand Up @@ -76,6 +76,7 @@ async def process(
precip_ds.close()
rh_ds.close()
temp_ds.close()
wind_speed_ds.close()
# Create latitude and month arrays needed for calculations
latitude_array = dmc_ds.generate_latitude_array()
month_array = np.full(latitude_array.shape, datetime_to_calculate_utc.month)
Expand Down Expand Up @@ -105,7 +106,7 @@ async def process(
)

# Create and store FFMC dataset
ffmc_values, ffmc_no_data_value = calculate_ffmc(ffmc_ds, warped_temp_ds, warped_rh_ds, warped_wind_speed_ds, warped_precip_ds)
ffmc_values, ffmc_no_data_value = calculate_ffmc(ffmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, warped_wind_speed_ds)
new_ffmc_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.FFMC)
new_ffmc_path = await s3_client.persist_raster_data(
temp_dir,
Expand Down
6 changes: 5 additions & 1 deletion api/app/sfms/fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ def calculate_ffmc(previous_ffmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPS
precip_array, _ = precip_ds.replace_nodata_with(0)
wind_speed_array, _ = wind_speed_ds.replace_nodata_with(0)

# Due to warping of the rh dataset, rh values can exceed 100 which breaks the ffmc calculation.
# Set rh values greater than 100 to the max allowable which is 100.
rh_array[rh_array > 100] = 100

start = perf_counter()
ffmc_values = vectorized_ffmc(previous_ffmc_array, temp_array, rh_array, precip_array, wind_speed_array)
ffmc_values = vectorized_ffmc(previous_ffmc_array, temp_array, rh_array, wind_speed_array, precip_array)
logger.info("%f seconds to calculate vectorized ffmc", perf_counter() - start)

nodata_mask, nodata_value = previous_ffmc_ds.get_nodata_mask()
Expand Down
92 changes: 92 additions & 0 deletions api/app/sfms/hourly_ffmc_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import logging
import os
import tempfile
from datetime import datetime, timedelta
from osgeo import gdal
from typing import List, cast

from app.weather_models.rdps_filename_marshaller import model_run_for_hour

from app.geospatial.wps_dataset import WPSDataset
from app.jobs.rdps_sfms import MAX_MODEL_RUN_HOUR
from app.sfms.daily_fwi_processor import MultiDatasetContext
from app.sfms.fwi_processor import calculate_ffmc
from app.sfms.raster_addresser import RasterKeyAddresser
from app.utils.geospatial import GDALResamplingMethod
from app.utils.s3 import set_s3_gdal_config
from app.utils.s3_client import S3Client


logger = logging.getLogger(__name__)


class HourlyFFMCProcessor:
"""
Class for calculating/generating forecasted hourly FFMC rasters.
"""

def __init__(self, start_datetime: datetime, addresser: RasterKeyAddresser):
self.start_datetime = start_datetime
self.addresser = addresser

async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, hours_to_process: int = MAX_MODEL_RUN_HOUR):
set_s3_gdal_config()

# hFFMC general process
# 1. Cron job kicks off the job and we use current UTC time as start time
# 2. Create HourlyFFMCProcessor with the start time and begin processing
# 3. Use job start time to determine most recent RDPS model run start time (date and 00z or 12z)
# 4. Use most recent RDPS model run start time to determine most recent hFFMC key to use as source which is always one hour before the RDPS start time (04 or 16 PDT)
# 5. Start calculating hFFMC from model run hour 0 through to 47. Save the calculated hFFMCs to S3. Most recently calculated hFFMC is used as input to the next hour's hFFMC calculation.
# 6. hFFMC rasters are saved to S3 with UTC based keys.

# Determine most recent RDPS model run
rdps_model_run_hour = model_run_for_hour(self.start_datetime.hour)
rdps_model_run_start = datetime(
year=self.start_datetime.year, month=self.start_datetime.month, day=self.start_datetime.day, hour=rdps_model_run_hour, tzinfo=self.start_datetime.tzinfo
)

# Determine key to the initial/seed hFFMC from SFMS and check if it exists. Initial hffmc will be a 04 or 16 hour hffmc from SFMS.
hffmc_key = self.addresser.get_uploaded_hffmc_key(rdps_model_run_start)
hffmc_key_exists = await s3_client.all_objects_exist(hffmc_key)
if not hffmc_key_exists:
logger.warning(f"Missing initial hFFMC raster from SFMS for date {self.start_datetime}. Missing key is {hffmc_key}.")
return

for hour in range(0, hours_to_process):
with tempfile.TemporaryDirectory() as temp_dir:
# Get and check existence of weather s3 keys
temp_key, rh_key, wind_speed_key, precip_key = self.addresser.get_weather_data_keys_hffmc(rdps_model_run_start, hour)
weather_keys_exist = await s3_client.all_objects_exist(temp_key, rh_key, wind_speed_key, precip_key)
if not weather_keys_exist:
logging.warning(f"Missing weather keys for model run: {rdps_model_run_start} and hour {hour}")
break

# Prefix our S3 keys for access via gdal
temp_key, rh_key, wind_speed_key, precip_key, hffmc_key = self.addresser.gdal_prefix_keys(temp_key, rh_key, wind_speed_key, precip_key, hffmc_key)
with input_dataset_context([temp_key, rh_key, wind_speed_key, precip_key, hffmc_key]) as input_datasets:
input_datasets = cast(List[WPSDataset], input_datasets) # Ensure correct type inference
temp_ds, rh_ds, wind_speed_ds, precip_ds, hffmc_ds = input_datasets
# Warp weather datasets to match hffmc
warped_temp_ds = temp_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(temp_key)}", GDALResamplingMethod.BILINEAR)
warped_rh_ds = rh_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(rh_key)}", GDALResamplingMethod.BILINEAR)
warped_wind_speed_ds = wind_speed_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(wind_speed_key)}", GDALResamplingMethod.BILINEAR)
warped_precip_ds = precip_ds.warp_to_match(hffmc_ds, f"{temp_dir}/{os.path.basename(precip_key)}", GDALResamplingMethod.BILINEAR)

# Create and store new hFFMC dataset
hffmc_values, hffmc_no_data_value = calculate_ffmc(hffmc_ds, warped_temp_ds, warped_rh_ds, warped_precip_ds, warped_wind_speed_ds)
new_hffmc_datetime = rdps_model_run_start + timedelta(hours=hour)
hffmc_key = self.addresser.get_calculated_hffmc_index_key(new_hffmc_datetime)
geotransform = hffmc_ds.as_gdal_ds().GetGeoTransform()
projection = hffmc_ds.as_gdal_ds().GetProjection()
hffmc_ds.close()
await s3_client.persist_raster_data(
temp_dir,
hffmc_key,
geotransform,
projection,
hffmc_values,
hffmc_no_data_value,
)
# Clear gdal virtual file system cache of S3 metadata in order to allow newly uploaded hffmc rasters to be opened immediately.
gdal.VSICurlClearCache()
64 changes: 61 additions & 3 deletions api/app/sfms/raster_addresser.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import enum
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from app import config
from app.utils.time import convert_utc_to_pdt
from app.weather_models import ModelEnum
from app.weather_models.rdps_filename_marshaller import compose_computed_precip_rdps_key, compose_rdps_key
from app.weather_models.rdps_filename_marshaller import compose_computed_precip_rdps_key, compose_rdps_key, compose_rdps_key_hffmc


class WeatherParameter(enum.Enum):
Expand Down Expand Up @@ -36,15 +37,16 @@ class RasterKeyAddresser:
def __init__(self):
self.sfms_calculated_prefix = "sfms/calculated"
self.s3_prefix = f"/vsis3/{config.get('OBJECT_STORE_BUCKET')}"
self.smfs_hourly_upload_prefix = "sfms/uploads/hourlies"
self.sfms_upload_prefix = "sfms/uploads/actual"
self.weather_model_prefix = f"weather_models/{ModelEnum.RDPS.lower()}"

def get_uploaded_index_key(self, datetime_utc: datetime, fwi_param: FWIParameter):
assert_all_utc(datetime_utc)
iso_date = datetime_utc.date().isoformat()

return f"{self.sfms_upload_prefix}/{iso_date}/{fwi_param.value}{iso_date.replace('-', '')}.tif"


def get_calculated_index_key(self, datetime_utc: datetime, fwi_param: FWIParameter):
"""
Generates the calculated fire weather index key that points to the associated raster artifact in the object store.
Expand Down Expand Up @@ -105,3 +107,59 @@ def gdal_prefix_keys(self, *keys):
:return: A tuple of all strings provided, prefixed with vsis3/{bucket}
"""
return tuple(f"{self.s3_prefix}/{key}" for key in keys)

def get_uploaded_hffmc_key(self, datetime_utc: datetime):
"""
Given the start time of an RDPS model run, return a key to the most recent hFFMC raster which will be
equivalent to RDPS model run start time minus one hour in PDT. Note that the hFFMC rasters are stored according
to PDT times. hFFMC keys will end with 04 or 16 for their hour.
:param datetime_utc: The RDPS model run start date and time.
:return: A key to the most recent hFFMC raster.
"""
assert_all_utc(datetime_utc)

# Convert utc into pdt and substract one hour to get hFFMC source raster time. sfms only produces hFFMC from Apr - Oct which is always PDT
datetime_pdt = convert_utc_to_pdt(datetime_utc) - timedelta(hours=1)
iso_date = datetime_pdt.date().isoformat()
return f"{self.smfs_hourly_upload_prefix}/{iso_date}/fine_fuel_moisture_code{iso_date.replace('-', '')}{datetime_pdt.hour:02d}.tif"

def get_weather_data_keys_hffmc(self, rdps_model_run_start: datetime, offset_hour):
"""
Gets temp, rh, wind speed and calculated accumulated precip for the specified RDPS model run start date and hour.
:param rdps_model_run_start: The RDPS model run start date and time.
:param offset_hour: The hour offset from the RDPS model run start hour.
:return: Keys to rasters in S3 storage for temp, rh, wind speed and calculated precip rasters.
"""
assert_all_utc(rdps_model_run_start)
non_precip_keys = tuple(self.get_model_data_key_hffmc(rdps_model_run_start, offset_hour, param) for param in WeatherParameter)
datetime_to_calculate_utc = rdps_model_run_start + timedelta(hours=offset_hour)
precip_key = self.get_calculated_precip_key(datetime_to_calculate_utc)
all_weather_data_keys = non_precip_keys + (precip_key,)
return all_weather_data_keys

def get_model_data_key_hffmc(self, rdps_model_run_start: datetime, offset_hour: int, weather_param: WeatherParameter):
"""
Gets a S3 key for the weather parameter of interest for the specified RDPS model run start date and time at the provided offset.
:param rdps_model_run_start: The RDPS model run start date and time.
:param offset_hour: The hour offset from the RDPS model run start hour.
:param weather_param: The weather parameter of interest (temp, rh, or wind speed).
:return: A key to a raster in S3 storage.
"""
assert_all_utc(rdps_model_run_start)
weather_model_date_prefix = f"{self.weather_model_prefix}/{rdps_model_run_start.date().isoformat()}/"
return os.path.join(weather_model_date_prefix, compose_rdps_key_hffmc(rdps_model_run_start, offset_hour, weather_param.value))

def get_calculated_hffmc_index_key(self, datetime_utc: datetime):
"""
Given a UTC datetime return a calculated key based on PDT time as hFFMC rasters are named according to PDT.
:param datetime_utc: A UTC datetime.
:return: An S3 key for hFFMC using PDT time.
"""
assert_all_utc(datetime_utc)
iso_date = datetime_utc.date().isoformat()
weather_param_prefix = "fine_fuel_moisture_code"
return f"{self.sfms_calculated_prefix}/hourlies/{iso_date}/{weather_param_prefix}{iso_date.replace('-', '')}{datetime_utc.hour:02d}.tif"
39 changes: 39 additions & 0 deletions api/app/tests/dataset_common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from contextlib import ExitStack, contextmanager
import numpy as np
from osgeo import osr, gdal
from typing import List
import uuid
from app.geospatial.wps_dataset import WPSDataset

Expand Down Expand Up @@ -47,3 +49,40 @@ def create_mock_gdal_dataset():
def create_mock_wps_dataset():
mock_ds = create_mock_gdal_dataset()
return WPSDataset(ds=mock_ds, ds_path=None)

def create_mock_wps_datasets(num: int) -> List[WPSDataset]:
return [create_mock_wps_dataset() for _ in range(num)]


def create_mock_input_dataset_context(num: int):
input_datasets = create_mock_wps_datasets(num)

@contextmanager
def mock_input_dataset_context(_: List[str]):
try:
# Enter each dataset's context and yield the list of instances
with ExitStack() as stack:
yield [stack.enter_context(ds) for ds in input_datasets]
finally:
# Close all datasets to ensure cleanup
for ds in input_datasets:
ds.close()

return input_datasets, mock_input_dataset_context


def create_mock_new_ds_context(number_of_datasets: int):
new_datasets = create_mock_wps_datasets(number_of_datasets)

@contextmanager
def mock_new_datasets_context(_: List[str]):
try:
# Enter each dataset's context and yield the list of instances
with ExitStack() as stack:
yield [stack.enter_context(ds) for ds in new_datasets]
finally:
# Close all datasets to ensure cleanup
for ds in new_datasets:
ds.close()

return new_datasets, mock_new_datasets_context
2 changes: 2 additions & 0 deletions api/app/tests/jobs/test_sfms_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ async def mock_job_error():

def test_sfms_calc_job_cli_arg(monkeypatch, mocker: MockerFixture):
daily_fwi_calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_daily_fwi", return_value=None)
hffmc_calc_spy = mocker.patch.object(SFMSCalcJob, "calculate_hffmc", return_value=None)

test_datetime = "2024-10-10 5"
monkeypatch.setattr("sys.argv", ["sfms_calculations.py", test_datetime])

sfms_calculations.main()

daily_fwi_calc_spy.assert_called_once_with(datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc))
hffmc_calc_spy.assert_called_once_with(datetime.strptime(test_datetime, "%Y-%m-%d %H").replace(tzinfo=timezone.utc))


@pytest.mark.anyio
Expand Down
Loading

0 comments on commit a8fb048

Please sign in to comment.