Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optionally skip regenerating geoparquet assets #24

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/stactools/goes_glm/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,4 @@ class OrbitalSlot(str, enum.Enum):
[-156.06, 66.56],
]
],
}
}
78 changes: 43 additions & 35 deletions src/stactools/goes_glm/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
from geopandas import GeoDataFrame, GeoSeries
from netCDF4 import Dataset
from shapely.geometry import Point
import pyarrow.parquet

from . import constants

logger = logging.getLogger(__name__)


def convert(dataset: Dataset, dest_folder: str) -> Dict[str, Dict[str, Any]]:
def convert(dataset: Dataset, dest_folder: str, create_parquet=True) -> Dict[str, Dict[str, Any]]:
"""
Converts a netCDF dataset to three geoparquet files (for events, flashes
and groups) in the given folder.
Expand All @@ -28,13 +29,13 @@ def convert(dataset: Dataset, dest_folder: str) -> Dict[str, Dict[str, Any]]:
dict: Asset Objects
"""
assets: Dict[str, Dict[str, Any]] = {}
assets[constants.PARQUET_KEY_EVENTS] = create_event(dataset, dest_folder)
assets[constants.PARQUET_KEY_FLASHES] = create_flashes(dataset, dest_folder)
assets[constants.PARQUET_KEY_GROUPS] = create_groups(dataset, dest_folder)
assets[constants.PARQUET_KEY_EVENTS] = create_event(dataset, dest_folder, create_parquet=create_parquet)
assets[constants.PARQUET_KEY_FLASHES] = create_flashes(dataset, dest_folder, create_parquet=create_parquet)
assets[constants.PARQUET_KEY_GROUPS] = create_groups(dataset, dest_folder, create_parquet=create_parquet)
return assets


def create_event(dataset: Dataset, dest_folder: str) -> Dict[str, Any]:
def create_event(dataset: Dataset, dest_folder: str, create_parquet=True) -> Dict[str, Any]:
"""
Creates geoparquet file from a netCDF Dataset for the events.

Expand All @@ -46,10 +47,10 @@ def create_event(dataset: Dataset, dest_folder: str) -> Dict[str, Any]:
"""
file = os.path.join(dest_folder, "events.parquet")
cols = ["lat", "lon", "id", "time_offset", "energy", "parent_group_id"]
return create_asset(dataset, file, "event", cols, constants.PARQUET_TITLE_EVENTS)
return create_asset(dataset, file, "event", cols, constants.PARQUET_TITLE_EVENTS, create_parquet=create_parquet)


def create_flashes(dataset: Dataset, dest_folder: str) -> Dict[str, Any]:
def create_flashes(dataset: Dataset, dest_folder: str, create_parquet=True) -> Dict[str, Any]:
"""
Creates geoparquet file from a netCDF Dataset for the flashes.

Expand All @@ -72,10 +73,10 @@ def create_flashes(dataset: Dataset, dest_folder: str) -> Dict[str, Any]:
"energy",
"quality_flag",
]
return create_asset(dataset, file, "flash", cols, constants.PARQUET_TITLE_FLASHES)
return create_asset(dataset, file, "flash", cols, constants.PARQUET_TITLE_FLASHES, create_parquet=create_parquet)


def create_groups(dataset: Dataset, dest_folder: str) -> Dict[str, Any]:
def create_groups(dataset: Dataset, dest_folder: str, create_parquet=True) -> Dict[str, Any]:
"""
Creates geoparquet file from a netCDF Dataset for the groups.

Expand All @@ -97,11 +98,11 @@ def create_groups(dataset: Dataset, dest_folder: str) -> Dict[str, Any]:
"quality_flag",
"parent_flash_id",
]
return create_asset(dataset, file, "group", cols, constants.PARQUET_TITLE_GROUPS)
return create_asset(dataset, file, "group", cols, constants.PARQUET_TITLE_GROUPS, create_parquet=create_parquet)


def create_asset(
dataset: Dataset, file: str, type: str, cols: List[str], title: str
dataset: Dataset, file: str, type: str, cols: List[str], title: str, create_parquet=True,
) -> Dict[str, Any]:
"""
Creates an asset object for a netCDF Dataset with some additional properties.
Expand Down Expand Up @@ -154,15 +155,17 @@ def create_asset(
"length of {lat_col} ({lat_var_len}) != length of {lon_col} ({lon_var_len})"
)

for i in range(0, count):
lat = lat_var[i]
lon = lon_var[i]
geometries.append(Point(lon, lat))
if create_parquet:
for i in range(0, count):
lat = lat_var[i]
lon = lon_var[i]
geometries.append(Point(lon, lat))

# fill dict with all data in a columnar way
table_data = {
constants.PARQUET_GEOMETRY_COL: GeoSeries(geometries, crs=constants.SOURCE_CRS)
}
if create_parquet:
table_data = {
constants.PARQUET_GEOMETRY_COL: GeoSeries(geometries, crs=constants.SOURCE_CRS)
}
table_cols = [{"name": constants.PARQUET_GEOMETRY_COL, "type": dataset.featureType}]
for col in cols:
if col == "lat" or col == "lon":
Expand All @@ -179,7 +182,8 @@ def create_asset(

variable = dataset.variables[var_name]
attrs = variable.ncattrs()
data = variable[...].tolist()
if create_parquet:
data = variable[...].tolist()
table_col = {
"name": col,
"type": str(variable.datatype), # todo: check data type #11
Expand All @@ -205,32 +209,36 @@ def create_asset(
)

new_data: List[Optional[datetime]] = []
for val in data:
try:
if milliseconds:
delta = timedelta(milliseconds=val)
else:
delta = timedelta(seconds=val)
new_data.append(base + delta)
except TypeError:
raise Exception(
f"An invalid value '{val}' found in variable '{var_name}'"
)

table_data[new_col] = new_data
if create_parquet:
for val in data:
try:
if milliseconds:
delta = timedelta(milliseconds=val)
else:
delta = timedelta(seconds=val)
new_data.append(base + delta)
except TypeError:
raise Exception(
f"An invalid value '{val}' found in variable '{var_name}'"
)

if create_parquet:
table_data[new_col] = new_data
col_info = {
"name": new_col,
"type": constants.PARQUET_DATETIME_COL_TYPE,
}

table_cols.append(col_info)

table_data[col] = data
if create_parquet:
table_data[col] = data
table_cols.append(table_col)

# Create a geodataframe and store it as geoparquet file
dataframe = GeoDataFrame(table_data)
dataframe.to_parquet(file, version="2.6")
if create_parquet:
dataframe = GeoDataFrame(table_data)
dataframe.to_parquet(file, version="2.6")

# Create asset dict
return create_asset_metadata(title, file, table_cols, count)
Expand Down
12 changes: 9 additions & 3 deletions src/stactools/goes_glm/stac.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import os
import re
from datetime import datetime, timezone
from typing import Optional
from typing import Dict, Optional

from dateutil.parser import isoparse
from netCDF4 import Dataset
import pyarrow.parquet
from pystac import (
Asset,
CatalogType,
Expand Down Expand Up @@ -165,6 +166,7 @@ def create_item(
nonetcdf: bool = False,
fixnetcdf: bool = False,
appendctime: bool = False,
geoparquet_hrefs: Optional[Dict[str, str]] = None,
) -> Item:
"""Create a STAC Item

Expand Down Expand Up @@ -308,8 +310,12 @@ def create_item(
proj.centroid = centroid

if not nogeoparquet:
target_folder = os.path.dirname(asset_href)
assets = parquet.convert(dataset, target_folder)
if geoparquet_hrefs:
target_folder = os.path.dirname(asset_href)
assets = parquet.convert(dataset, target_folder, create_parquet=False)
else:
target_folder = os.path.dirname(asset_href)
assets = parquet.convert(dataset, target_folder)
for key, asset_dict in assets.items():
asset = Asset.from_dict(asset_dict)
item.add_asset(key, asset)
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
17 changes: 17 additions & 0 deletions tests/test_stac.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,20 @@ def test_create_item(self) -> None:
self.assertTrue("dimensions" in var)
self.assertTrue("type" in var)
self.assertTrue("description" in var)


def test_geoparquet_hrefs():
asset_href = "./tests/data-files/OR_GLM-L2-LCFA_G16_s20181591447400_e20181591448000_c20181591448028.nc"
geoparquet_hrefs = {
"geoparquet_flashes": "./tests/data-files/OR_GLM-L2-LCFA_G16_s20181591447400_e20181591448000_c20181591448028-flashes.parquet",
"geoparquet_groups": "./tests/data-files/OR_GLM-L2-LCFA_G16_s20181591447400_e20181591448000_c20181591448028-groups.parquet",
"geoparquet_events": "./tests/data-files/OR_GLM-L2-LCFA_G16_s20181591447400_e20181591448000_c20181591448028-events.parquet",
}

result = stac.create_item(asset_href, geoparquet_hrefs=geoparquet_hrefs)
expected = stac.create_item(asset_href, geoparquet_hrefs=geoparquet_hrefs)

for k, v in expected.assets.items():
v.href = result.assets[k].href

assert result.to_dict() == expected.to_dict()