Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink] Support MongoDB CDC Import/Export #460

Merged
merged 15 commits into from
Apr 1, 2024
32 changes: 27 additions & 5 deletions lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@ SPDX-License-Identifier: Apache-2.0
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.10</scala.version>
<log4j.version>2.17.2</log4j.version>
<cdc.version>2.4.2</cdc.version>
<cdc.version>3.0.0</cdc.version>
<local.scope>provided</local.scope>
</properties>

<dependencies>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<version>2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-common</artifactId>
Expand Down Expand Up @@ -122,26 +128,42 @@ SPDX-License-Identifier: Apache-2.0
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-sqlserver-cdc</artifactId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mongodb-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>1.0.1-1.17</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>4.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.17</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
Expand Down Expand Up @@ -52,6 +54,7 @@ public class JdbcCDC {
private static String[] tableList;
private static String serverTimezone;
private static String pluginName;
private static int batchSize;

public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
Expand All @@ -62,7 +65,7 @@ public static void main(String[] args) throws Exception {
host = parameter.get(SOURCE_DB_HOST.key());
port = parameter.getInt(SOURCE_DB_PORT.key(), MysqlDBManager.DEFAULT_MYSQL_PORT);
//Postgres Oracle
if (dbType.equalsIgnoreCase("oracle") || dbType.equalsIgnoreCase("postgres")) {
if (dbType.equalsIgnoreCase("oracle") || dbType.equalsIgnoreCase("postgres") ) {
schemaList = parameter.get(SOURCE_DB_SCHEMA_LIST.key()).split(",");
String[] tables = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
tableList = new String[tables.length];
Expand All @@ -71,7 +74,11 @@ public static void main(String[] args) throws Exception {
}
splitSize = parameter.getInt(SOURCE_DB_SPLIT_SIZE.key(), SOURCE_DB_SPLIT_SIZE.defaultValue());
}
if (dbType.equalsIgnoreCase("sqlserver")){
if (dbType.equalsIgnoreCase("sqlserver") ){
tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
}
if ( dbType.equalsIgnoreCase("mongodb")){
batchSize = parameter.getInt(BATCH_SIZE.key(), BATCH_SIZE.defaultValue());
tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
}
pluginName = parameter.get(PLUGIN_NAME.key(), PLUGIN_NAME.defaultValue());
Expand All @@ -92,6 +99,7 @@ public static void main(String[] args) throws Exception {
conf.set(SOURCE_DB_PORT, port);
conf.set(WAREHOUSE_PATH, databasePrefixPath);
conf.set(SERVER_TIME_ZONE, serverTimezone);
conf.set(SOURCE_DB_TYPE,dbType);

// parameters for mutil tables dml sink
conf.set(LakeSoulSinkOptions.USE_CDC, true);
Expand Down Expand Up @@ -136,9 +144,12 @@ public static void main(String[] args) throws Exception {
if (dbType.equalsIgnoreCase("oracle")) {
oracleCdc(lakeSoulRecordConvert, conf, env);
}
if (dbType.equalsIgnoreCase("sqlserver")){
if (dbType.equalsIgnoreCase("sqlserver")) {
sqlserverCdc(lakeSoulRecordConvert, conf, env);
}
if (dbType.equalsIgnoreCase("mongodb")) {
mongoCdc(lakeSoulRecordConvert, conf, env);
}

}

Expand Down Expand Up @@ -269,11 +280,39 @@ public static void sqlserverCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Con
LakeSoulMultiTableSinkStreamBuilder
builder =
new LakeSoulMultiTableSinkStreamBuilder(sqlServerSource, context, lakeSoulRecordConvert);

DataStreamSource<BinarySourceRecord> source = builder.buildMultiTableSource("Sqlserver Source");

DataStream<BinarySourceRecord> stream = builder.buildHashPartitionedCDCStream(source);
DataStreamSink<BinarySourceRecord> dmlSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From sqlserver Database " + dbName);
}

private static void mongoCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Configuration conf, StreamExecutionEnvironment env) throws Exception {
MongoDBSource<BinarySourceRecord> mongoSource =
MongoDBSource.<BinarySourceRecord>builder()
.hosts(host)
.databaseList(dbName)
.collectionList(tableList)
.startupOptions(StartupOptions.initial())
.scanFullChangelog(true)
.batchSize(batchSize)
.username(userName)
.password(passWord)
.deserializer(new BinaryDebeziumDeserializationSchema(lakeSoulRecordConvert, conf.getString(WAREHOUSE_PATH)))
.build();
NameSpaceManager manager = new NameSpaceManager();
manager.importOrSyncLakeSoulNamespace(dbName);
LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context();
context.env = env;
context.conf = conf;
LakeSoulMultiTableSinkStreamBuilder
builder =
new LakeSoulMultiTableSinkStreamBuilder(mongoSource, context, lakeSoulRecordConvert);
DataStreamSource<BinarySourceRecord> source = builder.buildMultiTableSource("mongodb Source");

DataStream<BinarySourceRecord> stream = builder.buildHashPartitionedCDCStream(source);
DataStreamSink<BinarySourceRecord> dmlSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From mongo Database " + dbName);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0

package org.apache.flink.lakesoul.entry;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.WriteModel;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.bson.*;
import org.bson.types.Decimal128;

import java.io.Serializable;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class MongoSinkUtils {

static Table coll;
static List<String> structNameFiledList;

public static void createMongoColl(String database, String collName, String uri) {
MongoClient mongoClient = MongoClients.create(uri);
MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
mongoDatabase.createCollection(collName);
mongoClient.close();
}

public static class MyMongoSerializationSchema implements MongoSerializationSchema<Tuple2<Boolean, Row>>, Serializable {
@Override
public WriteModel<BsonDocument> serialize(Tuple2<Boolean, Row> record, MongoSinkContext context) {
Row row = record.f1; // Extract the Row object from the Tuple2
BsonDocument document = new BsonDocument();
int fieldCount = row.getArity();
DataType[] fieldDataTypes = coll.getSchema().getFieldDataTypes();
for (int i = 0; i < fieldCount; i++) {
String fieldName = coll.getSchema().getFieldNames()[i];
Object fieldValue = row.getField(i);
if (fieldValue instanceof Row) {
DataType dataType = fieldDataTypes[i];
RowType rowType = (RowType) dataType.getLogicalType();
structNameFiledList = traverseRow(rowType);
}
if (fieldValue != null) {
try {
document.append(fieldName, convertTonBsonValue(fieldValue));
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}
return new InsertOneModel<>(document);
}


public static BsonValue convertTonBsonValue(Object value) throws ParseException {
if (value == null) {
return new BsonNull();
} else if (value instanceof Integer) {
return new BsonInt32((Integer) value);
} else if (value instanceof Long) {
return new BsonInt64((Long) value);
} else if (value instanceof String) {
return new BsonString((String) value);
} else if (value instanceof Boolean) {
return new BsonBoolean((Boolean) value);
} else if (value instanceof Double) {
return new BsonDouble((Double) value);
} else if (value instanceof BigDecimal) {
return new BsonDecimal128(new Decimal128((BigDecimal) value));
} else if (value instanceof Date) {
return new BsonDateTime((long) value);
} else if (value instanceof BinaryType) {
return new BsonBinary((byte[]) value);
} else if (value instanceof byte[]) {
return new BsonBinary((byte[]) value);
} else if (value instanceof Object[]) {
Object[] array = (Object[]) value;
BsonArray bsonArray = new BsonArray();
for (Object element : array) {
bsonArray.add(convertTonBsonValue(element));
}
return bsonArray;
} else if (isDateTimeString(value)) {
Date date = parseDateTime(value.toString());
return new BsonDateTime(date.getTime());
} else if (value instanceof Row) {
Row row = (Row) value;
BsonDocument bsonDocument = new BsonDocument();
for (int i = 0; i < row.getArity(); i++) {
Object fieldValue = row.getField(i);
List<String> stringList = new ArrayList<>(structNameFiledList);
String name = structNameFiledList.get(0);
stringList.remove(0);
structNameFiledList = stringList;
bsonDocument.append(name, convertTonBsonValue(fieldValue));
}
return bsonDocument;
} else {
throw new IllegalArgumentException("Unsupported data type: " + value.getClass());
}
}

public static List<String> traverseRow(RowType rowType) {
List<String> nameList = new ArrayList<>();
traverseField(rowType, nameList);
return nameList;
}

private static void traverseField(RowType rowType, List<String> nameList) {
for (int i = 0; i < rowType.getFieldCount(); i++) {
String fieldName = rowType.getFieldNames().get(i);
LogicalType fieldType = rowType.getTypeAt(i);
nameList.add(fieldName);
if (fieldType instanceof RowType) {
traverseField((RowType) fieldType, nameList);
}
}
}

public static List<String> findDirectNestedNames(JSONObject jsonObject, String targetFieldName, int currentLevel, int targetLevel) {
List<String> nestedNames = new ArrayList<>();
findDirectNestedNamesHelper(jsonObject, targetFieldName, currentLevel, targetLevel, nestedNames);
return nestedNames;
}

public static void findDirectNestedNamesHelper(JSONObject jsonObject, String targetFieldName, int currentLevel, int targetLevel, List<String> nestedNames) {
if (currentLevel == targetLevel) {
if (jsonObject.getString("name").equals(targetFieldName)) {
JSONArray children = jsonObject.getJSONArray("children");
for (Object obj : children) {
JSONObject child = (JSONObject) obj;
nestedNames.add(child.getString("name"));
}
}
} else {
JSONArray children = jsonObject.getJSONArray("children");
for (Object obj : children) {
JSONObject child = (JSONObject) obj;
findDirectNestedNamesHelper(child, targetFieldName, currentLevel + 1, targetLevel, nestedNames);
}
}
}

public static boolean isDateTimeString(Object value) {
return value.toString().matches("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$");
}

public static Date parseDateTime(String value) throws ParseException {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
return dateFormat.parse(value);
}
}
}
Loading
Loading