Skip to content

Commit

Permalink
GEOMESA-3423 Postgis - Add unlogged table option (#3244)
Browse files Browse the repository at this point in the history
  • Loading branch information
autodidacticon authored Dec 10, 2024
1 parent b4baebe commit f8b6dc7
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 10 deletions.
11 changes: 11 additions & 0 deletions docs/user/postgis/index_config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,14 @@ for each query, moving data out of it faster may improve performance.
After the schema has been created, changes to the schedule can be made through the
:ref:`postgis_cli_update_schema` command.

Configuring WAL logging
-----------------------

PostgreSQL uses a write-ahead log (WAL) to ensure data consistency and durability. By default, the WAL is written
for all changes to the database, including the partitioned tables. Disabling the WAL for the partitioned tables
can significantly improve write performance, but at the cost of data durability. If increased performance is desired,
the WAL can be disabled for the partitioned tables by setting the key ``pg.wal.enabled`` to ``false``.

See the PostgreSQL `documentation <https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-UNLOGGED>`_
for more information on the implications of disabling the WAL.
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,9 @@ object PartitionedPostgisDialect {
val WriteAheadPartitionsTableSpace = "pg.partitions.tablespace.wa-partitions"
val MainTableSpace = "pg.partitions.tablespace.main"

// set postgres table wal logging
val WalLogEnabled = "pg.wal.enabled"

implicit class ConfigConversions(val sft: SimpleFeatureType) extends AnyVal {
def getIntervalHours: Int = Option(sft.getUserData.get(IntervalHours)).map(int).getOrElse(6)
def getMaxPartitions: Option[Int] = Option(sft.getUserData.get(MaxPartitions)).map(int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,13 @@ package object dialect {
tables: Tables,
cols: Columns,
partitions: PartitionInfo,
userData: Map[String, String] = Map.empty)
userData: Map[String, String] = Map.empty) {
val walLogSQL: String = if (userData.getOrElse(Config.WalLogEnabled, "true").toBoolean) {
""
} else {
" UNLOGGED "
}
}

object TypeInfo {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ object CompactPartitions extends SqlProcedure {
| -- lock the child table to prevent any inserts that would be lost
| EXECUTE 'LOCK TABLE ${info.schema.quoted}.' || quote_ident(spill_partition) ||
| ' IN SHARE ROW EXCLUSIVE MODE';
| EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') ||
| partition_tablespace || ' AS SELECT * FROM' ||
| ' (SELECT * FROM ${info.schema.quoted}.' || quote_ident(partition_name) ||
| ' UNION ALL SELECT * FROM ${info.schema.quoted}.' || quote_ident(spill_partition) ||
| ') results' ||
| ' ORDER BY _st_sortablehash($geomCol)';
| GET DIAGNOSTICS unsorted_count := ROW_COUNT;
| ELSE
| EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name || '_tmp_sort') ||
| partition_tablespace || ' AS SELECT * FROM ' || quote_ident(partition_name) ||
| ' ORDER BY _st_sortablehash($geomCol)';
| GET DIAGNOSTICS unsorted_count := ROW_COUNT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object MergeWriteAheadPartitions extends SqlProcedure {
| -- use "create table as" (vs create then insert) for performance benefits related to WAL skipping
| -- we need a "select distinct" to avoid primary key conflicts - this should be fairly cheap since
| -- we're already sorting and there should be few or no conflicts
| EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name) ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name) ||
| partition_tablespace || ' AS SELECT DISTINCT ON' ||
| ' (_st_sortablehash($geomCol), fid, ${info.cols.dtg.quoted}) * FROM ' ||
| quote_ident(partition_name || '_tmp_migrate') || ' ORDER BY _st_sortablehash($geomCol)';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object PartitionWriteAheadLog extends SqlProcedure {
| RAISE INFO '% Creating partition with insert % (unattached)', timeofday()::timestamp, partition_name;
| -- upper bounds are exclusive
| -- use "create table as" (vs create then insert) for performance benefits related to WAL skipping
| EXECUTE 'CREATE TABLE ${info.schema.quoted}.' || quote_ident(partition_name) ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE ${info.schema.quoted}.' || quote_ident(partition_name) ||
| partition_tablespace || ' AS SELECT * FROM ' || quote_ident(write_ahead.name) ||
| ' WHERE $dtgCol >= ' || quote_literal(partition_start) ||
| ' AND $dtgCol < ' || quote_literal(partition_end) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object RollWriteAheadLog extends SqlProcedure with CronSchedule {
| END IF;
|
| -- requires SHARE UPDATE EXCLUSIVE
| EXECUTE 'CREATE TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(next_partition) || '(' ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(next_partition) || '(' ||
| 'CONSTRAINT ' || quote_ident(next_partition || '_pkey') ||
| ' PRIMARY KEY (fid, ${info.cols.dtg.quoted})' || index_space || ')' ||
| ' INHERITS (${table.name.qualified})${table.storage.opts}' || partition_tablespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object AnalyzeQueueTable extends SqlStatements {

override protected def createStatements(info: TypeInfo): Seq[String] = {
val create =
s"""CREATE TABLE IF NOT EXISTS ${info.tables.analyzeQueue.name.qualified} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.tables.analyzeQueue.name.qualified} (
| partition_name text,
| enqueued timestamp without time zone
|);""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object PartitionTables extends SqlStatements {
case Some(ts) => (s" TABLESPACE ${ts.quoted}", s" USING INDEX TABLESPACE ${ts.quoted}")
}
val create =
s"""CREATE TABLE IF NOT EXISTS ${table.name.qualified} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${table.name.qualified} (
| LIKE ${info.tables.writeAhead.name.qualified} INCLUDING DEFAULTS INCLUDING CONSTRAINTS,
| CONSTRAINT ${escape(table.name.raw, "pkey")} PRIMARY KEY (fid, ${info.cols.dtg.quoted})$indexTs
|) PARTITION BY RANGE(${info.cols.dtg.quoted})$tableTs;""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object SortQueueTable extends SqlStatements {

override protected def createStatements(info: TypeInfo): Seq[String] = {
val create =
s"""CREATE TABLE IF NOT EXISTS ${info.tables.sortQueue.name.qualified} (
s"""CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.tables.sortQueue.name.qualified} (
| partition_name text,
| unsorted_count bigint,
| enqueued timestamp without time zone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object WriteAheadTable extends SqlStatements {
| WHERE type_name = ${literal(info.typeName)} INTO seq_val;
| partition := ${literal(table.name.raw + "_")} || lpad(seq_val::text, 3, '0');
|
| EXECUTE 'CREATE TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(partition) || '(' ||
| EXECUTE 'CREATE ${info.walLogSQL} TABLE IF NOT EXISTS ${info.schema.quoted}.' || quote_ident(partition) || '(' ||
| 'CONSTRAINT ' || quote_ident(partition || '_pkey') ||
| ' PRIMARY KEY (fid, ${info.cols.dtg.quoted})$indexTs ' ||
| ') INHERITS (${table.name.qualified})${table.storage.opts}$tableTs';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.geotools.jdbc.JDBCDataStore
import org.geotools.referencing.CRS
import org.junit.runner.RunWith
import org.locationtech.geomesa.filter.FilterHelper
import org.locationtech.geomesa.gt.partition.postgis.dialect.PartitionedPostgisDialect.Config.WalLogEnabled
import org.locationtech.geomesa.gt.partition.postgis.dialect.procedures.{DropAgedOffPartitions, PartitionMaintenance, RollWriteAheadLog}
import org.locationtech.geomesa.gt.partition.postgis.dialect.tables.{PartitionTablespacesTable, PrimaryKeyTable, SequenceTable, UserDataTable}
import org.locationtech.geomesa.gt.partition.postgis.dialect.{PartitionedPostgisDialect, PartitionedPostgisPsDialect, TableConfig, TypeInfo}
Expand Down Expand Up @@ -88,6 +89,27 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
"preparedStatements" -> "true"
)

def isTableLoggedQuery(tableName: String, schemaName: String): String =
s"""
|SELECT
| n.nspname AS schema_name,
| c.relname AS table_name,
| CASE c.relpersistence
| WHEN 'u' THEN 'unlogged'
| WHEN 'p' THEN 'permanent'
| WHEN 't' THEN 'temporary'
| ELSE 'unknown'
| END AS table_type
|FROM
| pg_class c
|JOIN
| pg_namespace n ON n.oid = c.relnamespace
|WHERE
| c.relname = '$tableName'
| AND n.nspname = '$schemaName';
|
|""".stripMargin

var container: GenericContainer[_] = _

lazy val host = Option(container).map(_.getHost).getOrElse("localhost")
Expand Down Expand Up @@ -133,6 +155,77 @@ class PartitionedPostgisDataStoreTest extends Specification with BeforeAfterAll
ok
}

"create logged tables" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)

try {
val sft = SimpleFeatureTypes.renameSft(this.sft, "logged_test")

ds.createSchema(sft)

val typeInfo: TypeInfo = TypeInfo(this.schema, sft)

Seq(
typeInfo.tables.mainPartitions.name.raw,
typeInfo.tables.writeAheadPartitions.name.raw,
typeInfo.tables.spillPartitions.name.raw,
typeInfo.tables.analyzeQueue.name.raw,
typeInfo.tables.sortQueue.name.raw).forall { tableName =>
val sql = isTableLoggedQuery(tableName, "public")
// verify that the table is logged
WithClose(ds.asInstanceOf[JDBCDataStore].getConnection(Transaction.AUTO_COMMIT)) { cx =>
WithClose(cx.createStatement()) { st =>
WithClose(st.executeQuery(sql)) { rs =>
rs.next() must beTrue
logger.info(s"Table ${rs.getString("table_name")} is ${rs.getString("table_name")}")
rs.getString("table_type") mustEqual "permanent"
}
}
}
}
} finally {
ds.dispose()
}
}

"create unlogged tables" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)

try {
val sft = SimpleFeatureTypes.renameSft(this.sft, "unlogged_test")
sft.getUserData.put(WalLogEnabled, "false")

ds.createSchema(sft)

val typeInfo: TypeInfo = TypeInfo(this.schema, sft)

Seq(
typeInfo.tables.mainPartitions.name.raw,
typeInfo.tables.writeAheadPartitions.name.raw,
// typeInfo.tables.writeAhead.name.raw, write ahead table is created with PartitionedPostgisDialect#encodePostCreateTable
// which doesnt have access to the user data, should be ok because the write ahead main table doesnt have any data
typeInfo.tables.spillPartitions.name.raw,
typeInfo.tables.analyzeQueue.name.raw,
typeInfo.tables.sortQueue.name.raw).forall { tableName =>
val sql = isTableLoggedQuery(tableName, "public")
// verify that the table is unlogged
WithClose(ds.asInstanceOf[JDBCDataStore].getConnection(Transaction.AUTO_COMMIT)) { cx =>
WithClose(cx.createStatement()) { st =>
WithClose(st.executeQuery(sql)) { rs =>
rs.next() must beTrue
logger.info(s"Table ${rs.getString("table_name")} is ${rs.getString("table_name")}")
rs.getString("table_type") mustEqual "unlogged"
}
}
}
}
} finally {
ds.dispose()
}
}

"work" in {
val ds = DataStoreFinder.getDataStore(params.asJava)
ds must not(beNull)
Expand Down

0 comments on commit f8b6dc7

Please sign in to comment.