-
Notifications
You must be signed in to change notification settings - Fork 24
CDC via S3
NOTE: This feature is in preview
Replicator may be configured to receive CockroachDB changefeed events via an Object Store, like S3.
A thorough understanding of CockroachDB CDC features is a prerequisite for any operators deploying Replicator as a consumer of an S3 bucket.
- A source CRDB cluster emits changes in near-real time via the enterprise
CHANGEFEED
feature to S3 cloud storage sink. - Replicator connects to the S3 bucket and polls for new objects that are created in the bucket.
- Replicator applies the changes to the target.
+------------------------+
| source CRDB |
| |
| CDC {source_table(s)} |
+------------------------+
|
V
s3://bucket
+---------------------+
| S3 Bucket |
| |
+---------------------+
Ʌ
|
+---------------------+
| Replicator |
| S3 Consumer |
+---------------------+
|
V
sql://target.db
+------------------------+
| target |
| {destination_table(s)} |
+------------------------+
-
In the source cluster, choose the table(s) that you would like to replicate.
-
In the destination cluster, re-create those tables within a single SQL database schema and match the table definitions exactly:
- Don't create any new constraints on any destination table(s).
- It's imperative that the columns are named exactly the same in the destination table.
-
Create the staging database
_replicator
in the target cluster.- It is recommended that you reduce the default GC time to five minutes, since these tables have a high volume of deletes.
ALTER DATABASE _replicator CONFIGURE ZONE USING gc.ttlseconds=300
-
Start
replicator
./replicator objstore --storageURL "s3://bucket?AWS_ACCESS_KEY_ID=....&AWS_SECRET_ACCESS_KEY=... --targetSchema destination.public --targetConn 'postgresql://root@localhost:26257/?sslmode=disable'
The credentials may be specified in different ways:
- In the URL, as in the example above.
- In the environment, as exported variables.
- In a credentials file (e.g. $HOME/.aws/credentials)
- If running on a EC2 instance, in the instance metadata.
In the initial version, if multiple replicator processes are started, only one will be actively retrieving objects from cloud storage. If the process fail, the retrieval of objects will failover to another running replicators automatically.
-
Set the cluster setting of the source cluster to enable range feeds:
SET CLUSTER SETTING kv.rangefeed.enabled = true
-
Create a cdc feed from the source cluster
-
CREATE CHANGEFEED FOR TABLE [source_table]
INTO 's3://bucketname?AWS_ACCESS_KEY_ID=...&AWS_SECRET_ACCESS_KEY=...' WITH updated, diff, resolved='5s', min_checkpoint_frequency='5s'
- always use the options
updated
,resolved
,min_checkpoint_frequency
as these are required for timely replication. - The
protect_data_from_gc_on_pause
option is not required, but can be used in situations where the cloud storage is unavailable for periods longer than the source's garbage-collection window (25 hours by default).
-
In addition to use an S3 bucket to replicate changefeeds from one CockroachDB cluster to another cluster, we can leverage the events stored in a S3 bucket to recover a database in conjunction with a backup. Using the events stored in a bucket will significantly reduce the RPO (the amount of data loss that occurs in a outage). This assumes that full and incremental backups are taking periodically, and that changefeeds are configured to send events to a cloud storage sink.
The recovery process consists of the following steps:
- Restore the available backups.
- Run the Replicator object store consumer specifying
--minTimestamp
as the timestamp of the latest backup that was used in the restore process above. The timestamp should be closest resolved timestamp before the start of the backup. This can be retrieved by running the statement below, before the backup has been started.
select cluster_logical_timestamp ();
Sample output
cluster_logical_timestamp
----------------------------------
1721914049902597000.0000000000
Replicator will connect to the S3 bucket, find all the objects that have been written to the bucket after the specified timestamp.
Usage:
replicator objstore [flags]
Flags:
--applyTimeout duration the maximum amount of time to wait for an update to be applied (default 30s)
--bestEffortOnly eventually-consistent mode; useful for high throughput, skew-tolerant schemas with FKs
--bestEffortWindow duration use an eventually-consistent mode for initial backfill or when replication is behind; 0 to disable (default 1h0m0s)
--bufferSize int buffer size for the ndjson parser (default 65536)
--disableCheckpointStream disable cross-Replicator checkpoint notifications and rely only on polling
--dlqTableName ident the name of a table in the target schema for storing dead-letter entries (default replicator_dlq)
--fetchDelay duration time to wait between fetching the list of entries in a bucket (default 100ms)
--flushPeriod duration flush queued mutations after this duration (default 1s)
--flushSize int ideal batch size to determine when to flush mutations (default 1000)
--immediate bypass staging tables and write directly to target; recommended only for KV-style workloads with no FKs
--insecureSkipVerify If true, disable client-side validation of responses
--limitLookahead int limit number of checkpoints to be considered when computing the resolving range; may cause replication to stall completely if older mutations cannot be applied
--maxTimestamp hlc.Time only accept messages older than this timestamp; this is an exclusive upper limit.
The timestamp must be provided in the HLC timestamp format, as returned by
cluster_logical_timestamp(). (default 0.0000000000)
--metricsAddr string a host:port on which to serve metrics and diagnostics
--minTimestamp hlc.Time "only accept unprocessed messages at or newer than this timestamp; this is an inclusive lower limit.
The timestamp must be provided in the HLC timestamp format, as returned by
cluster_logical_timestamp(). (default 0.0000000000)
--parallelism int the number of concurrent database transactions to use (default 16)
--partitionFormat objstore.PartitionFormat how changefeed file paths are partitioned: Daily, Flat, Hourly (default Daily)
--quiescentPeriod duration how often to retry deferred mutations (default 10s)
--retireOffset duration delay removal of applied mutations (default 24h0m0s)
--retryInitial duration initial time to wait before retrying an operation that failed because of a transient error (default 10ms)
--retryMax duration maximum time allowed for retrying an operation that failed because of a transient error (default 10s)
--scanSize int the number of rows to retrieve from staging (default 10000)
--schemaRefresh duration controls how often a watcher will refresh its schema. If this value is zero or negative, refresh behavior will be disabled. (default 1m0s)
--stageMarkAppliedLimit int limit the number of mutations to be marked applied in a single statement (default 100000)
--stageSanityCheckPeriod duration how often to validate staging table apply order (-1 to disable) (default 10m0s)
--stageSanityCheckWindow duration how far back to look when validating staging table apply order (default 1h0m0s)
--stageUnappliedPeriod duration how often to report the number of unapplied mutations in staging tables (-1 to disable) (default 1m0s)
--stagingConn string the staging database's connection string
--stagingCreateSchema automatically create the staging schema if it does not exist
--stagingIdleTime duration maximum lifetime of an idle connection (default 1m0s)
--stagingJitterTime duration the time over which to jitter database pool disconnections (default 15s)
--stagingMaxLifetime duration the maximum lifetime of a database connection (default 5m0s)
--stagingMaxPoolSize int the maximum number of staging database connections (default 128)
--stagingSchema atom a SQL database schema to store metadata in (default _replicator.public)
--storageURL string the URL to access the storage
--targetConn string the target database's connection string
--targetIdleTime duration maximum lifetime of an idle connection (default 1m0s)
--targetJitterTime duration the time over which to jitter database pool disconnections (default 15s)
--targetMaxLifetime duration the maximum lifetime of a database connection (default 5m0s)
--targetMaxPoolSize int the maximum number of target database connections (default 128)
--targetSchema atom the SQL database schema in the target cluster to update
--targetStatementCacheSize int the maximum number of prepared statements to retain (default 128)
--taskGracePeriod duration how long to allow for task cleanup when recovering from errors (default 1m0s)
--timestampLimit int the maximum number of source timestamps to coalesce into a target transaction (default 1000)
--tlsCACertificate string the path of the base64-encoded CA file
--tlsCertificate string the path of the base64-encoded client certificate file
--tlsPrivateKey string the path of the base64-encoded client private key
--userscript string the path to a configuration script, see userscript subcommand
--workers int maximum number of workers to process mutation files (default 4)
Global Flags:
--gracePeriod duration allow background processes to exit (default 30s)
--logDestination string write logs to a file, instead of stdout
--logFormat string choose log output format [ fluent, text ] (default "text")
-v, --verbose count increase logging verbosity to debug; repeat for trace