-
-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enable reading of pbfs with node location for ways #111
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* The MIT License (MIT) | ||
* | ||
* Copyright (c) 2023 Ángel Cervera Claudio | ||
* | ||
* Permission is hereby granted, free of charge, to any person obtaining a copy | ||
* of this software and associated documentation files (the "Software"), to deal | ||
* in the Software without restriction, including without limitation the rights | ||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
* copies of the Software, and to permit persons to whom the Software is | ||
* furnished to do so, subject to the following conditions: | ||
* | ||
* The above copyright notice and this permission notice shall be included in all | ||
* copies or substantial portions of the Software. | ||
* | ||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
* SOFTWARE. | ||
* | ||
*/ | ||
|
||
package com.acervera.osm4scala.utilities | ||
|
||
import scala.math.BigDecimal.RoundingMode | ||
|
||
/** | ||
* Utility to manage coordinates | ||
*/ | ||
object CoordUtils { | ||
|
||
private val COORDINATE_PRECISION = 7 | ||
|
||
/** | ||
* Calculate coordinate applying offset, granularity and delta. | ||
* The floating precision is 7 with Half Even rounding | ||
* mode | ||
* | ||
* @param offSet | ||
* @param delta | ||
* @param currentValue | ||
* @return | ||
*/ | ||
def decompressCoord(offSet: Long, delta: Long, currentValue: Double, granularity: Int): Double = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe these changes are a good idea to increase precision, but are out of the scope of adding the new fields. BTW, I'm not sure, but I think that the compiler is not going to optimize the code enough so what in fact are constants (like So what do you think about remove these changes from this PR and create a new one to talk about it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i have removed them |
||
BigDecimal.valueOf(offSet + (granularity * delta)) | ||
.*(BigDecimal.valueOf(1E-9)) | ||
.+(BigDecimal.valueOf(currentValue)) | ||
.setScale(COORDINATE_PRECISION, RoundingMode.HALF_EVEN) | ||
.doubleValue(); | ||
} | ||
|
||
/** | ||
* Calculate coordinate applying offset, granularity and delta. | ||
* The floating precision is 7 with Half Even rounding | ||
* | ||
* @param coordValue | ||
* @return | ||
*/ | ||
def convertToMicroDegrees(coordValue: Double): Double = { | ||
BigDecimal.valueOf(coordValue) | ||
.*(1E-7) | ||
.setScale(COORDINATE_PRECISION, RoundingMode.HALF_EVEN) | ||
.doubleValue(); | ||
} | ||
} |
angelcervera marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -122,7 +122,12 @@ class OsmPbfFormat extends FileFormat with DataSourceRegister { | |
|
||
override def inferSchema(sparkSession: SparkSession, | ||
options: Map[String, String], | ||
files: Seq[FileStatus]): Option[StructType] = Some(OsmSqlEntity.schema) | ||
files: Seq[FileStatus]): Option[StructType] = | ||
if (options.getOrElse("wayWithGeometry","false").toLowerCase().equals("true")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would keep only one schema. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just wanted to keep it backward compatible. I can keep only one if you think it's better |
||
Some(OsmSqlEntity.schemaWithGeo) | ||
} else { | ||
Some(OsmSqlEntity.schema) | ||
} | ||
|
||
override def prepareWrite(sparkSession: SparkSession, | ||
job: Job, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,7 +31,7 @@ import com.acervera.osm4scala.spark.OsmSqlEntity._ | |
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData | ||
import org.apache.spark.sql.catalyst.util._ | ||
import org.apache.spark.sql.types.{ArrayType, StructField, StructType} | ||
import org.apache.spark.sql.types.{ArrayType, LongType, StructField, StructType} | ||
import org.apache.spark.unsafe.types.UTF8String | ||
|
||
class OsmPbfRowIterator(osmEntityIterator: Iterator[OSMEntity], requiredSchema: StructType) | ||
|
@@ -88,18 +88,57 @@ object OsmPbfRowIterator { | |
case fieldName => throw new Exception(s"Field $fieldName not valid for Info.") | ||
}) | ||
|
||
private def populateWay(entity: WayEntity, structType: StructType): Seq[Any] = structType.fieldNames.map { | ||
case FIELD_ID => entity.id | ||
case FIELD_TYPE => ENTITY_TYPE_WAY | ||
case FIELD_LATITUDE => null | ||
case FIELD_LONGITUDE => null | ||
case FIELD_NODES => UnsafeArrayData.fromPrimitiveArray(entity.nodes.toArray) | ||
case FIELD_RELATIONS => new GenericArrayData(Seq.empty) | ||
case FIELD_TAGS => calculateTags(entity.tags) | ||
case FIELD_INFO => entity.info.map(populateInfo).orNull | ||
case fieldName => throw new Exception(s"Field $fieldName not valid for a Way.") | ||
private def populateWay(entity: WayEntity, structType: StructType): Seq[Any] = structType.fields.map(f => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I don't understand this change. Maybe I'm missing something. Why do you prefer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's the same as for Relations, the type of the field is needed |
||
f.name match { | ||
case FIELD_ID => entity.id | ||
case FIELD_TYPE => ENTITY_TYPE_WAY | ||
case FIELD_LATITUDE => null | ||
case FIELD_LONGITUDE => null | ||
case FIELD_NODES => calculateWayNodes(entity.nodes, entity.lat, entity.lgn, f) | ||
case FIELD_RELATIONS => new GenericArrayData(Seq.empty) | ||
case FIELD_TAGS => calculateTags(entity.tags) | ||
case FIELD_INFO => entity.info.map(populateInfo).orNull | ||
case fieldName => throw new Exception(s"Field $fieldName not valid for a Way.") | ||
} | ||
) | ||
|
||
private def calculateWayNodes(nodeIds: Seq[Long], lat: Seq[Double], lgn: Seq[Double], structField: StructField) | ||
: ArrayData = { | ||
if (structField.dataType.asInstanceOf[ArrayType].elementType == LongType) { | ||
//way nodes without geometry | ||
UnsafeArrayData.fromPrimitiveArray(nodeIds.toArray) | ||
} else { | ||
//way nodes with geometry | ||
calculateWayNodeWithGeometry(nodeIds, lat, lgn, structField) | ||
} | ||
} | ||
|
||
private def calculateWayNodeWithGeometry(nodeIds: Seq[Long], lat: Seq[Double], lgn: Seq[Double], structField: StructField) = { | ||
val nodes: Seq[(Long, (Double, Double))] = (nodeIds zip (lat, lgn).zipped.toList) | ||
new GenericArrayData( | ||
structField.dataType match { | ||
case ArrayType(elementType, _) => | ||
elementType match { | ||
case s: StructType => nodes.map(r => InternalRow.fromSeq(calculateWayNode(r, s))) | ||
case s => | ||
throw new UnsupportedOperationException( | ||
s"Schema ${s} isn't supported. Only arrays of StructType are allowed for way nodes.") | ||
} | ||
case s => | ||
throw new UnsupportedOperationException( | ||
s"Schema ${s} isn't supported. Only arrays of StructType are allowed for way nodes.") | ||
} | ||
) | ||
} | ||
|
||
private def calculateWayNode(wayNode: (Long, (Double, Double)), structType: StructType): Seq[Any] = | ||
structType.fieldNames.map { | ||
case FIELD_ID => wayNode._1 | ||
case FIELD_LATITUDE => wayNode._2._1 | ||
case FIELD_LONGITUDE => wayNode._2._2 | ||
case fieldName => throw new Exception(s"Field $fieldName not valid for a way node.") | ||
} | ||
|
||
private def populateRelation(entity: RelationEntity, structType: StructType): Seq[Any] = | ||
structType.fields.map(f => | ||
f.name match { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would keep this as before, but adding the new two nullable fields. My opinion in the other comment about the schema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about adding default values as Seq.empty to simplify migration to the new version? That will make other users happier than if they need to deal with compilation errors. ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added