Skip to content

Commit

Permalink
GEOMESA-3300 Partitioned PostGIS - delete _spill table on removeSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Sep 25, 2023
1 parent 2b0864f commit f1a535f
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@ package tables
*/
object PartitionTables extends SqlStatements {

private def tablesAndTypes(info: TypeInfo): Seq[(TableConfig, String)] = {
Seq(
info.tables.writeAheadPartitions -> "gist",
info.tables.mainPartitions -> "brin",
info.tables.spillPartitions -> "gist"
)
}

override protected def createStatements(info: TypeInfo): Seq[String] =
statements(info, info.tables.writeAheadPartitions, "gist") ++
statements(info, info.tables.mainPartitions, "brin") ++
statements(info, info.tables.spillPartitions, "gist")
tablesAndTypes(info).flatMap { case (table, indexType) => statements(info, table, indexType) }

private def statements(info: TypeInfo, table: TableConfig, indexType: String): Seq[String] = {
// note: don't include storage opts since these are parent partition tables
Expand Down Expand Up @@ -48,6 +54,5 @@ object PartitionTables extends SqlStatements {
}

override protected def dropStatements(info: TypeInfo): Seq[String] =
Seq(info.tables.writeAheadPartitions, info.tables.mainPartitions)
.map(table => s"DROP TABLE IF EXISTS ${table.name.qualified};")
tablesAndTypes(info).map { case (table, _) => s"DROP TABLE IF EXISTS ${table.name.qualified};" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.geotools.referencing.CRS
import org.junit.runner.RunWith
import org.locationtech.geomesa.filter.FilterHelper
import org.locationtech.geomesa.gt.partition.postgis.dialect.procedures.{DropAgedOffPartitions, PartitionMaintenance, RollWriteAheadLog}
import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.UserDataTable
import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.{PartitionTablespacesTable, PrimaryKeyTable, SequenceTable, UserDataTable}
import org.locationtech.geomesa.gt.partition.postgis.dialect.{PartitionedPostgisDialect, PartitionedPostgisPsDialect, TableConfig, TypeInfo}
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes}
Expand All @@ -36,6 +36,7 @@ import java.sql.Connection
import java.util.concurrent.CopyOnWriteArrayList
import java.util.logging.{Handler, Level, LogRecord}
import java.util.{Collections, Locale}
import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import scala.util.control.NonFatal

Expand Down Expand Up @@ -326,6 +327,107 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
}
}

"drop all associated tables on removeSchema" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)

try {
ds must beAnInstanceOf[JDBCDataStore]

foreach(Seq("dropme-test", "dropmetest")) { name =>
val sft = SimpleFeatureTypes.renameSft(this.sft, name)

ds.getTypeNames.toSeq must not(contain(sft.getTypeName))
ds.createSchema(sft)

val schema = Try(ds.getSchema(sft.getTypeName)).getOrElse(null)
schema must not(beNull)
schema.getUserData.asScala must containAllOf(sft.getUserData.asScala.toSeq)
logger.debug(s"Schema: ${SimpleFeatureTypes.encodeType(schema)}")

// write some data
WithClose(new DefaultTransaction()) { tx =>
WithClose(ds.getFeatureWriterAppend(sft.getTypeName, tx)) { writer =>
features.foreach { feature =>
FeatureUtils.write(writer, feature, useProvidedFid = true)
}
}
tx.commit()
}

// get all the tables associated with the schema
def getTablesAndIndices: Seq[String] = {
val tables = ArrayBuffer.empty[String]
WithClose(ds.asInstanceOf[JDBCDataStore].getConnection(Transaction.AUTO_COMMIT)) { cx =>
WithClose(cx.getMetaData.getTables(null, null, "dropme%", null)) { rs =>
while (rs.next()) {
tables += rs.getString(3)
}
}
}
tables.toSeq
}

// get all the procedures and functions associated with the schema
def getFunctions: Seq[String] = {
val fns = ArrayBuffer.empty[String]
WithClose(ds.asInstanceOf[JDBCDataStore].getConnection(Transaction.AUTO_COMMIT)) { cx =>
WithClose(cx.getMetaData.getProcedures(null, null, "%dropme%")) { rs =>
while (rs.next()) {
fns += rs.getString(3)
}
}
WithClose(cx.getMetaData.getFunctions(null, null, "%dropme%")) { rs =>
while (rs.next()) {
fns += rs.getString(3)
}
}
}
fns.toSeq
}

// get all the user data and other associated metadata
def getMeta: Seq[String] = {
val meta = ArrayBuffer.empty[String]
WithClose(ds.asInstanceOf[JDBCDataStore].getConnection(Transaction.AUTO_COMMIT)) { cx =>
Seq(
(UserDataTable.Name, "type_name", "key"),
(SequenceTable.Name, "type_name", "value"),
(PrimaryKeyTable.Name, "table_name", "pk_column"),
(PartitionTablespacesTable.Name, "type_name", "table_type")
).foreach { case (table, where, select) =>
WithClose(cx.prepareStatement(s"SELECT $select FROM ${table.quoted} WHERE $where like 'dropme%';")) { st =>
WithClose(st.executeQuery()) { rs =>
while (rs.next()) {
meta += rs.getString(1)
}
}
}
}
}
meta.toSeq
}

// _wa, _wa_partition, _partition, _spill tables + dtg, pk, geom indices for each
// _analyze_queue, _sort_queue, _wa_000, main view
getTablesAndIndices must haveLength(20)
// delete/insert/update/wa triggers
// analyze_partitions, compact, drop_age_off, merge_wa, part_maintenance, part_wa, roll_wa,
getFunctions must haveLength(11)
// 3 tablespaces, 4 user data, 1 seq count, 1 primary key
getMeta must haveLength(9)

ds.removeSchema(sft.getTypeName)

getTablesAndIndices must beEmpty
getFunctions must beEmpty
getMeta must beEmpty
}
} finally {
ds.dispose()
}
}

"remove whole-world filters" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)
Expand Down

0 comments on commit f1a535f

Please sign in to comment.