Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store snow masked hfi rasters #3807

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,22 @@
"GDPS",
"GEOGCS",
"geospatial",
"geotiff",
"grib",
"gribs",
"hourlies",
"HRDPS",
"luxon",
"maxx",
"maxy",
"miny",
"luxon",
"ndarray",
"numba",
"osgeo",
"PMTILES",
"polygonize",
"Polygonized",
"polygonizing",
"PRECIP",
"PRIMEM",
"PROJCS",
Expand Down
55 changes: 55 additions & 0 deletions api/app/auto_spatial_advisory/hfi_filepath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import os
from app.auto_spatial_advisory.run_type import RunType
from datetime import date


def get_pmtiles_filepath(run_date: date, run_type: RunType, filename: str) -> str:
"""
Get the file path for both reading and writing the pmtiles from/to the object store.
Example: {bucket}/psu/pmtiles/hfi/actual/[issue/run_date]/hfi[for_date].pmtiles


:param run_date: The date of the run to process. (when was the hfi file created?)
:param run_type: forecast or actual
:param filename: hfi[for_date].pmtiles -> hfi20230821.pmtiles
:return: s3 bucket key for pmtiles file
"""
pmtiles_filepath = os.path.join("psu", "pmtiles", "hfi", run_type.value, run_date.strftime("%Y-%m-%d"), filename)

return pmtiles_filepath


def get_pmtiles_filename(for_date: date):
"""
Returns the object store filename for a pmtiles file based on a given for_date.

:param for_date: the date the hfi pmtiles is forecasted for
:return: filename string
"""
return f'hfi{for_date.strftime("%Y%m%d")}.pmtiles'


def get_raster_filepath(run_date: date, run_type: RunType, filename: str) -> str:
"""
Get the file path for both reading and writing the tif raster from/to the object store.
Example: {bucket}/psu/rasters/hfi/actual/[issue/run_date]/hfi[for_date].tif


:param run_date: The date of the run to process. (when was the hfi file created?)
:param run_type: forecast or actual
:param filename: hfi[for_date].tif -> hfi20230821.tif
:return: s3 bucket key for raster file
"""
raster_filepath = os.path.join("psu", "rasters", "hfi", run_type.value, run_date.strftime("%Y-%m-%d"), filename)

return raster_filepath


def get_raster_tif_filename(for_date: date) -> str:
"""
Returns the object store filename for a raster tif based on a given for_date.

:param for_date: the date the hfi tif is forecasted for
:return: filename string
"""
return f'snow_masked_hfi{for_date.strftime("%Y%m%d")}.tif'
24 changes: 0 additions & 24 deletions api/app/auto_spatial_advisory/hfi_pmtiles.py

This file was deleted.

101 changes: 56 additions & 45 deletions api/app/auto_spatial_advisory/process_hfi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
from app.auto_spatial_advisory.run_type import RunType
from app.auto_spatial_advisory.snow import apply_snow_mask
from app.geospatial import NAD83_BC_ALBERS
from app.auto_spatial_advisory.hfi_pmtiles import get_pmtiles_filepath
from app.auto_spatial_advisory.hfi_filepath import get_pmtiles_filename, get_pmtiles_filepath, get_raster_filepath, get_raster_tif_filename
from app.utils.polygonize import polygonize_in_memory
from app.utils.pmtiles import tippecanoe_wrapper, write_geojson
from app.utils.s3 import get_client


logger = logging.getLogger(__name__)

HFI_PMTILES_PERMISSIONS = "public-read"
HFI_GEOSPATIAL_PERMISSIONS = "public-read"
HFI_PMTILES_MIN_ZOOM = 4
HFI_PMTILES_MAX_ZOOM = 11

Expand Down Expand Up @@ -98,57 +98,68 @@ async def process_hfi(run_type: RunType, run_date: date, run_datetime: datetime,

hfi_key = get_s3_key(run_type, run_date, for_date)
logger.info(f"Key to HFI in object storage: {hfi_key}")
with tempfile.TemporaryDirectory() as temp_dir:
temp_filename = os.path.join(temp_dir, "classified.tif")
classify_hfi(hfi_key, temp_filename)
# If something has gone wrong with the collection of snow coverage data and it has not been collected
# within the past 7 days, don't apply an old snow mask, work with the classified hfi data as is
if last_processed_snow is None or last_processed_snow[0].for_date + datetime.timedelta(days=7) < datetime.now():
logger.info("No recently processed snow data found. Proceeding with non-masked hfi data.")
working_hfi_path = temp_filename
else:
working_hfi_path = await apply_snow_mask(temp_filename, last_processed_snow[0], temp_dir)
# Create a snow coverage mask from previously downloaded snow data.
with polygonize_in_memory(working_hfi_path, "hfi", "hfi") as layer:
# We need a geojson file to pass to tippecanoe
temp_geojson = write_geojson(layer, temp_dir)

pmtiles_filename = f'hfi{for_date.strftime("%Y%m%d")}.pmtiles'
temp_pmtiles_filepath = os.path.join(temp_dir, pmtiles_filename)
logger.info(f"Writing pmtiles -- {pmtiles_filename}")
tippecanoe_wrapper(temp_geojson, temp_pmtiles_filepath, min_zoom=HFI_PMTILES_MIN_ZOOM, max_zoom=HFI_PMTILES_MAX_ZOOM)

async with get_client() as (client, bucket):
async with get_client() as (client, bucket):
with tempfile.TemporaryDirectory() as temp_dir:
temp_filename = os.path.join(temp_dir, "classified.tif")
classify_hfi(hfi_key, temp_filename)
# If something has gone wrong with the collection of snow coverage data and it has not been collected
# within the past 7 days, don't apply an old snow mask, work with the classified hfi data as is
if last_processed_snow is None or last_processed_snow[0].for_date + datetime.timedelta(days=7) < datetime.now():
logger.info("No recently processed snow data found. Proceeding with non-masked hfi data.")
working_hfi_path = temp_filename
else:
# Create a snow coverage mask from previously downloaded snow data.
working_hfi_path = await apply_snow_mask(temp_filename, last_processed_snow[0], temp_dir)

raster_filename = get_raster_tif_filename(for_date)
raster_key = get_raster_filepath(run_date, run_type, raster_filename)
logger.info(f"Uploading file {raster_filename} to {raster_key}")
await client.put_object(
Bucket=bucket,
Key=raster_key,
ACL=HFI_GEOSPATIAL_PERMISSIONS, # We need these to be accessible to everyone
Body=open(working_hfi_path, "rb"),
)
logger.info("Done uploading %s", raster_key)
with polygonize_in_memory(working_hfi_path, "hfi", "hfi") as layer:
# We need a geojson file to pass to tippecanoe
temp_geojson = write_geojson(layer, temp_dir)

pmtiles_filename = get_pmtiles_filename(for_date)
temp_pmtiles_filepath = os.path.join(temp_dir, pmtiles_filename)
logger.info(f"Writing pmtiles -- {pmtiles_filename}")
tippecanoe_wrapper(temp_geojson, temp_pmtiles_filepath, min_zoom=HFI_PMTILES_MIN_ZOOM, max_zoom=HFI_PMTILES_MAX_ZOOM)

key = get_pmtiles_filepath(run_date, run_type, pmtiles_filename)
logger.info(f"Uploading file {pmtiles_filename} to {key}")

await client.put_object(
Bucket=bucket,
Key=key,
ACL=HFI_PMTILES_PERMISSIONS, # We need these to be accessible to everyone
ACL=HFI_GEOSPATIAL_PERMISSIONS, # We need these to be accessible to everyone
Body=open(temp_pmtiles_filepath, "rb"),
)
logger.info("Done uploading file")

spatial_reference: osr.SpatialReference = layer.GetSpatialRef()
target_srs = osr.SpatialReference()
target_srs.ImportFromEPSG(NAD83_BC_ALBERS)
target_srs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER)
coordinate_transform = osr.CoordinateTransformation(spatial_reference, target_srs)

async with get_async_write_session_scope() as session:
advisory = await get_hfi_classification_threshold(session, HfiClassificationThresholdEnum.ADVISORY)
warning = await get_hfi_classification_threshold(session, HfiClassificationThresholdEnum.WARNING)

logger.info("Writing HFI advisory zones to API database...")
for i in range(layer.GetFeatureCount()):
# https://gdal.org/api/python/osgeo.ogr.html#osgeo.ogr.Feature
feature: ogr.Feature = layer.GetFeature(i)
obj = create_model_object(feature, advisory, warning, coordinate_transform, run_type, run_datetime, for_date)
await save_hfi(session, obj)

# Store the unique combination of run type, run datetime and for date in the run_parameters table
await save_run_parameters(session, run_type, run_datetime, for_date)
logger.info("Done uploading %s", key)

spatial_reference: osr.SpatialReference = layer.GetSpatialRef()
target_srs = osr.SpatialReference()
target_srs.ImportFromEPSG(NAD83_BC_ALBERS)
target_srs.SetAxisMappingStrategy(osr.OAMS_TRADITIONAL_GIS_ORDER)
coordinate_transform = osr.CoordinateTransformation(spatial_reference, target_srs)

async with get_async_write_session_scope() as session:
advisory = await get_hfi_classification_threshold(session, HfiClassificationThresholdEnum.ADVISORY)
warning = await get_hfi_classification_threshold(session, HfiClassificationThresholdEnum.WARNING)

logger.info("Writing HFI advisory zones to API database...")
for i in range(layer.GetFeatureCount()):
# https://gdal.org/api/python/osgeo.ogr.html#osgeo.ogr.Feature
feature: ogr.Feature = layer.GetFeature(i)
obj = create_model_object(feature, advisory, warning, coordinate_transform, run_type, run_datetime, for_date)
await save_hfi(session, obj)

# Store the unique combination of run type, run datetime and for date in the run_parameters table
await save_run_parameters(session, run_type, run_datetime, for_date)

perf_end = perf_counter()
delta = perf_end - perf_start
Expand Down
31 changes: 15 additions & 16 deletions api/app/utils/polygonize.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" Code for polygonizing a geotiff file. """
"""Code for polygonizing a geotiff file."""

import logging
from contextlib import contextmanager
from osgeo import gdal, ogr, osr
Expand All @@ -9,13 +10,13 @@


def _create_in_memory_band(data: np.ndarray, cols, rows, projection, geotransform):
""" Create an in memory data band to represent a single raster layer.
"""Create an in memory data band to represent a single raster layer.
See https://gdal.org/user/raster_data_model.html#raster-band for a complete
description of what a raster band is.
"""
mem_driver = gdal.GetDriverByName('MEM')
mem_driver = gdal.GetDriverByName("MEM")

dataset = mem_driver.Create('memory', cols, rows, 1, gdal.GDT_Byte)
dataset = mem_driver.Create("memory", cols, rows, 1, gdal.GDT_Byte)
dataset.SetProjection(projection)
dataset.SetGeoTransform(geotransform)
band = dataset.GetRasterBand(1)
Expand All @@ -26,7 +27,7 @@ def _create_in_memory_band(data: np.ndarray, cols, rows, projection, geotransfor

@contextmanager
def polygonize_in_memory(geotiff_filename, layer, field) -> ogr.Layer:
""" Given some tiff file, return a polygonized version of it, in memory, as an ogr layer. """
"""Given some tiff file, return a polygonized version of it, in memory, as an ogr layer."""
source: gdal.Dataset = gdal.Open(geotiff_filename, gdal.GA_ReadOnly)

source_band = source.GetRasterBand(1)
Expand All @@ -37,9 +38,7 @@ def polygonize_in_memory(geotiff_filename, layer, field) -> ogr.Layer:

# generate mask data
mask_data = np.where(source_data == nodata_value, False, True)
mask_ds, mask_band = _create_in_memory_band(
mask_data, source_band.XSize, source_band.YSize, source.GetProjection(),
source.GetGeoTransform())
mask_ds, mask_band = _create_in_memory_band(mask_data, source_band.XSize, source_band.YSize, source.GetProjection(), source.GetGeoTransform())

# Create a memory OGR datasource to put results in.
# https://gdal.org/drivers/vector/memory.html#vector-memory
Expand Down Expand Up @@ -71,15 +70,15 @@ def polygonize_geotiff_to_shapefile(raster_source_filename, vector_dest_filename
<vector_dest_filename>.shp, and inserts polygonized contents of source
file into destination file.
"""
if raster_source_filename[-3:] != '.tif':
return f'{raster_source_filename} is an invalid file format for raster source'
if vector_dest_filename[-3:] != '.shp':
vector_dest_filename += '.shp'
if raster_source_filename[-3:] != ".tif":
return f"{raster_source_filename} is an invalid file format for raster source"
if vector_dest_filename[-3:] != ".shp":
vector_dest_filename += ".shp"

source_data = gdal.Open(raster_source_filename, gdal.GA_ReadOnly)
source_band = source_data.GetRasterBand(1)
value = ogr.FieldDefn('Band 1', ogr.OFTInteger)
logger.info('%s raster count %s', raster_source_filename, source_data.RasterCount)
value = ogr.FieldDefn("Band 1", ogr.OFTInteger)
logger.info("%s raster count %s", raster_source_filename, source_data.RasterCount)

driver = ogr.GetDriverByName("ESRI Shapefile")
destination = driver.CreateDataSource(vector_dest_filename)
Expand All @@ -88,7 +87,7 @@ def polygonize_geotiff_to_shapefile(raster_source_filename, vector_dest_filename
dest_layer = destination.CreateLayer(vector_dest_filename, geom_type=ogr.wkbPolygon, srs=dest_srs)
dest_layer.CreateField(value)
# 'Band 1' is the field name on the layer for Fuel Type ID
dest_field = dest_layer.GetLayerDefn().GetFieldIndex('Band 1')
dest_field = dest_layer.GetLayerDefn().GetFieldIndex("Band 1")
gdal.Polygonize(source_band, None, dest_layer, dest_field, [])

return f'Polygonized {raster_source_filename} to {vector_dest_filename}'
return f"Polygonized {raster_source_filename} to {vector_dest_filename}"
Loading