Skip to content

Commit

Permalink
Merge branch 'databrickslabs:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
a0x8o authored Mar 19, 2024
2 parents df3bda4 + a64da84 commit 3ad89bc
Show file tree
Hide file tree
Showing 18 changed files with 638 additions and 19 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
## v0.4.1 [DBR 13.3 LTS]
- Fixed python bindings for MosaicAnalyzer functions.
- Added tiller functions, ST_AsGeoJSONTile and ST_AsMVTTile, for creating GeoJSON and MVT tiles as aggregations of geometries.
- Added filter and convolve functions for raster data.
- Raster tile schema changed to be <tile:struct<index_id:bigint, tile:binary, metadata:map<string, string>>.
- Raster tile metadata will contain driver, parentPath and path.
- Raster tile metadata will contain warnings and errors in case of failures.
- All raster functions ensure rasters are TILED and not STRIPED when appropriate.
- GDAL cache memory has been decreased to 512MB to reduce memory usage and competition with Spark.
- Add RST_MakeTiles that allows for different raster creations.
- Rasters can now be passed as file pointers using checkpoint location.
- Added logic to handle zarr format for raster data.
- Added RST_SeparateBands to separate bands from a raster for NetCDF and Zarr formats.

## v0.4.0 [DBR 13.3 LTS]
- First release for DBR 13.3 LTS which is Ubuntu Jammy and Spark 3.4.1. Not backwards compatible, meaning it will not run on prior DBRs; requires either a Photon DBR or a ML Runtime (__Standard, non-Photon DBR no longer allowed__).
- New `setup_fuse_install` function to meet various requirements arising with Unity Catalog + Shared Access clusters; removed the scala equivalent function, making artifact setup and install python-first for scala and Spark SQL.
Expand Down
2 changes: 1 addition & 1 deletion R/sparkR-mosaic/sparkrMosaic/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: sparkrMosaic
Title: SparkR bindings for Databricks Mosaic
Version: 0.4.0
Version: 0.4.1
Authors@R:
person("Robert", "Whiffin", , "[email protected]", role = c("aut", "cre")
)
Expand Down
2 changes: 1 addition & 1 deletion R/sparklyr-mosaic/sparklyrMosaic/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: sparklyrMosaic
Title: sparklyr bindings for Databricks Mosaic
Version: 0.4.0
Version: 0.4.1
Authors@R:
person("Robert", "Whiffin", , "[email protected]", role = c("aut", "cre")
)
Expand Down
2 changes: 1 addition & 1 deletion R/sparklyr-mosaic/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ library(sparklyr.nested)
spark_home <- Sys.getenv("SPARK_HOME")
spark_home_set(spark_home)

install.packages("sparklyrMosaic_0.4.0.tar.gz", repos = NULL)
install.packages("sparklyrMosaic_0.4.1.tar.gz", repos = NULL)
library(sparklyrMosaic)

# find the mosaic jar in staging
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
author = 'Stuart Lynn, Milos Colic, Erni Durdevic, Robert Whiffin, Timo Roest'

# The full version, including alpha/beta/rc tags
release = "v0.4.0"
release = "v0.4.1"


# -- General configuration ---------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<version>3.7.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
Expand Down Expand Up @@ -278,7 +278,7 @@
<scala.version>2.12.10</scala.version>
<scala.compat.version>2.12</scala.compat.version>
<spark.version>3.4.0</spark.version>
<mosaic.version>0.4.0</mosaic.version>
<mosaic.version>0.4.1</mosaic.version>
</properties>
</profile>
</profiles>
Expand Down
2 changes: 1 addition & 1 deletion python/mosaic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
from .models import SpatialKNN
from .readers import read

__version__ = "0.4.0"
__version__ = "0.4.1"
51 changes: 51 additions & 0 deletions python/mosaic/api/aggregators.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#######################

__all__ = [
"st_asgeojsontile_agg",
"st_asmvttile_agg",
"st_union_agg",
"grid_cell_union_agg",
"grid_cell_intersection_agg",
Expand Down Expand Up @@ -45,6 +47,55 @@ def st_intersection_agg(leftIndex: ColumnOrName, rightIndex: ColumnOrName) -> Co
)


def st_asgeojsontile_agg(geom: ColumnOrName, attributes: ColumnOrName) -> Column:
"""
Returns the aggregated GeoJSON tile.
Parameters
----------
geom : Column
The geometry column to aggregate.
attributes : Column
The attributes column to aggregate.
Returns
-------
Column
The aggregated GeoJSON tile.
"""
return config.mosaic_context.invoke_function(
"st_asgeojsontile_agg",
pyspark_to_java_column(geom),
pyspark_to_java_column(attributes)
)


def st_asmvttile_agg(geom: ColumnOrName, attributes: ColumnOrName, zxyID: ColumnOrName) -> Column:
"""
Returns the aggregated MVT tile.
Parameters
----------
geom : Column
The geometry column to aggregate.
attributes : Column
The attributes column to aggregate.
zxyID : Column
The zxyID column to aggregate.
Returns
-------
Column
The aggregated MVT tile.
"""
return config.mosaic_context.invoke_function(
"st_asmvttile_agg",
pyspark_to_java_column(geom),
pyspark_to_java_column(attributes),
pyspark_to_java_column(zxyID)
)


def st_intersects_agg(leftIndex: ColumnOrName, rightIndex: ColumnOrName) -> Column:
"""
Tests if any `leftIndex` : `rightIndex` pairs intersect.
Expand Down
1 change: 1 addition & 0 deletions python/mosaic/models/analyzer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .analyzer import MosaicAnalyzer
38 changes: 38 additions & 0 deletions python/mosaic/models/analyzer/analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from pyspark.sql import SparkSession, DataFrame, SQLContext
from typing import *


class MosaicAnalyzer:
"""
MosaicAnalyzer is a class that provides the ability to analyze spatial data
and provide insights into the optimal resolution for the given dataset.
This only works for geometries that have area > 0.
"""

def __init__(self, dataframe: DataFrame):
"""
Initialize the SpatialKNN model.
"""

self.spark = SparkSession.builder.getOrCreate()
self.model = getattr(
self.spark._jvm.com.databricks.labs.mosaic.sql, "MosaicAnalyzer"
)(dataframe._jdf)

def get_optimal_resolution(self, geometry_column: str):
"""
Get the optimal resolution for the given dataset.
"""
return self.model.getOptimalResolution(geometry_column)

def get_optimal_resolution(self, geometry_column: str, nrows: int):
"""
Get the optimal resolution for the given dataset.
"""
return self.model.getOptimalResolution(geometry_column, nrows)

def get_optimal_resolution(self, geometry_column: str, sample: float):
"""
Get the optimal resolution for the given dataset.
"""
return self.model.getOptimalResolution(geometry_column, sample)
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.databricks.labs.mosaic.expressions.geometry

import com.databricks.labs.mosaic.core.geometry.api.GeometryAPI
import com.databricks.labs.mosaic.expressions.geometry.base.AsTileExpression
import com.databricks.labs.mosaic.functions.MosaicExpressionConfig
import com.databricks.labs.mosaic.utils.PathUtils
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, TypedImperativeAggregate}
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.trees.BinaryLike
import org.apache.spark.sql.catalyst.util.GenericArrayData
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.gdal.ogr._

import scala.collection.mutable

case class ST_AsGeojsonTileAgg(
geometryExpr: Expression,
attributesExpr: Expression,
expressionConfig: MosaicExpressionConfig,
mutableAggBufferOffset: Int,
inputAggBufferOffset: Int
) extends TypedImperativeAggregate[mutable.ArrayBuffer[Any]]
with BinaryLike[Expression]
with AsTileExpression {

val geometryAPI: GeometryAPI = GeometryAPI.apply(expressionConfig.getGeometryAPI)
override lazy val deterministic: Boolean = true
override val left: Expression = geometryExpr
override val right: Expression = attributesExpr
override val nullable: Boolean = false
override val dataType: DataType = StringType

override def prettyName: String = "st_asgeojsontile_agg"

private lazy val tupleType =
StructType(
StructField("geom", geometryExpr.dataType, nullable = false) ::
StructField("attrs", attributesExpr.dataType, nullable = false) :: Nil
)
private lazy val projection = UnsafeProjection.create(Array[DataType](ArrayType(elementType = tupleType, containsNull = false)))
private lazy val row = new UnsafeRow(2)

override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty

def update(buffer: mutable.ArrayBuffer[Any], input: InternalRow): mutable.ArrayBuffer[Any] = {
val geom = geometryExpr.eval(input)
val attrs = attributesExpr.eval(input)
val value = InternalRow.fromSeq(Seq(geom, attrs))
buffer += InternalRow.copyValue(value)
buffer
}

def merge(buffer: mutable.ArrayBuffer[Any], input: mutable.ArrayBuffer[Any]): mutable.ArrayBuffer[Any] = {
buffer ++= input
}

override def eval(buffer: mutable.ArrayBuffer[Any]): Any = {
ogr.RegisterAll()
val driver = ogr.GetDriverByName("GeoJSON")
val tmpName = PathUtils.createTmpFilePath("geojson")
val ds: DataSource = driver.CreateDataSource(tmpName)

val srs = getSRS(buffer.head, geometryExpr, geometryAPI)

val layer = createLayer(ds, srs, attributesExpr.dataType.asInstanceOf[StructType])

insertRows(buffer, layer, geometryExpr, geometryAPI, attributesExpr)

ds.FlushCache()
ds.delete()

val source = scala.io.Source.fromFile(tmpName)
val result = source.getLines().collect { case x => x }.mkString("\n")
UTF8String.fromString(result)
}

override def serialize(obj: mutable.ArrayBuffer[Any]): Array[Byte] = {
val array = new GenericArrayData(obj.toArray)
projection.apply(InternalRow.apply(array)).getBytes
}

override def deserialize(bytes: Array[Byte]): mutable.ArrayBuffer[Any] = {
val buffer = createAggregationBuffer()
row.pointTo(bytes, bytes.length)
row.getArray(0).foreach(tupleType, (_, x: Any) => buffer += x)
buffer
}

override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
copy(inputAggBufferOffset = newInputAggBufferOffset)

override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate =
copy(mutableAggBufferOffset = newMutableAggBufferOffset)

override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): ST_AsGeojsonTileAgg =
copy(geometryExpr = newLeft, attributesExpr = newRight)

}

object ST_AsGeojsonTileAgg {

def registryExpressionInfo(db: Option[String]): ExpressionInfo =
new ExpressionInfo(
classOf[ST_AsGeojsonTileAgg].getCanonicalName,
db.orNull,
"st_asgeojsontile_agg",
"""
| _FUNC_(geom, attrs) - Aggregate function that returns a GeoJSON string from a set of geometries and attributes.
""".stripMargin,
"",
"""
| Examples:
| > SELECT _FUNC_(a, b) FROM table GROUP BY tile_id;
| {"type":"FeatureCollection","features":[{"type":"Feature","geometry":{"type":"Point","coordinates":[1.0,1.0]},"properties":{"name":"a"}},{"type":"Feature","geometry":{"type":"Point","coordinates":[2.0,2.0]},"properties":{"name":"b"}}]}
| {"type":"FeatureCollection","features":[{"type":"Feature","geometry":{"type":"Point","coordinates":[3.0,3.0]},"properties":{"name":"c"}},{"type":"Feature","geometry":{"type":"Point","coordinates":[4.0,4.0]},"properties":{"name":"d"}}]}
| """.stripMargin,
"",
"agg_funcs",
"1.0",
"",
"built-in"
)

}
Loading

0 comments on commit 3ad89bc

Please sign in to comment.