Skip to content

C2C via Kafka

Wiki Maintenance Bot edited this page Aug 29, 2024 · 19 revisions

CockroachDB to CockroachDB via Kafka

NOTE: This feature is in preview

Replicator may be configured to receive CockroachDB changefeed events via a Kafka broker.

A thorough understanding of CockroachDB CDC features and Kafka is a prerequisite for any operators deploying Replicator as a consumer of Kafka topics.

Overview

  • A source CRDB cluster emits changes in near-real time via the enterprise CHANGEFEED feature to a Kafka topic.
  • Replicator registers a consumer group to the topic. The partitions of all the topics are divided among the consumers in the group, which may run in separate Replicator processes. When Replicator processes are started/stopped, the partitions are re-assigned so that each process receives a proportional share of the partitions.
  • Replicator applies the changes to the target in immediate mode (additional modes will be available).
+------------------------+
|     source CRDB        |
|                        |
| CDC {source_table(s)}  |
+------------------------+
          |
          V
   kafka://broker:9092
+---------------------+
|     Kafka cluster   |
|                     |
+---------------------+
          Ʌ
          |
+---------------------+
|      Replicator     |
|    consumer-group   |
+---------------------+
          |
          V
     sql://target.db
+------------------------+
|       target           |
| {destination_table(s)} |
+------------------------+

Instructions

  1. In the source cluster, choose the table(s) that you would like to replicate.

  2. In the destination cluster, re-create those tables within a single SQL database schema and match the table definitions exactly:

    • Don't create any new constraints on any destination table(s).
    • It's imperative that the columns are named exactly the same in the destination table.
  3. Create the staging database _replicator in the target cluster.

    • It is recommended that you reduce the default GC time to five minutes, since these tables have a high volume of deletes.
    • ALTER DATABASE _replicator CONFIGURE ZONE USING gc.ttlseconds=300
  4. Start replicator

    ./replicator kafka --topic [source table] --broker broker:9092 --group [kafka group name] --targetSchema destination.public --targetConn 'postgresql://root@localhost:26257/?sslmode=disable'

    You can start multiple Replicator processes either on:

    • all nodes of the destination cluster
    • one or more servers that can have a low latency connection to the destination cluster

    Note that you need to have multiple partitions configured in the Kafka topics to allow all the Replicator process to consume events.

  5. Set the cluster setting of the source cluster to enable range feeds: SET CLUSTER SETTING kv.rangefeed.enabled = true

  6. Once it starts up, enable a cdc feed from the source cluster

    • CREATE CHANGEFEED FOR TABLE [source_table] INTO 'kafka://broker:9092' WITH updated, diff, resolved='1s', min_checkpoint_frequency='1s', kafka_sink_config='{"Flush":{"Bytes":1048576,"Frequency":"1s"}}'
    • by default, the table name is used as the topic name.
    • always use the options updated, resolved, min_checkpoint_frequency as these are required for timely replication.
    • The protect_data_from_gc_on_pause option is not required, but can be used in situations where a Kafka broker may be unavailable for periods longer than the source's garbage-collection window (25 hours by default).

Using Kafka events to recover a database

In addition to use Kafka events to stream events from one CockroachDB cluster to another cluster, we can leverage the events stored in a Kafka topic to recover a database in conjunction with a backup. Using the events stored in a Kafka broker will significantly reduce the RPO (the amount of data loss that occurs in a outage). This assumes that full and incremental backups are taking periodically, and that changefeeds are configured to send events to a Kafka cluster.

The recovery process consists of the following steps:

  • Restore the available backups.
  • Run the Replicator Kafka consumer specifying --minTimestamp as the timestamp of the latest backup that was used in the restore process above. The timestamp should be closest resolved timestamp before the start of the backup. This can be retrieved by looking at status of the job, and recording it before starting the backup.
select job_type, description, running_status from [show jobs] where status='running' and job_type='CHANGEFEED';

Sample output

-[ RECORD 1 ]
job_type       | CHANGEFEED
description    | CREATE CHANGEFEED FOR TABLE mytable INTO 'kafka://localhost:9092' WITH OPTIONS (diff, min_checkpoint_frequency = '1s', resolved = '1s', updated)
running_status | running: resolved=1714770174.737022000,0

Replicator will connect to the Kafka broker and determine the offset of the resolved timestamp event across all the partitions, and start reading the mutations to be applied to the destination database.

Starting a Kafka Consumer for replication

Usage:
  replicator kafka [flags]

Flags:
      --applyTimeout duration          the maximum amount of time to wait for an update to be applied (default 30s)
      --assumeIdempotent               disable the extra staging table queries that debounce non-idempotent redelivery in changefeeds
      --batchSize int                  messages to accumulate before committing to the target (default 100)
      --bestEffortOnly                 eventually-consistent mode; useful for high throughput, skew-tolerant schemas with FKs
      --bestEffortWindow duration      use an eventually-consistent mode for initial backfill or when replication is behind; 0 to disable (default 1h0m0s)
      --broker stringArray             address of Kafka broker(s)
      --dlqTableName ident             the name of a table in the target schema for storing dead-letter entries (default replicator_dlq)
      --flushPeriod duration           flush queued mutations after this duration (default 1s)
      --flushSize int                  ideal batch size to determine when to flush mutations (default 1000)
      --group string                   the Kafka consumer group id
      --immediate                      bypass staging tables and write directly to target; recommended only for KV-style workloads with no FKs
      --insecureSkipVerify             If true, disable client-side validation of responses
      --maxTimestamp string            only accept messages older than this timestamp; this is an exclusive upper limit
      --metricsAddr string             a host:port on which to serve metrics and diagnostics
      --minTimestamp string            only accept unprocessed messages at or newer than this timestamp; this is an inclusive lower limit
      --parallelism int                the number of concurrent database transactions to use (default 16)
      --quiescentPeriod duration       how often to retry deferred mutations (default 10s)
      --resolvedInterval duration      interval between two resolved timestamps.
                                       Only used when minTimestamp is specified.
                                       It serves as a hint to seek the offset of a resolved timestamp message
                                       that is strictly less than the minTimestamp in the Kafka feed.
                                       Note:
                                       The optimal value for resolvedInterval is the same as the resolved
                                       interval specified in the CREATE CHANGEFEED command.
                                       The resolved messages will not be emitted more frequently than
                                       the configured min_checkpoint_frequency specified in CREATE CHANGEFEED
                                       command (but may be emitted less frequently).
                                       Please see the CREATE CHANGEFEED documentation for details.
                                        (default 5s)
      --retireOffset duration          delay removal of applied mutations (default 24h0m0s)
      --saslClientID string            client ID for OAuth authentication from a third-party provider
      --saslClientSecret string        Client secret for OAuth authentication from a third-party provider
      --saslGrantType string           Override the default OAuth client credentials grant type for other implementations
      --saslMechanism string           Can be set to OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512, or PLAIN
      --saslPassword string            SASL password
      --saslScope stringArray          Scopes that the OAuth token should have access for.
      --saslTokenURL string            Client token URL for OAuth authentication from a third-party provider
      --saslUser string                SASL username
      --scanSize int                   the number of rows to retrieve from staging (default 10000)
      --stagingConn string             the staging database's connection string
      --stagingCreateSchema            automatically create the staging schema if it does not exist
      --stagingIdleTime duration       maximum lifetime of an idle connection (default 1m0s)
      --stagingJitterTime duration     the time over which to jitter database pool disconnections (default 15s)
      --stagingMaxLifetime duration    the maximum lifetime of a database connection (default 5m0s)
      --stagingMaxPoolSize int         the maximum number of staging database connections (default 128)
      --stagingSchema atom             a SQL database schema to store metadata in (default _replicator.public)
      --strategy string                Kafka consumer group re-balance strategy (default "sticky")
      --targetConn string              the target database's connection string
      --targetIdleTime duration        maximum lifetime of an idle connection (default 1m0s)
      --targetJitterTime duration      the time over which to jitter database pool disconnections (default 15s)
      --targetMaxLifetime duration     the maximum lifetime of a database connection (default 5m0s)
      --targetMaxPoolSize int          the maximum number of target database connections (default 128)
      --targetSchema atom              the SQL database schema in the target cluster to update
      --targetStatementCacheSize int   the maximum number of prepared statements to retain (default 128)
      --taskGracePeriod duration       how long to allow for task cleanup when recovering from errors (default 1m0s)
      --timestampLimit int             the maximum number of source timestamps to coalesce into a target transaction (default 1000)
      --tlsCACertificate string        the path of the base64-encoded CA file
      --tlsCertificate string          the path of the base64-encoded client certificate file
      --tlsPrivateKey string           the path of the base64-encoded client private key
      --topic stringArray              the topic(s) that the consumer should use
      --userscript string              the path to a configuration script, see userscript subcommand

Global Flags:
      --gracePeriod duration    allow background processes to exit (default 30s)
      --logDestination string   write logs to a file, instead of stdout
      --logFormat string        choose log output format [ fluent, text ] (default "text")
  -v, --verbose count           increase logging verbosity to debug; repeat for trace
Clone this wiki locally