Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ability to create scaled tiles and tiles from SIDD #29

Merged
merged 1 commit into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions src/aws/osml/image_processing/gdal_tile_factory.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import base64
import logging
from secrets import token_hex
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

from defusedxml import ElementTree
from osgeo import gdal, gdalconst

from aws.osml.gdal import GDALCompressionOptions, GDALImageFormats, NITFDESAccessor, RangeAdjustmentType, get_type_and_scales
from aws.osml.photogrammetry import ImageCoordinate, SensorModel

from .sicd_updater import SICDUpdater
from .sidd_updater import SIDDUpdater

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,8 +44,8 @@ def __init__(
self.raster_dataset = raster_dataset
self.sensor_model = sensor_model
self.des_accessor = None
self.sicd_updater = None
self.sicd_des_header = None
self.sar_updater = None
self.sar_des_header = None
self.range_adjustment = range_adjustment
self.output_type = output_type

Expand All @@ -56,22 +56,27 @@ def __init__(
xml_data_content_segments = self.des_accessor.get_segments_by_name("XML_DATA_CONTENT")
if xml_data_content_segments is not None and len(xml_data_content_segments) > 0:
# This appears to be SICD or SIDD data
# TODO: Check to make sure this is the right XML_DATA_CONTENT segment containing SICD
sicd_des = xml_data_content_segments[0]
sicd_bytes = self.des_accessor.parse_field_value(sicd_des, "DESDATA", base64.b64decode)
sicd_xml = sicd_bytes.decode("utf-8")
sicd_metadata = ElementTree.fromstring(sicd_xml)
self.sicd_des_header = self.des_accessor.extract_des_header(sicd_des)
self.sicd_updater = SICDUpdater(sicd_metadata)
xml_data_segment = xml_data_content_segments[0]
xml_bytes = self.des_accessor.parse_field_value(xml_data_segment, "DESDATA", base64.b64decode)
xml_str = xml_bytes.decode("utf-8")
if "SIDD" in xml_str:
self.sar_des_header = self.des_accessor.extract_des_header(xml_data_segment)
self.sar_updater = SIDDUpdater(xml_str)
elif "SICD" in xml_str:
self.sar_des_header = self.des_accessor.extract_des_header(xml_data_segment)
self.sar_updater = SICDUpdater(xml_str)

self.default_gdal_translate_kwargs = self._create_gdal_translate_kwargs()

def create_encoded_tile(self, src_window: List[int]) -> Optional[bytearray]:
def create_encoded_tile(
self, src_window: List[int], output_size: Optional[Tuple[int, int]] = None
) -> Optional[bytearray]:
"""
This method cuts a tile from the full image, updates the metadata as needed, and finally compresses/encodes
the result in the output format requested.

:param src_window: the [left_x, top_y, width, height] bounds of this tile
:param output_size: an optional size of the output tile (width, height)
:return: the encoded image tile or None if one could not be produced
"""
temp_ds_name = f"/vsimem/{token_hex(16)}.{self.tile_format}"
Expand All @@ -81,23 +86,27 @@ def create_encoded_tile(self, src_window: List[int]) -> Optional[bytearray]:
# create image tiles using the format, compression, etc. requested by the client.
gdal_translate_kwargs = self.default_gdal_translate_kwargs.copy()

if output_size is not None:
gdal_translate_kwargs["width"] = output_size[0]
gdal_translate_kwargs["height"] = output_size[1]

# Create a new IGEOLO value based on the corner points of this tile
if self.sensor_model is not None and self.tile_format == GDALImageFormats.NITF:
gdal_translate_kwargs["creationOptions"].append("ICORDS=G")
gdal_translate_kwargs["creationOptions"].append("IGEOLO=" + self.create_new_igeolo(src_window))

if self.sicd_updater is not None and self.tile_format == GDALImageFormats.NITF:
# If we're outputting a SICD tile we need to update the XML metadata to include the new chip
if self.sar_updater is not None and self.tile_format == GDALImageFormats.NITF:
# If we're outputting a SICD or SIDD tile we need to update the XML metadata to include the new chip
# origin and size. This will allow applications using the tile to correctly interpret the remaining
# image metadata.
self.sicd_updater.update_image_data_for_chip(src_window)
updated_sicd_des = self.sicd_des_header + self.sicd_updater.encode_current_xml()
self.sar_updater.update_image_data_for_chip(src_window, output_size)
updated_sar_des = self.sar_des_header + self.sar_updater.encode_current_xml()

gdal_translate_kwargs["creationOptions"].append("ICAT=SAR")
gdal_translate_kwargs["creationOptions"].append("IREP=NODISPLY")
gdal_translate_kwargs["creationOptions"].append("IREPBAND= , ")
gdal_translate_kwargs["creationOptions"].append("ISUBCAT=I,Q")
gdal_translate_kwargs["creationOptions"].append("DES=XML_DATA_CONTENT=" + updated_sicd_des)
gdal_translate_kwargs["creationOptions"].append("DES=XML_DATA_CONTENT=" + updated_sar_des)

# Use GDAL to create an encoded tile of the image region
# From GDAL documentation:
Expand Down
86 changes: 24 additions & 62 deletions src/aws/osml/image_processing/sicd_updater.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,36 @@
import logging
import re
from typing import Callable, List, TypeVar
from xml.etree import ElementTree as ET
from math import floor
from typing import List, Optional, Tuple

from defusedxml import ElementTree
from xsdata.formats.dataclass.parsers import XmlParser
from xsdata.formats.dataclass.serializers import XmlSerializer
from xsdata.formats.dataclass.serializers.config import SerializerConfig

logger = logging.getLogger(__name__)

# This is a type placeholder needed by the parse_element_text() type hints
T = TypeVar("T")


class SICDUpdater:
"""
This class provides a means to perform common updates to a SICD XML metadata document.
"""

def __init__(self, sicd_element: ET.Element):
def __init__(self, xml_str: str):
"""
Construct a new instance of this class to manage a given set of SICD metadata.

:param sicd_element: the SICD XML metadata to update
:param xml_str: the SICD XML metadata to update
"""
self.sicd_element = sicd_element

# Extract the XML namespace from the root SICD element and store it for later use in element queries
namespace_match = re.match(r"{.*}", self.sicd_element.tag)
self.namespace = namespace_match.group(0) if namespace_match else ""

# We don't currently have many examples of SICD data. An attempt has been made to make this code
# work so long as the portions of the XML schema we depend upon don't change. This warning is just
# an attempt to provide diagnostic information incase future datasets don't work.
if self.namespace != "{urn:SICD:1.2.1}":
logger.warning(f"Attempting to process SICD metadata with an untested namespace {self.namespace}")
self.xml_str = xml_str
if self.xml_str is not None and len(self.xml_str) > 0:
parser = XmlParser()
self.sicd = parser.from_string(self.xml_str)

# Here we're storing off the original first row/col to support the case where multiple chips are
# created from a SICD image that has already been chipped.
self.original_first_row = self.parse_element_text(".//{0}FirstRow".format(self.namespace), int)
self.original_first_col = self.parse_element_text(".//{0}FirstCol".format(self.namespace), int)
self.original_first_row = self.sicd.image_data.first_row
self.original_first_col = self.sicd.image_data.first_col

def update_image_data_for_chip(self, chip_bounds: List[int]) -> None:
def update_image_data_for_chip(self, chip_bounds: List[int], output_size: Optional[Tuple[int, int]]) -> None:
"""
This updates the SICD ImageData structure so that the FirstRow, FirstCol and NumRows, NumCols
elements match the new chip boundary.A sample of this XML structure is shown below::
Expand All @@ -57,52 +48,23 @@ def update_image_data_for_chip(self, chip_bounds: List[int]) -> None:


:param chip_bounds: the [col, row, width, height] of the chip boundary
:param output_size: the [width, height] of the output chip
"""
first_col_element = self.sicd_element.find(".//{0}FirstCol".format(self.namespace))
first_row_element = self.sicd_element.find(".//{0}FirstRow".format(self.namespace))
num_cols_element = self.sicd_element.find(".//{0}NumCols".format(self.namespace))
num_rows_element = self.sicd_element.find(".//{0}NumRows".format(self.namespace))
if first_row_element is None or first_col_element is None or num_cols_element is None or num_rows_element is None:
logger.warning("SICD ImageData structures were not found. No updates applied.")
return

first_col_element.text = str(self.original_first_col + chip_bounds[0])
first_row_element.text = str(self.original_first_row + chip_bounds[1])
num_cols_element.text = str(chip_bounds[2])
num_rows_element.text = str(chip_bounds[3])
if output_size is not None and (output_size[0] != chip_bounds[2] or output_size[1] != chip_bounds[3]):
raise ValueError("SICD chipping does not support scaling operations.")

if logger.isEnabledFor(logging.DEBUG):
image_data_element = self.sicd_element.find(".//{0}ImageData".format(self.namespace))
if image_data_element is not None:
logger.debug("Updated SICD ImageData element for chip:")
logger.debug(
ElementTree.tostring(
image_data_element,
encoding="unicode",
)
)
self.sicd.image_data.first_row = floor(float(self.original_first_row)) + int(chip_bounds[1])
self.sicd.image_data.first_col = floor(float(self.original_first_col)) + int(chip_bounds[0])
self.sicd.image_data.num_rows = int(chip_bounds[3])
self.sicd.image_data.num_cols = int(chip_bounds[2])

def encode_current_xml(self) -> str:
"""
Returns a copy of the current SICD metadata encoded in XML.

:return: xml encoded SICD metadata
"""
return ElementTree.tostring(self.sicd_element, encoding="unicode")

def parse_element_text(self, element_xpath: str, type_conversion: Callable[[str], T]) -> T:
"""
This function finds the first element matching the provided xPath and then runs the text of that element
through the provided conversion function.

:param element_xpath: the xPath of the element
:param type_conversion: the desired type of the output, must support construction from a string
:return: the element text converted to the requested type
"""
field_element = self.sicd_element.find(element_xpath)
if field_element is None:
raise ValueError(f"Unable to find element {element_xpath}")
str_value = field_element.text
if str_value is None:
raise ValueError(f"Element {element_xpath} does not have text.")
return type_conversion(str_value)
serializer = XmlSerializer(config=SerializerConfig(pretty_print=False))
updated_xml = serializer.render(self.sicd)
return updated_xml
Loading