Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SFMS: Daily FWI #4126

Merged
merged 5 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/app/jobs/sfms_calculations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
class SFMSCalcJob:
async def calculate_daily_fwi(self, start_time: datetime):
"""
Entry point for processing SFMS DMC/DC/BUI rasters. To run from a specific date manually in openshift,
Entry point for processing SFMS daily FWI rasters. To run from a specific date manually in openshift,
see openshift/sfms-calculate/README.md
"""
logger.info(f"Begin BUI raster calculations -- calculating {DAYS_TO_CALCULATE} days forward")
Expand All @@ -30,7 +30,7 @@
daily_processor = DailyFWIProcessor(start_time, DAYS_TO_CALCULATE, RasterKeyAddresser())

async with S3Client() as s3_client:
await daily_processor.process(s3_client, multi_wps_dataset_context, multi_wps_dataset_context, multi_wps_dataset_context)
await daily_processor.process(s3_client, multi_wps_dataset_context, multi_wps_dataset_context, multi_wps_dataset_context, multi_wps_dataset_context)

Check warning on line 33 in api/app/jobs/sfms_calculations.py

View check run for this annotation

Codecov / codecov/patch

api/app/jobs/sfms_calculations.py#L33

Added line #L33 was not covered by tests

# calculate the execution time.
execution_time = get_utc_now() - start_exec
Expand Down
31 changes: 25 additions & 6 deletions api/app/sfms/daily_fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy as np

from app.geospatial.wps_dataset import WPSDataset
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_isi
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_fwi, calculate_isi
from app.sfms.raster_addresser import FWIParameter, RasterKeyAddresser
from app.utils.geospatial import GDALResamplingMethod
from app.utils.s3 import set_s3_gdal_config
Expand All @@ -30,7 +30,14 @@ def __init__(self, start_datetime: datetime, days: int, addresser: RasterKeyAddr
self.days = days
self.addresser = addresser

async def process(self, s3_client: S3Client, input_dataset_context: MultiDatasetContext, new_dmc_dc_context: MultiDatasetContext, new_ffmc_context: MultiDatasetContext):
async def process(
self,
s3_client: S3Client,
input_dataset_context: MultiDatasetContext,
new_dmc_dc_context: MultiDatasetContext,
new_ffmc_context: MultiDatasetContext,
new_isi_bui_context: MultiDatasetContext,
):
set_s3_gdal_config()

for day in range(self.days):
Expand Down Expand Up @@ -114,16 +121,16 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset
with new_dmc_dc_context([new_dmc_path, new_dc_path]) as new_dmc_dc_datasets:
new_ds = cast(List[WPSDataset], new_dmc_dc_datasets) # Ensure correct type inference
new_dmc_ds, new_dc_ds = new_ds
bui_values, nodata = calculate_bui(new_dmc_ds, new_dc_ds)
bui_values, bui_nodata = calculate_bui(new_dmc_ds, new_dc_ds)

# Store the new BUI dataset
await s3_client.persist_raster_data(
new_bui_path = await s3_client.persist_raster_data(
temp_dir,
new_bui_key,
dmc_ds.as_gdal_ds().GetGeoTransform(),
dmc_ds.as_gdal_ds().GetProjection(),
bui_values,
nodata,
bui_nodata,
)

# Open new FFMC dataset and calculate ISI
Expand All @@ -134,7 +141,7 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset
isi_values, isi_nodata = calculate_isi(new_ffmc_ds, warped_wind_speed_ds)

# Store the new ISI dataset
await s3_client.persist_raster_data(
new_isi_path = await s3_client.persist_raster_data(
temp_dir,
new_isi_key,
new_ffmc_ds.as_gdal_ds().GetGeoTransform(),
Expand All @@ -143,6 +150,18 @@ async def process(self, s3_client: S3Client, input_dataset_context: MultiDataset
isi_nodata,
)

# Open new ISI and BUI datasets to calculate FWI
new_fwi_key = self.addresser.get_calculated_index_key(datetime_to_calculate_utc, FWIParameter.FWI)
with new_isi_bui_context([new_isi_path, new_bui_path]) as new_isi_bui_datasets:
new_ds = cast(List[WPSDataset], new_isi_bui_datasets) # Ensure correct type inference
new_isi_ds, new_bui_ds = new_ds

fwi_values, fwi_nodata = calculate_fwi(new_isi_ds, new_bui_ds)

await s3_client.persist_raster_data(
temp_dir, new_fwi_key, new_isi_ds.as_gdal_ds().GetGeoTransform(), new_isi_ds.as_gdal_ds().GetProjection(), fwi_values, fwi_nodata
)

def _get_calculate_dates(self, day: int):
"""
Calculate the UTC date and times based on the provided day offset.
Expand Down
17 changes: 16 additions & 1 deletion api/app/sfms/fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import numpy as np

from app.auto_spatial_advisory.sfms import vectorized_bui, vectorized_dc, vectorized_dmc, vectorized_ffmc, vectorized_isi
from app.auto_spatial_advisory.sfms import vectorized_bui, vectorized_dc, vectorized_dmc, vectorized_ffmc, vectorized_fwi, vectorized_isi
from app.geospatial.wps_dataset import WPSDataset

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -76,6 +76,21 @@ def calculate_ffmc(previous_ffmc_ds: WPSDataset, temp_ds: WPSDataset, rh_ds: WPS
return ffmc_values, nodata_value


def calculate_fwi(isi_ds: WPSDataset, bui_ds: WPSDataset):
isi_array, _ = isi_ds.replace_nodata_with(0)
bui_array, _ = bui_ds.replace_nodata_with(0)

start = perf_counter()
fwi_values = vectorized_fwi(isi_array, bui_array)
logger.info("%f seconds to calculate vectorized fwi", perf_counter() - start)

nodata_mask, nodata_value = isi_ds.get_nodata_mask()
if nodata_mask is not None:
fwi_values[nodata_mask] = nodata_value

return fwi_values, nodata_value


def calculate_isi(ffmc_ds: WPSDataset, wind_speed_ds: WPSDataset):
ffmc_array, _ = ffmc_ds.replace_nodata_with(0)
wind_speed_array, _ = wind_speed_ds.replace_nodata_with(0)
Expand Down
22 changes: 19 additions & 3 deletions api/app/tests/sfms/test_daily_fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
new_datasets, mock_new_ffmc_datasets_context = create_mock_new_ds_context(1)
mock_new_ffmc_ds = new_datasets[0]

# mock the isi and bui datasets
new_datasets, mock_new_isi_bui_datasets_context = create_mock_new_ds_context(2)
mock_new_isi_ds, mock_new_bui_ds = new_datasets

# mock gdal open
mocker.patch("osgeo.gdal.Open", return_value=create_mock_gdal_dataset())

Expand All @@ -92,14 +96,15 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
calculate_bui_spy = mocker.spy(daily_fwi_processor, "calculate_bui")
calculate_ffmc_spy = mocker.spy(daily_fwi_processor, "calculate_ffmc")
calculate_isi_spy = mocker.spy(daily_fwi_processor, "calculate_isi")
calculate_fwi_spy = mocker.spy(daily_fwi_processor, "calculate_fwi")

async with S3Client() as mock_s3_client:
# mock s3 client
mock_all_objects_exist = AsyncMock(return_value=True)
mocker.patch.object(mock_s3_client, "all_objects_exist", new=mock_all_objects_exist)
persist_raster_spy = mocker.patch.object(mock_s3_client, "persist_raster_data", return_value="test_key.tif")

await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_datasets_context)
await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_datasets_context, mock_new_isi_bui_datasets_context)

# Verify weather model keys and actual keys are checked for both days
assert mock_all_objects_exist.call_count == 4
Expand Down Expand Up @@ -142,6 +147,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.FFMC),
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.BUI),
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.ISI),
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.FWI),
# second day, previous days' dc and dmc are looked up first
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DC),
mocker.call(EXPECTED_FIRST_DAY, FWIParameter.DMC),
Expand All @@ -151,6 +157,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
mocker.call(EXPECTED_SECOND_DAY, FWIParameter.FFMC),
mocker.call(EXPECTED_SECOND_DAY, FWIParameter.BUI),
mocker.call(EXPECTED_SECOND_DAY, FWIParameter.ISI),
mocker.call(EXPECTED_SECOND_DAY, FWIParameter.FWI),
]

# Verify weather inputs are warped to match dmc raster
Expand Down Expand Up @@ -192,6 +199,7 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
wps_datasets = ffmc_calls[0][1:4] # Extract dataset arguments
assert all(isinstance(ds, WPSDataset) for ds in wps_datasets)


assert calculate_bui_spy.call_args_list == [
mocker.call(mock_new_dmc_ds, mock_new_dc_ds),
mocker.call(mock_new_dmc_ds, mock_new_dc_ds),
Expand All @@ -203,8 +211,13 @@ async def test_daily_fwi_processor(mocker: MockerFixture):
wps_datasets = isi_calls.args # Extract dataset arguments
assert all(isinstance(ds, WPSDataset) for ds in wps_datasets)

assert calculate_fwi_spy.call_args_list == [
mocker.call(mock_new_isi_ds, mock_new_bui_ds),
mocker.call(mock_new_isi_ds, mock_new_bui_ds),
]

# 5 each day, new dmc, dc, ffmc, bui, and isi rasters
assert persist_raster_spy.call_count == 10
assert persist_raster_spy.call_count == 12


@pytest.mark.parametrize(
Expand All @@ -225,20 +238,23 @@ async def test_no_weather_keys_exist(side_effect_1: bool, side_effect_2: bool, m

_, mock_new_dmc_dc_datasets_context = create_mock_new_ds_context(2)
_, mock_new_ffmc_dataset_context = create_mock_new_ds_context(1)
_, mock_new_isi_bui_datasets_context = create_mock_new_ds_context(2)

# calculation spies
calculate_dmc_spy = mocker.spy(daily_fwi_processor, "calculate_dmc")
calculate_dc_spy = mocker.spy(daily_fwi_processor, "calculate_dc")
calculate_bui_spy = mocker.spy(daily_fwi_processor, "calculate_bui")
calculate_ffmc_spy = mocker.spy(daily_fwi_processor, "calculate_ffmc")
calculate_isi_spy = mocker.spy(daily_fwi_processor, "calculate_isi")
calculate_fwi_spy = mocker.spy(daily_fwi_processor, "calculate_fwi")

fwi_processor = DailyFWIProcessor(TEST_DATETIME, 1, RasterKeyAddresser())

await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_dataset_context)
await fwi_processor.process(mock_s3_client, mock_input_dataset_context, mock_new_dmc_dc_datasets_context, mock_new_ffmc_dataset_context, mock_new_isi_bui_datasets_context)

calculate_dmc_spy.assert_not_called()
calculate_dc_spy.assert_not_called()
calculate_bui_spy.assert_not_called()
calculate_ffmc_spy.assert_not_called()
calculate_isi_spy.assert_not_called()
calculate_fwi_spy.assert_not_called()
50 changes: 48 additions & 2 deletions api/app/tests/sfms/test_fwi_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@

import numpy as np
import pytest
from cffdrs import bui, dc, dmc, ffmc, isi
from cffdrs import bui, dc, dmc, ffmc, fwi, isi
from osgeo import osr

from app.geospatial.wps_dataset import WPSDataset
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_isi
from app.sfms.fwi_processor import calculate_bui, calculate_dc, calculate_dmc, calculate_ffmc, calculate_fwi, calculate_isi

FWI_ARRAY = np.array([[12, 20], [-999, -999]])
WEATHER_ARRAY = np.array([[12, 20], [0, 0]])


@dataclass
class InputDatasets:
bui: WPSDataset
dc: WPSDataset
dmc: WPSDataset
ffmc: WPSDataset
isi: WPSDataset
temp: WPSDataset
rh: WPSDataset
precip: WPSDataset
Expand All @@ -31,9 +33,11 @@ def input_datasets():
transform = (-2, 1, 0, 2, 0, -1)

return InputDatasets(
bui=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999),
dc=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999),
dmc=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999),
ffmc=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999),
isi=WPSDataset.from_array(FWI_ARRAY, transform, srs.ExportToWkt(), nodata_value=-999),
temp=WPSDataset.from_array(WEATHER_ARRAY, transform, srs.ExportToWkt()),
rh=WPSDataset.from_array(WEATHER_ARRAY, transform, srs.ExportToWkt()),
precip=WPSDataset.from_array(WEATHER_ARRAY, transform, srs.ExportToWkt()),
Expand Down Expand Up @@ -152,6 +156,20 @@ def test_calculate_bui_values(input_datasets):
assert math.isclose(static_bui, bui_values[0, 0], abs_tol=0.01)


def test_calculate_isi_masked_correctly(input_datasets):
ffmc_ds = input_datasets.ffmc
windspeed_ds = input_datasets.wind_speed

isi_values, nodata_value = calculate_isi(ffmc_ds, windspeed_ds)

# validate output shape and nodata masking
assert isi_values.shape == (2, 2)
assert isi_values[1, 0] == nodata_value
assert isi_values[1, 1] == nodata_value
assert isi_values[0, 0] != nodata_value
assert isi_values[0, 1] != nodata_value


def test_calculate_isi_values(input_datasets):
ffmc_ds = input_datasets.ffmc
wind_speed_ds = input_datasets.wind_speed
Expand Down Expand Up @@ -197,3 +215,31 @@ def test_calculate_ffmc_masked_correctly(input_datasets):
assert daily_ffmc_values[1, 1] == nodata_value
assert daily_ffmc_values[0, 0] != nodata_value
assert daily_ffmc_values[0, 1] != nodata_value


def test_calculate_fwi_masked_correctly(input_datasets):
isi_ds = input_datasets.isi
bui_ds = input_datasets.bui

fwi_values, nodata_value = calculate_fwi(isi_ds, bui_ds)

# validate output shape and nodata masking
assert fwi_values.shape == (2, 2)
assert fwi_values[1, 0] == nodata_value
assert fwi_values[1, 1] == nodata_value
assert fwi_values[0, 0] != nodata_value
assert fwi_values[0, 1] != nodata_value


def test_calculate_fwi_values(input_datasets):
isi_ds = input_datasets.isi
bui_ds = input_datasets.bui

isi_sample = FWI_ARRAY[0, 0]
bui_sample = FWI_ARRAY[0, 0]

fwi_values, _ = calculate_fwi(isi_ds, bui_ds)

static_fwi = fwi(isi_sample, bui_sample)

assert math.isclose(static_fwi, fwi_values[0, 0], abs_tol=0.01)
Loading