From 899a4034754206ea5e5562d8328a62a2b63492bc Mon Sep 17 00:00:00 2001 From: "milos.colic" Date: Thu, 14 Dec 2023 16:17:19 +0000 Subject: [PATCH 1/2] Fix the GDAL max cache value. Fix GDAL config passing from spark.conf. Revert logic for base RST_ expressions. --- .../labs/mosaic/core/raster/api/GDAL.scala | 8 -------- .../core/raster/gdal/MosaicRasterGDAL.scala | 1 - .../raster/base/Raster1ArgExpression.scala | 15 ++++++++------- .../raster/base/Raster2ArgExpression.scala | 13 ++++++------- .../raster/base/RasterArray1ArgExpression.scala | 12 ++++++------ .../raster/base/RasterArray2ArgExpression.scala | 12 ++++++------ .../raster/base/RasterArrayExpression.scala | 11 +++++------ .../raster/base/RasterExpression.scala | 11 +++++------ .../base/RasterExpressionSerialization.scala | 5 +++-- .../raster/base/RasterGeneratorExpression.scala | 6 +++--- .../RasterTessellateGeneratorExpression.scala | 17 ++++++++--------- .../raster/base/RasterToGridExpression.scala | 4 +--- .../functions/MosaicExpressionConfig.scala | 9 ++++----- .../labs/mosaic/gdal/MosaicGDAL.scala | 2 -- 14 files changed, 55 insertions(+), 71 deletions(-) diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/api/GDAL.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/api/GDAL.scala index d6e40906f..66bde39a3 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/api/GDAL.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/api/GDAL.scala @@ -19,14 +19,6 @@ import java.util.UUID */ object GDAL { - def dropDrivers(): Unit = { - val n = gdal.GetDriverCount() - for (i <- 0 until n) { - val driver = gdal.GetDriver(i) - driver.delete() - } - } - /** * Returns the no data value for the given GDAL data type. For non-numeric * data types, it returns 0.0. For numeric data types, it returns the diff --git a/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala b/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala index 91cbf0041..4f51749dc 100644 --- a/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala +++ b/src/main/scala/com/databricks/labs/mosaic/core/raster/gdal/MosaicRasterGDAL.scala @@ -510,7 +510,6 @@ object MosaicRasterGDAL extends RasterReader { case Some(driverShortName) => val drivers = new JVector[String]() drivers.add(driverShortName) - gdal.GetDriverByName(driverShortName).Register() gdal.OpenEx(path, GA_ReadOnly, drivers) case None => gdal.Open(path, GA_ReadOnly) } diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/Raster1ArgExpression.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/Raster1ArgExpression.scala index 30ce7d2ec..f01027ff1 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/Raster1ArgExpression.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/Raster1ArgExpression.scala @@ -1,6 +1,7 @@ package com.databricks.labs.mosaic.expressions.raster.base import com.databricks.labs.mosaic.core.raster.api.GDAL +import com.databricks.labs.mosaic.core.raster.io.RasterCleaner import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile import com.databricks.labs.mosaic.expressions.base.GenericExpressionFactory import com.databricks.labs.mosaic.functions.MosaicExpressionConfig @@ -74,13 +75,13 @@ abstract class Raster1ArgExpression[T <: Expression: ClassTag]( // noinspection DuplicatedCode override def nullSafeEval(input: Any, arg1: Any): Any = { GDAL.enable(expressionConfig) - val row = input.asInstanceOf[InternalRow] - serialize( - rasterTransform(MosaicRasterTile.deserialize(row, expressionConfig.getCellIdType), arg1), - returnsRaster, - outputType, - expressionConfig - ) + val tile = MosaicRasterTile.deserialize(input.asInstanceOf[InternalRow], expressionConfig.getCellIdType) + val raster = tile.getRaster + val result = rasterTransform(tile, arg1) + val serialized = serialize(result, returnsRaster, outputType, expressionConfig) + RasterCleaner.dispose(raster) + RasterCleaner.dispose(result) + serialized } override def makeCopy(newArgs: Array[AnyRef]): Expression = GenericExpressionFactory.makeCopyImpl[T](this, newArgs, 2, expressionConfig) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/Raster2ArgExpression.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/Raster2ArgExpression.scala index 02f2707f7..ccdc7d5b3 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/Raster2ArgExpression.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/Raster2ArgExpression.scala @@ -83,13 +83,12 @@ abstract class Raster2ArgExpression[T <: Expression: ClassTag]( // noinspection DuplicatedCode override def nullSafeEval(input: Any, arg1: Any, arg2: Any): Any = { GDAL.enable(expressionConfig) - val row = input.asInstanceOf[InternalRow] - serialize( - rasterTransform(MosaicRasterTile.deserialize(row, expressionConfig.getCellIdType), arg1, arg2), - returnsRaster, - outputType, - expressionConfig - ) + val tile = MosaicRasterTile.deserialize(input.asInstanceOf[InternalRow], expressionConfig.getCellIdType) + val result = rasterTransform(tile, arg1, arg2) + val serialized = serialize(result, returnsRaster, outputType, expressionConfig) + // passed by name makes things re-evaluated + RasterCleaner.dispose(tile) + serialized } override def makeCopy(newArgs: Array[AnyRef]): Expression = GenericExpressionFactory.makeCopyImpl[T](this, newArgs, 3, expressionConfig) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterArray1ArgExpression.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterArray1ArgExpression.scala index 1113367c0..d21f96c2d 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterArray1ArgExpression.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterArray1ArgExpression.scala @@ -1,6 +1,7 @@ package com.databricks.labs.mosaic.expressions.raster.base import com.databricks.labs.mosaic.core.raster.api.GDAL +import com.databricks.labs.mosaic.core.raster.io.RasterCleaner import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile import com.databricks.labs.mosaic.expressions.base.GenericExpressionFactory import com.databricks.labs.mosaic.functions.MosaicExpressionConfig @@ -69,12 +70,11 @@ abstract class RasterArray1ArgExpression[T <: Expression: ClassTag]( */ override def nullSafeEval(input: Any, arg1: Any): Any = { GDAL.enable(expressionConfig) - serialize( - rasterTransform(RasterArrayUtils.getTiles(input, rastersExpr, expressionConfig), arg1), - returnsRaster, - dataType, - expressionConfig - ) + val tiles = RasterArrayUtils.getTiles(input, rastersExpr, expressionConfig) + val result = rasterTransform(tiles, arg1) + val serialized = serialize(result, returnsRaster, dataType, expressionConfig) + tiles.foreach(t => RasterCleaner.dispose(t)) + serialized } override def makeCopy(newArgs: Array[AnyRef]): Expression = GenericExpressionFactory.makeCopyImpl[T](this, newArgs, 2, expressionConfig) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterArray2ArgExpression.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterArray2ArgExpression.scala index f73899d1c..a26082f2d 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterArray2ArgExpression.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterArray2ArgExpression.scala @@ -1,6 +1,7 @@ package com.databricks.labs.mosaic.expressions.raster.base import com.databricks.labs.mosaic.core.raster.api.GDAL +import com.databricks.labs.mosaic.core.raster.io.RasterCleaner import com.databricks.labs.mosaic.core.types.model.MosaicRasterTile import com.databricks.labs.mosaic.expressions.base.GenericExpressionFactory import com.databricks.labs.mosaic.functions.MosaicExpressionConfig @@ -74,12 +75,11 @@ abstract class RasterArray2ArgExpression[T <: Expression: ClassTag]( */ override def nullSafeEval(input: Any, arg1: Any, arg2: Any): Any = { GDAL.enable(expressionConfig) - serialize( - rasterTransform(RasterArrayUtils.getTiles(input, rastersExpr, expressionConfig), arg1, arg2), - returnsRaster, - dataType, - expressionConfig - ) + val tiles = RasterArrayUtils.getTiles(input, rastersExpr, expressionConfig) + val result = rasterTransform(tiles, arg1, arg2) + val serialized = serialize(result, returnsRaster, dataType, expressionConfig) + tiles.foreach(t => RasterCleaner.dispose(t)) + serialized } override def makeCopy(newArgs: Array[AnyRef]): Expression = GenericExpressionFactory.makeCopyImpl[T](this, newArgs, 3, expressionConfig) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterArrayExpression.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterArrayExpression.scala index 9d12da2b6..b8ad9fc12 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterArrayExpression.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterArrayExpression.scala @@ -65,12 +65,11 @@ abstract class RasterArrayExpression[T <: Expression: ClassTag]( */ override def nullSafeEval(input: Any): Any = { GDAL.enable(expressionConfig) - serialize( - rasterTransform(RasterArrayUtils.getTiles(input, rastersExpr, expressionConfig)), - returnsRaster, - dataType, - expressionConfig - ) + val tiles = RasterArrayUtils.getTiles(input, rastersExpr, expressionConfig) + val result = rasterTransform(tiles) + val serialized = serialize(result, returnsRaster, dataType, expressionConfig) + tiles.foreach(t => RasterCleaner.dispose(t)) + serialized } override def makeCopy(newArgs: Array[AnyRef]): Expression = GenericExpressionFactory.makeCopyImpl[T](this, newArgs, 1, expressionConfig) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterExpression.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterExpression.scala index 307e3b1df..462d3204b 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterExpression.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterExpression.scala @@ -69,12 +69,11 @@ abstract class RasterExpression[T <: Expression: ClassTag]( */ override def nullSafeEval(input: Any): Any = { GDAL.enable(expressionConfig) - serialize( - rasterTransform(MosaicRasterTile.deserialize(input.asInstanceOf[InternalRow], cellIdDataType)), - returnsRaster, - dataType, - expressionConfig - ) + val tile = MosaicRasterTile.deserialize(input.asInstanceOf[InternalRow], cellIdDataType) + val result = rasterTransform(tile) + val serialized = serialize(result, returnsRaster, dataType, expressionConfig) + RasterCleaner.dispose(tile) + serialized } override def makeCopy(newArgs: Array[AnyRef]): Expression = GenericExpressionFactory.makeCopyImpl[T](this, newArgs, 1, expressionConfig) diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterExpressionSerialization.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterExpressionSerialization.scala index a0af1e043..a9bf17917 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterExpressionSerialization.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterExpressionSerialization.scala @@ -34,12 +34,13 @@ trait RasterExpressionSerialization { expressionConfig: MosaicExpressionConfig ): Any = { if (returnsRaster) { + val tile = data.asInstanceOf[MosaicRasterTile] val checkpoint = expressionConfig.getRasterCheckpoint val rasterType = outputDataType.asInstanceOf[StructType].fields(1).dataType - val result = data - .asInstanceOf[MosaicRasterTile] + val result = tile .formatCellId(IndexSystemFactory.getIndexSystem(expressionConfig.getIndexSystem)) .serialize(rasterType, checkpoint) + RasterCleaner.dispose(tile) result } else { data diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterGeneratorExpression.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterGeneratorExpression.scala index 548e9e6fd..29c714788 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterGeneratorExpression.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterGeneratorExpression.scala @@ -72,13 +72,13 @@ abstract class RasterGeneratorExpression[T <: Expression: ClassTag]( override def eval(input: InternalRow): TraversableOnce[InternalRow] = { GDAL.enable(expressionConfig) - val generatedRasters = - rasterGenerator(MosaicRasterTile.deserialize(rasterExpr.eval(input).asInstanceOf[InternalRow], cellIdDataType)) + val tile = MosaicRasterTile.deserialize(rasterExpr.eval(input).asInstanceOf[InternalRow], cellIdDataType) + val generatedRasters = rasterGenerator(tile) // Writing rasters disposes of the written raster val rows = generatedRasters.map(_.formatCellId(indexSystem).serialize()) generatedRasters.foreach(gr => RasterCleaner.dispose(gr)) - GDAL.dropDrivers() + RasterCleaner.dispose(tile) rows.map(row => InternalRow.fromSeq(Seq(row))) } diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterTessellateGeneratorExpression.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterTessellateGeneratorExpression.scala index 8f094b354..f2545942b 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterTessellateGeneratorExpression.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterTessellateGeneratorExpression.scala @@ -71,23 +71,22 @@ abstract class RasterTessellateGeneratorExpression[T <: Expression: ClassTag]( override def eval(input: InternalRow): TraversableOnce[InternalRow] = { GDAL.enable(expressionConfig) + val tile = MosaicRasterTile + .deserialize( + rasterExpr.eval(input).asInstanceOf[InternalRow], + indexSystem.getCellIdDataType + ) val inResolution: Int = indexSystem.getResolution(resolutionExpr.eval(input)) - val generatedChips = rasterGenerator( - MosaicRasterTile - .deserialize( - rasterExpr.eval(input).asInstanceOf[InternalRow], - indexSystem.getCellIdDataType - ), - inResolution - ) + val generatedChips = rasterGenerator(tile, inResolution) .map(chip => chip.formatCellId(indexSystem)) val rows = generatedChips .map(chip => InternalRow.fromSeq(Seq(chip.formatCellId(indexSystem).serialize()))) + RasterCleaner.dispose(tile) generatedChips.foreach(chip => RasterCleaner.dispose(chip)) generatedChips.foreach(chip => RasterCleaner.dispose(chip.getRaster)) - GDAL.dropDrivers() + rows.iterator } diff --git a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterToGridExpression.scala b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterToGridExpression.scala index 7ea6377d0..743f9cbd6 100644 --- a/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterToGridExpression.scala +++ b/src/main/scala/com/databricks/labs/mosaic/expressions/raster/base/RasterToGridExpression.scala @@ -62,9 +62,7 @@ abstract class RasterToGridExpression[T <: Expression: ClassTag, P]( val transformed = griddedPixels(tile.getRaster, indexSystem, resolution) val results = transformed.map(_.mapValues(valuesCombiner)) RasterCleaner.dispose(tile) - val res = serialize(results) - GDAL.dropDrivers() - res + serialize(results) } /** diff --git a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicExpressionConfig.scala b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicExpressionConfig.scala index ac24e3937..f306d4e9c 100644 --- a/src/main/scala/com/databricks/labs/mosaic/functions/MosaicExpressionConfig.scala +++ b/src/main/scala/com/databricks/labs/mosaic/functions/MosaicExpressionConfig.scala @@ -2,9 +2,8 @@ package com.databricks.labs.mosaic.functions import com.databricks.labs.mosaic._ import com.databricks.labs.mosaic.core.index.IndexSystemFactory -import org.apache.spark.SparkConf -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.{RuntimeConfig, SparkSession} /** * Mosaic Expression Config is a class that contains the configuration for the @@ -35,8 +34,8 @@ case class MosaicExpressionConfig(configs: Map[String, String]) { def getCellIdType: DataType = IndexSystemFactory.getIndexSystem(getIndexSystem).cellIdType - def setGDALConf(conf: SparkConf): MosaicExpressionConfig = { - val toAdd = conf.getAllWithPrefix(MOSAIC_GDAL_PREFIX) + def setGDALConf(conf: RuntimeConfig): MosaicExpressionConfig = { + val toAdd = conf.getAll.filter(_._1.startsWith(MOSAIC_GDAL_PREFIX)) MosaicExpressionConfig(configs ++ toAdd) } @@ -74,7 +73,7 @@ object MosaicExpressionConfig { .setGeometryAPI(spark.conf.get(MOSAIC_GEOMETRY_API, JTS.name)) .setIndexSystem(spark.conf.get(MOSAIC_INDEX_SYSTEM, H3.name)) .setRasterCheckpoint(spark.conf.get(MOSAIC_RASTER_CHECKPOINT, MOSAIC_RASTER_CHECKPOINT_DEFAULT)) - .setGDALConf(spark.sparkContext.getConf) + .setGDALConf(spark.conf) } diff --git a/src/main/scala/com/databricks/labs/mosaic/gdal/MosaicGDAL.scala b/src/main/scala/com/databricks/labs/mosaic/gdal/MosaicGDAL.scala index a3834e11a..f0b225f63 100644 --- a/src/main/scala/com/databricks/labs/mosaic/gdal/MosaicGDAL.scala +++ b/src/main/scala/com/databricks/labs/mosaic/gdal/MosaicGDAL.scala @@ -51,9 +51,7 @@ object MosaicGDAL extends Logging { gdal.SetConfigOption("GDAL_PAM_PROXY_DIR", GDAL_PAM_PROXY_DIR) gdal.SetConfigOption("GDAL_PAM_ENABLED", "NO") gdal.SetConfigOption("CPL_VSIL_USE_TEMP_FILE_FOR_RANDOM_WRITE", "NO") - gdal.SetConfigOption("GDAL_CACHEMAX", "64") gdal.SetConfigOption("CPL_LOG", s"$CPL_TMPDIR/gdal.log") - gdal.SetConfigOption("CPL_DEBUG", "ON") mosaicConfig.getGDALConf.foreach { case (k, v) => gdal.SetConfigOption(k.split("\\.").last, v) } } From 0fa9c4995d857f3eba39b12bc3c01baa5d1af52e Mon Sep 17 00:00:00 2001 From: "milos.colic" Date: Thu, 14 Dec 2023 16:42:17 +0000 Subject: [PATCH 2/2] Update CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 672204ffd..eff0a47fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ ## v0.3.14 - Fixes for Warning and Error messages on mosaic_enable call. +- Performance improvements for raster functions. +- Fix support for GDAL configuration via spark config (use 'spark.databricks.labs.mosaic.gdal.' prefix). ## v0.3.13 - R bindings generation fixed and improved.