Skip to content

Commit

Permalink
Create and display pmtiles for snow coverage (#3444)
Browse files Browse the repository at this point in the history
Co-authored-by: Conor Brady <[email protected]>
  • Loading branch information
dgboss and conbrad authored Mar 5, 2024
1 parent 38b29d3 commit e21ea54
Show file tree
Hide file tree
Showing 20 changed files with 381 additions and 77 deletions.
4 changes: 2 additions & 2 deletions api/app/auto_spatial_advisory/fuel_type_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from osgeo import ogr, osr, gdal
from shapely import wkt, wkb
from app import config
from app.auto_spatial_advisory.polygonize import polygonize_in_memory
from app.db.models.auto_spatial_advisory import FuelType
from app.db.database import get_async_write_session_scope
from app.db.crud.auto_spatial_advisory import save_fuel_type
from app.geospatial import NAD83_BC_ALBERS
from app.utils.polygonize import polygonize_in_memory


logger = logging.getLogger(__name__)
Expand All @@ -33,7 +33,7 @@ def fuel_type_iterator() -> Generator[Tuple[int, str], None, None]:
# that gdal is able to read.
filename = f'/vsis3/{bucket}/sfms/static/fbp2021.tif'
logger.info('Polygonizing %s...', filename)
with polygonize_in_memory(filename) as layer:
with polygonize_in_memory(filename, 'fuel', 'fuel') as layer:

spatial_reference: osr.SpatialReference = layer.GetSpatialRef()
target_srs = osr.SpatialReference()
Expand Down
58 changes: 0 additions & 58 deletions api/app/auto_spatial_advisory/hfi_pmtiles.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,8 @@
from osgeo import gdal, ogr
import os
import subprocess
from app.auto_spatial_advisory.run_type import RunType
from datetime import date


def tippecanoe_wrapper(geojson_filepath: str, output_pmtiles_filepath: str, min_zoom: int = 4, max_zoom: int = 11):
"""
Wrapper for the tippecanoe cli tool
:param geojson_filepath: Path to input geojson (must be in EPSG:4326)
:type geojson_filepath: str
:param output_pmtile: Path to output pmtiles file
:type output_pmtiles_filepath: str
:param min_zoom: pmtiles zoom out level
:type min_zoom: int
:param max_zoom: pmtiles zoom in level
:type max_zoom: int
"""
subprocess.run([
'tippecanoe',
f'--minimum-zoom={min_zoom}',
f'--maximum-zoom={max_zoom}',
'--projection=EPSG:4326',
f'--output={output_pmtiles_filepath}',
f'{geojson_filepath}',
'--force',
'--quiet'
], check=True
)


def write_hfi_geojson(hfi_polygons: ogr.Layer, output_dir: str) -> str:
"""
Write geojson file, projected in EPSG:4326, from ogr.Layer object
:param hfi_polygons: HFI polygon layer
:type hfi_polygons: ogr.Layer
:param output_dir: Output directory
:type output_dir: str
:return: Path to hfi geojson file
:rtype: str
"""
# We can't use an in-memory layer for translating, so we'll create a temp layer
# Using a geopackage since it supports all projections and doesn't limit field name lengths.
# This matters because the hfi data is distributed in an odd projection that doesn't have an EPSG code
temp_gpkg = os.path.join(output_dir, 'temp_hfi_polys.gpkg')
driver = ogr.GetDriverByName('GPKG')
temp_data_source = driver.CreateDataSource(temp_gpkg)
temp_data_source.CopyLayer(hfi_polygons, 'hfi_layer')

# We need a geojson file to pass to tippecanoe
temp_geojson = os.path.join(output_dir, 'temp_hfi_polys.geojson')

# tippecanoe recommends the input geojson be in EPSG:4326 [https://github.com/felt/tippecanoe#projection-of-input]
gdal.VectorTranslate(destNameOrDestDS=temp_geojson, srcDS=temp_gpkg,
format='GeoJSON', dstSRS='EPSG:4326', reproject=True)

del temp_gpkg

return temp_geojson


def get_pmtiles_filepath(run_date: date, run_type: RunType, filename: str) -> str:
"""
Expand Down
9 changes: 5 additions & 4 deletions api/app/auto_spatial_advisory/process_hfi.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
save_hfi, get_hfi_classification_threshold, HfiClassificationThresholdEnum, save_run_parameters,
get_run_parameters_id)
from app.auto_spatial_advisory.classify_hfi import classify_hfi
from app.auto_spatial_advisory.polygonize import polygonize_in_memory
from app.auto_spatial_advisory.run_type import RunType
from app.geospatial import NAD83_BC_ALBERS
from app.auto_spatial_advisory.hfi_pmtiles import write_hfi_geojson, tippecanoe_wrapper, get_pmtiles_filepath
from app.auto_spatial_advisory.hfi_pmtiles import get_pmtiles_filepath
from app.utils.polygonize import polygonize_in_memory
from app.utils.pmtiles import tippecanoe_wrapper, write_geojson
from app.utils.s3 import get_client


Expand Down Expand Up @@ -105,10 +106,10 @@ async def process_hfi(run_type: RunType, run_date: date, run_datetime: datetime,
with tempfile.TemporaryDirectory() as temp_dir:
temp_filename = os.path.join(temp_dir, 'classified.tif')
classify_hfi(key, temp_filename)
with polygonize_in_memory(temp_filename) as layer:
with polygonize_in_memory(temp_filename, 'hfi', 'hfi') as layer:

# We need a geojson file to pass to tippecanoe
temp_geojson = write_hfi_geojson(layer, temp_dir)
temp_geojson = write_geojson(layer, temp_dir)

pmtiles_filename = f'hfi{for_date.strftime("%Y%m%d")}.pmtiles'
temp_pmtiles_filepath = os.path.join(temp_dir, pmtiles_filename)
Expand Down
20 changes: 19 additions & 1 deletion api/app/db/crud/snow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
""" CRUD operations relating to processing snow coverage
"""
from datetime import datetime
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.models.snow import ProcessedSnow, SnowSourceEnum
Expand All @@ -14,7 +15,7 @@ async def save_processed_snow(session: AsyncSession, processed_snow: ProcessedSn
session.add(processed_snow)


async def get_last_processed_snow_by_source(session: AsyncSession, snow_source: SnowSourceEnum) -> ProcessedSnow:
async def get_last_processed_snow_by_source(session: AsyncSession, snow_source: SnowSourceEnum = SnowSourceEnum.viirs) -> ProcessedSnow:
""" Retrieve the record with the most recent for_date of the specified snow source.
:param snow_source: The source of snow data of interest.
Expand All @@ -26,4 +27,21 @@ async def get_last_processed_snow_by_source(session: AsyncSession, snow_source:
.where(ProcessedSnow.snow_source == snow_source)\
.order_by(ProcessedSnow.for_date.desc())
result = await session.execute(stmt)
return result.first()

async def get_most_recent_processed_snow_by_date(session: AsyncSession, target_date: datetime, snow_source: SnowSourceEnum = SnowSourceEnum.viirs) -> ProcessedSnow:
""" Retreive the most recent record prior or equal to the provided date.
:param target_date: The date of interest
:type target_date: datetime
:param snow_source: The source of snow data of interest.
:type snow_source: SnowSourceEnum
:return: A record containing the last date for which snow data from the specified source was successfully processed.
:rtype: ProcessedSnow
"""
stmt = select(ProcessedSnow)\
.where(ProcessedSnow.snow_source == snow_source)\
.where(ProcessedSnow.for_date <= target_date)\
.order_by(ProcessedSnow.for_date.desc())
result = await session.execute(stmt)
return result.first()
78 changes: 75 additions & 3 deletions api/app/jobs/viirs_snow.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from datetime import date, timedelta
from datetime import date, datetime, timedelta
from osgeo import gdal
import asyncio
import glob
import logging
import numpy as np
import os
import requests
import shutil
Expand All @@ -13,7 +14,10 @@
from app.db.database import get_async_read_session_scope, get_async_write_session_scope
from app.db.models.snow import ProcessedSnow, SnowSourceEnum
from app.rocketchat_notifications import send_rocketchat_notification
from app.utils.polygonize import polygonize_in_memory
from app.utils.pmtiles import tippecanoe_wrapper, write_geojson
from app.utils.s3 import get_client
from app.utils.time import vancouver_tz

logger = logging.getLogger(__name__)

Expand All @@ -24,8 +28,28 @@
LAYER_VARIABLE = "/VIIRS_Grid_IMG_2D/CGF_NDSI_Snow_Cover"
RAW_SNOW_COVERAGE_NAME = 'raw_snow_coverage.tif'
RAW_SNOW_COVERAGE_CLIPPED_NAME = 'raw_snow_coverage_clipped.tif'
BINARY_SNOW_COVERAGE_CLASSIFICATION_NAME = 'binary_snow_coverage.tif'
SNOW_COVERAGE_PMTILES_MIN_ZOOM = 4
SNOW_COVERAGE_PMTILES_MAX_ZOOM = 11
SNOW_COVERAGE_PMTILES_PERMISSIONS = 'public-read'


def get_pmtiles_filepath(for_date: date, filename: str) -> str:
"""
Returns the S3 storage key for storing the snow coverage pmtiles for the given date and file name.
:param for_date: The date of snow coverage imagery.
:type run_date: date
:param filename: snowCoverage[for_date].pmtiles -> snowCoverage20230821.pmtiles
:type filename: str
:return: s3 bucket key for pmtiles file
:rtype: str
"""
pmtiles_filepath = os.path.join('psu', 'pmtiles', 'snow', for_date.strftime('%Y-%m-%d'), filename)

return pmtiles_filepath

class ViirsSnowJob():
""" Job that downloads and processed VIIRS snow coverage data from the NSIDC (https://nsidc.org).
"""
Expand Down Expand Up @@ -135,6 +159,48 @@ async def _save_clipped_snow_coverage_mosaic_to_s3(self, for_date: date, path: s
await client.put_object(Bucket=bucket,
Key=key,
Body=file)

def _classify_snow_coverage(self, path: str):
source_path = os.path.join(path, RAW_SNOW_COVERAGE_CLIPPED_NAME)
source = gdal.Open(source_path, gdal.GA_ReadOnly)
source_band = source.GetRasterBand(1)
source_data = source_band.ReadAsArray()
# Classify the data. Snow coverage in the source data is indicated by values in the range of 0-100. I'm using a range of
# 10 - 100 to increase confidence. In the classified data 1 is assigned to snow covered pixels and all other pixels are 0.
classified = np.where((source_data > 10) & (source_data <= 100), 1, 0)
output_driver = gdal.GetDriverByName("GTiff")
classified_snow_path = os.path.join(path, BINARY_SNOW_COVERAGE_CLASSIFICATION_NAME)
classified_snow = output_driver.Create(classified_snow_path, xsize=source_band.XSize, ysize=source_band.YSize, bands=1, eType=gdal.GDT_Byte)
classified_snow.SetGeoTransform(source.GetGeoTransform())
classified_snow.SetProjection(source.GetProjection())
classified_snow_band = classified_snow.GetRasterBand(1)
classified_snow_band.WriteArray(classified)
source_data = None
source_band = None
source = None
classified_snow_band = None
classified_snow = None

async def _create_pmtiles_layer(self, path: str, for_date: date):
filename = os.path.join(path, BINARY_SNOW_COVERAGE_CLASSIFICATION_NAME)
with polygonize_in_memory(filename, 'snow', 'snow') as layer:
# We need a geojson file to pass to tippecanoe
temp_geojson = write_geojson(layer, path)
pmtiles_filename = f'snowCoverage{for_date.strftime("%Y%m%d")}.pmtiles'
temp_pmtiles_filepath = os.path.join(path, pmtiles_filename)
logger.info(f'Writing snow coverage pmtiles -- {pmtiles_filename}')
tippecanoe_wrapper(temp_geojson, temp_pmtiles_filepath,
min_zoom=SNOW_COVERAGE_PMTILES_MIN_ZOOM, max_zoom=SNOW_COVERAGE_PMTILES_MAX_ZOOM)

async with get_client() as (client, bucket):
key = get_pmtiles_filepath(for_date, pmtiles_filename)
logger.info(f'Uploading snow coverage file {pmtiles_filename} to {key}')

await client.put_object(Bucket=bucket,
Key=key,
ACL=SNOW_COVERAGE_PMTILES_PERMISSIONS, # We need these to be accessible to everyone
Body=open(temp_pmtiles_filepath, 'rb'))
logger.info('Done uploading snow coverage file')


async def _process_viirs_snow(self, for_date: date, path: str):
Expand All @@ -152,6 +218,11 @@ async def _process_viirs_snow(self, for_date: date, path: str):
self._create_snow_coverage_mosaic(sub_dir)
await self._clip_snow_coverage_mosaic(sub_dir, path)
await self._save_clipped_snow_coverage_mosaic_to_s3(for_date, sub_dir)
# Reclassify the clipped snow coverage mosaic to 1 for snow and 0 for all other cells
self._classify_snow_coverage(sub_dir)
# Create pmtiles file and save to S3
await self._create_pmtiles_layer(sub_dir, for_date)



async def _run_viirs_snow(self):
Expand All @@ -162,7 +233,7 @@ async def _run_viirs_snow(self):
today = date.today()
if last_processed_date is None:
# Case to cover the initial run of VIIRS snow processing (ie. start processing one week ago)
next_date = today - timedelta(days=7)
next_date = today - timedelta(days=10)
else:
# Start processing the day after the last record of a successful job.
next_date = last_processed_date + timedelta(days=1)
Expand All @@ -176,10 +247,11 @@ async def _run_viirs_snow(self):
while next_date < today:
date_string = next_date.strftime('%Y-%m-%d')
logger.info(f"Processing snow coverage data for date: {date_string}")
tz_aware_datetime = vancouver_tz.localize(datetime.combine(next_date, datetime.min.time()))
try:
await self._process_viirs_snow(next_date, temp_dir)
async with get_async_write_session_scope() as session:
processed_snow = ProcessedSnow(for_date=next_date, processed_date=today, snow_source=SnowSourceEnum.viirs)
processed_snow = ProcessedSnow(for_date=tz_aware_datetime, processed_date=today, snow_source=SnowSourceEnum.viirs)
await save_processed_snow(session, processed_snow)
logger.info(f"Successfully processed VIIRS snow coverage data for date: {date_string}")
except requests.exceptions.HTTPError as http_error:
Expand Down
3 changes: 2 additions & 1 deletion api/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from app import hourlies
from app.rocketchat_notifications import send_rocketchat_notification
from app.routers import (fba, forecasts, weather_models, c_haines, stations, hfi_calc,
fba_calc, sfms, morecast_v2)
fba_calc, sfms, morecast_v2, snow)
from app.fire_behaviour.cffdrs import CFFDRS


Expand Down Expand Up @@ -110,6 +110,7 @@ async def catch_exception_middleware(request: Request, call_next):
api.include_router(fba.router, tags=["Auto Spatial Advisory"])
api.include_router(sfms.router, tags=["SFMS", "Auto Spatial Advisory"])
api.include_router(morecast_v2.router, tags=["Morecast v2"])
api.include_router(snow.router, tags=['Snow'])


@api.get('/ready')
Expand Down
30 changes: 30 additions & 0 deletions api/app/routers/snow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
""" Routers for Snow related data
"""

import logging
from datetime import date, datetime
from fastapi import APIRouter, Depends
from app.auth import authentication_required, audit
from app.db.crud.snow import get_most_recent_processed_snow_by_date
from app.db.database import get_async_read_session_scope
from app.schemas.snow import ProcessedSnowModel, ProcessedSnowResponse
from app.utils.time import vancouver_tz

logger = logging.getLogger(__name__)

router = APIRouter(
prefix="/snow",
dependencies=[Depends(authentication_required), Depends(audit)],
)


@router.get('/most-recent-by-date/{for_date}', response_model=ProcessedSnowResponse | None)
async def get_most_recent_by_date(for_date: date, _=Depends(authentication_required)):
""" Returns the most recent processed snow record before or equal to the provided date. """
logger.info('/snow/most-recent-by-date/')
tz_aware_datetime = vancouver_tz.localize(datetime.combine(for_date, datetime.min.time()))
async with get_async_read_session_scope() as session:
result = await get_most_recent_processed_snow_by_date(session, tz_aware_datetime)
if result is not None:
processed_snow = result[0]
return ProcessedSnowResponse(processed_snow=ProcessedSnowModel(for_date=processed_snow.for_date, processed_date=processed_snow.processed_date, snow_source=processed_snow.snow_source))
16 changes: 16 additions & 0 deletions api/app/schemas/snow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
""" This module contains pydantic models related to snow data. """

from datetime import datetime
from pydantic import BaseModel
from app.db.models.snow import SnowSourceEnum

class ProcessedSnowModel(BaseModel):
""" The content of a processed snow object"""
for_date: datetime
processed_date: datetime
snow_source: SnowSourceEnum

class ProcessedSnowResponse(BaseModel):
""" A processed snow response """
processed_snow: ProcessedSnowModel

Loading

0 comments on commit e21ea54

Please sign in to comment.