Skip to content

PGLogical

Wiki Maintenance Bot edited this page Oct 25, 2024 · 26 revisions

PostgreSQL Logical Replication

An alternate application of Replicator is to connect to a PostgreSQL-compatible database instance to consume a logical replication feed. This is primarily intended for migration use-cases, in which it is desirable to have a minimum- or zero-downtime migration from PostgreSQL to CockroachDB.

Usage:
  replicator pglogical [flags]

Flags:
      --applyTimeout duration             the maximum amount of time to wait for an update to be applied (default 30s)
      --dlqTableName ident                the name of a table in the target schema for storing dead-letter entries (default replicator_dlq)
      --flushPeriod duration              flush queued mutations after this duration (default 1s)
      --flushSize int                     ideal batch size to determine when to flush mutations (default 1000)
      --metricsAddr string                a host:port on which to serve metrics and diagnostics
      --parallelism int                   the number of concurrent database transactions to use (default 16)
      --publicationName string            the publication within the source database to replicate
      --quiescentPeriod duration          how often to retry deferred mutations (default 10s)
      --retireOffset duration             delay removal of applied mutations (default 24h0m0s)
      --scanSize int                      the number of rows to retrieve from staging (default 10000)
      --schemaRefresh duration            controls how often a watcher will refresh its schema. If this value is zero or negative, refresh behavior will be disabled. (default 1m0s)
      --slotName string                   the replication slot in the source database (default "replicator")
      --sourceConn string                 the source database's connection string
      --stageMarkAppliedLimit int         limit the number of mutations to be marked applied in a single statement (default 100000)
      --stageSanityCheckPeriod duration   how often to validate staging table apply order (-1 to disable) (default 10m0s)
      --stageSanityCheckWindow duration   how far back to look when validating staging table apply order (default 1h0m0s)
      --stageUnappliedPeriod duration     how often to report the number of unapplied mutations in staging tables (-1 to disable) (default 1m0s)
      --stagingConn string                the staging database's connection string
      --stagingCreateSchema               automatically create the staging schema if it does not exist
      --stagingIdleTime duration          maximum lifetime of an idle connection (default 1m0s)
      --stagingJitterTime duration        the time over which to jitter database pool disconnections (default 15s)
      --stagingMaxLifetime duration       the maximum lifetime of a database connection (default 5m0s)
      --stagingMaxPoolSize int            the maximum number of staging database connections (default 128)
      --stagingSchema atom                a SQL database schema to store metadata in (default _replicator.public)
      --standbyTimeout duration           how often to report WAL progress to the source server (default 5s)
      --targetConn string                 the target database's connection string
      --targetIdleTime duration           maximum lifetime of an idle connection (default 1m0s)
      --targetJitterTime duration         the time over which to jitter database pool disconnections (default 15s)
      --targetMaxLifetime duration        the maximum lifetime of a database connection (default 5m0s)
      --targetMaxPoolSize int             the maximum number of target database connections (default 128)
      --targetSchema atom                 the SQL database schema in the target cluster to update
      --targetStatementCacheSize int      the maximum number of prepared statements to retain (default 128)
      --taskGracePeriod duration          how long to allow for task cleanup when recovering from errors (default 1m0s)
      --timestampLimit int                the maximum number of source timestamps to coalesce into a target transaction (default 1000)
      --userscript string                 the path to a configuration script, see userscript subcommand

Global Flags:
      --gracePeriod duration    allow background processes to exit (default 30s)
      --logDestination string   write logs to a file, instead of stdout
      --logFormat string        choose log output format [ fluent, text ] (default "text")
  -v, --verbose count           increase logging verbosity to debug; repeat for trace

The theory of operation is similar to the standard use case, the only difference is that Replicator connects to the source database to receive a replication feed, rather than act as the target for a webhook.

Postgres Replication Setup

  • A review of PostgreSQL logical replication will be useful to establish the limitations of logical replication.
  • Run CREATE PUBLICATION my_pub FOR ALL TABLES; in the source database. The name my_pub can be changed as desired and must be provided to the --publicationName flag. Specifying only a subset of tables in the source database is possible.
  • Run SELECT pg_create_logical_replication_slot('replicator', 'pgoutput'); in the source database. The value replicator may be changed and should be passed to the --slotName flag.
  • Run SELECT pg_export_snapshot(); to create a consistent point for bulk data export. Leave the source PosgreSQL session open. (Snapshot ids are valid only for the lifetime of the session which created them.) The value returned from this function can be passed to pg_dump --snapshot <snapshot_id> that is subsequently IMPORTed into CockroachDB or used with SET TRANSACTION SNAPSHOT 'snapshot_id' if migrating data using a SQL client.
  • Complete the bulk data migration before continuing.
  • Run CREATE DATABASE IF NOT EXISTS _replicator; in the target cluster to create a staging arena.
  • Run replicator pglogical with at least the --publicationName, --sourceConn, --targetConn, and --targetDB flags after the bulk data migration has been completed. This will catch up with all database mutations that have occurred since the replication slot was created.

If you pass --metricsAddr 127.0.0.1:13013, a Prometheus-compatible HTTP endpoint will be available at /_/varz. A trivial health-check endpoint is also available at /_/healthz.

To clean up from the above:

SELECT pg_drop_replication_slot('cdc_sink');`
DROP PUBLICATION my_pub;`

Additional Information

Clone this wiki locally