From 53eb9f53eb0a7a55a1ff952479ed44df28ca2f61 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Mon, 15 Jan 2024 11:21:40 -0800 Subject: [PATCH 01/11] processed_snow model --- .../versions/403586c146ae_processed_snow.py | 43 +++++++++++++++++++ api/app/db/crud/snow.py | 21 +++++++++ api/app/db/models/__init__.py | 1 + api/app/db/models/snow.py | 24 +++++++++++ api/app/db/models/weather_models.py | 2 +- 5 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 api/alembic/versions/403586c146ae_processed_snow.py create mode 100644 api/app/db/crud/snow.py create mode 100644 api/app/db/models/snow.py diff --git a/api/alembic/versions/403586c146ae_processed_snow.py b/api/alembic/versions/403586c146ae_processed_snow.py new file mode 100644 index 000000000..73eac44bf --- /dev/null +++ b/api/alembic/versions/403586c146ae_processed_snow.py @@ -0,0 +1,43 @@ +"""Processed snow + +Revision ID: 403586c146ae +Revises: 7cd069b79aaa +Create Date: 2024-01-15 11:05:24.330674 + +""" +from alembic import op +import sqlalchemy as sa +from app.db.models.common import TZTimeStamp + +# revision identifiers, used by Alembic. +revision = '403586c146ae' +down_revision = '7cd069b79aaa' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic ### + op.create_table('processed_snow', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('for_date', TZTimeStamp(), nullable=False), + sa.Column('processed_date', TZTimeStamp(), nullable=False), + sa.Column('snow_source', sa.Enum('viirs', name='snowsourceenum'), nullable=False), + sa.PrimaryKeyConstraint('id'), + comment='Record containing information about processed snow coverage data.' + ) + op.create_index(op.f('ix_processed_snow_for_date'), 'processed_snow', ['for_date'], unique=False) + op.create_index(op.f('ix_processed_snow_id'), 'processed_snow', ['id'], unique=False) + op.create_index(op.f('ix_processed_snow_processed_date'), 'processed_snow', ['processed_date'], unique=False) + op.create_index(op.f('ix_processed_snow_snow_source'), 'processed_snow', ['snow_source'], unique=False) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic ### + op.drop_index(op.f('ix_processed_snow_snow_source'), table_name='processed_snow') + op.drop_index(op.f('ix_processed_snow_processed_date'), table_name='processed_snow') + op.drop_index(op.f('ix_processed_snow_id'), table_name='processed_snow') + op.drop_index(op.f('ix_processed_snow_for_date'), table_name='processed_snow') + op.drop_table('processed_snow') + # ### end Alembic commands ### diff --git a/api/app/db/crud/snow.py b/api/app/db/crud/snow.py new file mode 100644 index 000000000..2a5f1be30 --- /dev/null +++ b/api/app/db/crud/snow.py @@ -0,0 +1,21 @@ +""" CRUD operations relating to processing snow coverage +""" +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession +from app.db.models.snow import ProcessedSnow, SnowSourceEnum + + +async def save_processed_snow(session: AsyncSession, processed_snow: ProcessedSnow): + """ Add a new ProcessedSnow record. + """ + session.add(processed_snow) + + +async def get_last_processed_snow_by_source(session: AsyncSession, snow_source: SnowSourceEnum): + """ Retrieve the record with the most recent for_date of the specified snow source. + """ + stmt = select(ProcessedSnow)\ + .where(ProcessedSnow.snow_source == snow_source)\ + .order_by(ProcessedSnow.for_date.desc()) + result = await session.execute(stmt) + return result.first() \ No newline at end of file diff --git a/api/app/db/models/__init__.py b/api/app/db/models/__init__.py index 43d304288..7e0721833 100644 --- a/api/app/db/models/__init__.py +++ b/api/app/db/models/__init__.py @@ -16,3 +16,4 @@ from app.db.models.auto_spatial_advisory import (Shape, ShapeType, HfiClassificationThreshold, ClassifiedHfi, RunTypeEnum, ShapeTypeEnum, FuelType, HighHfiArea, RunParameters) from app.db.models.morecast_v2 import MorecastForecastRecord +from app.db.models.snow import ProcessedSnow, SnowSourceEnum diff --git a/api/app/db/models/snow.py b/api/app/db/models/snow.py new file mode 100644 index 000000000..f61812b9f --- /dev/null +++ b/api/app/db/models/snow.py @@ -0,0 +1,24 @@ +import enum +from app.db.models import Base +from sqlalchemy import (Column, Enum, Integer, Sequence) +from app.db.models.common import TZTimeStamp + + +class SnowSourceEnum(enum.Enum): + """ Define different sensors from which snow data is processed. eg. VIIRS """ + viirs = "viirs" + + +class ProcessedSnow(Base): + """ Keeps track of snow coverage data that has been processed. """ + __tablename__ = 'processed_snow' + __table_args__ = ( + {'comment': 'Record containing information about processed snow coverage data.'} + ) + + id = Column(Integer, Sequence('processed_snow_id_seq'), + primary_key=True, nullable=False, index=True) + for_date = Column(TZTimeStamp, nullable=False, index=True) + processed_date = Column(TZTimeStamp, nullable=False, index=True) + snow_source = Column(Enum(SnowSourceEnum), nullable=False, index=True) + \ No newline at end of file diff --git a/api/app/db/models/weather_models.py b/api/app/db/models/weather_models.py index f8687c5c6..0c9ff4928 100644 --- a/api/app/db/models/weather_models.py +++ b/api/app/db/models/weather_models.py @@ -184,7 +184,7 @@ class ModelRunPrediction(Base): # The date and time to which the prediction applies. prediction_timestamp = Column(TZTimeStamp, nullable=False, index=True) # The station code representing the location (aka weather station). - station_code = Column(Integer, nullable=True) + station_code = Column(Integer, nullable=False) # Temperature 2m above model layer. tmp_tgl_2 = Column(Float, nullable=True) # Relative humidity 2m above model layer. From cad7fbcd72e5f5a7e22a9abaaf13c72f8f2bbb0b Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Fri, 19 Jan 2024 11:02:23 -0800 Subject: [PATCH 02/11] Process VIIRS snow data --- api/app/db/crud/snow.py | 10 +- api/app/jobs/viirs_snow.py | 217 ++++++++++++++++++++++++++++++++++++- 2 files changed, 223 insertions(+), 4 deletions(-) diff --git a/api/app/db/crud/snow.py b/api/app/db/crud/snow.py index 2a5f1be30..fed5075ca 100644 --- a/api/app/db/crud/snow.py +++ b/api/app/db/crud/snow.py @@ -7,12 +7,20 @@ async def save_processed_snow(session: AsyncSession, processed_snow: ProcessedSnow): """ Add a new ProcessedSnow record. + + :param processed_snow: The record to be saved.List of actual weather values + :type processed_snow: ProcessedSnow """ session.add(processed_snow) -async def get_last_processed_snow_by_source(session: AsyncSession, snow_source: SnowSourceEnum): +async def get_last_processed_snow_by_source(session: AsyncSession, snow_source: SnowSourceEnum) -> ProcessedSnow: """ Retrieve the record with the most recent for_date of the specified snow source. + + :param snow_source: The source of snow data of interest. + :type snow_source: SnowSourceEnum + :return: A record containing the last date for which snow data from the specified source was successfully processed. + :rtype: ProcessedSnow """ stmt = select(ProcessedSnow)\ .where(ProcessedSnow.snow_source == snow_source)\ diff --git a/api/app/jobs/viirs_snow.py b/api/app/jobs/viirs_snow.py index 08c5733da..5de21f4b1 100644 --- a/api/app/jobs/viirs_snow.py +++ b/api/app/jobs/viirs_snow.py @@ -1,11 +1,222 @@ +from datetime import date, timedelta +from osgeo import gdal +import asyncio +import glob import logging - -from app import configure_logging +import os +import requests +import shutil +import sys +import tempfile +from app import config, configure_logging +from app.db.crud.snow import get_last_processed_snow_by_source, save_processed_snow +from app.db.database import get_async_read_session_scope, get_async_write_session_scope +from app.db.models.snow import ProcessedSnow, SnowSourceEnum +from app.rocketchat_notifications import send_rocketchat_notification +from app.utils.s3 import get_client logger = logging.getLogger(__name__) +BC_BOUNDING_BOX = "-139.06,48.3,-114.03,60" +NSIDC_URL = "https://n5eil02u.ecs.nsidc.org/egi/request" +PRODUCT_VERSION = 2 +SHORT_NAME = "VNP10A1F" +LAYER_VARIABLE = "/VIIRS_Grid_IMG_2D/CGF_NDSI_Snow_Cover" +RAW_SNOW_COVERAGE_NAME = 'raw_snow_coverage.tif' +RAW_SNOW_COVERAGE_CLIPPED_NAME = 'raw_snow_coverage_clipped.tif' + + +class ViirsSnowJob(): + """ Job that downloads and processed VIIRS snow coverage data from the NSIDC (https://nsidc.org). + """ + + async def _get_last_processed_date(self) -> date: + """ Return the date of the most recent successful processing of VIIRS snow data. + + :return: The date of the most recent successful processing of VIIRS snow data. + :rtype: date + """ + async with get_async_read_session_scope() as session: + last_processed = await get_last_processed_snow_by_source(session, SnowSourceEnum.viirs) + return None if last_processed is None else last_processed[0].for_date.date() + + + def _download_viirs_granules_by_date(self, for_date: date, path: str, file_name: str): + """ Download VIIRS snow data for the specified date. + + :param for_date: The date of interest. + :type for_date: date + :param path: The path to a temporary directory to download the data to. + :type path: str + :param file_name: The name to assign to the downloaded file/archive. + :type: file_name: str + """ + # Interesting flow required by the NSIDC API. We need to issue a request without credentials which will fail + # with a 401. The URL in the response object is the authentication endpoint, so we issue a request to it with basic auth. + # Now we are authenticated with a session cookie and can make additional requests to the API. + # Furthermore, it looks like we're stuck with synchronous requests. + logger.info(f"Downloading VIIRS snow coverage data for date: {for_date.strftime('%Y-%m-%d')}") + session = requests.session() + resp = session.get(f"https://n5eil02u.ecs.nsidc.org/egi/capabilities/{SHORT_NAME}.{PRODUCT_VERSION}.xml") + session.get(resp.url, auth=(config.get("NASA_EARTHDATA_USER"),config.get("NASA_EARTHDATA_PWD"))) + # Check if request was successful + param_dict = {'short_name': SHORT_NAME, + 'version': PRODUCT_VERSION, + 'temporal': f"{for_date},{for_date}", + 'bounding_box': BC_BOUNDING_BOX, + 'bbox': BC_BOUNDING_BOX, + 'format': "GeoTIFF", + 'projection': "GEOGRAPHIC", + 'Coverage': LAYER_VARIABLE, + 'page_size': 100, + 'request_mode': "stream" + } + request = session.get(NSIDC_URL, params = param_dict) + request.raise_for_status() + file_path = os.path.join(path, file_name) + with open(file_path, 'wb') as file: + file.write(request.content) + # unzip the snow coverage data + shutil.unpack_archive(file_path, path) + + + def _create_snow_coverage_mosaic(self, path: str): + """ Use GDAL to create a mosaic from mulitple tifs of VIIRS snow data. + + :param path: The path to a temporary directory where the tifs are located. Also where the mosaic will be saved. + :type path: str + """ + output = os.path.join(path, RAW_SNOW_COVERAGE_NAME) + files = glob.glob(os.path.join(path, "**/*.tif"), recursive=True) + options = gdal.WarpOptions(format='GTiff', srcNodata=255) + gdal.Warp(output, files, options=options) + + + async def _clip_snow_coverage_mosaic(self, sub_dir: str, temp_dir: str): + """ Clip the boundary of the snow data mosaic to the boundary of BC. + + :param sub_dir: The path to the location of the mosaic. Also the output path for the clipped image. + :type sub_dir: str + :param temp_dir: The path to the location of the geojson file used to clip the moasic. + :type temp_dir: str + """ + input_path = os.path.join(sub_dir, RAW_SNOW_COVERAGE_NAME) + output_path = os.path.join(sub_dir, RAW_SNOW_COVERAGE_CLIPPED_NAME) + cut_line_path = os.path.join(temp_dir, "bc_boundary.geojson") + gdal.Warp(output_path, input_path, format='GTiff', cutlineDSName=cut_line_path) + + + async def _get_bc_boundary_from_s3(self, path: str): + """ Fetch the bc_boundary.geojson file from S3 and write a copy to the local temporary directory. + The file will be used to clip the snow coverage mosaic to match the boundary of BC. + + :param path: The path to which to save the bc_boundary.geojson file. + :type path: str + """ + async with get_client() as (client, bucket): + bc_boundary = await client.get_object(Bucket=bucket, Key="bc_boundary/bc_boundary.geojson") + data = await bc_boundary['Body'].read() + bc_boundary_geojson_path = os.path.join(path, "bc_boundary.geojson") + with open(bc_boundary_geojson_path, "wb") as file: + file.write(data) + + + async def _save_clipped_snow_coverage_moasic_to_s3(self, for_date: date, path: str): + """ Save the clipped mosaic to S3 storage. + + :param for_date: The date of interest. + :type for_date: date + :param path: The path to the directory containing the mosaic. + :type path: str + """ + async with get_client() as (client, bucket): + key = f"snow_coverage/{for_date.strftime('%Y-%m-%d')}/clipped_snow_coverage_{for_date.strftime('%Y-%m-%d')}_epsg4326.tif" + file_path = f"{path}/{RAW_SNOW_COVERAGE_CLIPPED_NAME}" + with open(file_path, "rb") as file: + await client.put_object(Bucket=bucket, + Key=key, + Body=file) + + + async def _process_viirs_snow(self, for_date: date, path: str): + """ Process VIIRS snow data. + + :param for_date: The date of interest. + :type for_date: date + :param path: A temporary file location for intermideiate files. + :type path: str + """ + with tempfile.TemporaryDirectory() as sub_dir: + file_name = f"{for_date.strftime('%Y-%m-%d')}.zip" + self._download_viirs_granules_by_date(for_date, sub_dir, file_name) + # Create a mosaic from the snow coverage imagery, clip it to the boundary of BC and save to S3 + self._create_snow_coverage_mosaic(sub_dir) + await self._clip_snow_coverage_mosaic(sub_dir, path) + await self._save_clipped_snow_coverage_moasic_to_s3(for_date, sub_dir) + + + async def _run_viirs_snow(self): + """ Entry point for running the job. + """ + # Grab the date from our database of the last successful processing of VIIRS snow data. + last_processed_date = await self._get_last_processed_date() + today = date.today() + if last_processed_date is None: + # Case to cover the initial run of VIIRS snow processing (ie. start processing yesterday) + next_date = today - timedelta(days=1) + else: + # Start processing the day after the last record of a successful job. + next_date = last_processed_date + timedelta(days=1) + if next_date == today: + logger.info("Processing of VIIRS snow data is up to date.") + return + with tempfile.TemporaryDirectory() as temp_dir: + # Get the bc_boundary.geojson in a temp_dir. This is expensive so we only want to do this once. + logger.info("Downloading bc_boundary.geojson from S3.") + await self._get_bc_boundary_from_s3(temp_dir) + while next_date < today: + date_string = next_date.strftime('%Y-%m-%d') + logger.info(f"Processing snow coverage data for date: {date_string}") + try: + await self._process_viirs_snow(next_date, temp_dir) + async with get_async_write_session_scope() as session: + processed_snow = ProcessedSnow(for_date=next_date, processed_date=today, snow_source=SnowSourceEnum.viirs) + await save_processed_snow(session, processed_snow) + logger.info(f"Successfully processed VIIRS snow coverage data for date: {date_string}") + except requests.exceptions.HTTPError as http_error: + # An HTTPError with status code of 501 means the VIIRS imagery for the date in question is not yet + # available. Stop processing at the current date and exit the job. We'll try again later. This is + # expected to occur and there is no need to send a notification to RocketChat. + if http_error.response.status_code == 501: + logger.info(f"VIIRS snow data is unavailable for date: {date_string}. Exiting job.") + break + else: + # If a different HTTPError occurred re-raise and let the next exception handler send a notification to RocketChat. + raise http_error + next_date = next_date + timedelta(days=1) + + def main(): - logger.info("***************Cron job started**************") + """ Kicks off asyncronous processing of VIIRS snow coverage data. + """ + try: + # We don't want gdal to silently swallow errors. + gdal.UseExceptions() + logger.debug('Begin processing VIIRS snow coverage data.') + + bot = ViirsSnowJob() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(bot._run_viirs_snow()) + + # Exit with 0 - success. + sys.exit(os.EX_OK) + except Exception as exception: + # Exit non 0 - failure. + logger.error("An error occurred while processing VIIRS snow coverage data.", exc_info=exception) + rc_message = ':scream: Encountered an error while processing VIIRS snow data.' + send_rocketchat_notification(rc_message, exception) + sys.exit(os.EX_SOFTWARE) if __name__ == '__main__': configure_logging() From 753340d89e6f70771e130d40900cd3ceaa14fbf6 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Fri, 19 Jan 2024 15:13:34 -0800 Subject: [PATCH 03/11] tests --- api/app/jobs/viirs_snow.py | 2 +- api/app/tests/jobs/test_viirs_snow.py | 122 ++++++++++++++++++++++++++ 2 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 api/app/tests/jobs/test_viirs_snow.py diff --git a/api/app/jobs/viirs_snow.py b/api/app/jobs/viirs_snow.py index 5de21f4b1..835e51dd4 100644 --- a/api/app/jobs/viirs_snow.py +++ b/api/app/jobs/viirs_snow.py @@ -167,7 +167,7 @@ async def _run_viirs_snow(self): else: # Start processing the day after the last record of a successful job. next_date = last_processed_date + timedelta(days=1) - if next_date == today: + if next_date >= today: logger.info("Processing of VIIRS snow data is up to date.") return with tempfile.TemporaryDirectory() as temp_dir: diff --git a/api/app/tests/jobs/test_viirs_snow.py b/api/app/tests/jobs/test_viirs_snow.py new file mode 100644 index 000000000..e87c9a4aa --- /dev/null +++ b/api/app/tests/jobs/test_viirs_snow.py @@ -0,0 +1,122 @@ +""" Unit testing for VIIRS snow data processing """ + +import os +import pytest +from datetime import date, timedelta +from pytest_mock import MockerFixture +from requests import Response +from requests.exceptions import HTTPError +from app.jobs import viirs_snow +from app.jobs.viirs_snow import ViirsSnowJob + + +async def mock__get_bc_boundary_from_s3(self, temp_dir): + return + + +def test_viirs_snow_job_fail(mocker: MockerFixture, + monkeypatch): + """ + Test that when the bot fails, a message is sent to rocket-chat, and our exit code is 1. + """ + + async def mock__get_last_processed_date(self): + raise Exception("Error") + + monkeypatch.setattr(ViirsSnowJob, '_get_last_processed_date', mock__get_last_processed_date) + rocket_chat_spy = mocker.spy(viirs_snow, 'send_rocketchat_notification') + + with pytest.raises(SystemExit) as excinfo: + viirs_snow.main() + # Assert that we exited with an error code. + assert excinfo.value.code == os.EX_SOFTWARE + # Assert that rocket chat was called. + assert rocket_chat_spy.call_count == 1 + + +def test_viirs_snow_job_exits_without_error_when_no_work_required(monkeypatch): + """ Test that viirs_snow_job exits without error when no data needs to be processed. + """ + async def mock__get_last_processed_date(self): + return date.today() - timedelta(days=1) + + monkeypatch.setattr(ViirsSnowJob, '_get_last_processed_date', mock__get_last_processed_date) + + with pytest.raises(SystemExit) as excinfo: + viirs_snow.main() + # Assert that we exited with an error code. + assert excinfo.value.code == os.EX_OK + + +def test_viirs_snow_job_exits_cleanly_when_no_viirs_data(monkeypatch): + """ Test that viirs_snow_job exits cleanly when attempt to download data that doesn't exist + throws a HTTPError with status code of 501. + """ + async def mock__get_last_processed_date(self): + return date.today() - timedelta(days=2) + + + def mock__download_viirs_granules_by_date(self, for_date: date, path: str, file_name: str): + error = HTTPError(response=Response()) + error.response.status_code = 501 + raise error + + monkeypatch.setattr(ViirsSnowJob, '_get_last_processed_date', mock__get_last_processed_date) + monkeypatch.setattr(ViirsSnowJob, '_get_bc_boundary_from_s3', mock__get_bc_boundary_from_s3) + monkeypatch.setattr(ViirsSnowJob, '_download_viirs_granules_by_date', mock__download_viirs_granules_by_date) + + + with pytest.raises(SystemExit) as excinfo: + viirs_snow.main() + # Assert that we exited with an error code. + assert excinfo.value.code == os.EX_OK + + +def test_viirs_snow_job_exits_cleanly_when_no_viirs_data(monkeypatch): + """ Test that viirs_snow_job exits cleanly when attempt to download data that doesn't exist + throws a HTTPError with status code of 501. + """ + async def mock__get_last_processed_date(self): + return date.today() - timedelta(days=2) + + def mock__download_viirs_granules_by_date(self, for_date: date, path: str, file_name: str): + error = HTTPError(response=Response()) + error.response.status_code = 501 + raise error + + monkeypatch.setattr(ViirsSnowJob, '_get_last_processed_date', mock__get_last_processed_date) + monkeypatch.setattr(ViirsSnowJob, '_get_bc_boundary_from_s3', mock__get_bc_boundary_from_s3) + monkeypatch.setattr(ViirsSnowJob, '_download_viirs_granules_by_date', mock__download_viirs_granules_by_date) + + + with pytest.raises(SystemExit) as excinfo: + viirs_snow.main() + # Assert that we exited with an error code. + assert excinfo.value.code == os.EX_OK + + +def test_viirs_snow_job_fails_on_nsidc_auth_failure(mocker: MockerFixture, monkeypatch): + """ + Test that when authentication with the NSIDC fails a message is sent to rocket-chat and our exit code is 1. + """ + + async def mock__get_last_processed_date(self): + return date.today() - timedelta(days=2) + + def mock__download_viirs_granules_by_date(self, for_date: date, path: str, file_name: str): + error = HTTPError(response=Response()) + error.response.status_code = 401 + raise error + + monkeypatch.setattr(ViirsSnowJob, '_get_last_processed_date', mock__get_last_processed_date) + monkeypatch.setattr(ViirsSnowJob, '_get_bc_boundary_from_s3', mock__get_bc_boundary_from_s3) + monkeypatch.setattr(ViirsSnowJob, '_download_viirs_granules_by_date', mock__download_viirs_granules_by_date) + + rocket_chat_spy = mocker.spy(viirs_snow, 'send_rocketchat_notification') + + with pytest.raises(SystemExit) as excinfo: + viirs_snow.main() + # Assert that we exited with an error code. + assert excinfo.value.code == os.EX_SOFTWARE + # Assert that rocket chat was called. + assert rocket_chat_spy.call_count == 1 \ No newline at end of file From 019a7bf90345fe9561f80f588864cc36aa45cc10 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Fri, 19 Jan 2024 15:23:52 -0800 Subject: [PATCH 04/11] Remove duplicate test --- api/app/tests/jobs/test_viirs_snow.py | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/api/app/tests/jobs/test_viirs_snow.py b/api/app/tests/jobs/test_viirs_snow.py index e87c9a4aa..aabb0efc6 100644 --- a/api/app/tests/jobs/test_viirs_snow.py +++ b/api/app/tests/jobs/test_viirs_snow.py @@ -72,29 +72,6 @@ def mock__download_viirs_granules_by_date(self, for_date: date, path: str, file_ assert excinfo.value.code == os.EX_OK -def test_viirs_snow_job_exits_cleanly_when_no_viirs_data(monkeypatch): - """ Test that viirs_snow_job exits cleanly when attempt to download data that doesn't exist - throws a HTTPError with status code of 501. - """ - async def mock__get_last_processed_date(self): - return date.today() - timedelta(days=2) - - def mock__download_viirs_granules_by_date(self, for_date: date, path: str, file_name: str): - error = HTTPError(response=Response()) - error.response.status_code = 501 - raise error - - monkeypatch.setattr(ViirsSnowJob, '_get_last_processed_date', mock__get_last_processed_date) - monkeypatch.setattr(ViirsSnowJob, '_get_bc_boundary_from_s3', mock__get_bc_boundary_from_s3) - monkeypatch.setattr(ViirsSnowJob, '_download_viirs_granules_by_date', mock__download_viirs_granules_by_date) - - - with pytest.raises(SystemExit) as excinfo: - viirs_snow.main() - # Assert that we exited with an error code. - assert excinfo.value.code == os.EX_OK - - def test_viirs_snow_job_fails_on_nsidc_auth_failure(mocker: MockerFixture, monkeypatch): """ Test that when authentication with the NSIDC fails a message is sent to rocket-chat and our exit code is 1. From 93cdb366f4dd078290bbdd8e86dfa476d5b45680 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Fri, 19 Jan 2024 15:28:20 -0800 Subject: [PATCH 05/11] Replace generic Exception --- api/app/tests/jobs/test_viirs_snow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/app/tests/jobs/test_viirs_snow.py b/api/app/tests/jobs/test_viirs_snow.py index aabb0efc6..9a333b788 100644 --- a/api/app/tests/jobs/test_viirs_snow.py +++ b/api/app/tests/jobs/test_viirs_snow.py @@ -21,7 +21,7 @@ def test_viirs_snow_job_fail(mocker: MockerFixture, """ async def mock__get_last_processed_date(self): - raise Exception("Error") + raise OSError("Error") monkeypatch.setattr(ViirsSnowJob, '_get_last_processed_date', mock__get_last_processed_date) rocket_chat_spy = mocker.spy(viirs_snow, 'send_rocketchat_notification') From 121484046e68e925e8569133f281bc0a487fc423 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Mon, 22 Jan 2024 11:37:49 -0800 Subject: [PATCH 06/11] Add object store env lookup to job --- openshift/templates/viirs_snow.cronjob.yaml | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/openshift/templates/viirs_snow.cronjob.yaml b/openshift/templates/viirs_snow.cronjob.yaml index 20b20dc4c..0125e0e3b 100644 --- a/openshift/templates/viirs_snow.cronjob.yaml +++ b/openshift/templates/viirs_snow.cronjob.yaml @@ -114,6 +114,26 @@ objects: secretKeyRef: name: ${GLOBAL_NAME} key: nasa-earthdata-pwd + - name: OBJECT_STORE_SERVER + valueFrom: + secretKeyRef: + name: ${GLOBAL_NAME} + key: object-store-server + - name: OBJECT_STORE_USER_ID + valueFrom: + secretKeyRef: + name: ${GLOBAL_NAME} + key: object-store-user-id + - name: OBJECT_STORE_SECRET + valueFrom: + secretKeyRef: + name: ${GLOBAL_NAME} + key: object-store-secret + - name: OBJECT_STORE_BUCKET + valueFrom: + secretKeyRef: + name: ${GLOBAL_NAME} + key: object-store-bucket resources: limits: cpu: "1" From 725cfa17bb21dcf8333e479f9e88926e75c14ff5 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Mon, 22 Jan 2024 11:58:20 -0800 Subject: [PATCH 07/11] Expand initial processing range --- api/app/jobs/viirs_snow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/app/jobs/viirs_snow.py b/api/app/jobs/viirs_snow.py index 835e51dd4..21209ee33 100644 --- a/api/app/jobs/viirs_snow.py +++ b/api/app/jobs/viirs_snow.py @@ -162,8 +162,8 @@ async def _run_viirs_snow(self): last_processed_date = await self._get_last_processed_date() today = date.today() if last_processed_date is None: - # Case to cover the initial run of VIIRS snow processing (ie. start processing yesterday) - next_date = today - timedelta(days=1) + # Case to cover the initial run of VIIRS snow processing (ie. start processing one week ago) + next_date = today - timedelta(days=7) else: # Start processing the day after the last record of a successful job. next_date = last_processed_date + timedelta(days=1) From 368127ba1d00ddd6d2055fdbae2055b5be8a3388 Mon Sep 17 00:00:00 2001 From: Darren Boss Date: Mon, 22 Jan 2024 14:53:22 -0800 Subject: [PATCH 08/11] Deploy VIIRS snow cronjob in prod --- openshift/scripts/oc_deploy_to_production.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/openshift/scripts/oc_deploy_to_production.sh b/openshift/scripts/oc_deploy_to_production.sh index 85e3d622a..ca2e7cbdf 100755 --- a/openshift/scripts/oc_deploy_to_production.sh +++ b/openshift/scripts/oc_deploy_to_production.sh @@ -51,6 +51,8 @@ PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_noaa_gfs_cronjob.sh PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_noaa_nam_cronjob.sh prod ${RUN_TYPE} echo C-Haines PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_c_haines_cronjob.sh prod ${RUN_TYPE} +echo VIIRS Snow +PROJ_TARGET=${PROJ_TARGET} bash $(dirname ${0})/oc_provision_viirs_snow_cronjob.sh prod ${RUN_TYPE} echo BC FireWeather cronjobs echo "Run forecast at 8h30 PDT and 16h30 PDT (so before and after noon)" PROJ_TARGET=${PROJ_TARGET} SCHEDULE="30 * * * *" bash $(dirname ${0})/oc_provision_wfwx_noon_forecasts_cronjob.sh prod ${RUN_TYPE} From a941af76deafa9bf6a387575b0ea33ee944566ef Mon Sep 17 00:00:00 2001 From: dgboss Date: Tue, 23 Jan 2024 09:32:54 -0800 Subject: [PATCH 09/11] Update api/app/jobs/viirs_snow.py Co-authored-by: Conor Brady --- api/app/jobs/viirs_snow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/app/jobs/viirs_snow.py b/api/app/jobs/viirs_snow.py index 21209ee33..4fada6b33 100644 --- a/api/app/jobs/viirs_snow.py +++ b/api/app/jobs/viirs_snow.py @@ -121,7 +121,7 @@ async def _get_bc_boundary_from_s3(self, path: str): file.write(data) - async def _save_clipped_snow_coverage_moasic_to_s3(self, for_date: date, path: str): + async def _save_clipped_snow_coverage_mosaic_to_s3(self, for_date: date, path: str): """ Save the clipped mosaic to S3 storage. :param for_date: The date of interest. From 1f0b737a6bbc4262ad16aa138992244cde1b4ff0 Mon Sep 17 00:00:00 2001 From: dgboss Date: Tue, 23 Jan 2024 09:33:06 -0800 Subject: [PATCH 10/11] Update api/app/jobs/viirs_snow.py Co-authored-by: Conor Brady --- api/app/jobs/viirs_snow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/app/jobs/viirs_snow.py b/api/app/jobs/viirs_snow.py index 4fada6b33..afae79998 100644 --- a/api/app/jobs/viirs_snow.py +++ b/api/app/jobs/viirs_snow.py @@ -197,7 +197,7 @@ async def _run_viirs_snow(self): def main(): - """ Kicks off asyncronous processing of VIIRS snow coverage data. + """ Kicks off asynchronous processing of VIIRS snow coverage data. """ try: # We don't want gdal to silently swallow errors. From f0c2a28aee2a058e49f0ecc8946ac67814d0fbf7 Mon Sep 17 00:00:00 2001 From: dgboss Date: Tue, 23 Jan 2024 09:34:59 -0800 Subject: [PATCH 11/11] Update api/app/jobs/viirs_snow.py --- api/app/jobs/viirs_snow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/api/app/jobs/viirs_snow.py b/api/app/jobs/viirs_snow.py index afae79998..7c623fed0 100644 --- a/api/app/jobs/viirs_snow.py +++ b/api/app/jobs/viirs_snow.py @@ -59,7 +59,6 @@ def _download_viirs_granules_by_date(self, for_date: date, path: str, file_name: session = requests.session() resp = session.get(f"https://n5eil02u.ecs.nsidc.org/egi/capabilities/{SHORT_NAME}.{PRODUCT_VERSION}.xml") session.get(resp.url, auth=(config.get("NASA_EARTHDATA_USER"),config.get("NASA_EARTHDATA_PWD"))) - # Check if request was successful param_dict = {'short_name': SHORT_NAME, 'version': PRODUCT_VERSION, 'temporal': f"{for_date},{for_date}",