Skip to content

Commit

Permalink
GEOMESA-3423 - add unlogged user data option
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Moorhead committed Dec 10, 2024
1 parent b4baebe commit 5ed290f
Show file tree
Hide file tree
Showing 15 changed files with 82 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,9 @@ object PartitionedPostgisDialect {
val WriteAheadPartitionsTableSpace = "pg.partitions.tablespace.wa-partitions"
val MainTableSpace = "pg.partitions.tablespace.main"

// set postgres table wal logging
val WalLogEnabled = "pg.wal.log.enabled"

implicit class ConfigConversions(val sft: SimpleFeatureType) extends AnyVal {
def getIntervalHours: Int = Option(sft.getUserData.get(IntervalHours)).map(int).getOrElse(6)
def getMaxPartitions: Option[Int] = Option(sft.getUserData.get(MaxPartitions)).map(int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,13 @@ package object dialect {
tables: Tables,
cols: Columns,
partitions: PartitionInfo,
userData: Map[String, String] = Map.empty)
userData: Map[String, String] = Map.empty) {
val walLogSQL: String = if (userData.getOrElse(Config.WalLogEnabled, "true").toBoolean) {
""
} else {
" UNLOGGED "
}
}

object TypeInfo {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ object CompactPartitions extends SqlProcedure {
| -- lock the child table to prevent any inserts that would be lost
| EXECUTE 'LOCK TABLE ${info.schema.quoted}.' || quote_ident(spill_partition) ||
| ' IN SHARE ROW EXCLUSIVE MODE';
| EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') ||
| partition_tablespace || ' AS SELECT * FROM' ||
| ' (SELECT * FROM ${info.schema.quoted}.' || quote_ident(partition_name) ||
| ' UNION ALL SELECT * FROM ${info.schema.quoted}.' || quote_ident(spill_partition) ||
| ') results' ||
| ' ORDER BY _st_sortablehash($geomCol)';
| GET DIAGNOSTICS unsorted_count := ROW_COUNT;
| ELSE
| EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') ||
| partition_tablespace || ' AS SELECT * FROM ' || quote_ident(partition_name) ||
| ' ORDER BY _st_sortablehash($geomCol)';
| GET DIAGNOSTICS unsorted_count := ROW_COUNT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object MergeWriteAheadPartitions extends SqlProcedure {
| -- use "create table as" (vs create then insert) for performance benefits related to WAL skipping
| -- we need a "select distinct" to avoid primary key conflicts - this should be fairly cheap since
| -- we're already sorting and there should be few or no conflicts
| EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name) ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name) ||
| partition_tablespace || ' AS SELECT DISTINCT ON' ||
| ' (_st_sortablehash($geomCol), fid, ${info.cols.dtg.quoted}) * FROM ' ||
| quote_ident(partition_name || '_tmp_migrate') || ' ORDER BY _st_sortablehash($geomCol)';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object PartitionWriteAheadLog extends SqlProcedure {
| RAISE INFO '% Creating partition with insert % (unattached)', timeofday()::timestamp, partition_name;
| -- upper bounds are exclusive
| -- use "create table as" (vs create then insert) for performance benefits related to WAL skipping
| EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name) ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name) ||
| partition_tablespace || ' AS SELECT * FROM ' || quote_ident(write_ahead.name) ||
| ' WHERE $dtgCol >= ' || quote_literal(partition_start) ||
| ' AND $dtgCol < ' || quote_literal(partition_end) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object RollWriteAheadLog extends SqlProcedure with CronSchedule {
| END IF;
|
| -- requires SHARE UPDATE EXCLUSIVE
| EXECUTE 'CREATE TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(next_partition) || '(' ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(next_partition) || '(' ||
| 'CONSTRAINT ' || quote_ident(next_partition || '_pkey') ||
| ' PRIMARY KEY (fid, ${info.cols.dtg.quoted})' || index_space || ')' ||
| ' INHERITS (${table.name.qualified})${table.storage.opts}' || partition_tablespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object AnalyzeQueueTable extends SqlStatements {

override protected def createStatements(info: TypeInfo): Seq[String] = {
val create =
s"""CREATE TABLE IF NOT EXISTS ${info.tables.analyzeQueue.name.qualified} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.tables.analyzeQueue.name.qualified} (
| partition_name text,
| enqueued timestamp without time zone
|);""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object PartitionTables extends SqlStatements {
case Some(ts) => (s" TABLESPACE ${ts.quoted}", s" USING INDEX TABLESPACE ${ts.quoted}")
}
val create =
s"""CREATE TABLE IF NOT EXISTS ${table.name.qualified} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${table.name.qualified} (
| LIKE ${info.tables.writeAhead.name.qualified} INCLUDING DEFAULTS INCLUDING CONSTRAINTS,
| CONSTRAINT ${escape(table.name.raw, "pkey")} PRIMARY KEY (fid, ${info.cols.dtg.quoted})$indexTs
|) PARTITION BY RANGE(${info.cols.dtg.quoted})$tableTs;""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class PartitionTablespacesTable extends Sql {
val table = TableIdentifier(info.schema.raw, Name.raw)
val cName = TableName(Name.raw + "_pkey")
val create =
s"""CREATE TABLE IF NOT EXISTS ${table.quoted} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${table.quoted} (
| type_name text not null,
| table_type text not null,
| table_space text
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class PrimaryKeyTable extends Sql {
// we need to define the primary key separately since the main view can't have any primary key columns
val table = s"${info.schema.quoted}.${Name.quoted}"
val create =
s"""CREATE TABLE IF NOT EXISTS $table (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS $table (
| table_schema character varying,
| table_name character varying,
| pk_column_idx integer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class SequenceTable extends Sql {
override def create(info: TypeInfo)(implicit ex: ExecutionContext): Unit = {
val table = TableIdentifier(info.schema.raw, Name.raw)
val create =
s"""CREATE TABLE IF NOT EXISTS ${table.qualified} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${table.qualified} (
| type_name text PRIMARY KEY,
| value smallint NOT NULL CHECK (value >= 0 AND value <= 999)
|);""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object SortQueueTable extends SqlStatements {

override protected def createStatements(info: TypeInfo): Seq[String] = {
val create =
s"""CREATE TABLE IF NOT EXISTS ${info.tables.sortQueue.name.qualified} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.tables.sortQueue.name.qualified} (
| partition_name text,
| unsorted_count bigint,
| enqueued timestamp without time zone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class UserDataTable extends Sql {
val table = TableIdentifier(info.schema.raw, Name.raw)
val cName = TableName(Name.raw + "_pkey")
val create =
s"""CREATE TABLE IF NOT EXISTS ${table.quoted} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${table.quoted} (
| type_name text not null,
| key text not null,
| value text not null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object WriteAheadTable extends SqlStatements {
| WHERE type_name = ${literal(info.typeName)} INTO seq_val;
| partition := ${literal(table.name.raw + "_")} || lpad(seq_val::text, 3, '0');
|
| EXECUTE 'CREATE TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(partition) || '(' ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(partition) || '(' ||
| 'CONSTRAINT ' || quote_ident(partition || '_pkey') ||
| ' PRIMARY KEY (fid, ${info.cols.dtg.quoted})$indexTs ' ||
| ') INHERITS (${table.name.qualified})${table.storage.opts}$tableTs';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.geotools.jdbc.JDBCDataStore
import org.geotools.referencing.CRS
import org.junit.runner.RunWith
import org.locationtech.geomesa.filter.FilterHelper
import org.locationtech.geomesa.gt.partition.postgis.dialect.PartitionedPostgisDialect.Config.WalLogEnabled
import org.locationtech.geomesa.gt.partition.postgis.dialect.procedures.{DropAgedOffPartitions, PartitionMaintenance, RollWriteAheadLog}
import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.{PartitionTablespacesTable, PrimaryKeyTable, SequenceTable, UserDataTable}
import org.locationtech.geomesa.gt.partition.postgis.dialect.{PartitionedPostgisDialect, PartitionedPostgisPsDialect, TableConfig, TypeInfo}
Expand Down Expand Up @@ -88,6 +89,27 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
"preparedStatements" -> "true"
)

def isTableLoggedQuery(tableName: String, schemaName: String): String =
s"""
|SELECT
| n.nspname AS schema_name,
| c.relname AS table_name,
| CASE c.relpersistence
| WHEN 'u' THEN 'unlogged'
| WHEN 'p' THEN 'permanent'
| WHEN 't' THEN 'temporary'
| ELSE 'unknown'
| END AS table_type
|FROM
| pg_class c
|JOIN
| pg_namespace n ON n.oid = c.relnamespace
|WHERE
| c.relname = '$tableName'
| AND n.nspname = '$schemaName';
|
|""".stripMargin

var container: GenericContainer[_] = _

lazy val host = Option(container).map(_.getHost).getOrElse("localhost")
Expand Down Expand Up @@ -133,6 +155,43 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
ok
}

"create unlogged tables" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)

try {
val sft = SimpleFeatureTypes.renameSft(this.sft, "unlogged_test")
sft.getUserData.put(WalLogEnabled, "false")

ds.createSchema(sft)

val typeInfo: TypeInfo = TypeInfo(this.schema, sft)

Seq(
typeInfo.tables.mainPartitions.name.raw,
typeInfo.tables.writeAheadPartitions.name.raw,
// typeInfo.tables.writeAhead.name.raw, write ahead table is created with PartitionedPostgisDialect#encodePostCreateTable
// which doesnt have access to the user data, should be ok because the write ahead main table doesnt have any data
typeInfo.tables.spillPartitions.name.raw,
typeInfo.tables.analyzeQueue.name.raw,
typeInfo.tables.sortQueue.name.raw).forall { tableName =>
val sql = isTableLoggedQuery(tableName, "public")
// verify that the table is unlogged
WithClose(ds.asInstanceOf[JDBCDataStore].getConnection(Transaction.AUTO_COMMIT)) { cx =>
WithClose(cx.createStatement()) { st =>
WithClose(st.executeQuery(sql)) { rs =>
rs.next() must beTrue
logger.info(s"Table ${rs.getString("table_name")} is ${rs.getString("table_name")}")
rs.getString("table_type") mustEqual "unlogged"
}
}
}
}
} finally {
ds.dispose()
}
}

"work" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)
Expand Down

0 comments on commit 5ed290f

Please sign in to comment.