Skip to content

C2C via Kafka

Wiki Maintenance Bot edited this page Oct 25, 2024 · 19 revisions

CockroachDB to CockroachDB via Kafka

NOTE: This feature is in preview

Replicator may be configured to receive CockroachDB changefeed events via a Kafka broker.

A thorough understanding of CockroachDB CDC features and Kafka is a prerequisite for any operators deploying Replicator as a consumer of Kafka topics.

Overview

  • A source CRDB cluster emits changes in near-real time via the enterprise CHANGEFEED feature to a Kafka topic.
  • Replicator registers a consumer group to the topic. The partitions of all the topics are divided among the consumers in the group, which may run in separate Replicator processes. When Replicator processes are started/stopped, the partitions are re-assigned so that each process receives a proportional share of the partitions.
  • Replicator applies the changes to the target in immediate mode (additional modes will be available).
+------------------------+
|     source CRDB        |
|                        |
| CDC {source_table(s)}  |
+------------------------+
          |
          V
   kafka://broker:9092
+---------------------+
|     Kafka cluster   |
|                     |
+---------------------+
          Ʌ
          |
+---------------------+
|      Replicator     |
|    consumer-group   |
+---------------------+
          |
          V
     sql://target.db
+------------------------+
|       target           |
| {destination_table(s)} |
+------------------------+

Instructions

  1. In the source cluster, choose the table(s) that you would like to replicate.

  2. In the destination cluster, re-create those tables within a single SQL database schema and match the table definitions exactly:

    • Don't create any new constraints on any destination table(s).
    • It's imperative that the columns are named exactly the same in the destination table.
  3. Create the staging database _replicator in the target cluster.

    • It is recommended that you reduce the default GC time to five minutes, since these tables have a high volume of deletes.
    • ALTER DATABASE _replicator CONFIGURE ZONE USING gc.ttlseconds=300
  4. Start replicator

    ./replicator kafka --topic [source table] --broker broker:9092 --group [kafka group name] --targetSchema destination.public --targetConn 'postgresql://root@localhost:26257/?sslmode=disable'

    You can start multiple Replicator processes either on:

    • all nodes of the destination cluster
    • one or more servers that can have a low latency connection to the destination cluster

    Note that you need to have multiple partitions configured in the Kafka topics to allow all the Replicator process to consume events.

  5. Set the cluster setting of the source cluster to enable range feeds: SET CLUSTER SETTING kv.rangefeed.enabled = true

  6. Once it starts up, enable a cdc feed from the source cluster

    • CREATE CHANGEFEED FOR TABLE [source_table] INTO 'kafka://broker:9092' WITH updated, diff, resolved='1s', min_checkpoint_frequency='1s', kafka_sink_config='{"Flush":{"Bytes":1048576,"Frequency":"1s"}}'
    • by default, the table name is used as the topic name.
    • always use the options updated, resolved, min_checkpoint_frequency as these are required for timely replication.
    • The protect_data_from_gc_on_pause option is not required, but can be used in situations where a Kafka broker may be unavailable for periods longer than the source's garbage-collection window (25 hours by default).

Using Kafka events to recover a database

In addition to use Kafka events to stream events from one CockroachDB cluster to another cluster, we can leverage the events stored in a Kafka topic to recover a database in conjunction with a backup. Using the events stored in a Kafka broker will significantly reduce the RPO (the amount of data loss that occurs in a outage). This assumes that full and incremental backups are taking periodically, and that changefeeds are configured to send events to a Kafka cluster.

The recovery process consists of the following steps:

  • Restore the available backups.
  • Run the Replicator Kafka consumer specifying --minTimestamp as the timestamp of the latest backup that was used in the restore process above. The timestamp should be closest resolved timestamp before the start of the backup. This can be retrieved by looking at status of the job, and recording it before starting the backup.
select job_type, description, running_status from [show jobs] where status='running' and job_type='CHANGEFEED';

Sample output

-[ RECORD 1 ]
job_type       | CHANGEFEED
description    | CREATE CHANGEFEED FOR TABLE mytable INTO 'kafka://localhost:9092' WITH OPTIONS (diff, min_checkpoint_frequency = '1s', resolved = '1s', updated)
running_status | running: resolved=1714770174.737022000,0

Replicator will connect to the Kafka broker and determine the offset of the resolved timestamp event across all the partitions, and start reading the mutations to be applied to the destination database.

Starting a Kafka Consumer for replication

Usage:
  replicator kafka [flags]

Flags:
      --applyTimeout duration             the maximum amount of time to wait for an update to be applied (default 30s)
      --assumeIdempotent                  disable the extra staging table queries that debounce non-idempotent redelivery in changefeeds
      --batchSize int                     messages to accumulate before committing to the target (default 100)
      --bestEffortOnly                    eventually-consistent mode; useful for high throughput, skew-tolerant schemas with FKs
      --bestEffortWindow duration         use an eventually-consistent mode for initial backfill or when replication is behind; 0 to disable (default 1h0m0s)
      --broker stringArray                address of Kafka broker(s)
      --disableCheckpointStream           disable cross-Replicator checkpoint notifications and rely only on polling
      --dlqTableName ident                the name of a table in the target schema for storing dead-letter entries (default replicator_dlq)
      --flushPeriod duration              flush queued mutations after this duration (default 1s)
      --flushSize int                     ideal batch size to determine when to flush mutations (default 1000)
      --group string                      the Kafka consumer group id
      --immediate                         bypass staging tables and write directly to target; recommended only for KV-style workloads with no FKs
      --insecureSkipVerify                If true, disable client-side validation of responses
      --limitLookahead int                limit number of checkpoints to be considered when computing the resolving range; may cause replication to stall completely if older mutations cannot be applied
      --maxTimestamp string               only accept messages older than this timestamp; this is an exclusive upper limit
      --metricsAddr string                a host:port on which to serve metrics and diagnostics
      --minTimestamp string               only accept unprocessed messages at or newer than this timestamp; this is an inclusive lower limit
      --parallelism int                   the number of concurrent database transactions to use (default 16)
      --quiescentPeriod duration          how often to retry deferred mutations (default 10s)
      --resolvedInterval duration         interval between two resolved timestamps.
                                          Only used when minTimestamp is specified.
                                          It serves as a hint to seek the offset of a resolved timestamp message
                                          that is strictly less than the minTimestamp in the Kafka feed.
                                          Note:
                                          The optimal value for resolvedInterval is the same as the resolved
                                          interval specified in the CREATE CHANGEFEED command.
                                          The resolved messages will not be emitted more frequently than
                                          the configured min_checkpoint_frequency specified in CREATE CHANGEFEED
                                          command (but may be emitted less frequently).
                                          Please see the CREATE CHANGEFEED documentation for details.
                                           (default 5s)
      --retireOffset duration             delay removal of applied mutations (default 24h0m0s)
      --saslClientID string               client ID for OAuth authentication from a third-party provider
      --saslClientSecret string           Client secret for OAuth authentication from a third-party provider
      --saslGrantType string              Override the default OAuth client credentials grant type for other implementations
      --saslMechanism string              Can be set to OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512, or PLAIN
      --saslPassword string               SASL password
      --saslScope stringArray             Scopes that the OAuth token should have access for.
      --saslTokenURL string               Client token URL for OAuth authentication from a third-party provider
      --saslUser string                   SASL username
      --scanSize int                      the number of rows to retrieve from staging (default 10000)
      --schemaRefresh duration            controls how often a watcher will refresh its schema. If this value is zero or negative, refresh behavior will be disabled. (default 1m0s)
      --stageMarkAppliedLimit int         limit the number of mutations to be marked applied in a single statement (default 100000)
      --stageSanityCheckPeriod duration   how often to validate staging table apply order (-1 to disable) (default 10m0s)
      --stageSanityCheckWindow duration   how far back to look when validating staging table apply order (default 1h0m0s)
      --stageUnappliedPeriod duration     how often to report the number of unapplied mutations in staging tables (-1 to disable) (default 1m0s)
      --stagingConn string                the staging database's connection string
      --stagingCreateSchema               automatically create the staging schema if it does not exist
      --stagingIdleTime duration          maximum lifetime of an idle connection (default 1m0s)
      --stagingJitterTime duration        the time over which to jitter database pool disconnections (default 15s)
      --stagingMaxLifetime duration       the maximum lifetime of a database connection (default 5m0s)
      --stagingMaxPoolSize int            the maximum number of staging database connections (default 128)
      --stagingSchema atom                a SQL database schema to store metadata in (default _replicator.public)
      --strategy string                   Kafka consumer group re-balance strategy (default "sticky")
      --targetConn string                 the target database's connection string
      --targetIdleTime duration           maximum lifetime of an idle connection (default 1m0s)
      --targetJitterTime duration         the time over which to jitter database pool disconnections (default 15s)
      --targetMaxLifetime duration        the maximum lifetime of a database connection (default 5m0s)
      --targetMaxPoolSize int             the maximum number of target database connections (default 128)
      --targetSchema atom                 the SQL database schema in the target cluster to update
      --targetStatementCacheSize int      the maximum number of prepared statements to retain (default 128)
      --taskGracePeriod duration          how long to allow for task cleanup when recovering from errors (default 1m0s)
      --timestampLimit int                the maximum number of source timestamps to coalesce into a target transaction (default 1000)
      --tlsCACertificate string           the path of the base64-encoded CA file
      --tlsCertificate string             the path of the base64-encoded client certificate file
      --tlsPrivateKey string              the path of the base64-encoded client private key
      --topic stringArray                 the topic(s) that the consumer should use
      --userscript string                 the path to a configuration script, see userscript subcommand

Global Flags:
      --gracePeriod duration    allow background processes to exit (default 30s)
      --logDestination string   write logs to a file, instead of stdout
      --logFormat string        choose log output format [ fluent, text ] (default "text")
  -v, --verbose count           increase logging verbosity to debug; repeat for trace
Clone this wiki locally