Skip to content

Commit

Permalink
Merge pull request #184 from ASFHyP3/develop
Browse files Browse the repository at this point in the history
Release v0.5.8
  • Loading branch information
jhkennedy authored Nov 19, 2024
2 parents 1f05ddd + fa8bcb2 commit 196bddb
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 118 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.5.8]
### Changed
- As an incremental improvement to deduplication performance, its-live-monitoring now:
- searches the `s3://its-live-data` bucket directly for already published (succeeded) pairs.
- searches HyP3 ITS_LIVE via the API for pairs still pending or running, instead of searching for all previously submitted pairs.
- Upgrade numpy from 1.26.4 to 2.1.3

## [0.5.7]
### Fixed
Expand Down
108 changes: 103 additions & 5 deletions its_live_monitoring/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
import logging
import os
import sys
from typing import Iterable

import boto3
import botocore.config
import geopandas as gpd
import hyp3_sdk as sdk
import numpy as np
import pandas as pd

from landsat import (
Expand Down Expand Up @@ -35,11 +39,88 @@
log = logging.getLogger('its_live_monitoring')
log.setLevel(os.environ.get('LOGGING_LEVEL', 'INFO'))

s3 = boto3.client(
's3',
config=botocore.config.Config(signature_version=botocore.UNSIGNED),
)

def deduplicate_hyp3_pairs(pairs: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Ensure we don't submit duplicate jobs to HyP3.

Search HyP3 jobs since the reference scene's acquisition date and remove already processed pairs
def point_to_region(lat: float, lon: float) -> str:
"""Returns a string (for example, N78W124) of a region name based on granule center point lat,lon."""
nw_hemisphere = 'S' if np.signbit(lat) else 'N'
ew_hemisphere = 'W' if np.signbit(lon) else 'E'

region_lat = int(np.abs(np.fix(lat / 10) * 10))
if region_lat == 90: # if you are exactly at a pole, put in lat = 80 bin
region_lat = 80

region_lon = int(np.abs(np.fix(lon / 10) * 10))
if region_lon >= 180: # if you are at the dateline, back off to the 170 bin
region_lon = 170

return f'{nw_hemisphere}{region_lat:02d}{ew_hemisphere}{region_lon:03d}'


def regions_from_bounds(min_lon: float, min_lat: float, max_lon: float, max_lat: float) -> set[str]:
"""Returns a set of all region names within a bounding box."""
lats, lons = np.mgrid[min_lat : max_lat + 10 : 10, min_lon : max_lon + 10 : 10]
return {point_to_region(lat, lon) for lat, lon in zip(lats.ravel(), lons.ravel())}


def get_key(tile_prefixes: Iterable[str], reference: str, secondary: str) -> str | None:
"""Search S3 for the key of a processed pair.
Args:
tile_prefixes: s3 tile path prefixes
reference: reference scene name
secondary: secondary scene name
Returns:
The key or None if one wasn't found.
"""
# NOTE: hyp3-autorift enforces earliest scene as the reference scene and will write files accordingly,
# but its-live-monitoring uses the latest scene as the reference scene, so enforce autorift convention
reference, secondary = sorted([reference, secondary])

for tile_prefix in tile_prefixes:
prefix = f'{tile_prefix}/{reference}_X_{secondary}'
response = s3.list_objects_v2(
Bucket='its-live-data',
Prefix=prefix,
)
for item in response.get('Contents', []):
if item['Key'].endswith('.nc'):
return item['Key']
return None


def deduplicate_s3_pairs(pairs: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Ensures that pairs aren't submitted if they already have a product in S3.
Args:
pairs: A GeoDataFrame containing *at least* these columns: `reference`, `reference_acquisition`, and
`secondary`.
Returns:
The pairs GeoDataFrame with any already submitted pairs removed.
"""
s2_prefix = 'velocity_image_pair/sentinel2/v02'
landsat_prefix = 'velocity_image_pair/landsatOLI/v02'
prefix = s2_prefix if pairs['reference'][0].startswith('S2') else landsat_prefix

regions = regions_from_bounds(*pairs['geometry'].total_bounds)
tile_prefixes = [f'{prefix}/{region}' for region in regions]

drop_indexes = []
for idx, reference, secondary in pairs[['reference', 'secondary']].itertuples():
if get_key(tile_prefixes=tile_prefixes, reference=reference, secondary=secondary):
drop_indexes.append(idx)

return pairs.drop(index=drop_indexes)


def deduplicate_hyp3_pairs(pairs: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Search HyP3 jobs since the reference scene's acquisition date and remove already submitted (in PENDING or RUNNING state) pairs.
Args:
pairs: A GeoDataFrame containing *at least* these columns: `reference`, `reference_acquisition`, and
Expand All @@ -48,13 +129,24 @@ def deduplicate_hyp3_pairs(pairs: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
Returns:
The pairs GeoDataFrame with any already submitted pairs removed.
"""
jobs = HYP3.find_jobs(
pending_jobs = HYP3.find_jobs(
job_type='AUTORIFT',
start=pairs.iloc[0].reference_acquisition,
name=pairs.iloc[0].reference,
user_id=EARTHDATA_USERNAME,
status_code='PENDING',
)

running_jobs = HYP3.find_jobs(
job_type='AUTORIFT',
start=pairs.iloc[0].reference_acquisition,
name=pairs.iloc[0].reference,
user_id=EARTHDATA_USERNAME,
status_code='RUNNING',
)

jobs = pending_jobs + running_jobs

df = pd.DataFrame([job.job_parameters['granules'] for job in jobs], columns=['reference', 'secondary'])
df = df.set_index(['reference', 'secondary'])
pairs = pairs.set_index(['reference', 'secondary'])
Expand Down Expand Up @@ -123,7 +215,13 @@ def process_scene(
if len(pairs) > 0:
pairs = deduplicate_hyp3_pairs(pairs)

log.info(f'Deduplicated pairs; {len(pairs)} remaining')
log.info(f'Deduplicated HyP3 running/pending pairs; {len(pairs)} remaining')
with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', None):
log.debug(pairs.sort_values(by=['secondary'], ascending=False).loc[:, ['reference', 'secondary']])

pairs = deduplicate_s3_pairs(pairs)

log.info(f'Deduplicated already published pairs; {len(pairs)} remaining')
with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', None):
log.debug(pairs.sort_values(by=['secondary'], ascending=False).loc[:, ['reference', 'secondary']])

Expand Down
4 changes: 2 additions & 2 deletions requirements-all.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-r requirements-its_live_monitoring.txt
-r requirements-status-messages.txt
cfn-lint==1.18.0
ruff==0.6.9
cfn-lint==1.19.0
ruff==0.7.4
pytest==8.3.3
responses==0.25.3
3 changes: 2 additions & 1 deletion requirements-its_live_monitoring.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
geopandas==1.0.1
hyp3-sdk==7.0.1
pandas==2.2.3
pystac-client==0.8.4
pystac-client==0.8.5
requests==2.32.3
shapely==2.0.6
numpy==2.1.3
12 changes: 12 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ def create_pystac_item(
return create_pystac_item


@pytest.fixture
def stac_search_factory():
class MockItemSearch:
def __init__(self, items: list[pystac.item.Item]):
self.items = items

def pages(self):
return [self.items]

return MockItemSearch


@pytest.fixture
def hyp3_job_factory():
def create_hyp3_job(granules: list) -> sdk.Job:
Expand Down
29 changes: 15 additions & 14 deletions tests/its_live_monitoring/test_landsat.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from copy import deepcopy
from datetime import datetime
from unittest.mock import MagicMock, patch
from unittest.mock import patch

import landsat


def test_get_landsat_stac_item(pystac_item_factory):
@patch('landsat.LANDSAT_COLLECTION.get_item')
def test_get_landsat_stac_item(mock_landsat_get_item, pystac_item_factory):
scene = 'LC08_L1TP_138041_20240128_20240207_02_T1'
properties = {
'instruments': ['OLI'],
Expand All @@ -18,10 +19,8 @@ def test_get_landsat_stac_item(pystac_item_factory):
collection = 'landsat-c2l1'
expected_item = pystac_item_factory(id=scene, datetime=datetime.now(), properties=properties, collection=collection)

with patch('landsat.LANDSAT_COLLECTION', MagicMock()):
landsat.LANDSAT_COLLECTION.get_item.return_value = expected_item
item = landsat.get_landsat_stac_item(scene)

mock_landsat_get_item.side_effect = [expected_item]
item = landsat.get_landsat_stac_item(scene)
assert item.collection_id == collection
assert item.properties == properties

Expand Down Expand Up @@ -99,7 +98,8 @@ def test_qualifies_for_processing(pystac_item_factory):
assert landsat.qualifies_for_landsat_processing(item)


def test_get_landsat_pairs_for_reference_scene(pystac_item_factory):
@patch('landsat.LANDSAT_CATALOG.search')
def test_get_landsat_pairs_for_reference_scene(mock_landsat_get_item, pystac_item_factory, stac_search_factory):
properties = {
'instruments': ['OLI'],
'landsat:collection_category': 'T1',
Expand Down Expand Up @@ -129,9 +129,8 @@ def test_get_landsat_pairs_for_reference_scene(pystac_item_factory):
pystac_item_factory(id=scene, datetime=date_time, properties=properties, collection=collection)
)

with patch('landsat.LANDSAT_CATALOG', MagicMock()):
landsat.LANDSAT_CATALOG.search().pages.return_value = (sec_items,)
df = landsat.get_landsat_pairs_for_reference_scene(ref_item)
mock_landsat_get_item.side_effect = [stac_search_factory(sec_items)]
df = landsat.get_landsat_pairs_for_reference_scene(ref_item)

assert (df['landsat:wrs_path'] == ref_item.properties['landsat:wrs_path']).all()
assert (df['landsat:wrs_row'] == ref_item.properties['landsat:wrs_row']).all()
Expand All @@ -140,7 +139,10 @@ def test_get_landsat_pairs_for_reference_scene(pystac_item_factory):
assert (df['reference'] == ref_item.id).all()


def test_get_landsat_pairs_for_off_nadir_reference_scene(pystac_item_factory):
@patch('landsat.LANDSAT_CATALOG.search')
def test_get_landsat_pairs_for_off_nadir_reference_scene(
mock_landsat_get_item, pystac_item_factory, stac_search_factory
):
properties = {
'instruments': ['OLI'],
'landsat:collection_category': 'T1',
Expand Down Expand Up @@ -171,9 +173,8 @@ def test_get_landsat_pairs_for_off_nadir_reference_scene(pystac_item_factory):
props['view:off_nadir'] = off_nadir
sec_items.append(pystac_item_factory(id=scene, datetime=date_time, properties=props, collection=collection))

with patch('landsat.LANDSAT_CATALOG', MagicMock()):
landsat.LANDSAT_CATALOG.search().pages.return_value = (sec_items,)
df = landsat.get_landsat_pairs_for_reference_scene(ref_item)
mock_landsat_get_item.side_effect = [stac_search_factory(sec_items)]
df = landsat.get_landsat_pairs_for_reference_scene(ref_item)

assert (df['view:off_nadir'] > 0).all()

Expand Down
Loading

0 comments on commit 196bddb

Please sign in to comment.