-
Notifications
You must be signed in to change notification settings - Fork 63
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
244 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
title: 'NCEP/CPC L3 Half Hourly 4km Global (60S - 60N) Merged IR V1 (GPM_MERGIR) at GES DISC' | ||
description: 'These data originate from NOAA/NCEP.The NOAA Climate Prediction Center/NCEP/NWS is making the data available originally in binary format, in a weekly rotating archive. The NASA GES DISC is acquiring the binary files as they become available, converts them into CF (Climate and Forecast) -convention...' | ||
pangeo_forge_version: '0.9.2' | ||
recipes: | ||
- id: 'GPM_MERGIR' | ||
object: 'recipe:recipe' | ||
provenance: | ||
providers: | ||
- name: 'NASA/GSFC/SED/ESD/GCDC/GESDISC' | ||
description: 'NASA/GSFC/SED/ESD/GCDC/GESDISC' | ||
roles: | ||
- producer | ||
- licensor | ||
url: 'https://disc.gsfc.nasa.gov/datacollection/GPM_MERGIR_1.html' | ||
license: 'Open Data' | ||
maintainers: | ||
- name: 'Development Seed' | ||
github: developmentseed | ||
bakery: | ||
id: 'pangeo-ldeo-nsf-earthcube' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
import base64 | ||
import json | ||
import os | ||
from dataclasses import dataclass, field | ||
from typing import Dict, Union | ||
|
||
import apache_beam as beam | ||
import requests | ||
from cmr import GranuleQuery | ||
from kerchunk.combine import MultiZarrToZarr | ||
from xarray import Dataset | ||
|
||
from pangeo_forge_recipes.patterns import pattern_from_file_sequence | ||
from pangeo_forge_recipes.storage import FSSpecTarget | ||
from pangeo_forge_recipes.transforms import ( | ||
CombineReferences, | ||
OpenWithKerchunk, | ||
RequiredAtRuntimeDefault, | ||
WriteCombinedReference, | ||
) | ||
from pangeo_forge_recipes.writers import ZarrWriterMixin | ||
|
||
HTTP_REL = 'http://esipfed.org/ns/fedsearch/1.1/data#' | ||
S3_REL = 'http://esipfed.org/ns/fedsearch/1.1/s3#' | ||
|
||
# This recipe requires the following environment variables from Earthdata | ||
ED_TOKEN = os.environ['EARTHDATA_TOKEN'] | ||
ED_USERNAME = os.environ['EARTHDATA_USERNAME'] | ||
ED_PASSWORD = os.environ['EARTHDATA_PASSWORD'] | ||
|
||
CREDENTIALS_API = 'https://data.gesdisc.earthdata.nasa.gov/s3credentials' | ||
SHORT_NAME = 'GPM_MERGIR' | ||
CONCAT_DIM = 'time' | ||
IDENTICAL_DIMS = ['lat', 'lon'] | ||
|
||
# use HTTP_REL if S3 access is not possible. S3_REL is faster. | ||
selected_rel = S3_REL | ||
|
||
|
||
def earthdata_auth(username: str, password: str): | ||
login_resp = requests.get(CREDENTIALS_API, allow_redirects=False) | ||
login_resp.raise_for_status() | ||
|
||
encoded_auth = base64.b64encode(f'{username}:{password}'.encode('ascii')) | ||
auth_redirect = requests.post( | ||
login_resp.headers['location'], | ||
data={'credentials': encoded_auth}, | ||
headers={'Origin': CREDENTIALS_API}, | ||
allow_redirects=False, | ||
) | ||
auth_redirect.raise_for_status() | ||
|
||
final = requests.get(auth_redirect.headers['location'], allow_redirects=False) | ||
|
||
results = requests.get(CREDENTIALS_API, cookies={'accessToken': final.cookies['accessToken']}) | ||
results.raise_for_status() | ||
|
||
creds = json.loads(results.content) | ||
return { | ||
'aws_access_key_id': creds['accessKeyId'], | ||
'aws_secret_access_key': creds['secretAccessKey'], | ||
'aws_session_token': creds['sessionToken'], | ||
} | ||
|
||
|
||
def filter_data_links(links, rel): | ||
return filter( | ||
lambda link: link['rel'] == rel | ||
and (link['href'].endswith('.nc') or link['href'].endswith('.nc4')), | ||
links, | ||
) | ||
|
||
|
||
def gen_data_links(rel): | ||
granules = GranuleQuery().short_name(SHORT_NAME).downloadable(True).get_all() | ||
for granule in granules: | ||
s3_links = filter_data_links(granule['links'], rel) | ||
first = next(s3_links, None) | ||
# throw if CMR does not have exactly one S3 link for an item | ||
if not first or next(s3_links, None) is not None: | ||
raise ValueError(f"Expected 1 link of type {rel} on {granule['title']}") | ||
yield first['href'] | ||
|
||
|
||
@dataclass | ||
class ConsolidateMetadata(beam.PTransform): | ||
"""Consolidate metadata into a single .zmetadata file. | ||
See zarr.consolidate_metadata() for details. | ||
""" | ||
|
||
storage_options: Dict = field(default_factory=dict) | ||
|
||
@staticmethod | ||
def _consolidate(mzz: MultiZarrToZarr, storage_options: Dict) -> Dict: | ||
import fsspec | ||
import zarr | ||
from kerchunk.utils import consolidate | ||
|
||
out = mzz.translate() | ||
fs = fsspec.filesystem( | ||
'reference', | ||
fo=out, | ||
remote_options=storage_options, | ||
) | ||
mapper = fs.get_mapper() | ||
metadata_key = '.zmetadata' | ||
zarr.consolidate_metadata(mapper, metadata_key=metadata_key) | ||
double_consolidated = consolidate(dict([(metadata_key, mapper[metadata_key])])) | ||
out['refs'] = out['refs'] | double_consolidated['refs'] | ||
return out | ||
|
||
def expand(self, pcoll: beam.PCollection) -> beam.PCollection: | ||
return pcoll | beam.Map(self._consolidate, storage_options=self.storage_options) | ||
|
||
|
||
@dataclass | ||
class WriteReferences(beam.PTransform, ZarrWriterMixin): | ||
"""Store a singleton PCollection consisting of a ``kerchunk.combine.MultiZarrToZarr`` object. | ||
:param store_name: Zarr store will be created with this name under ``target_root``. | ||
:param output_file_name: Name to give the output references file | ||
(``.json`` or ``.parquet`` suffix). | ||
""" | ||
|
||
store_name: str | ||
output_file_name: str = 'reference.json' | ||
target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field( | ||
default_factory=RequiredAtRuntimeDefault | ||
) | ||
storage_options: Dict = field(default_factory=dict) | ||
|
||
@staticmethod | ||
def _write( | ||
refs: Dict, full_target: FSSpecTarget, output_file_name: str, storage_options: Dict | ||
) -> Dataset: | ||
import fsspec | ||
import ujson | ||
import xarray as xr | ||
|
||
outpath = full_target._full_path(output_file_name) | ||
with full_target.fs.open(outpath, 'wb') as f: | ||
f.write(ujson.dumps(refs).encode()) | ||
|
||
fs = fsspec.filesystem( | ||
'reference', fo=full_target._full_path(output_file_name), remote_options=storage_options | ||
) | ||
return xr.open_dataset( | ||
fs.get_mapper(), engine='zarr', backend_kwargs={'consolidated': True} | ||
) | ||
|
||
def expand(self, pcoll: beam.PCollection) -> beam.PCollection: | ||
return pcoll | beam.Map( | ||
self._write, | ||
full_target=self.get_full_target(), | ||
output_file_name=self.output_file_name, | ||
storage_options=self.storage_options, | ||
) | ||
|
||
|
||
@dataclass | ||
class ValidateDatasetDimensions(beam.PTransform): | ||
"""Open the reference.json in xarray and validate dimensions.""" | ||
|
||
expected_dims: Dict = field(default_factory=dict) | ||
|
||
@staticmethod | ||
def _validate(ds: Dataset, expected_dims: Dict) -> None: | ||
if set(ds.dims) != expected_dims.keys(): | ||
raise ValueError(f'Expected dimensions {expected_dims.keys()}, got {ds.dims}') | ||
for dim, bounds in expected_dims.items(): | ||
if bounds is None: | ||
continue | ||
lo, hi = bounds | ||
actual_lo, actual_hi = round(ds[dim].data.min()), round(ds[dim].data.max()) | ||
if actual_lo != lo or actual_hi != hi: | ||
raise ValueError(f'Expected {dim} range [{lo}, {hi}], got {actual_lo, actual_hi}') | ||
return ds | ||
|
||
def expand( | ||
self, | ||
pcoll: beam.PCollection, | ||
) -> beam.PCollection: | ||
return pcoll | beam.Map(self._validate, expected_dims=self.expected_dims) | ||
|
||
|
||
fsspec_auth_kwargs = ( | ||
{'headers': {'Authorization': f'Bearer {ED_TOKEN}'}} | ||
if selected_rel == HTTP_REL | ||
else {'client_kwargs': earthdata_auth(ED_USERNAME, ED_PASSWORD)} | ||
) | ||
pattern = pattern_from_file_sequence( | ||
list(gen_data_links(selected_rel)), CONCAT_DIM, fsspec_open_kwargs=fsspec_auth_kwargs | ||
) | ||
|
||
# target_root is injected only into certain transforms in pangeo-forge-recipes | ||
# this is a hacky way to pull it out of the WriteCombinedReference transform | ||
hacky_way_to_pull = WriteCombinedReference( | ||
store_name=SHORT_NAME, | ||
concat_dims=pattern.concat_dims, | ||
identical_dims=IDENTICAL_DIMS, | ||
) | ||
recipe = ( | ||
beam.Create(pattern.items()) | ||
| OpenWithKerchunk( | ||
remote_protocol='s3' if selected_rel == S3_REL else 'https', | ||
file_type=pattern.file_type, | ||
storage_options=pattern.fsspec_open_kwargs, | ||
) | ||
| CombineReferences( | ||
concat_dims=pattern.concat_dims, | ||
identical_dims=IDENTICAL_DIMS, | ||
) | ||
| ConsolidateMetadata(storage_options=pattern.fsspec_open_kwargs) | ||
| WriteReferences( | ||
store_name=SHORT_NAME, | ||
target_root=hacky_way_to_pull.target_root, | ||
storage_options=pattern.fsspec_open_kwargs, | ||
) | ||
| ValidateDatasetDimensions(expected_dims={'time': None, 'lat': (-60, 60), 'lon': (-180, 180)}) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
s3fs==2023.10.0 | ||
boto3==1.28.71 | ||
python-cmr==0.9.0 |