Skip to content

Commit

Permalink
changes to the RasterAsGridReader and ReTileOnRead processes
Browse files Browse the repository at this point in the history
  • Loading branch information
sllynn committed Nov 8, 2024
1 parent a38dc21 commit 7e959f2
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,15 @@ case class MosaicRasterGDAL(
def isEmpty: Boolean = {
val bands = getBands
if (bands.isEmpty) {
subdatasets.values
.filter(_.toLowerCase(Locale.ROOT).startsWith(getDriversShortName.toLowerCase(Locale.ROOT)))
.flatMap(bp => readRaster(createInfo + ("path" -> bp)).getBands)
.takeWhile(_.isEmpty)
.nonEmpty
if (subdatasets.values.isEmpty) {
true
} else {
subdatasets.values
.filter(_.toLowerCase(Locale.ROOT).startsWith(getDriversShortName.toLowerCase(Locale.ROOT)))
.flatMap(bp => readRaster(createInfo + ("path" -> bp)).getBands)
.takeWhile(_.isEmpty)
.nonEmpty
}
} else {
bands.takeWhile(_.isEmpty).nonEmpty
}
Expand Down Expand Up @@ -530,9 +534,10 @@ case class MosaicRasterGDAL(
*/
def cleanUp(): Unit = {
// 0.4.4 - don't delete any checkpointing or fuse locations.
if (PathUtils.isTmpLocation(path)) {
Try(gdal.GetDriverByName(getDriversShortName).Delete(path))
PathUtils.cleanUpPath(path)
{
if (PathUtils.isTmpLocation(path)) {
PathUtils.cleanUpPath(path)
}
}
}

Expand All @@ -544,6 +549,9 @@ case class MosaicRasterGDAL(
val raster = getRaster
if (raster != null) {
raster.FlushCache()
for (band <- getBands) {
band.band.delete()
}
raster.delete()
}
}
Expand Down Expand Up @@ -730,8 +738,10 @@ object MosaicRasterGDAL extends RasterReader {
case Some(driverShortName) =>
val drivers = new JVector[String]()
drivers.add(driverShortName)
gdal.OpenEx(path, GA_ReadOnly, drivers)
case None => gdal.Open(path, GA_ReadOnly)
gdal.OpenEx(path, OF_READONLY, drivers)
case None => gdal.Open(path, OF_READONLY)
// gdal.OpenEx(path, OF_READONLY | OF_SHARED, drivers)
// case None => gdal.Open(path, OF_READONLY | OF_SHARED)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ object RasterClipByVector {
)

VectorClipper.cleanUpClipper(shapeFileName)
PathUtils.cleanUpPath(shapeFileName)

result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.databricks.labs.mosaic.core.raster.gdal.{MosaicRasterGDAL, MosaicRast
import org.gdal.gdal.{TranslateOptions, gdal}

import java.nio.file.{Files, Paths}
import scala.util.Try

/** GDALTranslate is a wrapper for the GDAL Translate command. */
object GDALTranslate {
Expand Down Expand Up @@ -32,7 +33,14 @@ object GDALTranslate {
val translateOptions = new TranslateOptions(translateOptionsVec)
val result = gdal.Translate(outputPath, raster.getRaster, translateOptions)
val errorMsg = gdal.GetLastErrorMsg
val size = Files.size(Paths.get(outputPath))
val size = Try(Files.size(Paths.get(outputPath))).getOrElse(
{
val msg = "Error during GDAL translate operation: " +
s"file ${raster.getPath} could not be translated to $outputPath " +
s"with command '$effectiveCommand'. GDAL returned error: $errorMsg"
throw new Exception(msg)
}
)
val createInfo = Map(
"path" -> outputPath,
"parentPath" -> raster.getParentPath,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.databricks.labs.mosaic.core.raster.operator.gdal

import com.databricks.labs.mosaic.core.raster.gdal.MosaicRasterGDAL
import org.gdal.gdal.{WarpOptions, gdal}
import org.gdal.gdal.{Dataset, WarpOptions, gdal}
import org.gdal.gdalconst.gdalconstConstants

import java.nio.file.{Files, Paths}
import scala.sys.process._
import scala.util.Try


/** GDALWarp is a wrapper for the GDAL Warp command. */
object GDALWarp {
Expand All @@ -22,14 +26,21 @@ object GDALWarp {
*/
def executeWarp(outputPath: String, rasters: Seq[MosaicRasterGDAL], command: String): MosaicRasterGDAL = {
require(command.startsWith("gdalwarp"), "Not a valid GDAL Warp command.")
// Test: gdal.ParseCommandLine(command)

val effectiveCommand = OperatorOptions.appendOptions(command, rasters.head.getWriteOptions)
val warpOptionsVec = OperatorOptions.parseOptions(effectiveCommand)
val warpOptions = new WarpOptions(warpOptionsVec)
val result = gdal.Warp(outputPath, rasters.map(_.getRaster).toArray, warpOptions)
// Format will always be the same as the first raster
val errorMsg = gdal.GetLastErrorMsg
val size = Files.size(Paths.get(outputPath))
val size = Try(Files.size(Paths.get(outputPath))).getOrElse(
{
val msg = "Error during GDAL warp operation: " +
s"file(s) \\n '${rasters.map(_.getPath).mkString("\\n")}' could not be reprojected to $outputPath " +
s"with command '$effectiveCommand'. GDAL returned error: $errorMsg"
throw new Exception(msg)
}
)
val createInfo = Map(
"path" -> outputPath,
"parentPath" -> rasters.head.getParentPath,
Expand All @@ -41,4 +52,21 @@ object GDALWarp {
rasters.head.copy(raster = result, createInfo = createInfo).flushCache()
}

/**
* Executes the GDAL Warp command on a single raster in a thread-safe manner.
* @param inPath
* The path of the input raster.
* @param outputPath
* The output path of the warped file.
* @param effectiveCommand
* The effective GDAL Warp command, e.g. `gdalwarp -d_srs EPSG:4326`.
* @return
* A Dataset object.
**/
private def warpSingleRasterWithGDALApplication(inPath: String, outputPath: String, effectiveCommand: String): Dataset = this.synchronized {
val executedCommand = s"$effectiveCommand $inPath $outputPath"
val commandOutput = executedCommand.!!
gdal.Open(outputPath, gdalconstConstants.GA_ReadOnly)
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.databricks.labs.mosaic.datasource.gdal

import com.databricks.labs.mosaic.core.index.{IndexSystem, IndexSystemFactory}
import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.core.raster.gdal.MosaicRasterGDAL
import com.databricks.labs.mosaic.core.raster.io.RasterCleaner
import com.databricks.labs.mosaic.core.raster.operator.retile.BalancedSubdivision
Expand Down Expand Up @@ -89,11 +90,34 @@ object ReTileOnRead extends ReadStrategy {
options: Map[String, String],
indexSystem: IndexSystem
): Iterator[InternalRow] = {
val inPath = status.getPath.toString
val uuid = getUUID(status)
val sizeInMB = options.getOrElse("sizeInMB", "16").toInt

var tmpPath = PathUtils.copyToTmpWithRetry(inPath, 5)
val inPath = status.getPath.toString
val tmpPath = options.getOrElse("readSubdataset", "false").toBoolean match {
case true =>
val readRaster = GDAL.raster(status.getPath.toString, status.getPath.toString)
val subDatasets = readRaster.subdatasets
if (subDatasets.isEmpty) {
throw new RuntimeException(s"Option 'readSubdataset' was set to 'true' but no subdatasets were found in ${status.getPath.toString}")
}
if (options.contains("subdatasetName")) {
val subdatasetName = options("subdatasetName")
if (!subDatasets.contains(subdatasetName)) {
throw new RuntimeException(s"Subdataset $subdatasetName not found in ${status.getPath.toString}")
}

val subdatasetPath = PathUtils.createTmpFilePath(readRaster.getRasterFileExtension)
readRaster.getSubdataset(subdatasetName).writeToPath(subdatasetPath)
readRaster.destroy()
subdatasetPath
}
else {
throw new RuntimeException(s"Option 'readSubdataset' was set to 'true' but 'subdatasetName' was not provided for ${status.getPath.toString}")
}
case false => PathUtils.copyToTmpWithRetry(inPath, 5)
}

val tiles = localSubdivide(tmpPath, inPath, sizeInMB)

val rows = tiles.map(tile => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.databricks.labs.mosaic.datasource.multiread

import com.databricks.labs.mosaic.MOSAIC_RASTER_READ_STRATEGY
import com.databricks.labs.mosaic.functions.MosaicContext
import com.databricks.labs.mosaic.{MOSAIC_RASTER_READ_STRATEGY, MOSAIC_RASTER_RE_TILE_ON_READ}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

Expand Down Expand Up @@ -39,31 +39,27 @@ class RasterAsGridReader(sparkSession: SparkSession) extends MosaicDataFrameRead
val config = getConfig
val resolution = config("resolution").toInt
val nPartitions = getNPartitions(config)
val readStrategy = config("retile") match {
case "true" => "retile_on_read"
case _ => "in_memory"
}
val tileSize = config("sizeInMB").toInt

val tileSizeInMB = config("sizeInMB").toInt

val nCores = nWorkers * workerNCores
val stageCoefficient = math.ceil(math.log(nCores) / math.log(4))

val firstStageSize = (tileSize * math.pow(4, stageCoefficient)).toInt
val firstStageSize = (tileSizeInMB * math.pow(4, stageCoefficient)).toInt

val pathsDf = sparkSession.read
.format("gdal")
.option("extensions", config("extensions"))
.option(MOSAIC_RASTER_READ_STRATEGY, readStrategy)
.option(MOSAIC_RASTER_READ_STRATEGY, MOSAIC_RASTER_RE_TILE_ON_READ)
.option("vsizip", config("vsizip"))
.option("sizeInMB", firstStageSize)
.options(extraOptions)
.load(paths: _*)
.repartition(nPartitions)

val rasterToGridCombiner = getRasterToGridFunc(config("combiner"))
val retiledDf = retileRaster(pathsDf, config)

val rasterDf = resolveRaster(pathsDf, config)

val retiledDf = retileRaster(rasterDf, config)
val rasterToGridCombiner = getRasterToGridFunc(config("combiner"))

val loadedDf = retiledDf
.withColumn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ case class RST_Avg(tileExpr: Expression, expressionConfig: MosaicExpressionConfi
// parse json from gdalinfo
val json = parse(gdalInfo).extract[Map[String, Any]]
val meanValues = json("bands").asInstanceOf[List[Map[String, Any]]].map { band =>
band("mean").asInstanceOf[Double]
band.getOrElse("mean", Double.NaN).asInstanceOf[Double]
}
ArrayData.toArrayData(meanValues.toArray)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.databricks.labs.mosaic.expressions.raster.base

import com.databricks.labs.mosaic.core.index.{IndexSystem, IndexSystemFactory}
import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.core.raster.io.RasterCleaner
import com.databricks.labs.mosaic.core.types.RasterTileType
import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile
Expand Down Expand Up @@ -63,7 +62,6 @@ abstract class RasterExpression[T <: Expression: ClassTag](
* The result of the expression.
*/
override def nullSafeEval(input: Any): Any = {
GDAL.enable(expressionConfig)
val rasterType = RasterTileType(rasterExpr, expressionConfig.isRasterUseCheckpoint).rasterType
val tile = MosaicRasterTile.deserialize(
input.asInstanceOf[InternalRow],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.databricks.labs.mosaic.expressions.raster.base

import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI
import com.databricks.labs.mosaic.core.index.{IndexSystem, IndexSystemFactory}
import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.core.raster.io.RasterCleaner
import com.databricks.labs.mosaic.core.types.RasterTileType
import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile
Expand Down Expand Up @@ -78,7 +77,6 @@ abstract class RasterTessellateGeneratorExpression[T <: Expression: ClassTag](
def rasterGenerator(raster: MosaicRasterTile, resolution: Int): Seq[MosaicRasterTile]

override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
GDAL.enable(expressionConfig)
val rasterType = RasterTileType(tileExpr, expressionConfig.isRasterUseCheckpoint).rasterType
val tile = MosaicRasterTile
.deserialize(tileExpr.eval(input).asInstanceOf[InternalRow], indexSystem.getCellIdDataType, rasterType)
Expand Down
28 changes: 27 additions & 1 deletion src/main/scala/com/databricks/labs/mosaic/utils/PathUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package com.databricks.labs.mosaic.utils
import com.databricks.labs.mosaic.MOSAIC_RASTER_TMP_PREFIX_DEFAULT
import com.databricks.labs.mosaic.functions.MosaicContext

import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{Files, Path, Paths}
import java.time.Clock
import scala.jdk.CollectionConverters._
import scala.util.Try

Expand Down Expand Up @@ -40,11 +42,33 @@ object PathUtils {
Try(Files.deleteIfExists(Paths.get(zipPath.replace(".zip", ""))))
}
Try(Files.deleteIfExists(Paths.get(zipPath)))
collectEmptyTmpDirs()
}

private def collectEmptyTmpDirs(): Unit = this.synchronized {
// iterate over all the directories in the temp location and delete any that are empty
// and older than 10 seconds
// This needs to be thread safe so we don't try and probe a directory
// that has been deleted in another thread
val tmpDir = Paths.get(MosaicContext.tmpDir(null)).getParent
if (Files.exists(tmpDir)) {
tmpDir.toFile
.listFiles
.filter(_.isDirectory)
.filter({ f =>
val attrs = Files.readAttributes(Paths.get(f.getAbsolutePath), classOf[BasicFileAttributes])
val lastModifiedTime = attrs.lastModifiedTime().toInstant
Clock.systemDefaultZone().instant().minusSeconds(10).isAfter(lastModifiedTime)
})
.filter(_.listFiles.isEmpty)
.foreach({ f => Try(f.delete()) })
}
}

/**
* Copy provided path to tmp.
* @param inPath
*
* @param inPath
* Path to copy from.
* @return
* The copied path.
Expand Down Expand Up @@ -152,6 +176,8 @@ object PathUtils {
s"$format:$vsiPrefix$filePath:$subdataset"
}



/**
* Clean path.
* - handles fuse paths.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@ class RasterAsGridReaderTest extends MosaicSpatialQueryTest with SharedSparkSess

test("Read netcdf with Raster As Grid Reader") {
assume(System.getProperty("os.name") == "Linux")
MosaicContext.build(H3IndexSystem, JTS)
// assume(checkpointingEnabled)
val mc = MosaicContext.build(H3IndexSystem, JTS)
mc.register(spark)


val netcdf = "/binary/netcdf-coral/"
val filePath = getClass.getResource(netcdf).getPath
val filePath = this.getClass.getResource(netcdf).getPath

noException should be thrownBy MosaicContext.read
.format("raster_to_grid")
.option("retile", "true")
.option("tileSize", "10")
.option("sizeInMB", "16")
.option("resolution", "0")
.option("readSubdataset", "true")
.option("subdataset", "1")
.option("subdatasetName", "bleaching_alert_area")
.option("retile", "true")
.option("tileSize", "600")
.option("kRingInterpolate", "3")
.option("combiner", "avg")
.load(filePath)
.select("measure")
.queryExecution
.executedPlan
.take(1)

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package org.apache.spark.sql.test

import com.databricks.labs.mosaic._
import com.databricks.labs.mosaic.gdal.MosaicGDAL
import com.databricks.labs.mosaic.utils.FileUtils
import com.databricks.labs.mosaic._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.{Args, CompositeStatus, Status}
Expand Down

0 comments on commit 7e959f2

Please sign in to comment.