Skip to content

Commit

Permalink
Supplementary mongodb lake documentation
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYunHey <[email protected]>
  • Loading branch information
ChenYunHey committed Apr 9, 2024
1 parent 058301c commit 269d8a9
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 7 deletions.
62 changes: 55 additions & 7 deletions website/docs/03-Usage Docs/05-flink-cdc-sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,29 +175,77 @@ For Postgresql configuration,please refer to https://ververica.github.io/flink-
--flink.savepoint s3://bucket/lakesoul/flink/savepoints
```

For mongoDB, the following additinal parameters need to be configured
For MongoDB, the following additional parameters need to be configured

| Parameter | Required | Meaning Description | Parameter Filling Format |
|-------------|----------|-----------------------|----------------------|
| --batchSize | optional | Get data size for each batch| --batchSize 1024 |


To support mongodb Full ChangeLog,You need to do the following configuration :
1 MongoDB version must be 6.0 or abouve;
2 Enable preAndPostImages feature at the databases level:

```bash
db.runCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: {
expireAfterSeconds: 'off' // replace with custom image expiration time
}
}
}
})
```
3 Enable changeStreamPreAndPostImages feature for collections to be monitored:

```bash
db.runCommand({
collMod: "<< collection name >>",
changeStreamPreAndPostImages: {
enabled: true
}
})
```
For more Mongodb configuration,please refer to https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/cdc-connectors/mongodb-cdc/

```bash
./bin/flink run -c org.apache.flink.lakesoul.entry.JdbcCDC \
lakesoul-flink-2.4.0-flink-1.17-SNAPSHOT.jar \
--source_db.db_name "cdc" \
--source_db.user "flinkuser" \
--source_db.password "flinkpw" \
--source.parallelism 1 \
--mongodb_database cdc \
--server_time_zone Asia/Shanghai \
--source_db.db_type "mongodb" \
--source_db.password "flinkpw" \
--source_db.host "172.18.0.2:27017" \
--source_db.schema_tables "cdc.bincoll" \
--source_db.host "localhost:27017" \
--source_db.schema_tables "cdc.test" \
--sink.parallelism 1 \
--job.checkpoint_interval 1000 \
--flink.checkpoint "file:/tmp/data/lakesoul/227/" \
--warehouse_path "file:/home/cyh/data/lakesoul/mongo"
--flink.checkpoint "file:/tmp/data/lakesoul/mongodb" \
--warehouse_path "file:/home/cyh/data/lakesoul/mongodb"
```
MongoDB CDC usage instructions:
1 The MongoDB table should contain the primary key field represented by "_id"
2
The data type of the same field should be consistent, and the field value is allowed to be null. The following situations are allowed:
````bash
[
{ _id: 1, name: 'Bob', age: null },
{ _id: 2, name: 'Tom', age: 18 },
{ _id: 3, name: 'Sam'}
]

````
The following situations are not allowed:
When a field has inconsistent data types for the same field before and after.
````bash
[
{ _id: 1, col: 'word' },
{ _id: 2, col: 12 }
]
````
## LakeSoul Flink CDC Sink job execution process

In the initialization phase after the LakeSoul Flink job starts, it will first read all the tables in the configured MySQL DB (excluding tables that do not need to be synchronized). For each table, first determine whether it exists in LakeSoul. If it does not exist, a LakeSoul table is automatically created, and its schema is consistent with the corresponding table in MySQL.
Expand Down Expand Up @@ -286,7 +334,7 @@ Type mapping relationship between Oracle and LakeSoul
| CHAR(n) <br/>NCHAR(n) <br/>NVARCHAR2(n) <br/>VARCHAR(n) <br/>VARCHAR2(n) <br/>CLOB <br/>NCLOB <br/>XMLType <br/>SYS.XMLTYPE | org.apache.spark.sql.types.DataTypes.StringType |
| BLOB | org.apache.spark.sql.types.DataTypes.BinaryType |

Type mapping relationship between Oracle and Lakesoul
Type mapping relationship between MongoDB and Lakesoul

| STRING | org.apache.spark.sql.types.DataTypes.StringTypes |
|-------------|----------------------------------------|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,58 @@ https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/
--flink.checkpoint s3://bucket/lakesoul/flink/checkpoints \
--flink.savepoint s3://bucket/lakesoul/flink/savepoints
```
对于MongoDB需要额外配置以下参数:

| Parameter | Required | Meaning Description | Parameter Filling Format |
|-------------|----------|-----------------------|----------------------|
| --batchSize | optional | Get data size for each batch| --batchSize 1024 |
同步mongodb作业实例
为了支持mongodb 的Full ChangeLog,你需要做如下配置 :
1 MongoDB的版本必须在6.0及以上
2 在数据库级别启用 preAndPostImages 功能
```bash
db.runCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: {
expireAfterSeconds: 'off' // replace with custom image expiration time
}
}
}
})
```
3
为要监控的集合启用changeStreamPreAndPostImages功能:

```bash
db.runCommand({
collMod: "<< collection name >>",
changeStreamPreAndPostImages: {
enabled: true
}
})
```

更多的MongoDB配置,请参考:https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/cdc-connectors/mongodb-cdc/
## LakeSoul Flink CDC Sink 作业执行流程

```bash
./bin/flink run -c org.apache.flink.lakesoul.entry.JdbcCDC \
lakesoul-flink-2.4.0-flink-1.17-SNAPSHOT.jar \
--source_db.db_name "cdc" \
--source_db.user "flinkuser" \
--source_db.password "flinkpw" \
--source.parallelism 1 \
--mongodb_database cdc \
--server_time_zone Asia/Shanghai \
--source_db.db_type "mongodb" \
--source_db.host "localhost:27017" \
--source_db.schema_tables "cdc.test" \
--sink.parallelism 1 \
--job.checkpoint_interval 1000 \
--flink.checkpoint "file:/tmp/data/lakesoul/mongodb" \
--warehouse_path "file:/home/cyh/data/lakesoul/mongodb"
```
LakeSoul Flink 作业启动后初始化阶段,首先会读取配置的 MySQL DB 中的所有表(排除掉不需要同步的表)。对每一个表,首先判断在 LakeSoul 中是否存在,如果不存在则自动创建一个 LakeSoul 表,其 Schema 与 MySQL 对应表一致。

完成初始化后,会读取所有表的 CDC Stream,以 Upsert 的方式写入到对应的各个 LakeSoul 表中。
Expand Down

0 comments on commit 269d8a9

Please sign in to comment.