Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix STAC writing and add AWS writing functions #53

Merged
merged 49 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
7ba0e72
Refactor STAC writing and add S3 writer
alexgleith Feb 21, 2024
b26887e
Add some genericism
alexgleith Feb 22, 2024
f452c27
Fix test data location
alexgleith Feb 22, 2024
52e2dfd
If bucket is a required arg, there's no need to check it
alexgleith Feb 22, 2024
71e6399
Make default landsat collection the AWS one
alexgleith Feb 22, 2024
24c756c
Fix up STAC datetime assignment
alexgleith Feb 23, 2024
f85b1dc
Refactor loader to support loading using a geobox, for precise defini…
alexgleith Feb 23, 2024
fac7d42
Fix a couple of bugs
alexgleith Mar 11, 2024
47789f2
Bug fix increased number of items returned
alexgleith Mar 11, 2024
3ae6a29
Fix EPSG code fixing for only MSPC landsat
alexgleith Mar 12, 2024
3a807ee
Fix antimeridian bbox maybe/hopefully for the last time
alexgleith Mar 13, 2024
9b6bf9a
Remove silly comment
alexgleith Mar 14, 2024
c60bb75
Fix incorrectly fixed test
alexgleith Mar 29, 2024
228e476
added test from main branch
jessjaco May 10, 2024
c0abacd
xr/data name compromise
jessjaco May 10, 2024
cc057c4
Add grid definitions and allow geobox as area input
alexgleith Jun 11, 2024
f517f08
working on cleaning up PR
jessjaco Jun 13, 2024
b6d81e9
Merge branch 'add-s3-writer' into add-s3-writer-jja
jessjaco Jun 25, 2024
8301663
fix tests
jessjaco Jun 25, 2024
5b28ce5
added proof of need for mspc modifier for stac search
jessjaco Jul 29, 2024
3ed3b72
fix test
jessjaco Jul 30, 2024
b988bee
cleanup of azure-related code
jessjaco Aug 1, 2024
b6cd62d
added enhanced task to reduce writer complexity
jessjaco Aug 12, 2024
cb01758
fix return from StacTask
jessjaco Aug 14, 2024
9edb76c
added geom fix
jessjaco Aug 14, 2024
96b3dbd
switched from rio.clip to odc mask in odcloader
jessjaco Aug 14, 2024
41dc489
remove duplicates from cross antimeridian search
jessjaco Aug 15, 2024
0234854
big writer cleanup
jessjaco Aug 16, 2024
ad85786
minor updates adn additions
jessjaco Aug 17, 2024
a181d0b
more cleanup
jessjaco Aug 19, 2024
caba6e5
re-add import
jessjaco Aug 19, 2024
919c1e7
combine querys rather than overwrite
jessjaco Aug 23, 2024
314199b
update time formatting in stac creation to be allow a variable number…
jessjaco Aug 23, 2024
e9fa886
tweaked date parsing in stac utils
jessjaco Aug 27, 2024
b634b41
another breaking antimeridian test
jessjaco Aug 30, 2024
b58de99
added more specificity to test
jessjaco Aug 30, 2024
fef2321
fix for failing test
jessjaco Aug 30, 2024
c154012
more antimeridian fixes
jessjaco Aug 31, 2024
7378f9f
added added auth helper script
jessjaco Sep 6, 2024
a9d45db
updated auth
jessjaco Sep 6, 2024
df2eede
incorporated grid code from dep-grid
jessjaco Sep 18, 2024
a74f566
adjused kwarg parsing
jessjaco Sep 20, 2024
7042dca
remove auth function
jessjaco Sep 24, 2024
7a24604
added dependency
jessjaco Sep 24, 2024
e9a6e42
Merge pull request #61 from digitalearthpacific/add-s3-writer-jja
alexgleith Sep 24, 2024
22d5a1c
update workflow to try and fix failures
jessjaco Sep 25, 2024
3ea40e3
changed build backend
jessjaco Sep 25, 2024
deceae6
Merge branch 'main' into add-s3-writer
jessjaco Sep 25, 2024
e092260
reinsert merge fixes
jessjaco Sep 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ jobs:
fetch-depth: 0

- name: Check linting
uses: rickstaa/action-black@v1
uses: psf/black@stable
id: action_black
with:
black_args: "--check dep_tools"
options: "--check"
src: "./dep_tools"

testing:
runs-on: ubuntu-latest
Expand Down
111 changes: 111 additions & 0 deletions dep_tools/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import json
from io import BytesIO
from pathlib import Path
from typing import IO, Union

import boto3
from botocore.client import BaseClient
from fiona.io import MemoryFile
from geopandas import GeoDataFrame
from odc.geo.xr import to_cog
from xarray import DataArray, Dataset

from pystac import Item


def object_exists(bucket: str, key: str, client: BaseClient | None = None) -> bool:
"""Check if a key exists in a bucket."""
if client is None:
client = boto3.client("s3")

try:
client.head_object(Bucket=bucket, Key=key)
return True
except client.exceptions.ClientError:
return False


def s3_dump(
data: Union[bytes, str, IO], bucket: str, key: str, client: BaseClient, **kwargs
) -> bool:
"""Write data to s3 object."""

r = client.put_object(Bucket=bucket, Key=key, Body=data, **kwargs)
code = r["ResponseMetadata"]["HTTPStatusCode"]
return 200 <= code < 300


def write_to_s3(
d: Union[DataArray, Dataset, GeoDataFrame, str],
path: Union[str, Path],
bucket: str,
overwrite: bool = True,
use_odc_writer: bool = True,
client: BaseClient | None = None,
s3_dump_kwargs=dict(),
**kwargs,
):
if client is None:
client = boto3.client("s3")

key = str(path).lstrip("/")

if not overwrite and object_exists(bucket, key, client):
return

if isinstance(d, (DataArray, Dataset)):
if use_odc_writer:
if "driver" in kwargs:
del kwargs["driver"]
binary_data = to_cog(d, **kwargs)
s3_dump(
binary_data,
bucket,
key,
client,
ContentType="image/tiff",
**s3_dump_kwargs,
)

else:
with BytesIO() as binary_data:
d.rio.to_raster(binary_data, driver="COG", **kwargs)
binary_data.seek(0)
s3_dump(
binary_data,
bucket,
key,
client,
ContentType="image/tiff",
**s3_dump_kwargs,
)

elif isinstance(d, GeoDataFrame):
with MemoryFile() as buffer:
d.to_file(buffer, **kwargs)
buffer.seek(0)
s3_dump(buffer.read(), bucket, key, client, **s3_dump_kwargs)
elif isinstance(d, Item):
s3_dump(
json.dumps(d.to_dict(), indent=4), bucket, key, client, **s3_dump_kwargs
)
elif isinstance(d, str):
s3_dump(d, bucket, key, client, **s3_dump_kwargs)
else:
raise ValueError(
"You can only write an Xarray DataArray or Dataset, Geopandas GeoDataFrame, Pystac Item, or string"
)


def write_stac_s3(
item: Item,
stac_path: str,
bucket: str,
**kwargs,
) -> None:
item_string = json.dumps(item.to_dict(), indent=4)
write_to_s3(
item_string, stac_path, bucket=bucket, ContentType="application/json", **kwargs
)

return stac_path
97 changes: 92 additions & 5 deletions dep_tools/azure.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from functools import partial
import io
import json
from multiprocessing.dummy import Pool as ThreadPool
import os
from pathlib import Path
from typing import Union, Generator

import azure.storage.blob
from azure.storage.blob import ContainerClient
from functools import partial
from multiprocessing.dummy import Pool as ThreadPool
from azure.storage.blob import ContainerClient, ContentSettings
import fiona
from geopandas import GeoDataFrame
from odc.geo.xr import to_cog
from osgeo import gdal
from pystac import Item
from xarray import DataArray, Dataset


def get_container_client(
Expand All @@ -23,7 +30,7 @@ def get_container_client(
"'None' is not a valid value for 'credential'. Pass a valid name or set the 'AZURE_STORAGE_SAS_TOKEN' environment variable"
)

return azure.storage.blob.ContainerClient(
return ContainerClient(
f"https://{storage_account}.blob.core.windows.net",
container_name=container_name,
credential=credential,
Expand Down Expand Up @@ -71,3 +78,83 @@ def list_blob_container(
blob_name = blob_record["name"]
if blob_name.endswith(suffix):
yield blob_name


def build_vrt(
bounds: list,
prefix: str = "",
suffix: str = "",
) -> Path:
blobs = [
f"/vsiaz/output/{blob.name}"
for blob in get_container_client().list_blobs(name_starts_with=prefix)
if blob.name.endswith(suffix)
]

local_prefix = Path(prefix).stem
vrt_file = f"data/{local_prefix}.vrt"
gdal.BuildVRT(vrt_file, blobs, outputBounds=bounds)
return Path(vrt_file)


def write_stac_blob_storage(
item: Item,
stac_path: str,
**kwargs,
) -> str | None:
item_json = json.dumps(item.to_dict(), indent=4)
write_to_blob_storage(
item_json,
stac_path,
content_settings=ContentSettings(content_type="application/json"),
**kwargs,
)
return stac_path


def write_to_blob_storage(
d: Union[DataArray, Dataset, GeoDataFrame, str],
path: Union[str, Path],
overwrite: bool = True,
use_odc_writer: bool = False,
client: ContainerClient | None = None,
**kwargs,
) -> None:
# Allowing for a shared container client, which might be
# more efficient. If not provided, get one.
if client is None:
client = get_container_client()
blob_client = client.get_blob_client(str(path))
if not overwrite and blob_client.exists():
return

if isinstance(d, (DataArray, Dataset)):
if use_odc_writer:
if "driver" in kwargs:
del kwargs["driver"]
binary_data = to_cog(d, **kwargs)
blob_client.upload_blob(
binary_data, overwrite=overwrite, connection_timeout=TIMEOUT_SECONDS
)
else:
with io.BytesIO() as buffer:
# This is needed or rioxarray doesn't know what type it is writing
if "driver" not in kwargs:
kwargs["driver"] = "COG"
d.rio.to_raster(buffer, **kwargs)
buffer.seek(0)
blob_client.upload_blob(
buffer, overwrite=overwrite, connection_timeout=TIMEOUT_SECONDS
)

elif isinstance(d, GeoDataFrame):
with fiona.io.MemoryFile() as buffer:
d.to_file(buffer, **kwargs)
buffer.seek(0)
blob_client.upload_blob(buffer, overwrite=overwrite)
elif isinstance(d, str):
blob_client.upload_blob(d, overwrite=overwrite, **kwargs)
else:
raise ValueError(
"You can only write an Xarray DataArray or Dataset, or Geopandas GeoDataFrame"
)
91 changes: 91 additions & 0 deletions dep_tools/grids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from typing import Literal

import antimeridian
import geopandas as gpd
from geopandas import GeoDataFrame, GeoSeries
from odc.geo import XY, BoundingBox
from odc.geo.gridspec import GridSpec
from shapely.geometry import shape

# This EPSG code is what we're using for now
# but it's not ideal, as its not an equal area projection...
PACIFIC_EPSG = "EPSG:3832"


def grid(
resolution: int | float = 30,
crs=PACIFIC_EPSG,
return_type: Literal["GridSpec", "GeoSeries", "GeoDataFrame"] = "GridSpec",
intersect_with: GeoDataFrame | None = None,
) -> GridSpec | GeoSeries | GeoDataFrame:
"""Returns a GridSpec or GeoSeries representing the Pacific grid, optionally
intersected with an area of interest.

Args:
resolution: The resolution, in meters, of the output. As tiles are
defined to be 96,000 meters on each side, it should divide 96,000
evenly.
crs: The desired crs of the output.
return_type: The return type. If intersect_with (see below) is not None,
this is ignored.
intersect_with: The output is intersected with the supplied GeoDataFrame
before returning, returning only tiles which overlap with those
features. Forces the output to be a GeoDataFrame.
"""

if intersect_with is not None:
full_grid = _geoseries(resolution, crs)
return _intersect_grid(full_grid, intersect_with)

return {
"GridSpec": _gridspec,
"GeoSeries": _geoseries,
"GeoDataFrame": _geodataframe,
}[return_type](resolution, crs)


def _intersect_grid(grid: GeoSeries, areas_of_interest):
return gpd.sjoin(
gpd.GeoDataFrame(geometry=grid), areas_of_interest.to_crs(grid.crs)
).drop(columns=["index_right"])


def _gridspec(resolution, crs=PACIFIC_EPSG):
gridspec_origin = XY(-3000000.0, -4000000.0)

side_in_meters = 96_000
shape = (side_in_meters / resolution, side_in_meters / resolution)

return GridSpec(
crs=crs,
tile_shape=shape,
resolution=resolution,
origin=gridspec_origin,
)


def _geodataframe(resolution, crs=PACIFIC_EPSG):
return GeoDataFrame(geometry=_geoseries(resolution, crs), crs=crs)


def _geoseries(resolution, crs) -> GeoSeries:
bounds = BoundingBox(120, -30, 280, 30, crs="EPSG:4326").to_crs(crs)
tiles = _gridspec(resolution, crs).tiles(bounds)
geometry, index = zip(
*[(a_tile[1].boundingbox.polygon.geom, a_tile[0]) for a_tile in tiles]
)

gs = gpd.GeoSeries(geometry, index, crs=PACIFIC_EPSG)
if crs != PACIFIC_EPSG:
gs = gs.to_crs(crs)
if crs == 4326:
gs = gs.apply(lambda geom: shape(antimeridian.fix_shape(geom)))

return gs


# The origin is in the projected CRS. This works for Landsat.
PACIFIC_GRID_30 = grid()

# This grid is for Sentinel-2 and has the same footprint
PACIFIC_GRID_10 = grid(resolution=10)
Loading