diff --git a/.github/workflows/deployment.yml b/.github/workflows/deployment.yml index 16350d45f..8fa0d2afa 100644 --- a/.github/workflows/deployment.yml +++ b/.github/workflows/deployment.yml @@ -237,7 +237,7 @@ jobs: shell: bash run: | oc login "${{ secrets.OPENSHIFT_CLUSTER }}" --token="${{ secrets.OC4_DEV_TOKEN }}" - PROJ_DEV="e1e498-dev" bash openshift/scripts/oc_provision_nats.sh ${SUFFIX} apply + PROJ_DEV="e1e498-dev" MEMORY_REQUEST=250Mi MEMORY_LIMIT=500Mi bash openshift/scripts/oc_provision_nats.sh ${SUFFIX} apply scan-dev: name: ZAP Baseline Scan Dev diff --git a/.vscode/settings.json b/.vscode/settings.json index 8f5c6b4c0..355bd29e8 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -54,6 +54,7 @@ ], "typescript.preferences.importModuleSpecifier": "non-relative", "cSpell.words": [ + "actuals", "actuals", "aiobotocore", "aiohttp", @@ -64,15 +65,20 @@ "Behaviour", "botocore", "cffdrs", + "cutline", "determinates", "excinfo", + "FBAN", "fastapi", "fireweather", "firezone", "GDPS", + "geoalchemy", "GEOGCS", + "geopackage", "geospatial", "geotiff", + "gpkg", "grib", "gribs", "HAINES", @@ -85,8 +91,10 @@ "maxy", "miny", "morecast", + "morecast", "nats", "ndarray", + "Neighbour", "numba", "ORJSON", "osgeo", @@ -99,6 +107,7 @@ "PROJCS", "pydantic", "RDPS", + "reproject", "rocketchat", "sfms", "sqlalchemy", diff --git a/api/alembic/versions/6910d017b626_add_advisory_tpi_table.py b/api/alembic/versions/6910d017b626_add_advisory_tpi_table.py new file mode 100644 index 000000000..b14d7223a --- /dev/null +++ b/api/alembic/versions/6910d017b626_add_advisory_tpi_table.py @@ -0,0 +1,54 @@ +"""add advisory tpi table + +Revision ID: 6910d017b626 +Revises: be128a7bb4fd +Create Date: 2024-07-31 16:27:31.642156 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "6910d017b626" +down_revision = "be128a7bb4fd" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic ### + op.create_table( + "advisory_tpi_stats", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("advisory_shape_id", sa.Integer(), nullable=False), + sa.Column("run_parameters", sa.Integer(), nullable=False), + sa.Column("valley_bottom", sa.Integer(), nullable=False), + sa.Column("mid_slope", sa.Integer(), nullable=False), + sa.Column("upper_slope", sa.Integer(), nullable=False), + sa.Column("pixel_size_metres", sa.Integer(), nullable=False), + sa.ForeignKeyConstraint( + ["advisory_shape_id"], + ["advisory_shapes.id"], + ), + sa.ForeignKeyConstraint( + ["run_parameters"], + ["run_parameters.id"], + ), + sa.PrimaryKeyConstraint("id"), + comment="Elevation TPI stats per fire shape", + ) + op.create_index(op.f("ix_advisory_tpi_stats_advisory_shape_id"), "advisory_tpi_stats", ["advisory_shape_id"], unique=False) + op.create_index(op.f("ix_advisory_tpi_stats_id"), "advisory_tpi_stats", ["id"], unique=False) + op.create_index(op.f("ix_advisory_tpi_stats_run_parameters"), "advisory_tpi_stats", ["run_parameters"], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic ### + op.drop_index(op.f("ix_advisory_tpi_stats_run_parameters"), table_name="advisory_tpi_stats") + op.drop_index(op.f("ix_advisory_tpi_stats_id"), table_name="advisory_tpi_stats") + op.drop_index(op.f("ix_advisory_tpi_stats_advisory_shape_id"), table_name="advisory_tpi_stats") + op.drop_table("advisory_tpi_stats") + # ### end Alembic commands ### diff --git a/api/app/.env.example b/api/app/.env.example index d9f8cfebf..ec235c083 100644 --- a/api/app/.env.example +++ b/api/app/.env.example @@ -60,4 +60,5 @@ OBJECT_STORE_SECRET=object_store_secret OBJECT_STORE_BUCKET=object_store_bucket DEM_NAME=dem_mosaic_250_max.tif TPI_DEM_NAME=bc_dem_50m_tpi.tif +CLASSIFIED_TPI_DEM_NAME=bc_dem_50m_tpi_win100_classified.tif SENTRY_DSN=some_dsn \ No newline at end of file diff --git a/api/app/auto_spatial_advisory/elevation.py b/api/app/auto_spatial_advisory/elevation.py index e97b4fdeb..572c624a0 100644 --- a/api/app/auto_spatial_advisory/elevation.py +++ b/api/app/auto_spatial_advisory/elevation.py @@ -1,10 +1,12 @@ """Takes a classified HFI image and calculates basic elevation statistics associated with advisory areas per fire zone.""" +from dataclasses import dataclass from datetime import date, datetime from time import perf_counter import logging import os import tempfile +from typing import Dict import numpy as np from osgeo import gdal from sqlalchemy.ext.asyncio import AsyncSession @@ -13,16 +15,47 @@ from app import config from app.auto_spatial_advisory.classify_hfi import classify_hfi from app.auto_spatial_advisory.run_type import RunType -from app.db.crud.auto_spatial_advisory import get_run_parameters_id, save_advisory_elevation_stats +from app.db.crud.auto_spatial_advisory import get_run_parameters_id, save_advisory_elevation_stats, save_advisory_elevation_tpi_stats from app.db.database import get_async_read_session_scope, get_async_write_session_scope, DB_READ_STRING -from app.db.models.auto_spatial_advisory import AdvisoryElevationStats +from app.db.models.auto_spatial_advisory import AdvisoryElevationStats, AdvisoryTPIStats +from app.auto_spatial_advisory.hfi_filepath import get_raster_filepath, get_raster_tif_filename from app.utils.s3 import get_client +from app.utils.geospatial import raster_mul, warp_to_match_extent logger = logging.getLogger(__name__) DEM_GDAL_SOURCE = None +async def process_elevation_tpi(run_type: RunType, run_datetime: datetime, for_date: date): + """ + Create new elevation statistics records for the given parameters. + + :param hfi_s3_key: the object store key pointing to the hfi tif to intersect with tpi layer + :param run_type: The type of run to process. (is it a forecast or actual run?) + :param run_datetime: The date and time of the run to process. (when was the hfi file created?) + :param for_date: The date of the hfi to process. (when is the hfi for?) + """ + logger.info("Processing elevation stats %s for run date: %s, for date: %s", run_type, run_datetime, for_date) + perf_start = perf_counter() + # Get the id from run_parameters associated with the provided run_type, for_date and for_datetime + async with get_async_write_session_scope() as session: + run_parameters_id = await get_run_parameters_id(session, run_type, run_datetime, for_date) + + stmt = select(AdvisoryTPIStats).where(AdvisoryTPIStats.run_parameters == run_parameters_id) + + exists = (await session.execute(stmt)).scalars().first() is not None + if not exists: + fire_zone_stats = await process_tpi_by_firezone(run_type, run_datetime.date(), for_date) + await store_elevation_tpi_stats(session, run_parameters_id, fire_zone_stats) + else: + logger.info("Elevation stats already computed") + + perf_end = perf_counter() + delta = perf_end - perf_start + logger.info("%f delta count before and after processing elevation stats", delta) + + async def process_elevation(source_path: str, run_type: RunType, run_datetime: datetime, for_date: date): """Create new elevation statistics records for the given parameters. @@ -178,6 +211,79 @@ def apply_threshold_mask_to_dem(threshold: int, mask_path: str, temp_dir: str): return masked_dem_path +@dataclass(frozen=True) +class FireZoneTPIStats: + """ + Captures fire zone stats of TPI pixels hitting >4K HFI threshold via + a dictionary, fire_zone_stats, of {source_identifier: {1: X, 2: Y, 3: Z}}, where 1 = valley bottom, 2 = mid slope, 3 = upper slope + and X, Y, Z are pixel counts at each of those elevation classes respectively. + + Also includes the TPI raster's pixel size in metres. + """ + + fire_zone_stats: Dict[int, Dict[int, int]] + pixel_size_metres: int + + +async def process_tpi_by_firezone(run_type: RunType, run_date: date, for_date: date): + """ + Given run parameters, lookup associated snow-masked HFI and static classified TPI geospatial data. + Cut out each fire zone shape from the above and intersect the TPI and HFI pixels, counting each pixel contributing to the TPI class. + Capture all fire zone stats keyed by its source_identifier. + + :param run_type: forecast or actual + :param run_date: date the computation ran + :param for_date: date the computation is for + :return: fire zone TPI status + """ + + gdal.SetConfigOption("AWS_SECRET_ACCESS_KEY", config.get("OBJECT_STORE_SECRET")) + gdal.SetConfigOption("AWS_ACCESS_KEY_ID", config.get("OBJECT_STORE_USER_ID")) + gdal.SetConfigOption("AWS_S3_ENDPOINT", config.get("OBJECT_STORE_SERVER")) + gdal.SetConfigOption("AWS_VIRTUAL_HOSTING", "FALSE") + bucket = config.get("OBJECT_STORE_BUCKET") + dem_file = config.get("CLASSIFIED_TPI_DEM_NAME") + + key = f"/vsis3/{bucket}/dem/tpi/{dem_file}" + tpi_source: gdal.Dataset = gdal.Open(key, gdal.GA_ReadOnly) + pixel_size_metres = int(tpi_source.GetGeoTransform()[1]) + + hfi_raster_filename = get_raster_tif_filename(for_date) + hfi_raster_key = get_raster_filepath(run_date, run_type, hfi_raster_filename) + hfi_key = f"/vsis3/{bucket}/{hfi_raster_key}" + hfi_source: gdal.Dataset = gdal.Open(hfi_key, gdal.GA_ReadOnly) + + warped_mem_path = f"/vsimem/warp_{hfi_raster_filename}" + resized_hfi_source: gdal.Dataset = warp_to_match_extent(hfi_source, tpi_source, warped_mem_path) + hfi_masked_tpi = raster_mul(tpi_source, resized_hfi_source) + resized_hfi_source = None + hfi_source = None + tpi_source = None + gdal.Unlink(warped_mem_path) + + fire_zone_stats: Dict[int, Dict[int, int]] = {} + async with get_async_write_session_scope() as session: + stmt = text("SELECT id, source_identifier FROM advisory_shapes;") + result = await session.execute(stmt) + + for row in result: + output_path = f"/vsimem/firezone_{row[1]}.tif" + warp_options = gdal.WarpOptions(format="GTiff", cutlineDSName=DB_READ_STRING, cutlineSQL=f"SELECT geom FROM advisory_shapes WHERE id={row[0]}", cropToCutline=True) + cut_hfi_masked_tpi: gdal.Dataset = gdal.Warp(output_path, hfi_masked_tpi, options=warp_options) + # Get unique values and their counts + tpi_classes, counts = np.unique(cut_hfi_masked_tpi.GetRasterBand(1).ReadAsArray(), return_counts=True) + cut_hfi_masked_tpi = None + gdal.Unlink(output_path) + tpi_class_freq_dist = dict(zip(tpi_classes, counts)) + + # Drop TPI class 4, this is the no data value from the TPI raster + tpi_class_freq_dist.pop(4, None) + fire_zone_stats[row[1]] = tpi_class_freq_dist + + hfi_masked_tpi = None + return FireZoneTPIStats(fire_zone_stats=fire_zone_stats, pixel_size_metres=pixel_size_metres) + + async def process_elevation_by_firezone(threshold: int, masked_dem_path: str, run_parameters_id: int): """ Given a tif that only contains elevations values at pixels where HFI exceeds the threshold, calculate statistics @@ -205,7 +311,7 @@ def intersect_raster_by_firezone(threshold: int, advisory_shape_id: int, source_ :param threshold: The current threshold being processed, 1 = 4k-10k, 2 = > 10k :param advisory_shape_id: The id of the fire zone (aka advisory_shape object) to clip with - :param source_identifier: The source identifer of the fire zone. + :param source_identifier: The source identifier of the fire zone. :param raster_path: The path to the raster to be clipped. :param temp_dir: A temporary location for storing intermediate files """ @@ -261,3 +367,26 @@ async def store_elevation_stats(session: AsyncSession, threshold: int, shape_id: threshold=threshold, ) await save_advisory_elevation_stats(session, advisory_elevation_stats) + + +async def store_elevation_tpi_stats(session: AsyncSession, run_parameters_id: int, fire_zone_tpi_stats: FireZoneTPIStats): + """ + Writes elevation TPI statistics to the database. + + :param shape_id: The advisory shape id. + :param run_parameters_id: The RunParameter object id associated with this run_type, for_date and run_datetime + :param fire_zone_stats: Dictionary keying shape id to a dictionary of classified tpi hfi pixel counts + """ + advisory_tpi_stats_list = [] + for shape_id, tpi_freq_count in fire_zone_tpi_stats.fire_zone_stats.items(): + advisory_tpi_stats = AdvisoryTPIStats( + advisory_shape_id=int(shape_id), + run_parameters=run_parameters_id, + valley_bottom=tpi_freq_count.get(1, 0), + mid_slope=tpi_freq_count.get(2, 0), + upper_slope=tpi_freq_count.get(3, 0), + pixel_size_metres=fire_zone_tpi_stats.pixel_size_metres, + ) + advisory_tpi_stats_list.append(advisory_tpi_stats) + + await save_advisory_elevation_tpi_stats(session, advisory_tpi_stats_list) diff --git a/api/app/auto_spatial_advisory/process_elevation_hfi.py b/api/app/auto_spatial_advisory/process_elevation_hfi.py index a1a800be8..52b5abe23 100644 --- a/api/app/auto_spatial_advisory/process_elevation_hfi.py +++ b/api/app/auto_spatial_advisory/process_elevation_hfi.py @@ -1,31 +1,27 @@ -""" Code relating to processing HFI data related to elevation -""" +"""Code relating to processing HFI data related to elevation""" + import logging from datetime import date, datetime from time import perf_counter -from app.auto_spatial_advisory.common import get_s3_key -from app.auto_spatial_advisory.elevation import process_elevation +from app.auto_spatial_advisory.elevation import process_elevation_tpi from app.auto_spatial_advisory.run_type import RunType logger = logging.getLogger(__name__) async def process_hfi_elevation(run_type: RunType, run_date: date, run_datetime: datetime, for_date: date): - """ Create a new elevation based hfi analysis records for the given date. + """Create a new elevation based hfi analysis records for the given date. :param run_type: The type of run to process. (is it a forecast or actual run?) :param run_date: The date of the run to process. (when was the hfi file created?) :param for_date: The date of the hfi to process. (when is the hfi for?) """ - logger.info('Processing HFI elevation %s for run date: %s, for date: %s', run_type, run_date, for_date) + logger.info("Processing HFI elevation %s for run date: %s, for date: %s", run_type, run_date, for_date) perf_start = perf_counter() - key = get_s3_key(run_type, run_date, for_date) - logger.info(f'Key to HFI in object storage: {key}') - - await process_elevation(key, run_type, run_datetime, for_date) + await process_elevation_tpi(run_type, run_datetime, for_date) perf_end = perf_counter() delta = perf_end - perf_start - logger.info('%f delta count before and after processing HFI elevation', delta) + logger.info("%f delta count before and after processing HFI elevation", delta) diff --git a/api/app/db/crud/auto_spatial_advisory.py b/api/app/db/crud/auto_spatial_advisory.py index fb46a57c4..a6831261c 100644 --- a/api/app/db/crud/auto_spatial_advisory.py +++ b/api/app/db/crud/auto_spatial_advisory.py @@ -9,23 +9,33 @@ from sqlalchemy.engine.row import Row from app.auto_spatial_advisory.run_type import RunType from app.db.models.auto_spatial_advisory import ( - AdvisoryFuelStats, Shape, ClassifiedHfi, HfiClassificationThreshold, SFMSFuelType, RunTypeEnum, - FuelType, HighHfiArea, RunParameters, AdvisoryElevationStats, ShapeType) + AdvisoryFuelStats, + Shape, + ClassifiedHfi, + HfiClassificationThreshold, + SFMSFuelType, + RunTypeEnum, + FuelType, + HighHfiArea, + RunParameters, + AdvisoryElevationStats, + AdvisoryTPIStats, + ShapeType, +) from app.db.models.hfi_calc import FireCentre logger = logging.getLogger(__name__) class HfiClassificationThresholdEnum(Enum): - """ Enum for the different HFI classification thresholds. """ - ADVISORY = 'advisory' - WARNING = 'warning' + """Enum for the different HFI classification thresholds.""" + ADVISORY = "advisory" + WARNING = "warning" -async def get_hfi_classification_threshold(session: AsyncSession, - name: HfiClassificationThresholdEnum) -> HfiClassificationThreshold: - stmt = select(HfiClassificationThreshold).where( - HfiClassificationThreshold.name == name.value) + +async def get_hfi_classification_threshold(session: AsyncSession, name: HfiClassificationThresholdEnum) -> HfiClassificationThreshold: + stmt = select(HfiClassificationThreshold).where(HfiClassificationThreshold.name == name.value) result = await session.execute(stmt) return result.scalars().first() @@ -39,7 +49,7 @@ async def save_fuel_type(session: AsyncSession, fuel_type: FuelType): async def get_fire_zone_unit_shape_type_id(session: AsyncSession): - statement = select(ShapeType).where(ShapeType.name == 'fire_zone_unit') + statement = select(ShapeType).where(ShapeType.name == "fire_zone_unit") result = await session.execute(statement) return result.scalars().first().id @@ -51,7 +61,7 @@ async def get_fire_zone_units(session: AsyncSession, fire_zone_type_id: int): async def get_combustible_area(session: AsyncSession): - """ Get the combustible area for each "shape". This is slow, and we don't expect it to run + """Get the combustible area for each "shape". This is slow, and we don't expect it to run in real time. This method isn't being used right now, but you can calculate the combustible area for each @@ -70,20 +80,19 @@ async def get_combustible_area(session: AsyncSession): ``` """ - logger.info('starting zone/combustible area intersection query') + logger.info("starting zone/combustible area intersection query") perf_start = perf_counter() - stmt = select(Shape.id, - Shape.source_identifier, - Shape.geom.ST_Area().label('zone_area'), - FuelType.geom.ST_Union().ST_Intersection(Shape.geom).ST_Area().label('combustible_area'))\ - .join(FuelType, FuelType.geom.ST_Intersects(Shape.geom))\ - .where(FuelType.fuel_type_id.not_in((-10000, 99, 100, 102, 103)))\ + stmt = ( + select(Shape.id, Shape.source_identifier, Shape.geom.ST_Area().label("zone_area"), FuelType.geom.ST_Union().ST_Intersection(Shape.geom).ST_Area().label("combustible_area")) + .join(FuelType, FuelType.geom.ST_Intersects(Shape.geom)) + .where(FuelType.fuel_type_id.not_in((-10000, 99, 100, 102, 103))) .group_by(Shape.id) + ) result = await session.execute(stmt) all_combustible = result.all() perf_end = perf_counter() delta = perf_end - perf_start - logger.info('%f delta count before and after hfi area + zone/area intersection query', delta) + logger.info("%f delta count before and after hfi area + zone/area intersection query", delta) return all_combustible @@ -91,7 +100,7 @@ async def get_all_hfi_thresholds(session: AsyncSession) -> List[HfiClassificatio """ Retrieve all records from advisory_hfi_classification_threshold table. """ - logger.info('retrieving HFI classification threshold info...') + logger.info("retrieving HFI classification threshold info...") stmt = select(HfiClassificationThreshold) result = await session.execute(stmt) @@ -99,9 +108,7 @@ async def get_all_hfi_thresholds(session: AsyncSession) -> List[HfiClassificatio for row in result.all(): threshold_object = row[0] - thresholds.append(HfiClassificationThreshold(id=threshold_object.id, - description=threshold_object.description, - name=threshold_object.name)) + thresholds.append(HfiClassificationThreshold(id=threshold_object.id, description=threshold_object.description, name=threshold_object.name)) return thresholds @@ -110,7 +117,7 @@ async def get_all_sfms_fuel_types(session: AsyncSession) -> List[SFMSFuelType]: """ Retrieve all records from sfms_fuel_types table """ - logger.info('retrieving SFMS fuel types info...') + logger.info("retrieving SFMS fuel types info...") stmt = select(SFMSFuelType) result = await session.execute(stmt) @@ -118,9 +125,7 @@ async def get_all_sfms_fuel_types(session: AsyncSession) -> List[SFMSFuelType]: for row in result.all(): fuel_type_object = row[0] - fuel_types.append(SFMSFuelType(fuel_type_id=fuel_type_object.fuel_type_id, - fuel_type_code=fuel_type_object.fuel_type_code, - description=fuel_type_object.description)) + fuel_types.append(SFMSFuelType(fuel_type_id=fuel_type_object.fuel_type_id, fuel_type_code=fuel_type_object.fuel_type_code, description=fuel_type_object.description)) return fuel_types @@ -144,87 +149,86 @@ async def get_precomputed_high_hfi_fuel_type_areas_for_shape(session: AsyncSessi all_results = result.all() perf_end = perf_counter() delta = perf_end - perf_start - logger.info('%f delta count before and after fuel types/high hfi/zone query', delta) + logger.info("%f delta count before and after fuel types/high hfi/zone query", delta) return all_results - -async def get_high_hfi_fuel_types_for_shape(session: AsyncSession, - run_type: RunTypeEnum, - run_datetime: datetime, - for_date: date, - shape_id: int) -> List[Row]: +async def get_high_hfi_fuel_types_for_shape(session: AsyncSession, run_type: RunTypeEnum, run_datetime: datetime, for_date: date, shape_id: int) -> List[Row]: """ Union of fuel types by fuel_type_id (1 multipolygon for each fuel type) Intersected with union of ClassifiedHfi for given run_type, run_datetime, and for_date for both 4K-10K and 10K+ HFI values Intersected with fire zone geom for a specific fire zone identified by ID """ - logger.info('starting fuel types/high hfi/zone intersection query for fire zone %s', str(shape_id)) + logger.info("starting fuel types/high hfi/zone intersection query for fire zone %s", str(shape_id)) perf_start = perf_counter() - stmt = select(Shape.source_identifier, FuelType.fuel_type_id, ClassifiedHfi.threshold, - func.sum(FuelType.geom.ST_Intersection(ClassifiedHfi.geom.ST_Intersection(Shape.geom)).ST_Area()).label('area'))\ - .join_from(ClassifiedHfi, Shape, ClassifiedHfi.geom.ST_Intersects(Shape.geom))\ - .join_from(ClassifiedHfi, FuelType, ClassifiedHfi.geom.ST_Intersects(FuelType.geom))\ - .where(ClassifiedHfi.run_type == run_type.value, ClassifiedHfi.for_date == for_date, - ClassifiedHfi.run_datetime == run_datetime, Shape.source_identifier == str(shape_id))\ - .group_by(Shape.source_identifier)\ - .group_by(FuelType.fuel_type_id)\ - .group_by(ClassifiedHfi.threshold)\ - .order_by(FuelType.fuel_type_id)\ + stmt = ( + select( + Shape.source_identifier, + FuelType.fuel_type_id, + ClassifiedHfi.threshold, + func.sum(FuelType.geom.ST_Intersection(ClassifiedHfi.geom.ST_Intersection(Shape.geom)).ST_Area()).label("area"), + ) + .join_from(ClassifiedHfi, Shape, ClassifiedHfi.geom.ST_Intersects(Shape.geom)) + .join_from(ClassifiedHfi, FuelType, ClassifiedHfi.geom.ST_Intersects(FuelType.geom)) + .where(ClassifiedHfi.run_type == run_type.value, ClassifiedHfi.for_date == for_date, ClassifiedHfi.run_datetime == run_datetime, Shape.source_identifier == str(shape_id)) + .group_by(Shape.source_identifier) + .group_by(FuelType.fuel_type_id) + .group_by(ClassifiedHfi.threshold) + .order_by(FuelType.fuel_type_id) .order_by(ClassifiedHfi.threshold) + ) result = await session.execute(stmt) perf_end = perf_counter() delta = perf_end - perf_start - logger.info('%f delta count before and after fuel types/high hfi/zone intersection query', delta) + logger.info("%f delta count before and after fuel types/high hfi/zone intersection query", delta) return result.all() -async def get_high_hfi_fuel_types(session: AsyncSession, - run_type: RunTypeEnum, - run_datetime: datetime, - for_date: date) -> List[Row]: +async def get_high_hfi_fuel_types(session: AsyncSession, run_type: RunTypeEnum, run_datetime: datetime, for_date: date) -> List[Row]: """ Union of fuel types by fuel_type_id (1 multipolygon for each fuel type) Intersected with union of ClassifiedHfi for given run_type, run_datetime, and for_date for both 4K-10K and 10K+ HFI values """ - logger.info('starting fuel types/high hfi/zone intersection query') + logger.info("starting fuel types/high hfi/zone intersection query") perf_start = perf_counter() - stmt = select(Shape.source_identifier, FuelType.fuel_type_id, ClassifiedHfi.threshold, - func.sum(FuelType.geom.ST_Intersection(ClassifiedHfi.geom.ST_Intersection(Shape.geom)).ST_Area()).label('area'))\ - .join_from(ClassifiedHfi, Shape, ClassifiedHfi.geom.ST_Intersects(Shape.geom))\ - .join_from(ClassifiedHfi, FuelType, ClassifiedHfi.geom.ST_Intersects(FuelType.geom))\ - .where(ClassifiedHfi.run_type == run_type.value, ClassifiedHfi.for_date == for_date, - ClassifiedHfi.run_datetime == run_datetime)\ - .group_by(Shape.source_identifier)\ - .group_by(FuelType.fuel_type_id)\ - .group_by(ClassifiedHfi.threshold)\ - .order_by(FuelType.fuel_type_id)\ + stmt = ( + select( + Shape.source_identifier, + FuelType.fuel_type_id, + ClassifiedHfi.threshold, + func.sum(FuelType.geom.ST_Intersection(ClassifiedHfi.geom.ST_Intersection(Shape.geom)).ST_Area()).label("area"), + ) + .join_from(ClassifiedHfi, Shape, ClassifiedHfi.geom.ST_Intersects(Shape.geom)) + .join_from(ClassifiedHfi, FuelType, ClassifiedHfi.geom.ST_Intersects(FuelType.geom)) + .where(ClassifiedHfi.run_type == run_type.value, ClassifiedHfi.for_date == for_date, ClassifiedHfi.run_datetime == run_datetime) + .group_by(Shape.source_identifier) + .group_by(FuelType.fuel_type_id) + .group_by(ClassifiedHfi.threshold) + .order_by(FuelType.fuel_type_id) .order_by(ClassifiedHfi.threshold) + ) logger.info(str(stmt)) result = await session.execute(stmt) perf_end = perf_counter() delta = perf_end - perf_start - logger.info('%f delta count before and after fuel types/high hfi/zone intersection query', delta) + logger.info("%f delta count before and after fuel types/high hfi/zone intersection query", delta) return result.all() -async def get_hfi_area(session: AsyncSession, - run_type: RunTypeEnum, - run_datetime: datetime, - for_date: date) -> List[Row]: - logger.info('gathering hfi area data') - stmt = select(Shape.id, Shape.source_identifier, Shape.combustible_area, HighHfiArea.id, - HighHfiArea.advisory_shape_id, HighHfiArea.threshold, HighHfiArea.area.label('hfi_area'))\ - .join(HighHfiArea, HighHfiArea.advisory_shape_id == Shape.id)\ - .join(RunParameters, RunParameters.id == HighHfiArea.run_parameters)\ - .where(cast(RunParameters.run_type, String) == run_type.value, - RunParameters.for_date == for_date, - RunParameters.run_datetime == run_datetime) + +async def get_hfi_area(session: AsyncSession, run_type: RunTypeEnum, run_datetime: datetime, for_date: date) -> List[Row]: + logger.info("gathering hfi area data") + stmt = ( + select(Shape.id, Shape.source_identifier, Shape.combustible_area, HighHfiArea.id, HighHfiArea.advisory_shape_id, HighHfiArea.threshold, HighHfiArea.area.label("hfi_area")) + .join(HighHfiArea, HighHfiArea.advisory_shape_id == Shape.id) + .join(RunParameters, RunParameters.id == HighHfiArea.run_parameters) + .where(cast(RunParameters.run_type, String) == run_type.value, RunParameters.for_date == for_date, RunParameters.run_datetime == run_datetime) + ) result = await session.execute(stmt) return result.all() @@ -234,29 +238,20 @@ async def get_run_datetimes(session: AsyncSession, run_type: RunTypeEnum, for_da Retrieve all distinct available run_datetimes for a given run_type and for_date, and return the run_datetimes in descending order (most recent is first) """ - stmt = select(RunParameters.run_datetime)\ - .where(RunParameters.run_type == run_type.value, RunParameters.for_date == for_date)\ - .distinct()\ - .order_by(RunParameters.run_datetime.desc()) + stmt = select(RunParameters.run_datetime).where(RunParameters.run_type == run_type.value, RunParameters.for_date == for_date).distinct().order_by(RunParameters.run_datetime.desc()) result = await session.execute(stmt) return result.all() -async def get_high_hfi_area(session: AsyncSession, - run_type: RunTypeEnum, - run_datetime: datetime, - for_date: date) -> List[Row]: - """ For each fire zone, get the area of HFI polygons in that zone that fall within the +async def get_high_hfi_area(session: AsyncSession, run_type: RunTypeEnum, run_datetime: datetime, for_date: date) -> List[Row]: + """For each fire zone, get the area of HFI polygons in that zone that fall within the 4000 - 10000 range and the area of HFI polygons that exceed the 10000 threshold. """ - stmt = select(HighHfiArea.id, - HighHfiArea.advisory_shape_id, - HighHfiArea.area, - HighHfiArea.threshold)\ - .join(RunParameters)\ - .where(cast(RunParameters.run_type, String) == run_type.value, - RunParameters.for_date == for_date, - RunParameters.run_datetime == run_datetime) + stmt = ( + select(HighHfiArea.id, HighHfiArea.advisory_shape_id, HighHfiArea.area, HighHfiArea.threshold) + .join(RunParameters) + .where(cast(RunParameters.run_type, String) == run_type.value, RunParameters.for_date == for_date, RunParameters.run_datetime == run_datetime) + ) result = await session.execute(stmt) return result.all() @@ -286,53 +281,43 @@ async def save_advisory_fuel_stats(session: AsyncSession, advisory_fuel_stats: L session.add_all(advisory_fuel_stats) -async def calculate_high_hfi_areas(session: AsyncSession, run_type: RunType, run_datetime: datetime, - for_date: date) -> List[Row]: +async def calculate_high_hfi_areas(session: AsyncSession, run_type: RunType, run_datetime: datetime, for_date: date) -> List[Row]: """ - Given a 'run_parameters_id', which represents a unqiue combination of run_type, run_datetime - and for_date, individually sum the areas in each firezone with: - 1. 4000 <= HFI < 10000 (aka 'advisory_area') - 2. HFI >= 10000 (aka 'warn_area') + Given a 'run_parameters_id', which represents a unqiue combination of run_type, run_datetime + and for_date, individually sum the areas in each firezone with: + 1. 4000 <= HFI < 10000 (aka 'advisory_area') + 2. HFI >= 10000 (aka 'warn_area') """ - logger.info('starting high HFI by zone intersection query') + logger.info("starting high HFI by zone intersection query") perf_start = perf_counter() - stmt = select(Shape.id.label('shape_id'), - ClassifiedHfi.threshold.label('threshold'), - func.sum(ClassifiedHfi.geom.ST_Intersection(Shape.geom).ST_Area()).label('area'))\ - .join_from(Shape, ClassifiedHfi, ClassifiedHfi.geom.ST_Intersects(Shape.geom))\ - .where(ClassifiedHfi.run_type == run_type.value)\ - .where(ClassifiedHfi.run_datetime == run_datetime)\ - .where(ClassifiedHfi.for_date == for_date)\ - .group_by(Shape.id)\ + stmt = ( + select(Shape.id.label("shape_id"), ClassifiedHfi.threshold.label("threshold"), func.sum(ClassifiedHfi.geom.ST_Intersection(Shape.geom).ST_Area()).label("area")) + .join_from(Shape, ClassifiedHfi, ClassifiedHfi.geom.ST_Intersects(Shape.geom)) + .where(ClassifiedHfi.run_type == run_type.value) + .where(ClassifiedHfi.run_datetime == run_datetime) + .where(ClassifiedHfi.for_date == for_date) + .group_by(Shape.id) .group_by(ClassifiedHfi.threshold) + ) result = await session.execute(stmt) all_high_hfi = result.all() perf_end = perf_counter() delta = perf_end - perf_start - logger.info('%f delta count before and after calculate high HFI by zone intersection query', delta) + logger.info("%f delta count before and after calculate high HFI by zone intersection query", delta) return all_high_hfi -async def get_run_parameters_id(session: AsyncSession, - run_type: RunType, - run_datetime: datetime, - for_date: date) -> List[Row]: - stmt = select(RunParameters.id)\ - .where(cast(RunParameters.run_type, String) == run_type.value, - RunParameters.run_datetime == run_datetime, - RunParameters.for_date == for_date) +async def get_run_parameters_id(session: AsyncSession, run_type: RunType, run_datetime: datetime, for_date: date) -> List[Row]: + stmt = select(RunParameters.id).where(cast(RunParameters.run_type, String) == run_type.value, RunParameters.run_datetime == run_datetime, RunParameters.for_date == for_date) result = await session.execute(stmt) return result.scalar() async def save_run_parameters(session: AsyncSession, run_type: RunType, run_datetime: datetime, for_date: date): - logger.info( - f'Writing run parameters. RunType: {run_type.value}; run_datetime: {run_datetime.isoformat()}; for_date: {for_date.isoformat()}') - stmt = insert(RunParameters)\ - .values(run_type=run_type.value, run_datetime=run_datetime, for_date=for_date)\ - .on_conflict_do_nothing() + logger.info(f"Writing run parameters. RunType: {run_type.value}; run_datetime: {run_datetime.isoformat()}; for_date: {for_date.isoformat()}") + stmt = insert(RunParameters).values(run_type=run_type.value, run_datetime=run_datetime, for_date=for_date).on_conflict_do_nothing() await session.execute(stmt) @@ -340,21 +325,29 @@ async def save_advisory_elevation_stats(session: AsyncSession, advisory_elevatio session.add(advisory_elevation_stats) -async def get_zonal_elevation_stats(session: AsyncSession, - fire_zone_id: int, - run_type: RunType, - run_datetime: datetime, - for_date: date) -> List[Row]: +async def save_advisory_elevation_tpi_stats(session: AsyncSession, advisory_elevation_stats: List[AdvisoryTPIStats]): + session.add_all(advisory_elevation_stats) + + +async def get_zonal_elevation_stats(session: AsyncSession, fire_zone_id: int, run_type: RunType, run_datetime: datetime, for_date: date) -> List[Row]: run_parameters_id = await get_run_parameters_id(session, run_type, run_datetime, for_date) stmt = select(Shape.id).where(Shape.source_identifier == str(fire_zone_id)) result = await session.execute(stmt) shape_id = result.scalar() - stmt = select(AdvisoryElevationStats.advisory_shape_id, AdvisoryElevationStats.minimum, - AdvisoryElevationStats.quartile_25, AdvisoryElevationStats.median, AdvisoryElevationStats.quartile_75, - AdvisoryElevationStats.maximum, AdvisoryElevationStats.threshold)\ - .where(AdvisoryElevationStats.advisory_shape_id == shape_id, AdvisoryElevationStats.run_parameters == run_parameters_id)\ + stmt = ( + select( + AdvisoryElevationStats.advisory_shape_id, + AdvisoryElevationStats.minimum, + AdvisoryElevationStats.quartile_25, + AdvisoryElevationStats.median, + AdvisoryElevationStats.quartile_75, + AdvisoryElevationStats.maximum, + AdvisoryElevationStats.threshold, + ) + .where(AdvisoryElevationStats.advisory_shape_id == shape_id, AdvisoryElevationStats.run_parameters == run_parameters_id) .order_by(AdvisoryElevationStats.threshold) + ) return await session.execute(stmt) @@ -378,4 +371,4 @@ async def get_provincial_rollup(session: AsyncSession, run_type: RunTypeEnum, ru .join(HighHfiArea, and_(HighHfiArea.advisory_shape_id == Shape.id, HighHfiArea.run_parameters == run_parameter_id), isouter=True) ) result = await session.execute(stmt) - return result.all() \ No newline at end of file + return result.all() diff --git a/api/app/db/models/auto_spatial_advisory.py b/api/app/db/models/auto_spatial_advisory.py index 940233699..ecee5e1e6 100644 --- a/api/app/db/models/auto_spatial_advisory.py +++ b/api/app/db/models/auto_spatial_advisory.py @@ -1,5 +1,5 @@ import enum -from sqlalchemy import (Integer, Date, String, Float, Column, Index, ForeignKey, Enum, UniqueConstraint) +from sqlalchemy import Integer, Date, String, Float, Column, Index, ForeignKey, Enum, UniqueConstraint from app.db.models.common import TZTimeStamp from geoalchemy2 import Geometry from app.db.models import Base @@ -7,40 +7,43 @@ from app.geospatial import NAD83_BC_ALBERS from sqlalchemy.dialects import postgresql + class ShapeTypeEnum(enum.Enum): - """ Define different shape types. e.g. "Zone", "Fire Centre" - later we may add - "Incident"/"Fire", "Custom" etc. etc. """ + """Define different shape types. e.g. "Zone", "Fire Centre" - later we may add + "Incident"/"Fire", "Custom" etc. etc.""" + fire_centre = 1 fire_zone = 2 fire_zone_unit = 3 class RunTypeEnum(enum.Enum): - """ Define different run types. e.g. "Forecast", "Actual" """ + """Define different run types. e.g. "Forecast", "Actual" """ + forecast = "forecast" actual = "actual" class ShapeType(Base): - """ Identify some kind of area type, e.g. "Zone", or "Fire" """ - __tablename__ = 'advisory_shape_types' - __table_args__ = ( - {'comment': 'Identify kind of advisory area (e.g. Zone, Fire etc.)'} - ) + """Identify some kind of area type, e.g. "Zone", or "Fire" """ + + __tablename__ = "advisory_shape_types" + __table_args__ = {"comment": "Identify kind of advisory area (e.g. Zone, Fire etc.)"} id = Column(Integer, primary_key=True) name = Column(Enum(ShapeTypeEnum), nullable=False, unique=True, index=True) class Shape(Base): - """ Identify some area of interest with respect to advisories. """ - __tablename__ = 'advisory_shapes' + """Identify some area of interest with respect to advisories.""" + + __tablename__ = "advisory_shapes" __table_args__ = ( # we may have to re-visit this constraint - but for the time being, the idea is # that for any given type of area, it has to be unique for the kind of thing that # it is. e.g. a zone has some id. - UniqueConstraint('source_identifier', 'shape_type'), - {'comment': 'Record identifying some area of interest with respect to advisories'} + UniqueConstraint("source_identifier", "shape_type"), + {"comment": "Record identifying some area of interest with respect to advisories"}, ) id = Column(Integer, primary_key=True) @@ -52,23 +55,22 @@ class Shape(Base): # Have to make this column nullable to start because the table already exists. Will be # modified in subsequent migration to nullable=False combustible_area = Column(Float, nullable=True) - geom = Column(Geometry('MULTIPOLYGON', spatial_index=False, srid=NAD83_BC_ALBERS), nullable=False) + geom = Column(Geometry("MULTIPOLYGON", spatial_index=False, srid=NAD83_BC_ALBERS), nullable=False) label = Column(String, nullable=True, index=False) fire_centre = Column(Integer, ForeignKey(FireCentre.id), nullable=True, index=True) -# Explict creation of index due to issue with alembic + geoalchemy. -Index('idx_advisory_shapes_geom', Shape.geom, postgresql_using='gist') +# Explicit creation of index due to issue with alembic + geoalchemy. +Index("idx_advisory_shapes_geom", Shape.geom, postgresql_using="gist") class HfiClassificationThreshold(Base): - __tablename__ = 'advisory_hfi_classification_threshold' - __table_args__ = ( - {'comment': 'The Operational Safe Works Standards specifies that an hfi of greater than ' - '4000 should result in an advisory. However in order for an FBAN to create ' - 'useful information, there are other thresholds of concern. E.g. > 10000' - } - ) + __tablename__ = "advisory_hfi_classification_threshold" + __table_args__ = { + "comment": "The Operational Safe Works Standards specifies that an hfi of greater than " + "4000 should result in an advisory. However in order for an FBAN to create " + "useful information, there are other thresholds of concern. E.g. > 10000" + } id = Column(Integer, primary_key=True, index=True) # e.g. '4000 < hfi < 10000' or 'hfi >= 10000' description = Column(String, nullable=False) @@ -77,45 +79,44 @@ class HfiClassificationThreshold(Base): class ClassifiedHfi(Base): - """ HFI classified into different groups. + """HFI classified into different groups. NOTE: In actual fact, forecasts and actuals can be run multiple times per day, - but we only care about the most recent one, so we only store the date, not the timesamp. + but we only care about the most recent one, so we only store the date, not the timestamp. """ - __tablename__ = 'advisory_classified_hfi' - __table_args__ = ( - {'comment': 'HFI classification for some forecast/advisory run on some day, for some date'} - ) + + __tablename__ = "advisory_classified_hfi" + __table_args__ = {"comment": "HFI classification for some forecast/advisory run on some day, for some date"} id = Column(Integer, primary_key=True, index=True) threshold = Column(Integer, ForeignKey(HfiClassificationThreshold.id), nullable=False, index=True) run_type = Column(Enum(RunTypeEnum), nullable=False, index=True) run_datetime = Column(TZTimeStamp, nullable=False) for_date = Column(Date, nullable=False) - geom = Column(Geometry('POLYGON', spatial_index=False, srid=NAD83_BC_ALBERS)) + geom = Column(Geometry("POLYGON", spatial_index=False, srid=NAD83_BC_ALBERS)) -# Explict creation of index due to issue with alembic + geoalchemy. -Index('idx_advisory_classified_hfi_geom', ClassifiedHfi.geom, postgresql_using='gist') +# Explicit creation of index due to issue with alembic + geoalchemy. +Index("idx_advisory_classified_hfi_geom", ClassifiedHfi.geom, postgresql_using="gist") class FuelType(Base): - """ Identify some kind of fuel type. """ - __tablename__ = 'advisory_fuel_types' - __table_args__ = ( - {'comment': 'Identify some kind of fuel type'} - ) + """Identify some kind of fuel type.""" + + __tablename__ = "advisory_fuel_types" + __table_args__ = {"comment": "Identify some kind of fuel type"} id = Column(Integer, primary_key=True, index=True) fuel_type_id = Column(Integer, nullable=False, index=True) - geom = Column(Geometry('POLYGON', spatial_index=False, srid=NAD83_BC_ALBERS)) + geom = Column(Geometry("POLYGON", spatial_index=False, srid=NAD83_BC_ALBERS)) # Explict creation of index due to issue with alembic + geoalchemy. -Index('idx_advisory_fuel_types_geom', FuelType.geom, postgresql_using='gist') +Index("idx_advisory_fuel_types_geom", FuelType.geom, postgresql_using="gist") class SFMSFuelType(Base): - """ Fuel types used by SFMS system """ - __tablename__ = 'sfms_fuel_types' - __table_args__ = ({'comment': 'Fuel types used by SFMS to calculate HFI spatially'}) + """Fuel types used by SFMS system""" + + __tablename__ = "sfms_fuel_types" + __table_args__ = {"comment": "Fuel types used by SFMS to calculate HFI spatially"} id = Column(Integer, primary_key=True, index=True) fuel_type_id = Column(Integer, nullable=False, index=True) fuel_type_code = Column(String, nullable=False) @@ -123,27 +124,25 @@ class SFMSFuelType(Base): class RunParameters(Base): - """ Combination of type of run (actual vs forecast), run datetime and for date.""" - __tablename__ = 'run_parameters' - __table_args__ = ( - UniqueConstraint('run_type', 'run_datetime', 'for_date'), - {'comment': 'A combination of run type, run datetime and for date.'} - ) + """Combination of type of run (actual vs forecast), run datetime and for date.""" + + __tablename__ = "run_parameters" + __table_args__ = (UniqueConstraint("run_type", "run_datetime", "for_date"), {"comment": "A combination of run type, run datetime and for date."}) id = Column(Integer, primary_key=True, index=True) - run_type = Column(postgresql.ENUM('actual', 'forecast', name='runtypeenum', - create_type=False), nullable=False, index=True) + run_type = Column(postgresql.ENUM("actual", "forecast", name="runtypeenum", create_type=False), nullable=False, index=True) run_datetime = Column(TZTimeStamp, nullable=False, index=True) for_date = Column(Date, nullable=False, index=True) + class HighHfiArea(Base): - """ Area exceeding HFI thresholds per fire zone. """ - __tablename__ = 'high_hfi_area' - __table_args__ = ( - {'comment': 'Area under advisory/warning per fire zone. advisory_area refers to the total area ' - 'in a fire zone with HFI values between 4000 - 10000 and warn_area refers to the total ' - 'area in a fire zone with HFI values exceeding 10000.' - } - ) + """Area exceeding HFI thresholds per fire zone.""" + + __tablename__ = "high_hfi_area" + __table_args__ = { + "comment": "Area under advisory/warning per fire zone. advisory_area refers to the total area " + "in a fire zone with HFI values between 4000 - 10000 and warn_area refers to the total " + "area in a fire zone with HFI values exceeding 10000." + } id = Column(Integer, primary_key=True, index=True) advisory_shape_id = Column(Integer, ForeignKey(Shape.id), nullable=False) @@ -153,16 +152,13 @@ class HighHfiArea(Base): class AdvisoryElevationStats(Base): - """ + """ Summary statistics about the elevation of area with high hfi (4k-10k and >10k) per fire shape based on the set run_type, for_date and run_datetime. """ - __tablename__ = 'advisory_elevation_stats' - __table_args__ = ( - { - 'comment': 'Elevation stats per fire shape by advisory threshold' - } - ) + + __tablename__ = "advisory_elevation_stats" + __table_args__ = {"comment": "Elevation stats per fire shape by advisory threshold"} id = Column(Integer, primary_key=True, index=True) advisory_shape_id = Column(Integer, ForeignKey(Shape.id), nullable=False, index=True) threshold = Column(Integer, ForeignKey(HfiClassificationThreshold.id), nullable=False) @@ -175,20 +171,35 @@ class AdvisoryElevationStats(Base): class AdvisoryFuelStats(Base): - """ + """ Summary statistics about the fuel type of area with high hfi (4k-10k and >10k) per fire shape based on the set run_type, for_date and run_datetime. """ - __tablename__ = 'advisory_fuel_stats' - __table_args__ = ( - UniqueConstraint('advisory_shape_id', 'threshold', 'run_parameters', 'fuel_type'), - { - 'comment': 'Fuel type stats per fire shape by advisory threshold' - } - ) + + __tablename__ = "advisory_fuel_stats" + __table_args__ = (UniqueConstraint("advisory_shape_id", "threshold", "run_parameters", "fuel_type"), {"comment": "Fuel type stats per fire shape by advisory threshold"}) id = Column(Integer, primary_key=True, index=True) advisory_shape_id = Column(Integer, ForeignKey(Shape.id), nullable=False, index=True) threshold = Column(Integer, ForeignKey(HfiClassificationThreshold.id), nullable=False) run_parameters = Column(Integer, ForeignKey(RunParameters.id), nullable=False, index=True) fuel_type = Column(Integer, ForeignKey(SFMSFuelType.id), nullable=False, index=True) area = Column(Float, nullable=False) + + +class AdvisoryTPIStats(Base): + """ + Summary statistics about the elevation based on a HFI >4k classified Topographic Position Index (TPI). + Classification of the TPI are bucketed into 1 = valley bottom, 2 = mid slope, 3 = upper slope. + Each record includes the raster pixel count of the above classifications as well as the raster pixel size and resolution in metres + and an advisory shape the stats are related to. + """ + + __tablename__ = "advisory_tpi_stats" + __table_args__ = {"comment": "Elevation TPI stats per fire shape"} + id = Column(Integer, primary_key=True, index=True) + advisory_shape_id = Column(Integer, ForeignKey(Shape.id), nullable=False, index=True) + run_parameters = Column(Integer, ForeignKey(RunParameters.id), nullable=False, index=True) + valley_bottom = Column(Integer, nullable=False, index=False) + mid_slope = Column(Integer, nullable=False, index=False) + upper_slope = Column(Integer, nullable=False, index=False) + pixel_size_metres = Column(Integer, nullable=False, index=False) diff --git a/api/app/utils/geospatial.py b/api/app/utils/geospatial.py new file mode 100644 index 000000000..e54288235 --- /dev/null +++ b/api/app/utils/geospatial.py @@ -0,0 +1,87 @@ +from dataclasses import dataclass +import logging +from typing import Any, Optional +from osgeo import gdal + + +logger = logging.getLogger(__name__) + + +def warp_to_match_extent(source_raster: gdal.Dataset, raster_to_match: gdal.Dataset, output_path: str) -> gdal.Dataset: + """ + Warp the source_raster to match the extent and projection of the other raster. + + :param source_raster: the raster to warp + :param raster_to_match: the reference raster to match the source against + :param output_path: output path of the resulting raster + :return: warped raster dataset + """ + source_geotransform = raster_to_match.GetGeoTransform() + x_res = source_geotransform[1] + y_res = -source_geotransform[5] + minx = source_geotransform[0] + maxy = source_geotransform[3] + maxx = minx + source_geotransform[1] * raster_to_match.RasterXSize + miny = maxy + source_geotransform[5] * raster_to_match.RasterYSize + extent = [minx, miny, maxx, maxy] + + # Warp to match input option parameters + return gdal.Warp(output_path, source_raster, dstSRS=raster_to_match.GetProjection(), outputBounds=extent, xRes=x_res, yRes=y_res, resampleAlg=gdal.GRA_NearestNeighbour) + + +def raster_mul(tpi_raster: gdal.Dataset, hfi_raster: gdal.Dataset, chunk_size=256) -> gdal.Dataset: + """ + Multiply rasters together by reading in chunks of pixels at a time to avoid loading + the rasters into memory all at once. + + :param tpi_raster: Classified TPI raster to multiply against the classified HFI raster + :param hfi_raster: Classified HFI raster to multiply against the classified TPI raster + :raises ValueError: Raised if the dimensions of the rasters don't match + :return: Multiplied raster result as a raster dataset + """ + # Get raster dimensions + x_size = tpi_raster.RasterXSize + y_size = tpi_raster.RasterYSize + + # Check if the dimensions of both rasters match + if x_size != hfi_raster.RasterXSize or y_size != hfi_raster.RasterYSize: + raise ValueError("The dimensions of the two rasters do not match.") + + # Get the geotransform and projection from the first raster + geotransform = tpi_raster.GetGeoTransform() + projection = tpi_raster.GetProjection() + + # Create the output raster + driver = gdal.GetDriverByName("MEM") + out_raster: gdal.Dataset = driver.Create("memory", x_size, y_size, 1, gdal.GDT_Byte) + + # Set the geotransform and projection + out_raster.SetGeoTransform(geotransform) + out_raster.SetProjection(projection) + + tpi_raster_band = tpi_raster.GetRasterBand(1) + hfi_raster_band = hfi_raster.GetRasterBand(1) + + # Process in chunks + for y in range(0, y_size, chunk_size): + y_chunk_size = min(chunk_size, y_size - y) + + for x in range(0, x_size, chunk_size): + x_chunk_size = min(chunk_size, x_size - x) + + # Read chunks from both rasters + tpi_chunk = tpi_raster_band.ReadAsArray(x, y, x_chunk_size, y_chunk_size) + hfi_chunk = hfi_raster_band.ReadAsArray(x, y, x_chunk_size, y_chunk_size) + + hfi_chunk[hfi_chunk >= 1] = 1 + hfi_chunk[hfi_chunk < 1] = 0 + + # Multiply the chunks + tpi_chunk *= hfi_chunk + + # Write the result to the output raster + out_raster.GetRasterBand(1).WriteArray(tpi_chunk, x, y) + tpi_chunk = None + hfi_chunk = None + + return out_raster diff --git a/openshift/scripts/oc_provision_nats.sh b/openshift/scripts/oc_provision_nats.sh index 58db2c6be..44ead304f 100755 --- a/openshift/scripts/oc_provision_nats.sh +++ b/openshift/scripts/oc_provision_nats.sh @@ -29,6 +29,8 @@ OC_PROCESS="oc -n ${PROJ_TARGET} process -f ${PATH_NATS} \ -p IMAGE_NAME=${APP_NAME}-api-${SUFFIX} \ -p IMAGE_TAG=${SUFFIX} \ -p POSTGRES_DATABASE=wps \ + ${MEMORY_REQUEST:+ "-p MEMORY_REQUEST=${MEMORY_REQUEST}"} \ + ${MEMORY_LIMIT:+ "-p MEMORY_LIMIT=${MEMORY_LIMIT}"} \ -p CRUNCHYDB_USER=wps-crunchydb-${SUFFIX}-pguser-wps-crunchydb-${SUFFIX} \ -p APP_NAME=${APP_NAME}" diff --git a/openshift/templates/nats.yaml b/openshift/templates/nats.yaml index b95f52720..48f0f2640 100644 --- a/openshift/templates/nats.yaml +++ b/openshift/templates/nats.yaml @@ -36,6 +36,13 @@ parameters: required: true - name: POSTGRES_DATABASE required: true + - name: MEMORY_REQUEST + required: true + value: 3800Mi + - name: MEMORY_LIMIT + required: true + value: 5000Mi + objects: - apiVersion: v1 kind: ConfigMap @@ -259,6 +266,13 @@ objects: - name: ${APP_NAME}-${SUFFIX}-nats-consumer image: ${IMAGE_REGISTRY}/${PROJ_TOOLS}/${IMAGE_NAME}:${IMAGE_TAG} imagePullPolicy: "Always" + resources: + requests: + cpu: "250m" + memory: ${MEMORY_REQUEST} + limits: + cpu: "500m" + memory: ${MEMORY_LIMIT} command: [ "poetry", @@ -332,6 +346,11 @@ objects: configMapKeyRef: name: ${GLOBAL_NAME} key: env.dem_name + - name: CLASSIFIED_TPI_DEM_NAME + valueFrom: + configMapKeyRef: + name: ${GLOBAL_NAME} + key: env.classified_tpi_dem_name ports: - containerPort: 4222 name: client