diff --git a/website/docs/03-Usage Docs/05-flink-cdc-sync.md b/website/docs/03-Usage Docs/05-flink-cdc-sync.md index b5b460ef3..dc4d25b2c 100644 --- a/website/docs/03-Usage Docs/05-flink-cdc-sync.md +++ b/website/docs/03-Usage Docs/05-flink-cdc-sync.md @@ -175,29 +175,77 @@ For Postgresql configuration,please refer to https://ververica.github.io/flink- --flink.savepoint s3://bucket/lakesoul/flink/savepoints ``` -For mongoDB, the following additinal parameters need to be configured +For MongoDB, the following additional parameters need to be configured | Parameter | Required | Meaning Description | Parameter Filling Format | |-------------|----------|-----------------------|----------------------| | --batchSize | optional | Get data size for each batch| --batchSize 1024 | + +To support mongodb Full ChangeLog,You need to do the following configuration : +1 MongoDB version must be 6.0 or abouve; +2 Enable preAndPostImages feature at the databases level: + +```bash +db.runCommand({ + setClusterParameter: { + changeStreamOptions: { + preAndPostImages: { + expireAfterSeconds: 'off' // replace with custom image expiration time + } + } + } +}) +``` +3 Enable changeStreamPreAndPostImages feature for collections to be monitored: + +```bash +db.runCommand({ + collMod: "<< collection name >>", + changeStreamPreAndPostImages: { + enabled: true + } +}) +``` +For more Mongodb configuration,please refer to https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/cdc-connectors/mongodb-cdc/ + ```bash ./bin/flink run -c org.apache.flink.lakesoul.entry.JdbcCDC \ lakesoul-flink-2.4.0-flink-1.17-SNAPSHOT.jar \ --source_db.db_name "cdc" \ --source_db.user "flinkuser" \ + --source_db.password "flinkpw" \ --source.parallelism 1 \ --mongodb_database cdc \ --server_time_zone Asia/Shanghai \ --source_db.db_type "mongodb" \ - --source_db.password "flinkpw" \ - --source_db.host "172.18.0.2:27017" \ - --source_db.schema_tables "cdc.bincoll" \ + --source_db.host "localhost:27017" \ + --source_db.schema_tables "cdc.test" \ --sink.parallelism 1 \ --job.checkpoint_interval 1000 \ - --flink.checkpoint "file:/tmp/data/lakesoul/227/" \ - --warehouse_path "file:/home/cyh/data/lakesoul/mongo" + --flink.checkpoint "file:/tmp/data/lakesoul/mongodb" \ + --warehouse_path "file:/home/cyh/data/lakesoul/mongodb" ``` +MongoDB CDC usage instructions: +1 The MongoDB table should contain the primary key field represented by "_id" +2 +The data type of the same field should be consistent, and the field value is allowed to be null. The following situations are allowed: +````bash +[ + { _id: 1, name: 'Bob', age: null }, + { _id: 2, name: 'Tom', age: 18 }, + { _id: 3, name: 'Sam'} +] + +```` +The following situations are not allowed: +When a field has inconsistent data types for the same field before and after. +````bash +[ + { _id: 1, col: 'word' }, + { _id: 2, col: 12 } +] +```` ## LakeSoul Flink CDC Sink job execution process In the initialization phase after the LakeSoul Flink job starts, it will first read all the tables in the configured MySQL DB (excluding tables that do not need to be synchronized). For each table, first determine whether it exists in LakeSoul. If it does not exist, a LakeSoul table is automatically created, and its schema is consistent with the corresponding table in MySQL. @@ -286,7 +334,7 @@ Type mapping relationship between Oracle and LakeSoul | CHAR(n)
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
NCLOB
XMLType
SYS.XMLTYPE | org.apache.spark.sql.types.DataTypes.StringType | | BLOB | org.apache.spark.sql.types.DataTypes.BinaryType | -Type mapping relationship between Oracle and Lakesoul +Type mapping relationship between MongoDB and Lakesoul | STRING | org.apache.spark.sql.types.DataTypes.StringTypes | |-------------|----------------------------------------| diff --git a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/05-flink-cdc-sync.md b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/05-flink-cdc-sync.md index 23dc34b59..5d606d31c 100644 --- a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/05-flink-cdc-sync.md +++ b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/05-flink-cdc-sync.md @@ -182,9 +182,58 @@ https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/ --flink.checkpoint s3://bucket/lakesoul/flink/checkpoints \ --flink.savepoint s3://bucket/lakesoul/flink/savepoints ``` +对于MongoDB需要额外配置以下参数: + +| Parameter | Required | Meaning Description | Parameter Filling Format | +|-------------|----------|-----------------------|----------------------| +| --batchSize | optional | Get data size for each batch| --batchSize 1024 | +同步mongodb作业实例 +为了支持mongodb 的Full ChangeLog,你需要做如下配置 : +1 MongoDB的版本必须在6.0及以上 +2 在数据库级别启用 preAndPostImages 功能 +```bash +db.runCommand({ + setClusterParameter: { + changeStreamOptions: { + preAndPostImages: { + expireAfterSeconds: 'off' // replace with custom image expiration time + } + } + } +}) +``` +3 +为要监控的集合启用changeStreamPreAndPostImages功能: + +```bash +db.runCommand({ + collMod: "<< collection name >>", + changeStreamPreAndPostImages: { + enabled: true + } +}) +``` +更多的MongoDB配置,请参考:https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/cdc-connectors/mongodb-cdc/ ## LakeSoul Flink CDC Sink 作业执行流程 +```bash +./bin/flink run -c org.apache.flink.lakesoul.entry.JdbcCDC \ + lakesoul-flink-2.4.0-flink-1.17-SNAPSHOT.jar \ + --source_db.db_name "cdc" \ + --source_db.user "flinkuser" \ + --source_db.password "flinkpw" \ + --source.parallelism 1 \ + --mongodb_database cdc \ + --server_time_zone Asia/Shanghai \ + --source_db.db_type "mongodb" \ + --source_db.host "localhost:27017" \ + --source_db.schema_tables "cdc.test" \ + --sink.parallelism 1 \ + --job.checkpoint_interval 1000 \ + --flink.checkpoint "file:/tmp/data/lakesoul/mongodb" \ + --warehouse_path "file:/home/cyh/data/lakesoul/mongodb" +``` LakeSoul Flink 作业启动后初始化阶段,首先会读取配置的 MySQL DB 中的所有表(排除掉不需要同步的表)。对每一个表,首先判断在 LakeSoul 中是否存在,如果不存在则自动创建一个 LakeSoul 表,其 Schema 与 MySQL 对应表一致。 完成初始化后,会读取所有表的 CDC Stream,以 Upsert 的方式写入到对应的各个 LakeSoul 表中。