Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink]support mongodb cdc NULL type #468

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public static String[] getDorisFieldTypes(DataType[] fieldTypes) {
stringFieldTypes[i] = "DATETIME";
} else if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
stringFieldTypes[i] = "VARCHAR";
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) {
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType ) {
stringFieldTypes[i] = "TIMESTAMP";
} else {
stringFieldTypes[i] = fieldTypes[i].toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
throw new IOException(equalOrCanCast);
}
for (LakeSoulMultiTableSinkCommittable committable : lakeSoulMultiTableSinkCommittable) {
if (committable.getTsMs() > schemaLastChangeTime) {
if (committable.getTsMs() > schemaLastChangeTime && !dbType.equals("mongodb")) {
LOG.error("incompatible cast data created and delayThreshold time: {}, dml create time: {}", schemaLastChangeTime, committable.getTsMs());
throw new IOException(equalOrCanCast);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ private static Schema buildSchema(Document bsonDocument, SchemaBuilder structSch
for (Map.Entry<String, Object> entry : bsonDocument.entrySet()) {
String fieldName = entry.getKey();
Object value = entry.getValue();
if (value instanceof Document) {
SchemaBuilder nestedStructSchemaBuilder = SchemaBuilder.struct();
structSchemaBuilder.field(fieldName, buildSchema((Document) value, nestedStructSchemaBuilder));
} else if (value instanceof List) {
List<?> arrayList = (List<?>) value;
Schema arraySchema = getSchemaForArrayList(arrayList);
structSchemaBuilder.field(fieldName, arraySchema);
} else {
structSchemaBuilder.field(fieldName, getSchemaForValue(value));
if (value != null){
if (value instanceof Document) {
SchemaBuilder nestedStructSchemaBuilder = SchemaBuilder.struct();
structSchemaBuilder.field(fieldName, buildSchema((Document) value, nestedStructSchemaBuilder));
} else if (value instanceof List) {
List<?> arrayList = (List<?>) value;
Schema arraySchema = getSchemaForArrayList(arrayList);
structSchemaBuilder.field(fieldName, arraySchema);
} else {
structSchemaBuilder.field(fieldName, getSchemaForValue(value));
}
}

}
return structSchemaBuilder.build();
}
Expand All @@ -56,24 +59,26 @@ private static void fillStructValues(Document bsonDocument, Struct struct) {
for (Map.Entry<String, Object> entry : bsonDocument.entrySet()) {
String fieldName = entry.getKey();
Object value = entry.getValue();
if (value instanceof Document) {
Struct nestedStruct = new Struct(struct.schema().field(fieldName).schema());
fillStructValues((Document) value, nestedStruct);
struct.put(fieldName, nestedStruct);
} else if (value instanceof List) {
List<?> arrayList = (List<?>) value;
struct.put(fieldName, arrayList);
} else if (value instanceof Decimal128) {
BigDecimal decimalValue = new BigDecimal(value.toString());
struct.put(fieldName, decimalValue);
} else if (value instanceof Binary) {
Binary binaryData = (Binary) value;
struct.put(fieldName,binaryData.getData());
} else if (value instanceof BsonTimestamp) {
BsonTimestamp bsonTimestamp = (BsonTimestamp) value;
struct.put(fieldName,bsonTimestamp.getValue());
} else {
struct.put(fieldName,value);
if (value != null){
if (value instanceof Document) {
Struct nestedStruct = new Struct(struct.schema().field(fieldName).schema());
fillStructValues((Document) value, nestedStruct);
struct.put(fieldName, nestedStruct);
} else if (value instanceof List) {
List<?> arrayList = (List<?>) value;
struct.put(fieldName, arrayList);
} else if (value instanceof Decimal128) {
BigDecimal decimalValue = new BigDecimal(value.toString());
struct.put(fieldName, decimalValue);
} else if (value instanceof Binary) {
Binary binaryData = (Binary) value;
struct.put(fieldName,binaryData.getData());
} else if (value instanceof BsonTimestamp) {
BsonTimestamp bsonTimestamp = (BsonTimestamp) value;
struct.put(fieldName,bsonTimestamp.getValue());
} else {
struct.put(fieldName,value);
}
}
}
}
Expand Down
62 changes: 55 additions & 7 deletions website/docs/03-Usage Docs/05-flink-cdc-sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,29 +175,77 @@ For Postgresql configuration,please refer to https://ververica.github.io/flink-
--flink.savepoint s3://bucket/lakesoul/flink/savepoints
```

For mongoDB, the following additinal parameters need to be configured
For MongoDB, the following additional parameters need to be configured

| Parameter | Required | Meaning Description | Parameter Filling Format |
|-------------|----------|-----------------------|----------------------|
| --batchSize | optional | Get data size for each batch| --batchSize 1024 |


To support mongodb Full ChangeLog,You need to do the following configuration :
1 MongoDB version must be 6.0 or abouve;
2 Enable preAndPostImages feature at the databases level:

```bash
db.runCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: {
expireAfterSeconds: 'off' // replace with custom image expiration time
}
}
}
})
```
3 Enable changeStreamPreAndPostImages feature for collections to be monitored:

```bash
db.runCommand({
collMod: "<< collection name >>",
changeStreamPreAndPostImages: {
enabled: true
}
})
```
For more Mongodb configuration,please refer to https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/cdc-connectors/mongodb-cdc/

```bash
./bin/flink run -c org.apache.flink.lakesoul.entry.JdbcCDC \
lakesoul-flink-2.4.0-flink-1.17-SNAPSHOT.jar \
--source_db.db_name "cdc" \
--source_db.user "flinkuser" \
--source_db.password "flinkpw" \
--source.parallelism 1 \
--mongodb_database cdc \
--server_time_zone Asia/Shanghai \
--source_db.db_type "mongodb" \
--source_db.password "flinkpw" \
--source_db.host "172.18.0.2:27017" \
--source_db.schema_tables "cdc.bincoll" \
--source_db.host "localhost:27017" \
--source_db.schema_tables "cdc.test" \
--sink.parallelism 1 \
--job.checkpoint_interval 1000 \
--flink.checkpoint "file:/tmp/data/lakesoul/227/" \
--warehouse_path "file:/home/cyh/data/lakesoul/mongo"
--flink.checkpoint "file:/tmp/data/lakesoul/mongodb" \
--warehouse_path "file:/home/cyh/data/lakesoul/mongodb"
```
MongoDB CDC usage instructions:
1 The MongoDB table should contain the primary key field represented by "_id"
2
The data type of the same field should be consistent, and the field value is allowed to be null. The following situations are allowed:
````bash
[
{ _id: 1, name: 'Bob', age: null },
{ _id: 2, name: 'Tom', age: 18 },
{ _id: 3, name: 'Sam'}
]

````
The following situations are not allowed:
When a field has inconsistent data types for the same field before and after.
````bash
[
{ _id: 1, col: 'word' },
{ _id: 2, col: 12 }
]
````
## LakeSoul Flink CDC Sink job execution process

In the initialization phase after the LakeSoul Flink job starts, it will first read all the tables in the configured MySQL DB (excluding tables that do not need to be synchronized). For each table, first determine whether it exists in LakeSoul. If it does not exist, a LakeSoul table is automatically created, and its schema is consistent with the corresponding table in MySQL.
Expand Down Expand Up @@ -286,7 +334,7 @@ Type mapping relationship between Oracle and LakeSoul
| CHAR(n) <br/>NCHAR(n) <br/>NVARCHAR2(n) <br/>VARCHAR(n) <br/>VARCHAR2(n) <br/>CLOB <br/>NCLOB <br/>XMLType <br/>SYS.XMLTYPE | org.apache.spark.sql.types.DataTypes.StringType |
| BLOB | org.apache.spark.sql.types.DataTypes.BinaryType |

Type mapping relationship between Oracle and Lakesoul
Type mapping relationship between MongoDB and Lakesoul

| STRING | org.apache.spark.sql.types.DataTypes.StringTypes |
|-------------|----------------------------------------|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,58 @@ https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/
--flink.checkpoint s3://bucket/lakesoul/flink/checkpoints \
--flink.savepoint s3://bucket/lakesoul/flink/savepoints
```
对于MongoDB需要额外配置以下参数:

| Parameter | Required | Meaning Description | Parameter Filling Format |
|-------------|----------|-----------------------|----------------------|
| --batchSize | optional | Get data size for each batch| --batchSize 1024 |
同步mongodb作业实例
为了支持mongodb 的Full ChangeLog,你需要做如下配置 :
1 MongoDB的版本必须在6.0及以上
2 在数据库级别启用 preAndPostImages 功能
```bash
db.runCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: {
expireAfterSeconds: 'off' // replace with custom image expiration time
}
}
}
})
```
3
为要监控的集合启用changeStreamPreAndPostImages功能:

```bash
db.runCommand({
collMod: "<< collection name >>",
changeStreamPreAndPostImages: {
enabled: true
}
})
```

更多的MongoDB配置,请参考:https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/cdc-connectors/mongodb-cdc/
## LakeSoul Flink CDC Sink 作业执行流程

```bash
./bin/flink run -c org.apache.flink.lakesoul.entry.JdbcCDC \
lakesoul-flink-2.4.0-flink-1.17-SNAPSHOT.jar \
--source_db.db_name "cdc" \
--source_db.user "flinkuser" \
--source_db.password "flinkpw" \
--source.parallelism 1 \
--mongodb_database cdc \
--server_time_zone Asia/Shanghai \
--source_db.db_type "mongodb" \
--source_db.host "localhost:27017" \
--source_db.schema_tables "cdc.test" \
--sink.parallelism 1 \
--job.checkpoint_interval 1000 \
--flink.checkpoint "file:/tmp/data/lakesoul/mongodb" \
--warehouse_path "file:/home/cyh/data/lakesoul/mongodb"
```
LakeSoul Flink 作业启动后初始化阶段,首先会读取配置的 MySQL DB 中的所有表(排除掉不需要同步的表)。对每一个表,首先判断在 LakeSoul 中是否存在,如果不存在则自动创建一个 LakeSoul 表,其 Schema 与 MySQL 对应表一致。

完成初始化后,会读取所有表的 CDC Stream,以 Upsert 的方式写入到对应的各个 LakeSoul 表中。
Expand Down
Loading