Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GPM MERGIR Kerchunk Recipe #260

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions recipes/gpm_mergir/meta.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
title: 'NCEP/CPC L3 Half Hourly 4km Global (60S - 60N) Merged IR V1 (GPM_MERGIR) at GES DISC'
description: 'These data originate from NOAA/NCEP.The NOAA Climate Prediction Center/NCEP/NWS is making the data available originally in binary format, in a weekly rotating archive. The NASA GES DISC is acquiring the binary files as they become available, converts them into CF (Climate and Forecast) -convention...'
pangeo_forge_version: '0.9.2'
recipes:
- id: 'GPM_MERGIR'
object: 'recipe:recipe'
provenance:
providers:
- name: 'NASA/GSFC/SED/ESD/GCDC/GESDISC'
description: 'NASA/GSFC/SED/ESD/GCDC/GESDISC'
roles:
- producer
- licensor
url: 'https://disc.gsfc.nasa.gov/datacollection/GPM_MERGIR_1.html'
license: 'Open Data'
maintainers:
- name: 'Development Seed'
github: developmentseed
bakery:
id: 'pangeo-ldeo-nsf-earthcube'
221 changes: 221 additions & 0 deletions recipes/gpm_mergir/recipe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import base64
import json
import os
from dataclasses import dataclass, field
from typing import Dict, Union

import apache_beam as beam
import requests
from cmr import GranuleQuery
from kerchunk.combine import MultiZarrToZarr
from xarray import Dataset

from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.storage import FSSpecTarget
from pangeo_forge_recipes.transforms import (
CombineReferences,
OpenWithKerchunk,
RequiredAtRuntimeDefault,
WriteCombinedReference,
)
from pangeo_forge_recipes.writers import ZarrWriterMixin

HTTP_REL = 'http://esipfed.org/ns/fedsearch/1.1/data#'
S3_REL = 'http://esipfed.org/ns/fedsearch/1.1/s3#'

# This recipe requires the following environment variables from Earthdata
ED_TOKEN = os.environ['EARTHDATA_TOKEN']
ED_USERNAME = os.environ['EARTHDATA_USERNAME']
ED_PASSWORD = os.environ['EARTHDATA_PASSWORD']

CREDENTIALS_API = 'https://data.gesdisc.earthdata.nasa.gov/s3credentials'
SHORT_NAME = 'GPM_MERGIR'
CONCAT_DIM = 'time'
IDENTICAL_DIMS = ['lat', 'lon']

# use HTTP_REL if S3 access is not possible. S3_REL is faster.
selected_rel = S3_REL


def earthdata_auth(username: str, password: str):
login_resp = requests.get(CREDENTIALS_API, allow_redirects=False)
login_resp.raise_for_status()

encoded_auth = base64.b64encode(f'{username}:{password}'.encode('ascii'))
auth_redirect = requests.post(
login_resp.headers['location'],
data={'credentials': encoded_auth},
headers={'Origin': CREDENTIALS_API},
allow_redirects=False,
)
auth_redirect.raise_for_status()

final = requests.get(auth_redirect.headers['location'], allow_redirects=False)

results = requests.get(CREDENTIALS_API, cookies={'accessToken': final.cookies['accessToken']})
results.raise_for_status()

creds = json.loads(results.content)
return {
'aws_access_key_id': creds['accessKeyId'],
'aws_secret_access_key': creds['secretAccessKey'],
'aws_session_token': creds['sessionToken'],
}


def filter_data_links(links, rel):
return filter(
lambda link: link['rel'] == rel
and (link['href'].endswith('.nc') or link['href'].endswith('.nc4')),
links,
)


def gen_data_links(rel):
granules = GranuleQuery().short_name(SHORT_NAME).downloadable(True).get_all()
for granule in granules:
s3_links = filter_data_links(granule['links'], rel)
first = next(s3_links, None)
# throw if CMR does not have exactly one S3 link for an item
if not first or next(s3_links, None) is not None:
raise ValueError(f"Expected 1 link of type {rel} on {granule['title']}")
yield first['href']


@dataclass
class ConsolidateMetadata(beam.PTransform):
"""Consolidate metadata into a single .zmetadata file.

See zarr.consolidate_metadata() for details.
"""

storage_options: Dict = field(default_factory=dict)

@staticmethod
def _consolidate(mzz: MultiZarrToZarr, storage_options: Dict) -> Dict:
import fsspec
import zarr
from kerchunk.utils import consolidate

out = mzz.translate()
fs = fsspec.filesystem(
'reference',
fo=out,
remote_options=storage_options,
)
mapper = fs.get_mapper()
metadata_key = '.zmetadata'
zarr.consolidate_metadata(mapper, metadata_key=metadata_key)
double_consolidated = consolidate(dict([(metadata_key, mapper[metadata_key])]))
out['refs'] = out['refs'] | double_consolidated['refs']
return out

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(self._consolidate, storage_options=self.storage_options)


@dataclass
class WriteReferences(beam.PTransform, ZarrWriterMixin):
"""Store a singleton PCollection consisting of a ``kerchunk.combine.MultiZarrToZarr`` object.

:param store_name: Zarr store will be created with this name under ``target_root``.
:param output_file_name: Name to give the output references file
(``.json`` or ``.parquet`` suffix).
"""

store_name: str
output_file_name: str = 'reference.json'
target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field(
default_factory=RequiredAtRuntimeDefault
)
storage_options: Dict = field(default_factory=dict)

@staticmethod
def _write(
refs: Dict, full_target: FSSpecTarget, output_file_name: str, storage_options: Dict
) -> Dataset:
import fsspec
import ujson
import xarray as xr

outpath = full_target._full_path(output_file_name)
with full_target.fs.open(outpath, 'wb') as f:
f.write(ujson.dumps(refs).encode())

fs = fsspec.filesystem(
'reference', fo=full_target._full_path(output_file_name), remote_options=storage_options
)
return xr.open_dataset(
fs.get_mapper(), engine='zarr', backend_kwargs={'consolidated': True}
)

def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(
self._write,
full_target=self.get_full_target(),
output_file_name=self.output_file_name,
storage_options=self.storage_options,
)


@dataclass
class ValidateDatasetDimensions(beam.PTransform):
"""Open the reference.json in xarray and validate dimensions."""

expected_dims: Dict = field(default_factory=dict)

@staticmethod
def _validate(ds: Dataset, expected_dims: Dict) -> None:
if set(ds.dims) != expected_dims.keys():
raise ValueError(f'Expected dimensions {expected_dims.keys()}, got {ds.dims}')
for dim, bounds in expected_dims.items():
if bounds is None:
continue
lo, hi = bounds
actual_lo, actual_hi = round(ds[dim].data.min()), round(ds[dim].data.max())
if actual_lo != lo or actual_hi != hi:
raise ValueError(f'Expected {dim} range [{lo}, {hi}], got {actual_lo, actual_hi}')
return ds

def expand(
self,
pcoll: beam.PCollection,
) -> beam.PCollection:
return pcoll | beam.Map(self._validate, expected_dims=self.expected_dims)


fsspec_auth_kwargs = (
{'headers': {'Authorization': f'Bearer {ED_TOKEN}'}}
if selected_rel == HTTP_REL
else {'client_kwargs': earthdata_auth(ED_USERNAME, ED_PASSWORD)}
)
pattern = pattern_from_file_sequence(
list(gen_data_links(selected_rel)), CONCAT_DIM, fsspec_open_kwargs=fsspec_auth_kwargs
)

# target_root is injected only into certain transforms in pangeo-forge-recipes
# this is a hacky way to pull it out of the WriteCombinedReference transform
hacky_way_to_pull = WriteCombinedReference(
store_name=SHORT_NAME,
concat_dims=pattern.concat_dims,
identical_dims=IDENTICAL_DIMS,
)
recipe = (
beam.Create(pattern.items())
| OpenWithKerchunk(
remote_protocol='s3' if selected_rel == S3_REL else 'https',
file_type=pattern.file_type,
storage_options=pattern.fsspec_open_kwargs,
)
| CombineReferences(
concat_dims=pattern.concat_dims,
identical_dims=IDENTICAL_DIMS,
)
| ConsolidateMetadata(storage_options=pattern.fsspec_open_kwargs)
| WriteReferences(
store_name=SHORT_NAME,
target_root=hacky_way_to_pull.target_root,
storage_options=pattern.fsspec_open_kwargs,
)
| ValidateDatasetDimensions(expected_dims={'time': None, 'lat': (-60, 60), 'lon': (-180, 180)})
)
3 changes: 3 additions & 0 deletions recipes/gpm_mergir/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
s3fs==2023.10.0
boto3==1.28.71
python-cmr==0.9.0