Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEOMESA-3300 Partitioned PostGIS - delete _spill table on removeSchema #3000

Merged
merged 1 commit into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@ package tables
*/
object PartitionTables extends SqlStatements {

private def tablesAndTypes(info: TypeInfo): Seq[(TableConfig, String)] = {
Seq(
info.tables.writeAheadPartitions -> "gist",
info.tables.mainPartitions -> "brin",
info.tables.spillPartitions -> "gist"
)
}

override protected def createStatements(info: TypeInfo): Seq[String] =
statements(info, info.tables.writeAheadPartitions, "gist") ++
statements(info, info.tables.mainPartitions, "brin") ++
statements(info, info.tables.spillPartitions, "gist")
tablesAndTypes(info).flatMap { case (table, indexType) => statements(info, table, indexType) }

private def statements(info: TypeInfo, table: TableConfig, indexType: String): Seq[String] = {
// note: don't include storage opts since these are parent partition tables
Expand Down Expand Up @@ -48,6 +54,5 @@ object PartitionTables extends SqlStatements {
}

override protected def dropStatements(info: TypeInfo): Seq[String] =
Seq(info.tables.writeAheadPartitions, info.tables.mainPartitions)
.map(table => s"DROP TABLE IF EXISTS ${table.name.qualified};")
tablesAndTypes(info).map { case (table, _) => s"DROP TABLE IF EXISTS ${table.name.qualified};" }
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import org.geotools.referencing.CRS
import org.junit.runner.RunWith
import org.locationtech.geomesa.filter.FilterHelper
import org.locationtech.geomesa.gt.partition.postgis.dialect.procedures.{DropAgedOffPartitions, PartitionMaintenance, RollWriteAheadLog}
import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.UserDataTable
import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.{PartitionTablespacesTable, PrimaryKeyTable, SequenceTable, UserDataTable}
import org.locationtech.geomesa.gt.partition.postgis.dialect.{PartitionedPostgisDialect, PartitionedPostgisPsDialect, TableConfig, TypeInfo}
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes}
Expand All @@ -36,6 +36,7 @@ import java.sql.Connection
import java.util.concurrent.CopyOnWriteArrayList
import java.util.logging.{Handler, Level, LogRecord}
import java.util.{Collections, Locale}
import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import scala.util.control.NonFatal

Expand Down Expand Up @@ -326,6 +327,107 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
}
}

"drop all associated tables on removeSchema" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)

try {
ds must beAnInstanceOf[JDBCDataStore]

foreach(Seq("dropme-test", "dropmetest")) { name =>
val sft = SimpleFeatureTypes.renameSft(this.sft, name)

ds.getTypeNames.toSeq must not(contain(sft.getTypeName))
ds.createSchema(sft)

val schema = Try(ds.getSchema(sft.getTypeName)).getOrElse(null)
schema must not(beNull)
schema.getUserData.asScala must containAllOf(sft.getUserData.asScala.toSeq)
logger.debug(s"Schema: ${SimpleFeatureTypes.encodeType(schema)}")

// write some data
WithClose(new DefaultTransaction()) { tx =>
WithClose(ds.getFeatureWriterAppend(sft.getTypeName, tx)) { writer =>
features.foreach { feature =>
FeatureUtils.write(writer, feature, useProvidedFid = true)
}
}
tx.commit()
}

// get all the tables associated with the schema
def getTablesAndIndices: Seq[String] = {
val tables = ArrayBuffer.empty[String]
WithClose(ds.asInstanceOf[JDBCDataStore].getConnection(Transaction.AUTO_COMMIT)) { cx =>
WithClose(cx.getMetaData.getTables(null, null, "dropme%", null)) { rs =>
while (rs.next()) {
tables += rs.getString(3)
}
}
}
tables.toSeq
}

// get all the procedures and functions associated with the schema
def getFunctions: Seq[String] = {
val fns = ArrayBuffer.empty[String]
WithClose(ds.asInstanceOf[JDBCDataStore].getConnection(Transaction.AUTO_COMMIT)) { cx =>
WithClose(cx.getMetaData.getProcedures(null, null, "%dropme%")) { rs =>
while (rs.next()) {
fns += rs.getString(3)
}
}
WithClose(cx.getMetaData.getFunctions(null, null, "%dropme%")) { rs =>
while (rs.next()) {
fns += rs.getString(3)
}
}
}
fns.toSeq
}

// get all the user data and other associated metadata
def getMeta: Seq[String] = {
val meta = ArrayBuffer.empty[String]
WithClose(ds.asInstanceOf[JDBCDataStore].getConnection(Transaction.AUTO_COMMIT)) { cx =>
Seq(
(UserDataTable.Name, "type_name", "key"),
(SequenceTable.Name, "type_name", "value"),
(PrimaryKeyTable.Name, "table_name", "pk_column"),
(PartitionTablespacesTable.Name, "type_name", "table_type")
).foreach { case (table, where, select) =>
WithClose(cx.prepareStatement(s"SELECT $select FROM ${table.quoted} WHERE $where like 'dropme%';")) { st =>
WithClose(st.executeQuery()) { rs =>
while (rs.next()) {
meta += rs.getString(1)
}
}
}
}
}
meta.toSeq
}

// _wa, _wa_partition, _partition, _spill tables + dtg, pk, geom indices for each
// _analyze_queue, _sort_queue, _wa_000, main view
getTablesAndIndices must haveLength(20)
// delete/insert/update/wa triggers
// analyze_partitions, compact, drop_age_off, merge_wa, part_maintenance, part_wa, roll_wa,
getFunctions must haveLength(11)
// 3 tablespaces, 4 user data, 1 seq count, 1 primary key
getMeta must haveLength(9)

ds.removeSchema(sft.getTypeName)

getTablesAndIndices must beEmpty
getFunctions must beEmpty
getMeta must beEmpty
}
} finally {
ds.dispose()
}
}

"remove whole-world filters" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)
Expand Down