Skip to content

Commit

Permalink
Fixes to the raster as grid reader (#593)
Browse files Browse the repository at this point in the history
* changes to the RasterAsGridReader and ReTileOnRead processes

* removed some cruft

* switch runners

* fixed failing zarr tests

* disabled the test for netcdf with checkpointing disabled
  • Loading branch information
sllynn authored Nov 12, 2024
1 parent a9b17e4 commit 27fd3ec
Show file tree
Hide file tree
Showing 17 changed files with 149 additions and 51 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
- "**"
jobs:
build:
runs-on: ubuntu-22.04
runs-on: larger
env:
GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }}
strategy:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:

jobs:
build:
runs-on: ubuntu-22.04
runs-on: larger
env:
GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }}
strategy:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build_r.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:

jobs:
build:
runs-on: ubuntu-22.04
runs-on: larger
env:
GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }}
strategy:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build_scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:

jobs:
build:
runs-on: ubuntu-22.04
runs-on: larger
env:
GITHUB_PAT: ${{ secrets.GITHUB_TOKEN }}
strategy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,15 @@ case class MosaicRasterGDAL(
def isEmpty: Boolean = {
val bands = getBands
if (bands.isEmpty) {
subdatasets.values
.filter(_.toLowerCase(Locale.ROOT).startsWith(getDriversShortName.toLowerCase(Locale.ROOT)))
.flatMap(bp => readRaster(createInfo + ("path" -> bp)).getBands)
.takeWhile(_.isEmpty)
.nonEmpty
if (subdatasets.values.isEmpty) {
true
} else {
subdatasets.values
.filter(_.toLowerCase(Locale.ROOT).startsWith(getDriversShortName.toLowerCase(Locale.ROOT)))
.flatMap(bp => readRaster(createInfo + ("path" -> bp)).getBands)
.takeWhile(_.isEmpty)
.nonEmpty
}
} else {
bands.takeWhile(_.isEmpty).nonEmpty
}
Expand Down Expand Up @@ -530,9 +534,10 @@ case class MosaicRasterGDAL(
*/
def cleanUp(): Unit = {
// 0.4.4 - don't delete any checkpointing or fuse locations.
if (PathUtils.isTmpLocation(path)) {
Try(gdal.GetDriverByName(getDriversShortName).Delete(path))
PathUtils.cleanUpPath(path)
{
if (PathUtils.isTmpLocation(path)) {
PathUtils.cleanUpPath(path)
}
}
}

Expand All @@ -544,6 +549,9 @@ case class MosaicRasterGDAL(
val raster = getRaster
if (raster != null) {
raster.FlushCache()
for (band <- getBands) {
band.band.delete()
}
raster.delete()
}
}
Expand Down Expand Up @@ -730,8 +738,8 @@ object MosaicRasterGDAL extends RasterReader {
case Some(driverShortName) =>
val drivers = new JVector[String]()
drivers.add(driverShortName)
gdal.OpenEx(path, GA_ReadOnly, drivers)
case None => gdal.Open(path, GA_ReadOnly)
gdal.OpenEx(path, OF_READONLY, drivers)
case None => gdal.Open(path, OF_READONLY)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ object RasterClipByVector {
)

VectorClipper.cleanUpClipper(shapeFileName)
PathUtils.cleanUpPath(shapeFileName)

result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.databricks.labs.mosaic.core.raster.gdal.{MosaicRasterGDAL, MosaicRast
import org.gdal.gdal.{TranslateOptions, gdal}

import java.nio.file.{Files, Paths}
import scala.util.Try

/** GDALTranslate is a wrapper for the GDAL Translate command. */
object GDALTranslate {
Expand Down Expand Up @@ -32,7 +33,14 @@ object GDALTranslate {
val translateOptions = new TranslateOptions(translateOptionsVec)
val result = gdal.Translate(outputPath, raster.getRaster, translateOptions)
val errorMsg = gdal.GetLastErrorMsg
val size = Files.size(Paths.get(outputPath))
val size = Try(Files.size(Paths.get(outputPath))).getOrElse(
{
val msg = "Error during GDAL translate operation: " +
s"file ${raster.getPath} could not be translated to $outputPath " +
s"with command '$effectiveCommand'. GDAL returned error: $errorMsg"
throw new Exception(msg)
}
)
val createInfo = Map(
"path" -> outputPath,
"parentPath" -> raster.getParentPath,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.databricks.labs.mosaic.core.raster.operator.gdal

import com.databricks.labs.mosaic.core.raster.gdal.MosaicRasterGDAL
import org.gdal.gdal.{WarpOptions, gdal}
import org.gdal.gdal.{Dataset, WarpOptions, gdal}
import org.gdal.gdalconst.gdalconstConstants

import java.nio.file.{Files, Paths}
import scala.sys.process._
import scala.util.Try


/** GDALWarp is a wrapper for the GDAL Warp command. */
object GDALWarp {
Expand All @@ -22,14 +26,21 @@ object GDALWarp {
*/
def executeWarp(outputPath: String, rasters: Seq[MosaicRasterGDAL], command: String): MosaicRasterGDAL = {
require(command.startsWith("gdalwarp"), "Not a valid GDAL Warp command.")
// Test: gdal.ParseCommandLine(command)

val effectiveCommand = OperatorOptions.appendOptions(command, rasters.head.getWriteOptions)
val warpOptionsVec = OperatorOptions.parseOptions(effectiveCommand)
val warpOptions = new WarpOptions(warpOptionsVec)
val result = gdal.Warp(outputPath, rasters.map(_.getRaster).toArray, warpOptions)
// Format will always be the same as the first raster
val errorMsg = gdal.GetLastErrorMsg
val size = Files.size(Paths.get(outputPath))
val size = Try(Files.size(Paths.get(outputPath))).getOrElse(
{
val msg = "Error during GDAL warp operation: " +
s"file(s) \\n '${rasters.map(_.getPath).mkString("\\n")}' could not be reprojected to $outputPath " +
s"with command '$effectiveCommand'. GDAL returned error: $errorMsg"
throw new Exception(msg)
}
)
val createInfo = Map(
"path" -> outputPath,
"parentPath" -> rasters.head.getParentPath,
Expand All @@ -41,4 +52,21 @@ object GDALWarp {
rasters.head.copy(raster = result, createInfo = createInfo).flushCache()
}

/**
* Executes the GDAL Warp command on a single raster in a thread-safe manner.
* @param inPath
* The path of the input raster.
* @param outputPath
* The output path of the warped file.
* @param effectiveCommand
* The effective GDAL Warp command, e.g. `gdalwarp -d_srs EPSG:4326`.
* @return
* A Dataset object.
**/
private def warpSingleRasterWithGDALApplication(inPath: String, outputPath: String, effectiveCommand: String): Dataset = this.synchronized {
val executedCommand = s"$effectiveCommand $inPath $outputPath"
val commandOutput = executedCommand.!!
gdal.Open(outputPath, gdalconstConstants.GA_ReadOnly)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ object BalancedSubdivision {
tile: MosaicRasterTile,
sizeInMb: Int
): Seq[MosaicRasterTile] = {
if (sizeInMb <= 0) return Seq(tile)
val numSplits = getNumSplits(tile.getRaster, sizeInMb)
val (x, y) = tile.getRaster.getDimensions
val (tileX, tileY) = getTileSize(x, y, numSplits)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.databricks.labs.mosaic.datasource.gdal

import com.databricks.labs.mosaic.core.index.{IndexSystem, IndexSystemFactory}
import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.core.raster.gdal.MosaicRasterGDAL
import com.databricks.labs.mosaic.core.raster.io.RasterCleaner
import com.databricks.labs.mosaic.core.raster.operator.retile.BalancedSubdivision
Expand All @@ -14,8 +15,6 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._

import java.nio.file.{Files, Paths}

/** An object defining the retiling read strategy for the GDAL file format. */
object ReTileOnRead extends ReadStrategy {

Expand Down Expand Up @@ -89,11 +88,34 @@ object ReTileOnRead extends ReadStrategy {
options: Map[String, String],
indexSystem: IndexSystem
): Iterator[InternalRow] = {
val inPath = status.getPath.toString
val uuid = getUUID(status)
val sizeInMB = options.getOrElse("sizeInMB", "16").toInt

var tmpPath = PathUtils.copyToTmpWithRetry(inPath, 5)
val inPath = status.getPath.toString
val tmpPath = options.getOrElse("readSubdataset", "false").toBoolean match {
case true =>
val readRaster = GDAL.raster(status.getPath.toString, status.getPath.toString)
val subDatasets = readRaster.subdatasets
if (subDatasets.isEmpty) {
throw new RuntimeException(s"Option 'readSubdataset' was set to 'true' but no subdatasets were found in ${status.getPath.toString}")
}
if (options.contains("subdatasetName")) {
val subdatasetName = options("subdatasetName")
if (!subDatasets.contains(subdatasetName)) {
throw new RuntimeException(s"Subdataset $subdatasetName not found in ${status.getPath.toString}")
}

val subdatasetPath = PathUtils.createTmpFilePath(readRaster.getRasterFileExtension)
readRaster.getSubdataset(subdatasetName).writeToPath(subdatasetPath)
readRaster.destroy()
subdatasetPath
}
else {
throw new RuntimeException(s"Option 'readSubdataset' was set to 'true' but 'subdatasetName' was not provided for ${status.getPath.toString}")
}
case false => PathUtils.copyToTmpWithRetry(inPath, 5)
}

val tiles = localSubdivide(tmpPath, inPath, sizeInMB)

val rows = tiles.map(tile => {
Expand All @@ -117,7 +139,8 @@ object ReTileOnRead extends ReadStrategy {
row
})

Files.deleteIfExists(Paths.get(tmpPath))
PathUtils.cleanUpPath(tmpPath)
// Try(Files.deleteIfExists(Paths.get(tmpPath)))

rows.iterator
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.databricks.labs.mosaic.datasource.multiread

import com.databricks.labs.mosaic.MOSAIC_RASTER_READ_STRATEGY
import com.databricks.labs.mosaic.functions.MosaicContext
import com.databricks.labs.mosaic.{MOSAIC_RASTER_READ_STRATEGY, MOSAIC_RASTER_RE_TILE_ON_READ}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

Expand Down Expand Up @@ -39,31 +39,27 @@ class RasterAsGridReader(sparkSession: SparkSession) extends MosaicDataFrameRead
val config = getConfig
val resolution = config("resolution").toInt
val nPartitions = getNPartitions(config)
val readStrategy = config("retile") match {
case "true" => "retile_on_read"
case _ => "in_memory"
}
val tileSize = config("sizeInMB").toInt

val tileSizeInMB = config("sizeInMB").toInt

val nCores = nWorkers * workerNCores
val stageCoefficient = math.ceil(math.log(nCores) / math.log(4))

val firstStageSize = (tileSize * math.pow(4, stageCoefficient)).toInt
val firstStageSize = (tileSizeInMB * math.pow(4, stageCoefficient)).toInt

val pathsDf = sparkSession.read
.format("gdal")
.option("extensions", config("extensions"))
.option(MOSAIC_RASTER_READ_STRATEGY, readStrategy)
.option(MOSAIC_RASTER_READ_STRATEGY, MOSAIC_RASTER_RE_TILE_ON_READ)
.option("vsizip", config("vsizip"))
.option("sizeInMB", firstStageSize)
.options(extraOptions)
.load(paths: _*)
.repartition(nPartitions)

val rasterToGridCombiner = getRasterToGridFunc(config("combiner"))
val retiledDf = retileRaster(pathsDf, config)

val rasterDf = resolveRaster(pathsDf, config)

val retiledDf = retileRaster(rasterDf, config)
val rasterToGridCombiner = getRasterToGridFunc(config("combiner"))

val loadedDf = retiledDf
.withColumn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ case class RST_Avg(tileExpr: Expression, expressionConfig: MosaicExpressionConfi
// parse json from gdalinfo
val json = parse(gdalInfo).extract[Map[String, Any]]
val meanValues = json("bands").asInstanceOf[List[Map[String, Any]]].map { band =>
band("mean").asInstanceOf[Double]
band.getOrElse("mean", Double.NaN).asInstanceOf[Double]
}
ArrayData.toArrayData(meanValues.toArray)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.databricks.labs.mosaic.expressions.raster.base

import com.databricks.labs.mosaic.core.index.{IndexSystem, IndexSystemFactory}
import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.core.raster.io.RasterCleaner
import com.databricks.labs.mosaic.core.types.RasterTileType
import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile
Expand Down Expand Up @@ -63,7 +62,6 @@ abstract class RasterExpression[T <: Expression: ClassTag](
* The result of the expression.
*/
override def nullSafeEval(input: Any): Any = {
GDAL.enable(expressionConfig)
val rasterType = RasterTileType(rasterExpr, expressionConfig.isRasterUseCheckpoint).rasterType
val tile = MosaicRasterTile.deserialize(
input.asInstanceOf[InternalRow],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.databricks.labs.mosaic.expressions.raster.base

import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI
import com.databricks.labs.mosaic.core.index.{IndexSystem, IndexSystemFactory}
import com.databricks.labs.mosaic.core.raster.api.GDAL
import com.databricks.labs.mosaic.core.raster.io.RasterCleaner
import com.databricks.labs.mosaic.core.types.RasterTileType
import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile
Expand Down Expand Up @@ -78,7 +77,6 @@ abstract class RasterTessellateGeneratorExpression[T <: Expression: ClassTag](
def rasterGenerator(raster: MosaicRasterTile, resolution: Int): Seq[MosaicRasterTile]

override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
GDAL.enable(expressionConfig)
val rasterType = RasterTileType(tileExpr, expressionConfig.isRasterUseCheckpoint).rasterType
val tile = MosaicRasterTile
.deserialize(tileExpr.eval(input).asInstanceOf[InternalRow], indexSystem.getCellIdDataType, rasterType)
Expand Down
28 changes: 27 additions & 1 deletion src/main/scala/com/databricks/labs/mosaic/utils/PathUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package com.databricks.labs.mosaic.utils
import com.databricks.labs.mosaic.MOSAIC_RASTER_TMP_PREFIX_DEFAULT
import com.databricks.labs.mosaic.functions.MosaicContext

import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{Files, Path, Paths}
import java.time.Clock
import scala.jdk.CollectionConverters._
import scala.util.Try

Expand Down Expand Up @@ -40,11 +42,33 @@ object PathUtils {
Try(Files.deleteIfExists(Paths.get(zipPath.replace(".zip", ""))))
}
Try(Files.deleteIfExists(Paths.get(zipPath)))
collectEmptyTmpDirs()
}

private def collectEmptyTmpDirs(): Unit = this.synchronized {
// iterate over all the directories in the temp location and delete any that are empty
// and older than 10 seconds
// This needs to be thread safe so we don't try and probe a directory
// that has been deleted in another thread
val tmpDir = Paths.get(MosaicContext.tmpDir(null)).getParent
if (Files.exists(tmpDir)) {
tmpDir.toFile
.listFiles
.filter(_.isDirectory)
.filter({ f =>
val attrs = Files.readAttributes(Paths.get(f.getAbsolutePath), classOf[BasicFileAttributes])
val lastModifiedTime = attrs.lastModifiedTime().toInstant
Clock.systemDefaultZone().instant().minusSeconds(10).isAfter(lastModifiedTime)
})
.filter(_.listFiles.isEmpty)
.foreach({ f => Try(f.delete()) })
}
}

/**
* Copy provided path to tmp.
* @param inPath
*
* @param inPath
* Path to copy from.
* @return
* The copied path.
Expand Down Expand Up @@ -152,6 +176,8 @@ object PathUtils {
s"$format:$vsiPrefix$filePath:$subdataset"
}



/**
* Clean path.
* - handles fuse paths.
Expand Down
Loading

0 comments on commit 27fd3ec

Please sign in to comment.