diff --git a/docs/user/postgis/commandline.rst b/docs/user/postgis/commandline.rst index 745d8ddf6a75..81670061b5aa 100644 --- a/docs/user/postgis/commandline.rst +++ b/docs/user/postgis/commandline.rst @@ -4,7 +4,6 @@ Partitioned PostGIS Command-Line Tools The partitioned PostGIS data store is bundled with the ``geomesa-gt`` command-line tools. See :ref:`gt_tools` for additional details. - Commands -------- @@ -21,3 +20,23 @@ Argument Description ======================== ========================================================= ``-f, --feature-name *`` The name of the schema ======================== ========================================================= + +.. _postgis_cli_update_schema: + +``update-schema`` +----------------- + +Alter an existing ``SimpleFeatureType``. For PostGIS, this command can only be used to modify configuration +values. See :ref:`postgis_index_config` for available configuration values. + +This command will also re-write the partition procedures as necessary to apply the configuration changes. + +======================== ============================================================== +Argument Description +======================== ============================================================== +``-f, --feature-name *`` The name of the schema to operate on +``--add-user-data`` Add or update an entry in the feature type user data +======================== ============================================================== + +The ``--add-user-data`` parameter can be used to add or update any user data key. See :ref:`postgis_index_config` for +some examples of configurable values. Entries can be specified as ``:``. diff --git a/docs/user/postgis/index_config.rst b/docs/user/postgis/index_config.rst index b97f8ccde826..4fe6de747512 100644 --- a/docs/user/postgis/index_config.rst +++ b/docs/user/postgis/index_config.rst @@ -1,3 +1,5 @@ +.. _postgis_index_config: + Partitioned PostGIS Index Configuration ======================================= @@ -5,18 +7,28 @@ GeoMesa exposes a variety of configuration options that can be used to customize See :ref:`set_sft_options` for details on setting configuration parameters. Note that most of the general options for GeoMesa stores are not supported by the partitioned PostGIS store, except as specified below. +.. note:: + + Most configurations can be updated after a schema has been created. See below for details + specific to each configuration. + Configuring the Default Date Attribute -------------------------------------- The default date attribute is the attribute that will be used for sorting data into partitions. See :ref:`set_date_attribute` for details on how to specify it. +The default date cannot be changed after the schema has been created. + Configuring Indices ------------------- Attributes in the feature type may be marked for indexing, which will create a B-tree index on the associated table column. See :ref:`attribute_indices` for details on how to specify indices. +After the schema has been created, additional indices can be added through ``CREATE INDEX`` statements on the +parent partition tables. See :ref:`pg_partition_table_design` for a description of the partition tables. + Configuring Partition Size -------------------------- @@ -34,6 +46,11 @@ Partition size is configured with the key ``pg.partitions.interval.hours``. SimpleFeatureType sft = ....; sft.getUserData().put("pg.partitions.interval.hours", "12"); +After the schema has been created, changes to the partition size can be made through the +:ref:`postgis_cli_update_schema` command. Changes will not be applied to any existing partitions. If the partition +size is **increased**, any recent partitions that would overlap with the new partition size will need to be +manually dropped and the data re-inserted in the write-ahead table in order to prevent partition range conflict errors. + Configuring Index Resolution ---------------------------- @@ -51,6 +68,8 @@ The number of pages is configured with the key ``pg.partitions.pages-per-range`` SimpleFeatureType sft = ....; sft.getUserData().put("pg.partitions.pages-per-range", "64"); +The index resolution cannot be changed after the schema has been created. + Configuring Data Age-Off ------------------------ @@ -68,6 +87,10 @@ Age-off is configured with the key ``pg.partitions.max``. SimpleFeatureType sft = ....; sft.getUserData().put("pg.partitions.max", "14"); +After the schema has been created, changes to the age-off can be made through the +:ref:`postgis_cli_update_schema` command, or by directly updating the ``geomesa_userdata`` table in Postgres. +Changes will take effect within the next 10 minutes. + .. _postgis_filter_world: Configuring Filter Optimizations @@ -86,6 +109,10 @@ which will ignore whole world filters. // enable filtering on "whole world" queries sft.getUserData().put("pg.partitions.filter.world", "true"); +After the schema has been created, changes to the filter optimization can be made through the +:ref:`postgis_cli_update_schema` command, or by directly updating the ``geomesa_userdata`` table in Postgres. +Clients must be restarted in order to pick up the change. + Configuring Tablespaces ----------------------- @@ -104,8 +131,9 @@ and ``pg.partitions.tablespace.main``. See :ref:`pg_partition_table_design` for SimpleFeatureType sft = ....; sft.getUserData().put("pg.partitions.tablespace.wa", "fasttablespace"); -Once the schema has been created, the tablespaces are stored in the ``partition_tablespaces`` table. This table -can be modified manually to change the location used for new partitions. +After the schema has been created, changes to the configured tablespaces can be made through the +:ref:`postgis_cli_update_schema` command, or by directly updating the ``partition_tablespaces`` table in Postgres. +Changes will not be applied to any existing partitions. Configuring the Maintenance Schedule ------------------------------------ @@ -125,3 +153,6 @@ for each query, moving data out of it faster may improve performance. SimpleFeatureType sft = ....; sft.getUserData().put("pg.partitions.cron.minute", "0"); + +After the schema has been created, changes to the schedule can be made through the +:ref:`postgis_cli_update_schema` command. diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala index da9594b380fc..f97e055c58c1 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/PartitionedPostgisDialect.scala @@ -165,12 +165,15 @@ class PartitionedPostgisDialect(store: JDBCDataStore) extends PostGISDialect(sto metadata: DatabaseMetaData, schemaName: String, cx: Connection): Unit = { + + import PartitionedPostgisDialect.Config._ + // normally views get set to read-only, override that here since we use triggers to delegate writes sft.getUserData.remove(JDBCDataStore.JDBC_READ_ONLY) // populate user data - val sql = s"select key, value from ${escape(schemaName)}.${UserDataTable.Name.quoted} where type_name = ?" - WithClose(cx.prepareStatement(sql)) { statement => + val userDataSql = s"select key, value from ${escape(schemaName)}.${UserDataTable.Name.quoted} where type_name = ?" + WithClose(cx.prepareStatement(userDataSql)) { statement => statement.setString(1, sft.getTypeName) WithClose(statement.executeQuery()) { rs => while (rs.next()) { @@ -178,6 +181,27 @@ class PartitionedPostgisDialect(store: JDBCDataStore) extends PostGISDialect(sto } } } + + // populate tablespaces + val tablespaceSql = + s"select table_space, table_type from " + + s"${escape(schemaName)}.${PartitionTablespacesTable.Name.quoted} where type_name = ?" + WithClose(cx.prepareStatement(tablespaceSql)) { statement => + statement.setString(1, sft.getTypeName) + WithClose(statement.executeQuery()) { rs => + while (rs.next()) { + val ts = rs.getString(1) + if (ts != null && ts.nonEmpty) { + rs.getString(2) match { + case WriteAheadTableSuffix.raw => sft.getUserData.put(WriteAheadTableSpace, ts) + case PartitionedWriteAheadTableSuffix.raw => sft.getUserData.put(WriteAheadPartitionsTableSpace, ts) + case PartitionedTableSuffix.raw => sft.getUserData.put(MainTableSpace, ts) + case s => logger.warn(s"Ignoring unexpected tablespace table: $s") + } + } + } + } + } } override def preDropTable(schemaName: String, sft: SimpleFeatureType, cx: Connection): Unit = { @@ -359,14 +383,23 @@ object PartitionedPostgisDialect { object Config extends Conversions { - val IntervalHours = "pg.partitions.interval.hours" - val PagesPerRange = "pg.partitions.pages-per-range" - val MaxPartitions = "pg.partitions.max" - val WriteAheadTableSpace = "pg.partitions.tablespace.wa" + // size of each partition - can be updated after schema is created, but requires + // running PartitionedPostgisDialect.upgrade in order to be applied + val IntervalHours = "pg.partitions.interval.hours" + // pages_per_range on the BRIN index - can't be updated after schema is created + val PagesPerRange = "pg.partitions.pages-per-range" + // max partitions to keep, i.e. age-off - can be updated freely after schema is created + val MaxPartitions = "pg.partitions.max" + // minute of each 10 minute block to execute the partition jobs - can be updated after schema is created, + // but requires running PartitionedPostgisDialect.upgrade in order to be applied + val CronMinute = "pg.partitions.cron.minute" + // remove 'whole world' filters - can be updated freely after schema is created + val FilterWholeWorld = "pg.partitions.filter.world" + + // tablespace configurations - can be updated freely after the schema is created + val WriteAheadTableSpace = "pg.partitions.tablespace.wa" val WriteAheadPartitionsTableSpace = "pg.partitions.tablespace.wa-partitions" - val MainTableSpace = "pg.partitions.tablespace.main" - val CronMinute = "pg.partitions.cron.minute" - val FilterWholeWorld = "pg.partitions.filter.world" + val MainTableSpace = "pg.partitions.tablespace.main" implicit class ConfigConversions(val sft: SimpleFeatureType) extends AnyVal { def getIntervalHours: Int = Option(sft.getUserData.get(IntervalHours)).map(int).getOrElse(6) diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/package.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/package.scala index 1b085b9d83c1..985870593bee 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/package.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/package.scala @@ -620,9 +620,8 @@ package object dialect { val create = s"""DO $$$$ |BEGIN - | IF NOT EXISTS (SELECT FROM cron.job WHERE jobname = $jName) THEN - | PERFORM cron.schedule($jName, ${schedule(info).quoted}, ${invocation(info).quoted}); - | END IF; + |${unscheduleSql(jName)} + | PERFORM cron.schedule($jName, ${schedule(info).quoted}, ${invocation(info).quoted}); |END$$$$;""".stripMargin Seq(create) } @@ -632,12 +631,15 @@ package object dialect { val drop = s"""DO $$$$ |BEGIN - | IF EXISTS (SELECT FROM cron.job WHERE jobname = $jName) THEN - | PERFORM cron.unschedule($jName); - | END IF; + |${unscheduleSql(jName)} |END$$$$;""".stripMargin Seq(drop) ++ super.dropStatements(info) } + + protected def unscheduleSql(quotedName: String): String = + s""" IF EXISTS (SELECT FROM cron.job WHERE jobname = $quotedName) THEN + | PERFORM cron.unschedule($quotedName); + | END IF;""".stripMargin } /** diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/PartitionTablespacesTable.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/PartitionTablespacesTable.scala index 1a65e720fed6..5d4ec23eff27 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/PartitionTablespacesTable.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/PartitionTablespacesTable.scala @@ -21,17 +21,27 @@ class PartitionTablespacesTable extends Sql { val Name: TableName = TableName("partition_tablespaces") override def create(info: TypeInfo)(implicit ex: ExecutionContext): Unit = { - val table = s"${info.schema.quoted}.${Name.quoted}" + val table = TableIdentifier(info.schema.raw, Name.raw) + val cName = TableName(Name.raw + "_pkey") val create = - s"""CREATE TABLE IF NOT EXISTS $table ( + s"""CREATE TABLE IF NOT EXISTS ${table.quoted} ( | type_name text not null, | table_type text not null, | table_space text |);""".stripMargin - ex.execute(create) + val constraint = + s"""DO $$$$ + |BEGIN + | IF NOT EXISTS (SELECT FROM pg_constraint WHERE conname = ${cName.asLiteral} AND conrelid = ${table.asRegclass}) THEN + | ALTER TABLE ${table.quoted} ADD CONSTRAINT ${cName.quoted} PRIMARY KEY (type_name, table_type); + | END IF; + |END$$$$;""".stripMargin + + Seq(create, constraint).foreach(ex.execute) val insertSql = - s"INSERT INTO $table (type_name, table_type, table_space) VALUES (?, ?, ?) ON CONFLICT DO NOTHING;" + s"INSERT INTO ${table.quoted} (type_name, table_type, table_space) VALUES (?, ?, ?) " + + "ON CONFLICT (type_name, table_type) DO UPDATE SET table_space = EXCLUDED.table_space;" def insert(suffix: String, table: TableConfig): Unit = ex.executeUpdate(insertSql, Seq(info.typeName, suffix, table.tablespace.map(_.raw).orNull)) diff --git a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/UserDataTable.scala b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/UserDataTable.scala index 366eded8387f..4d780ea5e136 100644 --- a/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/UserDataTable.scala +++ b/geomesa-gt/geomesa-gt-partitioning/src/main/scala/org/locationtech/geomesa/gt/partition/postgis/dialect/tables/UserDataTable.scala @@ -54,9 +54,6 @@ class UserDataTable extends Sql { insert(Config.IntervalHours, Some(Integer.toString(info.partitions.hoursPerPartition))) insert(Config.PagesPerRange, Some(Integer.toString(info.partitions.pagesPerRange))) insert(Config.MaxPartitions, info.partitions.maxPartitions.map(Integer.toString)) - insert(Config.WriteAheadTableSpace, info.tables.writeAhead.tablespace.map(_.raw)) - insert(Config.WriteAheadPartitionsTableSpace, info.tables.writeAheadPartitions.tablespace.map(_.raw)) - insert(Config.MainTableSpace, info.tables.mainPartitions.tablespace.map(_.raw)) insert(Config.CronMinute, info.partitions.cronMinute.map(Integer.toString)) insert(Config.FilterWholeWorld, info.userData.get(Config.FilterWholeWorld)) } diff --git a/geomesa-gt/geomesa-gt-tools/pom.xml b/geomesa-gt/geomesa-gt-tools/pom.xml index 558d5061c7f3..0750f7c88ef6 100644 --- a/geomesa-gt/geomesa-gt-tools/pom.xml +++ b/geomesa-gt/geomesa-gt-tools/pom.xml @@ -92,6 +92,10 @@ org.specs2 specs2-junit_${scala.binary.version} + + org.testcontainers + testcontainers + diff --git a/geomesa-gt/geomesa-gt-tools/src/main/scala/org/locationtech/geomesa/geotools/tools/data/GeoToolsUpdateSchemaCommand.scala b/geomesa-gt/geomesa-gt-tools/src/main/scala/org/locationtech/geomesa/geotools/tools/data/GeoToolsUpdateSchemaCommand.scala index 139f4eddad02..9508ea1d0a6c 100644 --- a/geomesa-gt/geomesa-gt-tools/src/main/scala/org/locationtech/geomesa/geotools/tools/data/GeoToolsUpdateSchemaCommand.scala +++ b/geomesa-gt/geomesa-gt-tools/src/main/scala/org/locationtech/geomesa/geotools/tools/data/GeoToolsUpdateSchemaCommand.scala @@ -8,19 +8,115 @@ package org.locationtech.geomesa.geotools.tools.data -import com.beust.jcommander.Parameters -import org.geotools.data.DataStore +import com.beust.jcommander.{Parameter, ParameterException, Parameters} +import org.geotools.data.{DataStore, DefaultTransaction} +import org.geotools.feature.simple.SimpleFeatureTypeBuilder +import org.geotools.jdbc.JDBCDataStore import org.locationtech.geomesa.geotools.tools.GeoToolsDataStoreCommand import org.locationtech.geomesa.geotools.tools.GeoToolsDataStoreCommand.GeoToolsDataStoreParams import org.locationtech.geomesa.geotools.tools.data.GeoToolsUpdateSchemaCommand.GeoToolsUpdateSchemaParams -import org.locationtech.geomesa.tools.data.UpdateSchemaCommand -import org.locationtech.geomesa.tools.data.UpdateSchemaCommand.UpdateSchemaParams +import org.locationtech.geomesa.gt.partition.postgis.dialect.{PartitionedPostgisDialect, PartitionedPostgisPsDialect} +import org.locationtech.geomesa.tools.utils.{NoopParameterSplitter, Prompt} +import org.locationtech.geomesa.tools.{Command, DataStoreCommand, OptionalForceParam, RequiredTypeNameParam} +import org.locationtech.geomesa.utils.io.WithClose -class GeoToolsUpdateSchemaCommand extends UpdateSchemaCommand[DataStore] with GeoToolsDataStoreCommand { +import java.io.IOException +import java.util.Collections + +class GeoToolsUpdateSchemaCommand extends DataStoreCommand[DataStore] with GeoToolsDataStoreCommand { + + import scala.collection.JavaConverters._ + + override val name = "update-schema" override val params = new GeoToolsUpdateSchemaParams() + + override def execute(): Unit = withDataStore(update) + + protected def update(ds: DataStore): Unit = { + + // ensure we have an operation + params.validate().foreach(e => throw e) + + val sft = try { ds.getSchema(params.featureName) } catch { case _: IOException => null } + if (sft == null) { + throw new ParameterException(s"Schema '${params.featureName}' does not exist in the data store") + } + + var n = 0 + // numbering for our prompts + def number: Int = { n += 1; n } + val prompts = new StringBuilder() + + val builder = new SimpleFeatureTypeBuilder() + builder.init(sft) + + val updated = builder.buildFeatureType() + updated.getUserData.putAll(sft.getUserData) + + if (!params.userData.isEmpty) { + params.userData.asScala.foreach { ud => + ud.split(":", 2) match { + case Array(k, v) => + updated.getUserData.put(k, v) match { + case null => prompts.append(s"\n $number: Adding user data: '$k=$v'") + case old => prompts.append(s"\n $number: Updating user data: '$k=$v' (was '$old')") + } + case _ => throw new ParameterException(s"Invalid user data entry - expected 'key:value': $ud") + } + } + } + + Command.user.info(s"Preparing to update schema '${sft.getTypeName}':$prompts") + if (params.force || Prompt.confirm("Continue (y/n)? ")) { + Command.user.info("Updating, please wait...") + ds match { + case jdbc: JDBCDataStore => + // update schema is not implemented for JDBC stores + val partitioning = + Option(jdbc.dialect).collect { + case d: PartitionedPostgisDialect => d + case d: PartitionedPostgisPsDialect => d + } + + val dialect = partitioning.getOrElse { + throw new RuntimeException( + "JDBCDataStore does not support schema updates unless using 'dbtype=postgis-partitioned'") + } + + WithClose(new DefaultTransaction()) { tx => + WithClose(jdbc.getConnection(tx)) { cx => + dialect.postCreateTable(jdbc.getDatabaseSchema, updated, cx) + } + } + + case _ => + try { ds.updateSchema(sft.getTypeName, updated) } catch { + case _: UnsupportedOperationException => + throw new RuntimeException(s"${ds.getClass.getSimpleName} does not support schema updates") + } + } + + Command.user.info("Update complete") + } + } } object GeoToolsUpdateSchemaCommand { + @Parameters(commandDescription = "Update a feature type") - class GeoToolsUpdateSchemaParams extends UpdateSchemaParams with GeoToolsDataStoreParams + class GeoToolsUpdateSchemaParams extends RequiredTypeNameParam with OptionalForceParam with GeoToolsDataStoreParams { + + @Parameter(names = Array("--add-user-data"), + description = "Add a new entry or update an existing entry in the feature type user data, delineated with a colon (:)", + splitter = classOf[NoopParameterSplitter]) + var userData: java.util.List[String] = Collections.emptyList() + + def validate(): Option[ParameterException] = { + if (userData.isEmpty) { + Some(new ParameterException("Please specify an update operation")) + } else { + None + } + } + } } diff --git a/geomesa-gt/geomesa-gt-tools/src/test/resources/testcontainers/Dockerfile b/geomesa-gt/geomesa-gt-tools/src/test/resources/testcontainers/Dockerfile new file mode 100644 index 000000000000..0a70eee6bfd6 --- /dev/null +++ b/geomesa-gt/geomesa-gt-tools/src/test/resources/testcontainers/Dockerfile @@ -0,0 +1,10 @@ +ARG FROM_TAG=15-3.3 +FROM postgis/postgis:${FROM_TAG} + +# install pg_cron +RUN echo | sh /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh && \ + apt-get -y install postgresql-15-cron && \ + echo "\nshared_preload_libraries='pg_cron,pg_stat_statements'\n" >> /usr/share/postgresql/postgresql.conf.sample + +# configure pg_cron +COPY initdb.sh /docker-entrypoint-initdb.d/initdb.sh diff --git a/geomesa-gt/geomesa-gt-tools/src/test/resources/testcontainers/initdb.sh b/geomesa-gt/geomesa-gt-tools/src/test/resources/testcontainers/initdb.sh new file mode 100644 index 000000000000..779a67e0d70d --- /dev/null +++ b/geomesa-gt/geomesa-gt-tools/src/test/resources/testcontainers/initdb.sh @@ -0,0 +1,22 @@ +#!/bin/sh + +set -e + +# Perform all actions as $POSTGRES_USER +export PGUSER="$POSTGRES_USER" + +# Create the extension +echo "Loading pg_cron extensions into $POSTGRES_DB" +"${psql[@]}" --dbname="$POSTGRES_DB" <<- EOSQL + ALTER SYSTEM SET shared_preload_libraries = pg_cron,pg_stat_statements; + ALTER SYSTEM SET cron.database_name = $POSTGRES_DB; + CREATE EXTENSION IF NOT EXISTS pg_cron; + create extension if not exists pg_stat_statements; +EOSQL + +# in postgres 12 JIT was changed from default off to default on +# disable JIT optimizations, as they can cause extreme slowness with postgis queries above a certain size +echo "Configuring postgis defaults" +"${psql[@]}" --dbname="$POSTGRES_DB" <<- EOSQL + ALTER SYSTEM SET jit_optimize_above_cost = '-1'; +EOSQL diff --git a/geomesa-gt/geomesa-gt-tools/src/test/scala/org/locationtech/geomesa/geotools/tools/data/GeoToolsUpdateSchemaCommandTest.scala b/geomesa-gt/geomesa-gt-tools/src/test/scala/org/locationtech/geomesa/geotools/tools/data/GeoToolsUpdateSchemaCommandTest.scala new file mode 100644 index 000000000000..106b3689db15 --- /dev/null +++ b/geomesa-gt/geomesa-gt-tools/src/test/scala/org/locationtech/geomesa/geotools/tools/data/GeoToolsUpdateSchemaCommandTest.scala @@ -0,0 +1,207 @@ +/*********************************************************************** + * Copyright (c) 2013-2023 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.geotools.tools.data + +import com.typesafe.scalalogging.LazyLogging +import org.geotools.data._ +import org.geotools.jdbc.JDBCDataStore +import org.geotools.util.factory.Hints +import org.junit.runner.RunWith +import org.locationtech.geomesa.features.ScalaSimpleFeature +import org.locationtech.geomesa.gt.partition.postgis.PartitionedPostgisDataStoreParams +import org.locationtech.geomesa.gt.partition.postgis.dialect.TypeInfo +import org.locationtech.geomesa.gt.partition.postgis.dialect.procedures.{PartitionMaintenance, RollWriteAheadLog} +import org.locationtech.geomesa.utils.collection.SelfClosingIterator +import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes} +import org.locationtech.geomesa.utils.io.WithClose +import org.opengis.feature.simple.SimpleFeature +import org.specs2.mutable.Specification +import org.specs2.runner.JUnitRunner +import org.specs2.specification.BeforeAfterAll +import org.testcontainers.containers.GenericContainer +import org.testcontainers.containers.output.Slf4jLogConsumer +import org.testcontainers.images.builder.ImageFromDockerfile + +import scala.collection.mutable.ArrayBuffer + +@RunWith(classOf[JUnitRunner]) +class GeoToolsUpdateSchemaCommandTest extends Specification with BeforeAfterAll with LazyLogging { + + import scala.collection.JavaConverters._ + + val sft = + SimpleFeatureTypes.createType( + "tools", + "name:String,dtg:Date,*geom:Point:srid=4326;" + + "pg.partitions.interval.hours=6," + + "pg.partitions.cron.minute=1") + val features = List.tabulate(24) { i => + val dtg = f"2023-10-30T0$i%02d:00:00.000Z" + val sf = ScalaSimpleFeature.create(sft, f"${sft.getTypeName}.id$i%02d", "name" + i, dtg, s"POINT(0 $i)") + sf.getUserData.put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE) + sf.getUserData.put(Hints.PROVIDED_FID, f"$i%02d") + sf + } + + val schema = "public" + + lazy val now = System.currentTimeMillis() + + lazy val dsParams = Map( + "dbtype" -> PartitionedPostgisDataStoreParams.DbType.sample.toString, + "host" -> host, + "port" -> port, + "database" -> "postgres", + "user" -> "postgres", + "passwd" -> "postgres", + "Batch insert size" -> "10", + "preparedStatements" -> "true" + ) + + var container: GenericContainer[_] = _ + + lazy val host = Option(container).map(_.getHost).getOrElse("localhost") + lazy val port = Option(container).map(_.getFirstMappedPort).getOrElse(5432).toString + + override def beforeAll(): Unit = { + val image = + new ImageFromDockerfile("testcontainers/postgis_cron", false) + .withFileFromClasspath(".", "testcontainers") + .withBuildArg("FROM_TAG", sys.props.getOrElse("postgis.docker.tag", "15-3.3")) + container = new GenericContainer(image) + container.addEnv("POSTGRES_HOST_AUTH_METHOD", "trust") + container.addExposedPort(5432) + container.start() + container.followOutput(new Slf4jLogConsumer(logger.underlying)) + } + + override def afterAll(): Unit = { + if (container != null) { + container.stop() + } + } + + "GeoToolsUpdateSchemaCommand" should { + "support schema updates" in { + WithClose(DataStoreFinder.getDataStore(dsParams.asJava)) { case ds: JDBCDataStore => + ds must not(beNull) + ds.getSchema(sft.getTypeName) must throwAn[Exception] + ds.createSchema(sft) + + val typeInfo = TypeInfo(this.schema, sft) + + def write(features: Seq[SimpleFeature]): Unit = { + WithClose(ds.getFeatureWriterAppend(sft.getTypeName, Transaction.AUTO_COMMIT)) { writer => + features.foreach(FeatureUtils.write(writer, _)) + } + // manually run the partition jobs + WithClose(ds.getConnection(Transaction.AUTO_COMMIT)) { cx => + WithClose(cx.prepareCall(s"call ${RollWriteAheadLog.name(typeInfo).quoted}();"))(_.execute()) + WithClose(cx.prepareCall(s"call ${PartitionMaintenance.name(typeInfo).quoted}();"))(_.execute()) + } + } + + def getFeatures: Seq[SimpleFeature] = { + WithClose(ds.getFeatureReader(new Query(sft.getTypeName), Transaction.AUTO_COMMIT)) { reader => + SelfClosingIterator(reader).map(ScalaSimpleFeature.copy).toList.sortBy(_.getID) + } + } + + // get all the tables associated with the schema + def getTables: Seq[String] = { + val tables = ArrayBuffer.empty[String] + WithClose(ds.getConnection(Transaction.AUTO_COMMIT)) { cx => + WithClose(cx.getMetaData.getTables(null, null, sft.getTypeName + "_partition%", Array("TABLE"))) { rs => + while (rs.next()) { + tables += rs.getString(3) + } + } + } + tables.toSeq.sorted + } + + // get the cron schedule for the partition maintenance job + def getCron: String = { + var cron: String = null + val sql = s"select schedule from cron.job where command like 'CALL ${PartitionMaintenance.name(typeInfo).quoted}()'" + WithClose(ds.getConnection(Transaction.AUTO_COMMIT)) { cx => + WithClose(cx.prepareStatement(sql)) { st => + WithClose(st.executeQuery()) { rs => + rs.next() must beTrue + cron = rs.getString(1).trim + rs.next() must beFalse + } + } + } + cron + } + + getTables must beEmpty + + // write the first 12 features + write(features.take(12)) + + // verify 6 hour partitions + getTables mustEqual Seq("tools_partition_2023_10_30_00", "tools_partition_2023_10_30_06") + + // verify features come back ok - note we have to compare backwards due to Date vs Timestamp equality + features.take(12) mustEqual getFeatures + + // verify cron schedule that we configured above + getCron mustEqual "1,11,21,31,41,51 * * * *" + + val command = new GeoToolsUpdateSchemaCommand() { + override lazy val connection: Map[String, String] = dsParams + } + command.params.featureName = sft.getTypeName + command.params.force = true + + // run the schema update command + command.params.userData = java.util.Arrays.asList("pg.partitions.interval.hours:2", "pg.partitions.cron.minute:2") + command.execute() + + // verify cron schedule was updated + getCron mustEqual "2,12,22,32,42,52 * * * *" + + // write the next 12 features + write(features.drop(12)) + + // verify partitions changed to 2 hours + getTables mustEqual + Seq( + "tools_partition_2023_10_30_00", + "tools_partition_2023_10_30_06", + "tools_partition_2023_10_30_12", + "tools_partition_2023_10_30_14", + "tools_partition_2023_10_30_16", + "tools_partition_2023_10_30_18", + "tools_partition_2023_10_30_20", + "tools_partition_2023_10_30_22" + ) + + // verify features still come back ok + features mustEqual getFeatures + + // add an age-off + // note: since the data is from the past, this will age-off the entire dataset + command.params.userData = java.util.Arrays.asList("pg.partitions.max:4") + command.execute() + + // run the age-off job + write(Seq.empty) + + // verify partitions were dropped + getTables must beEmpty + + // verify features were dropped + getFeatures must beEmpty + } + } + } +} diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala index 2a404babbea7..74ea227d3c98 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/GeoMesaDataStore.scala @@ -25,6 +25,7 @@ import org.locationtech.geomesa.utils.concurrent.CachedThreadPool import org.locationtech.geomesa.utils.conf.SemanticVersion.MinorOrdering import org.locationtech.geomesa.utils.conf.{GeoMesaProperties, IndexId, SemanticVersion} import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeComparator.TypeComparison import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.{AttributeOptions, Configs, InternalConfigs} import org.locationtech.geomesa.utils.geotools.converters.FastConverter @@ -461,14 +462,18 @@ abstract class GeoMesaDataStore[DS <: GeoMesaDataStore[DS]](val config: GeoMesaD if (previous == null) { new SchemaCompatibility.DoesNotExist(this, sft) } else { - Try(validateSchemaUpdate(previous, sft)).failed.map(SchemaCompatibility.Incompatible).getOrElse { - val copy = SimpleFeatureTypes.copy(sft) - updateSchemaUserData(copy, previous) - if (SimpleFeatureTypes.compare(copy, previous) == 0 && copy.getUserData == previous.getUserData) { - SchemaCompatibility.Unchanged - } else { - new SchemaCompatibility.Compatible(this, typeName, sft) - } + validateSchemaUpdate(previous, sft) match { + case Right(TypeComparison.Compatible(extension, renamed, _)) => + val copy = SimpleFeatureTypes.copy(sft) + updateSchemaUserData(copy, previous) + if (!extension && !renamed && copy.getUserData == previous.getUserData) { + SchemaCompatibility.Unchanged + } else { + new SchemaCompatibility.Compatible(this, typeName, sft) + } + + case Left(e) => + SchemaCompatibility.Incompatible(e) } } } diff --git a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/MetadataBackedDataStore.scala b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/MetadataBackedDataStore.scala index 6d38f4e41ee0..1f323b6eade5 100644 --- a/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/MetadataBackedDataStore.scala +++ b/geomesa-index-api/src/main/scala/org/locationtech/geomesa/index/geotools/MetadataBackedDataStore.scala @@ -21,10 +21,11 @@ import org.locationtech.geomesa.index.metadata.HasGeoMesaMetadata import org.locationtech.geomesa.index.planning.QueryInterceptor.QueryInterceptorFactory import org.locationtech.geomesa.index.utils.{DistributedLocking, Releasable} import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeComparator.TypeComparison import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.Configs import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes.InternalConfigs.TableSharingPrefix import org.locationtech.geomesa.utils.geotools.converters.FastConverter -import org.locationtech.geomesa.utils.geotools.{FeatureUtils, GeoToolsDateFormat, SimpleFeatureTypes} +import org.locationtech.geomesa.utils.geotools.{FeatureUtils, GeoToolsDateFormat, SimpleFeatureTypeComparator, SimpleFeatureTypes} import org.locationtech.geomesa.utils.index.{GeoMesaSchemaValidator, ReservedWordCheck} import org.locationtech.geomesa.utils.io.CloseWithLogging import org.opengis.feature.`type`.Name @@ -244,7 +245,7 @@ abstract class MetadataBackedDataStore(config: NamespaceConfig) extends DataStor GeoMesaSchemaValidator.validate(schema) - validateSchemaUpdate(previousSft, schema) + validateSchemaUpdate(previousSft, schema).left.foreach(e => throw e) val sft = SimpleFeatureTypes.mutable(schema) @@ -415,43 +416,50 @@ abstract class MetadataBackedDataStore(config: NamespaceConfig) extends DataStor * * @param existing existing schema * @param schema updated sft + * @return validation result */ - protected def validateSchemaUpdate(existing: SimpleFeatureType, schema: SimpleFeatureType): Unit = { + protected def validateSchemaUpdate( + existing: SimpleFeatureType, + schema: SimpleFeatureType): Either[UnsupportedOperationException, TypeComparison.Compatible] = { // validate that default geometry and date have not changed (rename is ok) if (schema.getGeomIndex != existing.getGeomIndex) { - throw new UnsupportedOperationException("Changing the default geometry attribute is not supported") - } else if (schema.getDtgIndex != existing.getDtgIndex) { - throw new UnsupportedOperationException("Changing the default date attribute is not supported") + return Left(new UnsupportedOperationException("Changing the default geometry attribute is not supported")) + } + if (schema.getDtgIndex != existing.getDtgIndex) { + return Left(new UnsupportedOperationException("Changing the default date attribute is not supported")) } // check that unmodifiable user data has not changed - MetadataBackedDataStore.UnmodifiableUserDataKeys.foreach { key => - if (schema.userData[Any](key) != existing.userData[Any](key)) { - throw new UnsupportedOperationException(s"Updating '$key' is not supported") + val userDataChanges = MetadataBackedDataStore.UnmodifiableUserDataKeys.flatMap { key => + if (schema.userData[Any](key) == existing.userData[Any](key)) { None } else { + Some(s"'$key'") } } - - // validate that attributes weren't removed - if (existing.getAttributeCount > schema.getAttributeCount) { - throw new UnsupportedOperationException("Removing attributes from the schema is not supported") + if (userDataChanges.nonEmpty) { + val msg = s"${if (userDataChanges.size == 1) { "" } else { "s" }} ${userDataChanges.mkString(", ")}" + return Left(new UnsupportedOperationException(s"Updating user data key$msg is not supported")) } - // check for column type changes - existing.getAttributeDescriptors.asScala.zipWithIndex.foreach { case (prev, i) => - val binding = schema.getDescriptor(i).getType.getBinding - if (!binding.isAssignableFrom(prev.getType.getBinding)) { - throw new UnsupportedOperationException( - s"Incompatible schema column type change: ${schema.getDescriptor(i).getLocalName} " + - s"from ${prev.getType.getBinding.getName} to ${binding.getName}") - } - } + SimpleFeatureTypeComparator.compare(existing, schema) match { + case TypeComparison.AttributeRemoved => + Left(new UnsupportedOperationException("Removing attributes from the schema is not supported")) - // check for reserved words - only check for new/renamed attributes - val reserved = schema.getAttributeDescriptors.asScala.map(_.getLocalName).exists { name => - existing.getDescriptor(name) == null && FeatureUtils.ReservedWords.contains(name.toUpperCase(Locale.US)) - } - if (reserved) { - ReservedWordCheck.validateAttributeNames(schema) + case TypeComparison.AttributeTypeChanged(changes) => + val msg = changes.map { case (name, (from, to)) => s"$name from ${from.getName} to ${to.getName}" } + Left(new UnsupportedOperationException(s"Incompatible schema column type changes: ${msg.mkString(", ")}")) + + case c: TypeComparison.Compatible => + // check for reserved words - only check for new/renamed attributes + val reserved = schema.getAttributeDescriptors.asScala.map(_.getLocalName).exists { name => + existing.getDescriptor(name) == null && FeatureUtils.ReservedWords.contains(name.toUpperCase(Locale.US)) + } + if (reserved) { + try { ReservedWordCheck.validateAttributeNames(schema) } catch { + case NonFatal(e) => + return Left(new UnsupportedOperationException(e.getMessage)) + } + } + Right(c) } } diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/ingest/IngestCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/ingest/IngestCommand.scala index dfbfc1a0b154..74e95582831f 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/ingest/IngestCommand.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/ingest/IngestCommand.scala @@ -14,7 +14,7 @@ import com.typesafe.scalalogging.LazyLogging import org.apache.commons.io.{FilenameUtils, IOUtils} import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.geotools.data.{DataStore, DataStoreFinder, DataUtilities} +import org.geotools.data.DataStore import org.locationtech.geomesa.convert.ConverterConfigLoader import org.locationtech.geomesa.convert.all.TypeAwareInference import org.locationtech.geomesa.convert2.SimpleFeatureConverter @@ -30,7 +30,8 @@ import org.locationtech.geomesa.tools.ingest.IngestCommand.{IngestCounters, Inge import org.locationtech.geomesa.tools.utils.{CLArgResolver, Prompt, TerminalCallback} import org.locationtech.geomesa.utils.collection.CloseableIterator import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty -import org.locationtech.geomesa.utils.geotools.{ConfigSftParsing, SimpleFeatureTypes} +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypeComparator.TypeComparison +import org.locationtech.geomesa.utils.geotools.{ConfigSftParsing, SimpleFeatureTypeComparator, SimpleFeatureTypes} import org.locationtech.geomesa.utils.io.fs.FileSystemDelegate.FileHandle import org.locationtech.geomesa.utils.io.fs.LocalDelegate.StdInHandle import org.locationtech.geomesa.utils.io.{CloseWithLogging, PathUtils, WithClose} @@ -101,27 +102,19 @@ trait IngestCommand[DS <: DataStore] throw new ParameterException("--split-max-size can only be used with --combine-inputs") } - // use .get to re-throw the exception if we fail - IngestCommand.getSftAndConverter(params, inputs.paths, format, Some(this)).get.foreach { case (sft, converter) => - val start = System.currentTimeMillis() - // create schema for the feature prior to ingest - val ds = DataStoreFinder.getDataStore(connection.asJava).asInstanceOf[DS] - if (ds == null) { - throw new ParameterException("Could not create data store from parameters") - } - try { + withDataStore { ds => + // use .get to re-throw the exception if we fail + IngestCommand.getSftAndConverter(params, inputs.paths, format, Some(ds)).get.foreach { case (sft, converter) => + val start = System.currentTimeMillis() + // create schema for the feature prior to ingest val existing = Try(ds.getSchema(sft.getTypeName)).getOrElse(null) if (existing == null) { Command.user.info(s"Creating schema '${sft.getTypeName}'") setBackendSpecificOptions(sft) ds.createSchema(sft) } else { + // note: sft will have been loaded from the datastore if it already exists, so will match existing Command.user.info(s"Schema '${sft.getTypeName}' exists") - if (DataUtilities.compare(sft, existing) != 0) { - throw new ParameterException("Existing simple feature type does not match expected type" + - s"\n existing: '${SimpleFeatureTypes.encodeType(existing)}'" + - s"\n expected: '${SimpleFeatureTypes.encodeType(sft)}'") - } } val result = startIngest(mode, ds, sft, converter, inputs) if (params.waitForCompletion) { @@ -140,8 +133,6 @@ trait IngestCommand[DS <: DataStore] } else { Command.user.info("Job submitted, check tracking for progress and result") } - } finally { - CloseWithLogging(ds) } } } @@ -232,18 +223,18 @@ object IngestCommand extends LazyLogging { * @param params params * @param inputs input files * @param format input format - * @param command hook to data store for loading schemas by name + * @param ds data store for loading schemas by name * @return None if user declines inferred result, otherwise the loaded/inferred result */ def getSftAndConverter( params: TypeNameParam with FeatureSpecParam with ConverterConfigParam with OptionalForceParam, inputs: Seq[String], format: Option[String], - command: Option[DataStoreCommand[_ <: DataStore]]): Try[Option[(SimpleFeatureType, Config)]] = Try { + ds: Option[_ <: DataStore]): Try[Option[(SimpleFeatureType, Config)]] = Try { import org.locationtech.geomesa.utils.conversions.ScalaImplicits.RichIterator // try to load the sft, first check for an existing schema, then load from the params/environment - var sft: SimpleFeatureType = loadSft(params, command).orNull + var sft: SimpleFeatureType = loadSft(params, ds).orNull var converter: Config = Option(params.config).map(CLArgResolver.getConfig).orNull @@ -277,7 +268,7 @@ object IngestCommand extends LazyLogging { if (sft == null) { val typeName = Option(params.featureName).getOrElse { - val existing = command.toSet[DataStoreCommand[_ <: DataStore]].flatMap(_.withDataStore(_.getTypeNames)) + val existing = ds.toSet[DataStore].flatMap(_.getTypeNames) val fileName = Option(FilenameUtils.getBaseName(file.path)) val base = fileName.map(_.trim.replaceAll("[^A-Za-z0-9]+", "_")).filterNot(_.isEmpty).getOrElse("geomesa") var name = base @@ -382,23 +373,23 @@ object IngestCommand extends LazyLogging { } object Inputs { - val StdInInputs = Seq("-") + val StdInInputs: Seq[String] = Seq("-") } /** * Tries to load a feature type, first from the data store then from the params/environment * * @param params params - * @param command command with data store access + * @param ds data store access * @return */ private def loadSft( params: TypeNameParam with FeatureSpecParam, - command: Option[DataStoreCommand[_ <: DataStore]]): Option[SimpleFeatureType] = { + ds: Option[_ <: DataStore]): Option[SimpleFeatureType] = { val fromStore = for { - cmd <- command + d <- ds name <- Option(params.featureName) - sft <- cmd.withDataStore(ds => Try(ds.getSchema(name)).filter(_ != null).toOption) + sft <- Try(d.getSchema(name)).filter(_ != null).toOption } yield { sft } @@ -408,11 +399,13 @@ object IngestCommand extends LazyLogging { if (logger.underlying.isWarnEnabled()) { for { fs <- fromStore; fe <- fromEnv } { - if (fs.getTypeName != fe.getTypeName || SimpleFeatureTypes.compare(fs, fe) != 0) { - logger.warn( - "Schema from data store does not match schema from environment." + - s"\n From data store: ${fs.getTypeName} identified ${DataUtilities.encodeType(fs)}" + - s"\n From environment: ${fe.getTypeName} identified ${DataUtilities.encodeType(fe)}") + SimpleFeatureTypeComparator.compare(fs, fe) match { + case TypeComparison.Compatible(false, false, _) if fs.getTypeName == fe.getTypeName => // ok + case _ => + logger.warn( + "Schema from data store does not match schema from environment." + + s"\n From data store: ${fs.getTypeName} identified ${SimpleFeatureTypes.encodeType(fs)}" + + s"\n From environment: ${fe.getTypeName} identified ${SimpleFeatureTypes.encodeType(fe)}") } } } diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypeComparator.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypeComparator.scala new file mode 100644 index 000000000000..a6d3ddebcff0 --- /dev/null +++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/geotools/SimpleFeatureTypeComparator.scala @@ -0,0 +1,82 @@ +/*********************************************************************** + * Copyright (c) 2013-2023 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.utils.geotools + +import org.opengis.feature.simple.SimpleFeatureType + +object SimpleFeatureTypeComparator { + + import TypeComparison._ + + /** + * Compares an existing feature type with an update to the feature type. This comparison + * is specific to the types of updates that GeoMesa supports. + * + * @param existing existing type + * @param update updated type + * @return + */ + def compare(existing: SimpleFeatureType, update: SimpleFeatureType): TypeComparison = { + if (existing.getAttributeCount > update.getAttributeCount) { + return AttributeRemoved + } + + var renamed = false + var superclass = false + val attributeChangeBuilder = Map.newBuilder[String, (Class[_], Class[_])] + + // check for column type changes + var i = 0 + while (i < existing.getAttributeCount) { + val e = existing.getDescriptor(i) + val u = update.getDescriptor(i) + if (u.getType.getBinding != e.getType.getBinding) { + if (u.getType.getBinding.isAssignableFrom(e.getType.getBinding)) { + superclass = true + } else { + attributeChangeBuilder += e.getLocalName -> (e.getType.getBinding, u.getType.getBinding) + } + } + renamed = renamed || u.getLocalName != e.getLocalName + i += 1 + } + + val attributeChanges = attributeChangeBuilder.result() + if (attributeChanges.nonEmpty) { + AttributeTypeChanged(attributeChanges) + } else { + val extension = i < update.getAttributeCount + Compatible(extension, renamed, superclass) + } + } + + sealed trait TypeComparison + + object TypeComparison { + + /** + * Attributes have been removed from the schema + */ + case object AttributeRemoved extends TypeComparison + + /** + * Attribute types have changed in an incompatible manner + */ + case class AttributeTypeChanged(changes: Map[String, (Class[_], Class[_])]) extends TypeComparison + + /** + * Types are compatible, in that GeoMesa can support migrating from one to the other + * + * @param extension attributes were added at the end + * @param renamed attributes were renamed but have the same bindings + * @param superclass attributes are superclasses of the original binding + */ + case class Compatible(extension: Boolean, renamed: Boolean, superclass: Boolean) extends TypeComparison + } +}