From 2906c6102a5d984e46105db1a3b8e821721a6cde Mon Sep 17 00:00:00 2001
From: ChenYunHey <1908166778@qq.com>
Date: Mon, 11 Mar 2024 15:00:56 +0800
Subject: [PATCH 01/13] support mongodb cdc
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
lakesoul-flink/pom.xml | 18 +-
.../apache/flink/lakesoul/entry/JdbcCDC.java | 39 ++-
.../sink/committer/LakeSoulSinkCommitter.java | 3 +
.../LakeSoulSinkGlobalCommitter.java | 15 +-
.../lakesoul/types/BinarySourceRecord.java | 8 +-
.../lakesoul/types/LakeSoulRecordConvert.java | 278 +++++++++++++-----
.../flink/lakesoul/types/ParseDocument.java | 78 +++++
.../flink/lakesoul/types/StructConvert.java | 106 +++++++
8 files changed, 464 insertions(+), 81 deletions(-)
create mode 100644 lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/ParseDocument.java
create mode 100644 lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/StructConvert.java
diff --git a/lakesoul-flink/pom.xml b/lakesoul-flink/pom.xml
index 89a57f4e7..6462cf692 100644
--- a/lakesoul-flink/pom.xml
+++ b/lakesoul-flink/pom.xml
@@ -23,11 +23,17 @@ SPDX-License-Identifier: Apache-2.0
2.12
2.12.10
2.17.2
- 2.4.2
+ 3.0.0
provided
+
+ org.hamcrest
+ hamcrest
+ 2.1
+ test
+
com.dmetasoul
lakesoul-common
@@ -141,6 +147,16 @@ SPDX-License-Identifier: Apache-2.0
flink-sql-connector-oracle-cdc
${cdc.version}
+
+ com.ververica
+ flink-sql-connector-mongodb-cdc
+ ${cdc.version}
+
+
+ org.mongodb
+ bson
+ 4.3.4
+
org.apache.doris
flink-doris-connector-1.17
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
index 00c0471da..aa5a6317f 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
@@ -11,6 +11,8 @@
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
+import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
+import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
@@ -62,7 +64,7 @@ public static void main(String[] args) throws Exception {
host = parameter.get(SOURCE_DB_HOST.key());
port = parameter.getInt(SOURCE_DB_PORT.key(), MysqlDBManager.DEFAULT_MYSQL_PORT);
//Postgres Oracle
- if (dbType.equalsIgnoreCase("oracle") || dbType.equalsIgnoreCase("postgres")) {
+ if (dbType.equalsIgnoreCase("oracle") || dbType.equalsIgnoreCase("postgres") || dbType.equalsIgnoreCase("mongodb")) {
schemaList = parameter.get(SOURCE_DB_SCHEMA_LIST.key()).split(",");
String[] tables = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
tableList = new String[tables.length];
@@ -92,6 +94,7 @@ public static void main(String[] args) throws Exception {
conf.set(SOURCE_DB_PORT, port);
conf.set(WAREHOUSE_PATH, databasePrefixPath);
conf.set(SERVER_TIME_ZONE, serverTimezone);
+ conf.set(SOURCE_DB_TYPE,dbType);
// parameters for mutil tables dml sink
conf.set(LakeSoulSinkOptions.USE_CDC, true);
@@ -136,9 +139,12 @@ public static void main(String[] args) throws Exception {
if (dbType.equalsIgnoreCase("oracle")) {
oracleCdc(lakeSoulRecordConvert, conf, env);
}
- if (dbType.equalsIgnoreCase("sqlserver")){
+ if (dbType.equalsIgnoreCase("sqlserver")) {
sqlserverCdc(lakeSoulRecordConvert, conf, env);
}
+ if (dbType.equalsIgnoreCase("mongodb")) {
+ mongoCdc(lakeSoulRecordConvert, conf, env);
+ }
}
@@ -276,4 +282,33 @@ public static void sqlserverCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Con
DataStreamSink dmlSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From sqlserver Database " + dbName);
}
+
+ private static void mongoCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Configuration conf, StreamExecutionEnvironment env) throws Exception {
+ MongoDBSource mongoSource =
+ MongoDBSource.builder()
+ .hosts(host)
+ .databaseList(schemaList) // 设置捕获的数据库,支持正则表达式
+ .collectionList(tableList) //设置捕获的集合,支持正则表达式
+ .startupOptions(StartupOptions.initial())
+ .scanFullChangelog(true)
+ .batchSize(splitSize)
+ .username(userName)
+ .password(passWord)
+ .deserializer(new BinaryDebeziumDeserializationSchema(lakeSoulRecordConvert, conf.getString(WAREHOUSE_PATH)))
+ .build();
+ NameSpaceManager manager = new NameSpaceManager();
+ manager.importOrSyncLakeSoulNamespace(dbName);
+ LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context();
+ context.env = env;
+ context.conf = conf;
+ LakeSoulMultiTableSinkStreamBuilder
+ builder =
+ new LakeSoulMultiTableSinkStreamBuilder(mongoSource, context, lakeSoulRecordConvert);
+ DataStreamSource source = builder.buildMultiTableSource("mongodb Source");
+
+ DataStream stream = builder.buildHashPartitionedCDCStream(source);
+ DataStreamSink dmlSink = builder.buildLakeSoulDMLSink(stream);
+ env.execute("LakeSoul CDC Sink From mongo Database " + dbName);
+
+ }
}
\ No newline at end of file
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java
index 0ae7f13a7..ed7eb2bba 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkCommitter.java
@@ -99,6 +99,9 @@ public List commit(List commit(
LOG.info("Committing: {}", globalCommittable);
int index = 0;
+ String dbType = this.conf.getString(SOURCE_DB_TYPE,"");
+
for (Map.Entry, List> entry :
globalCommittable.getGroupedCommitables()
.entrySet()) {
@@ -118,6 +122,9 @@ public List commit(
List lakeSoulMultiTableSinkCommittable = entry.getValue();
String tableName = identity.tableId.table();
String tableNamespace = identity.tableId.schema();
+ if (tableNamespace==null){
+ tableNamespace = identity.tableId.catalog();
+ }
boolean isCdc = identity.useCDC;
Schema msgSchema = FlinkUtil.toArrowSchema(identity.rowType, isCdc ? Optional.of(
identity.cdcColumn) :
@@ -154,7 +161,7 @@ public List commit(
!new HashSet<>(partitionKeys.rangeKeys).containsAll(identity.partitionKeyList)) {
throw new IOException("Change of partition key column of table " + tableName + " is forbidden");
}
- StructType origSchema = null;
+ StructType origSchema ;
if (TableInfoDao.isArrowKindSchema(tableInfo.getTableSchema())) {
Schema arrowSchema = Schema.fromJSON(tableInfo.getTableSchema());
origSchema = ArrowUtils.fromArrowSchema(arrowSchema);
@@ -193,7 +200,11 @@ public List commit(
msgSchema,
identity.useCDC,
identity.cdcColumn);
- dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.toJson());
+ if (dbType.equals("mongodb")){
+ dbManager.updateTableSchema(tableInfo.getTableId(), mergeStructType.json());
+ }else {
+ dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.toJson());
+ }
if (JSONObject.parseObject(tableInfo.getProperties()).containsKey(DBConfig.TableInfoProperty.DROPPED_COLUMN)) {
dbManager.removeLogicallyDropColumn(tableInfo.getTableId());
}
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java
index 94f0f9a30..b327f6c3c 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java
@@ -100,8 +100,12 @@ public static BinarySourceRecord fromMysqlSourceRecord(SourceRecord sourceRecord
}
long sortField = (binlogFileIndex << 32) + binlogPosition;
LakeSoulRowDataWrapper data = convert.toLakeSoulDataType(valueSchema, value, tableId, tsMs, sortField);
- String tablePath = new Path(new Path(basePath, tableId.schema()), tableId.table()).toString();
-
+ String tablePath;
+ if (tableId.schema()==null){
+ tablePath = new Path(new Path(basePath, tableId.catalog()), tableId.table()).toString();
+ }else {
+ tablePath = new Path(new Path(basePath, tableId.schema()), tableId.table()).toString();
+ }
return new BinarySourceRecord(sourceRecord.topic(), primaryKeys, tableId, FlinkUtil.makeQualifiedPath(tablePath).toString(),
Collections.emptyList(), false, data, null);
}
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
index 31877780a..829cc6eb6 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
@@ -4,10 +4,13 @@
package org.apache.flink.lakesoul.types;
+import com.amazonaws.services.dynamodbv2.xspec.S;
+import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Decimal;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException;
import com.ververica.cdc.debezium.utils.TemporalConversions;
import io.debezium.data.Enum;
import io.debezium.data.EnumSet;
@@ -33,6 +36,8 @@
import org.apache.flink.table.data.*;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.RowKind;
@@ -49,6 +54,7 @@
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.CDC_CHANGE_COLUMN_DEFAULT;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.SORT_FIELD;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkOptions.USE_CDC;
+import static org.apache.flink.lakesoul.types.ParseDocument.convertBSONToStruct;
public class LakeSoulRecordConvert implements Serializable {
@@ -60,6 +66,7 @@ public class LakeSoulRecordConvert implements Serializable {
List partitionFields;
+
public LakeSoulRecordConvert(Configuration conf, String serverTimeZone) {
this(conf, serverTimeZone, Collections.emptyList());
}
@@ -111,46 +118,98 @@ private boolean partitionFieldsChanged(RowType beforeType, RowData beforeData, R
}
public LakeSoulRowDataWrapper toLakeSoulDataType(Schema sch, Struct value, TableId tableId, long tsMs, long sortField) throws Exception {
- Envelope.Operation op = getOperation(sch, value);
- Schema valueSchema = value.schema();
LakeSoulRowDataWrapper.Builder builder = LakeSoulRowDataWrapper.newBuilder().setTableId(tableId)
.setUseCDC(useCDC).setCDCColumn(cdcColumn);
- if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
- Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
- Struct after = value.getStruct(Envelope.FieldName.AFTER);
- RowData insert = convert(after, afterSchema, RowKind.INSERT, sortField);
- RowType rt = toFlinkRowType(afterSchema);
- insert.setRowKind(RowKind.INSERT);
- builder.setOperation("insert").setAfterRowData(insert).setAfterType(rt);
- } else if (op == Envelope.Operation.DELETE) {
- Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
- Struct before = value.getStruct(Envelope.FieldName.BEFORE);
- RowData delete = convert(before, beforeSchema, RowKind.DELETE, sortField);
- RowType rt = toFlinkRowType(beforeSchema);
- builder.setOperation("delete").setBeforeRowData(delete).setBeforeRowType(rt);
- delete.setRowKind(RowKind.DELETE);
+ boolean isMongoDDL = true;
+ try {
+ value.getWithoutDefault(MongoDBEnvelope.FULL_DOCUMENT_FIELD);
+ } catch (DataException e) {
+ isMongoDDL = false;
+ }
+ if (isMongoDDL) {
+ String op = value.getString(MongoDBEnvelope.OPERATION_TYPE_FIELD);
+ if (op.equals("insert")) {
+ String fullDocument = value.getString(MongoDBEnvelope.FULL_DOCUMENT_FIELD);
+ Struct bsonStruct = convertBSONToStruct(fullDocument);
+ Schema documentSchema = bsonStruct.schema();
+ RowData insert = convert(bsonStruct, documentSchema, RowKind.INSERT, sortField);
+ RowType mongoRt = toFlinkRowType(documentSchema,true);
+ insert.setRowKind(RowKind.INSERT);
+ builder.setOperation("insert").setAfterRowData(insert).setAfterType(mongoRt);
+ } else if (op.equals("delete")) {
+ String fullDocumentValue = value.getString("fullDocumentBeforeChange");
+ Struct before = convertBSONToStruct(fullDocumentValue);
+ Schema beforSchema = before.schema();
+ RowData delete = convert(before,beforSchema,RowKind.DELETE,sortField);
+ RowType rt = toFlinkRowType(beforSchema, true);
+ builder.setOperation("delete").setBeforeRowData(delete).setBeforeRowType(rt);
+ delete.setRowKind(RowKind.DELETE);
+ } else {
+ String fullDocumentBeforChange = value.getString("fullDocumentBeforeChange");
+ Struct before = convertBSONToStruct(fullDocumentBeforChange);
+ Schema beforeSchema = before.schema();
+ RowData beforeData = convert(before, beforeSchema, RowKind.UPDATE_BEFORE, sortField);
+ beforeData.setRowKind(RowKind.UPDATE_BEFORE);
+ RowType beforeRT = toFlinkRowType(beforeSchema, true);
+ String fullDocument = value.getString(MongoDBEnvelope.FULL_DOCUMENT_FIELD);
+ Struct after = convertBSONToStruct(fullDocument);
+ Schema afterSchema = after.schema();
+ RowData afterData = convert(after, afterSchema, RowKind.UPDATE_AFTER, sortField);
+ afterData.setRowKind(RowKind.UPDATE_AFTER);
+ RowType afterRT = toFlinkRowType(afterSchema, true);
+ if (partitionFieldsChanged(beforeRT, beforeData, afterRT, afterData)) {
+ // partition fields changed. we need to emit both before and after RowData
+ builder.setOperation("update").setBeforeRowData(beforeData).setBeforeRowType(beforeRT)
+ .setAfterRowData(afterData).setAfterType(afterRT);
+ } else {
+ // otherwise we only need to keep the after RowData
+ builder.setOperation("update")
+ .setAfterRowData(afterData).setAfterType(afterRT);
+ }
+ }
} else {
- Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
- Struct before = value.getStruct(Envelope.FieldName.BEFORE);
- RowData beforeData = convert(before, beforeSchema, RowKind.UPDATE_BEFORE, sortField);
- RowType beforeRT = toFlinkRowType(beforeSchema);
- beforeData.setRowKind(RowKind.UPDATE_BEFORE);
- Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
- Struct after = value.getStruct(Envelope.FieldName.AFTER);
- RowData afterData = convert(after, afterSchema, RowKind.UPDATE_AFTER, sortField);
- RowType afterRT = toFlinkRowType(afterSchema);
- afterData.setRowKind(RowKind.UPDATE_AFTER);
- if (partitionFieldsChanged(beforeRT, beforeData, afterRT, afterData)) {
- // partition fields changed. we need to emit both before and after RowData
- builder.setOperation("update").setBeforeRowData(beforeData).setBeforeRowType(beforeRT)
- .setAfterRowData(afterData).setAfterType(afterRT);
+ Envelope.Operation op = getOperation(sch, value);
+ Schema valueSchema = value.schema();
+ if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
+ Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
+ Struct after = value.getStruct(Envelope.FieldName.AFTER);
+ RowData insert = convert(after, afterSchema, RowKind.INSERT, sortField);
+ boolean afterNullable = afterSchema.isOptional();
+ RowType rt = toFlinkRowType(afterSchema,afterNullable);
+ insert.setRowKind(RowKind.INSERT);
+ builder.setOperation("insert").setAfterRowData(insert).setAfterType(rt);
+ } else if (op == Envelope.Operation.DELETE) {
+ Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
+ Struct before = value.getStruct(Envelope.FieldName.BEFORE);
+ RowData delete = convert(before, beforeSchema, RowKind.DELETE, sortField);
+ boolean nullable = beforeSchema.isOptional();
+ RowType rt = toFlinkRowType(beforeSchema,nullable);
+ builder.setOperation("delete").setBeforeRowData(null).setBeforeRowType(rt);
+ delete.setRowKind(RowKind.DELETE);
} else {
- // otherwise we only need to keep the after RowData
- builder.setOperation("update")
- .setAfterRowData(afterData).setAfterType(afterRT);
+ Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
+ Struct before = value.getStruct(Envelope.FieldName.BEFORE);
+ RowData beforeData = convert(before, beforeSchema, RowKind.UPDATE_BEFORE, sortField);
+ boolean beforNullable = beforeSchema.isOptional();
+ RowType beforeRT = toFlinkRowType(beforeSchema,beforNullable);
+ beforeData.setRowKind(RowKind.UPDATE_BEFORE);
+ Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
+ Struct after = value.getStruct(Envelope.FieldName.AFTER);
+ RowData afterData = convert(after, afterSchema, RowKind.UPDATE_AFTER, sortField);
+ boolean afterNullable = afterSchema.isOptional();
+ RowType afterRT = toFlinkRowType(afterSchema,afterNullable);
+ afterData.setRowKind(RowKind.UPDATE_AFTER);
+ if (partitionFieldsChanged(beforeRT, beforeData, afterRT, afterData)) {
+ // partition fields changed. we need to emit both before and after RowData
+ builder.setOperation("update").setBeforeRowData(beforeData).setBeforeRowType(beforeRT)
+ .setAfterRowData(afterData).setAfterType(afterRT);
+ } else {
+ // otherwise we only need to keep the after RowData
+ builder.setOperation("update")
+ .setAfterRowData(afterData).setAfterType(afterRT);
+ }
}
}
-
return builder.setTsMs(tsMs).build();
}
@@ -169,7 +228,7 @@ public RowType toFlinkRowTypeCDC(RowType rowType) {
return RowType.of(colTypes, colNames);
}
- public RowType toFlinkRowType(Schema schema) {
+ public RowType toFlinkRowType(Schema schema, boolean nullable) {
int arity = schema.fields().size() + 1;
if (useCDC) ++arity;
String[] colNames = new String[arity];
@@ -178,7 +237,7 @@ public RowType toFlinkRowType(Schema schema) {
for (int i = 0; i < (useCDC ? arity - 2 : arity - 1); i++) {
Field item = fieldNames.get(i);
colNames[i] = item.name();
- colTypes[i] = convertToLogical(item.schema());
+ colTypes[i] = convertToLogical(item.schema(),nullable);
}
// colNames[useCDC ? arity - 3 : arity - 2] = BINLOG_FILE_INDEX;
// colTypes[useCDC ? arity - 3 : arity - 2] = new BigIntType();
@@ -191,16 +250,30 @@ public RowType toFlinkRowType(Schema schema) {
return RowType.of(colTypes, colNames);
}
- public LogicalType convertToLogical(Schema fieldSchema) {
+ public LogicalType convertToLogical(Schema fieldSchema, boolean nullable) {
if (isPrimitiveType(fieldSchema)) {
- return primitiveLogicalType(fieldSchema);
+ return primitiveLogicalType(fieldSchema, nullable);
} else {
- return otherLogicalType(fieldSchema);
+ return otherLogicalType(fieldSchema, nullable);
}
}
- private LogicalType primitiveLogicalType(Schema fieldSchema) {
- boolean nullable = fieldSchema.isOptional();
+ public List getRowFields(Schema schema) {
+ List rowFields = new ArrayList<>();
+
+ for (Field field : schema.fields()) {
+ String fieldName = field.name();
+ Schema fieldype = field.schema();
+ LogicalType logicalType = convertToLogical(fieldype ,true);
+ RowType.RowField rowField = new RowType.RowField(fieldName, logicalType);
+ rowFields.add(rowField);
+ }
+ return rowFields;
+ }
+
+ private LogicalType primitiveLogicalType(Schema fieldSchema,boolean nullable) {
+// boolean nullable = fieldSchema.isOptional();
+// if (isMongoDDl) nullable = true;
switch (fieldSchema.type()) {
case BOOLEAN:
return new BooleanType(nullable);
@@ -216,6 +289,9 @@ private LogicalType primitiveLogicalType(Schema fieldSchema) {
return new DoubleType(nullable);
case STRING:
return new VarCharType(nullable, Integer.MAX_VALUE);
+ case STRUCT:
+ List rowFields = getRowFields(fieldSchema);
+ return new RowType(nullable, rowFields);
case BYTES:
Map paras = fieldSchema.parameters();
int byteLen = Integer.MAX_VALUE;
@@ -229,8 +305,8 @@ private LogicalType primitiveLogicalType(Schema fieldSchema) {
}
}
- private LogicalType otherLogicalType(Schema fieldSchema) {
- boolean nullable = fieldSchema.isOptional();
+ private LogicalType otherLogicalType(Schema fieldSchema, boolean nullable) {
+ //boolean nullable = fieldSchema.isOptional();
switch (fieldSchema.name()) {
case Enum.LOGICAL_NAME:
case Json.LOGICAL_NAME:
@@ -361,11 +437,47 @@ public RowData convert(Struct struct, Schema schema, RowKind rowKind, long sortF
return row;
}
+ public RowData convertDocumentStruct(Struct struct) {
+ Schema schema = struct.schema();
+ int arity = schema.fields().size();
+ List fieldNames = schema.fields();
+ BinaryRowData row = new BinaryRowData(arity);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+
+ for (int i = 0; i < schema.fields().size(); i++) {
+ Field field = fieldNames.get(i);
+ String fieldName = field.name();
+ Schema fieldSchema = schema.field(fieldName).schema();
+ Object fieldValue = struct.getWithoutDefault(fieldName);
+ sqlSchemaAndFieldWrite(writer, i,fieldValue, fieldSchema, serverTimeZone);
+ }
+ writer.complete();
+ return row;
+ }
+
+
+ private void convertNestedStruct(BinaryRowWriter writer, int index, Struct nestedStruct, Schema nestedSchema) {
+ int nestedArity = nestedSchema.fields().size();
+ List nestedFields = nestedSchema.fields();
+ BinaryRowData nestedRow = new BinaryRowData(nestedArity);
+ BinaryRowWriter nestedWriter = new BinaryRowWriter(nestedRow);
+ for (int i = 0; i < nestedArity; i++) {
+ Field field = nestedFields.get(i);
+ String nestedFieldName = field.name();
+ Schema nestedFieldType = nestedSchema.field(nestedFieldName).schema();
+ Object nestedFieldValue = nestedStruct.getWithoutDefault(nestedFieldName);
+ sqlSchemaAndFieldWrite(nestedWriter, i, nestedFieldValue,nestedFieldType, serverTimeZone);
+ }
+ nestedWriter.complete();
+ RowDataSerializer rowDataSerializer = new RowDataSerializer();
+ writer.writeRow(index, nestedRow, rowDataSerializer);
+ }
+
private boolean isPrimitiveType(Schema fieldSchema) {
return fieldSchema.name() == null;
}
- private void sqlSchemaAndFieldWrite(BinaryRowWriter writer, int index, Object fieldValue, Schema fieldSchema, ZoneId serverTimeZone) {
+ public void sqlSchemaAndFieldWrite(BinaryRowWriter writer, int index, Object fieldValue, Schema fieldSchema, ZoneId serverTimeZone) {
if (isPrimitiveType(fieldSchema)) {
primitiveTypeWrite(writer, index, fieldValue, fieldSchema);
} else {
@@ -374,32 +486,41 @@ private void sqlSchemaAndFieldWrite(BinaryRowWriter writer, int index, Object fi
}
private void primitiveTypeWrite(BinaryRowWriter writer, int index, Object fieldValue, Schema fieldSchema) {
- switch (fieldSchema.type()) {
- case BOOLEAN:
- writeBoolean(writer, index, fieldValue);
- break;
- case INT8:
- case INT16:
- case INT32:
- writeInt(writer, index, fieldValue);
- break;
- case INT64:
- writeLong(writer, index, fieldValue);
- break;
- case FLOAT32:
- writeFloat(writer, index, fieldValue);
- break;
- case FLOAT64:
- writeDouble(writer, index, fieldValue);
- break;
- case STRING:
- writeString(writer, index, fieldValue);
- break;
- case BYTES:
- writeBinary(writer, index, fieldValue);
- break;
- default:
- throw new UnsupportedOperationException("LakeSoul doesn't support type: " + fieldSchema.type());
+ if (fieldValue == null){
+ writer.setNullAt(index);
+ }else if (fieldSchema.type().getName().equals("struct")){
+ convertNestedStruct(writer, index, (Struct) fieldValue,fieldSchema);
+ }else {
+ switch (fieldSchema.type()) {
+ case BOOLEAN:
+ writeBoolean(writer, index, fieldValue);
+ break;
+ case INT8:
+ case INT16:
+ case INT32:
+ writeInt(writer, index, fieldValue);
+ break;
+ case INT64:
+ writeLong(writer, index, fieldValue);
+ break;
+ case FLOAT32:
+ writeFloat(writer, index, fieldValue);
+ break;
+ case FLOAT64:
+ writeDouble(writer, index, fieldValue);
+ break;
+ case STRING:
+ writeString(writer, index, fieldValue);
+ break;
+ case BYTES:
+ writeBinary(writer, index, fieldValue);
+ break;
+// case STRUCT:
+// writeRow(writer, index, fieldValue);
+// break;
+ default:
+ throw new UnsupportedOperationException("LakeSoul doesn't support type: " + fieldSchema.type());
+ }
}
}
@@ -491,10 +612,9 @@ public Object convertToDecimal(Object dbzObj, Schema schema) {
}
}
Map paras = schema.parameters();
- if (paras==null){
+ if (paras == null) {
return DecimalData.fromBigDecimal(bigDecimal, 38, 30);
- }
- else {
+ } else {
return DecimalData.fromBigDecimal(bigDecimal, Integer.parseInt(paras.get("connect.decimal.precision")), Integer.parseInt(paras.get("scale")));
}
}
@@ -653,6 +773,16 @@ public void writeString(BinaryRowWriter writer, int index, Object dbzObj) {
writer.writeString(index, StringData.fromString(dbzObj.toString()));
}
+ public void writeRow(BinaryRowWriter writer, int index, Object dbzObj) {
+ RowData rowData = null;
+ if (dbzObj instanceof Struct) {
+ Struct struct = (Struct) dbzObj;
+ rowData = convertDocumentStruct(struct);
+ }
+ RowDataSerializer rowDataSerializer = new RowDataSerializer();
+ writer.writeRow(index, rowData, rowDataSerializer);
+ }
+
public void writeBinary(BinaryRowWriter writer, int index, Object dbzObj) {
if (dbzObj instanceof byte[]) {
writer.writeBinary(index, (byte[]) dbzObj);
@@ -666,4 +796,4 @@ public void writeBinary(BinaryRowWriter writer, int index, Object dbzObj) {
"Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
}
}
-}
+}
\ No newline at end of file
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/ParseDocument.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/ParseDocument.java
new file mode 100644
index 000000000..79e8d86b5
--- /dev/null
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/ParseDocument.java
@@ -0,0 +1,78 @@
+// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package org.apache.flink.lakesoul.types;
+
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
+import org.bson.Document;
+
+import java.util.Map;
+
+public class ParseDocument {
+ public static Struct convertBSONToStruct(String value) {
+ Document bsonDocument = Document.parse(value);
+ // 创建根 Struct 的 Schema
+ SchemaBuilder structSchemaBuilder = SchemaBuilder.struct();
+ Struct struct = new Struct(buildSchema(bsonDocument, structSchemaBuilder));
+ // 填充 Struct 的值
+ fillStructValues(bsonDocument, struct);
+
+ return struct;
+ }
+
+ private static Schema buildSchema(Document bsonDocument, SchemaBuilder structSchemaBuilder) {
+ for (Map.Entry entry : bsonDocument.entrySet()) {
+ String fieldName = entry.getKey();
+ Object value = entry.getValue();
+ if (value instanceof Document) {
+ // 处理嵌套的 Document
+ SchemaBuilder nestedStructSchemaBuilder = SchemaBuilder.struct();
+ structSchemaBuilder.field(fieldName, buildSchema((Document) value, nestedStructSchemaBuilder));
+ } else {
+ // 处理普通字段
+ structSchemaBuilder.field(fieldName, getSchemaForValue(value));
+ }
+ }
+ return structSchemaBuilder.build();
+ }
+
+ private static void fillStructValues(Document bsonDocument, Struct struct) {
+ for (Map.Entry entry : bsonDocument.entrySet()) {
+ String fieldName = entry.getKey();
+ Object value = entry.getValue();
+ if (value instanceof Document) {
+ // 处理嵌套的 Document
+ Struct nestedStruct = new Struct(struct.schema().field(fieldName).schema());
+ fillStructValues((Document) value, nestedStruct);
+ struct.put(fieldName, nestedStruct);
+ } else {
+ // 处理普通字段
+ struct.put(fieldName, value);
+ }
+ }
+ }
+
+ private static Schema getSchemaForValue(Object value) {
+ // 根据值的类型返回对应的 Schema
+ if (value instanceof String) {
+ return Schema.STRING_SCHEMA;
+ } else if (value instanceof Integer) {
+ return Schema.INT32_SCHEMA;
+ } else if (value instanceof Long) {
+ return Schema.INT64_SCHEMA;
+ } else if (value instanceof Double) {
+ return Schema.FLOAT64_SCHEMA;
+ } else if (value instanceof Boolean) {
+ return Schema.BOOLEAN_SCHEMA;
+ } else if (value instanceof Byte) {
+ return Schema.BYTES_SCHEMA;
+ } else {
+ // 处理其他类型,可以根据实际情况添加更多类型
+ return Schema.STRING_SCHEMA;
+ }
+ }
+}
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/StructConvert.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/StructConvert.java
new file mode 100644
index 000000000..1917d8367
--- /dev/null
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/StructConvert.java
@@ -0,0 +1,106 @@
+// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package org.apache.flink.lakesoul.types;
+
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
+import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
+import org.apache.flink.table.data.*;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+
+import java.util.List;
+public class StructConvert {
+ public RowData convert(Struct struct) {
+ // 获取Struct的schema和字段个数
+ Schema schema = struct.schema();
+ int arity = schema.fields().size() + 1; // +1 for extra event sortField
+ List fieldNames = schema.fields();
+ // 创建BinaryRowData
+ BinaryRowData row = new BinaryRowData(arity);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ // 遍历Struct的字段
+ for (int i = 0; i < schema.fields().size(); i++) {
+ Field field = fieldNames.get(i);
+ String fieldName = field.name();
+ Schema fieldSchema = schema.field(fieldName).schema();
+ // 获取Struct字段的值
+ Object fieldValue = struct.getWithoutDefault(fieldName);
+ // 将字段值写入BinaryRowData
+ writeField(writer, i, fieldSchema, fieldValue);
+ }
+ // 关闭BinaryRowWriter
+ writer.complete();
+ return row;
+ }
+
+ public RowData convert1(Struct struct){
+ Schema schema = struct.schema();
+ int arity = schema.fields().size()+1;
+ List fieldNames = schema.fields();
+ BinaryRowData row = new BinaryRowData(arity);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ Field field = fieldNames.get(0);
+ String fieldName = field.name();;
+ Schema fieldSchema = schema.field(fieldName).schema();
+ Object fieldValue = struct.getWithoutDefault(fieldName);
+ RowDataSerializer rowDataSerializer = new RowDataSerializer();
+ writer.writeRow(0,(RowData) fieldValue ,rowDataSerializer);
+ writer.complete();
+ return row;
+ }
+ // 递归处理嵌套的Struct类型
+ private void writeField(BinaryRowWriter writer, int index, Schema schema, Object fieldValue) {
+
+ if (fieldValue == null) {
+ // 如果字段值为null,则写入null
+ writer.setNullAt(index);
+ } else if (schema.type().getName().equals("struct")) {
+ // 如果字段类型是StructType,递归处理
+ convertNestedStruct(writer, index, (Struct) fieldValue, schema);
+ } else {
+ // 根据字段类型写入值
+ // 这里根据实际情况,可能需要根据字段类型进行不同的处理
+ switch (schema.type()) {
+ case INT8:
+ case INT16:
+ case INT32:
+ writer.writeInt(index, (Integer) fieldValue);
+ break;
+ case STRING:
+ writer.writeString(index, StringData.fromString(fieldValue.toString()));
+ break;
+ default:
+ writer.writeString(index, StringData.fromString(fieldValue.toString()));
+ }
+
+ }
+ }
+ // 递归处理嵌套的Struct类型
+ private void convertNestedStruct(BinaryRowWriter writer, int index, Struct nestedStruct, Schema nestedSchema) {
+ // 获取嵌套Struct的schema和字段个数
+ int nestedArity = nestedSchema.fields().size();
+ List nestedFields = nestedSchema.fields();
+ // 创建嵌套Struct的BinaryRowData
+ BinaryRowData nestedRow = new BinaryRowData(nestedArity);
+ BinaryRowWriter nestedWriter = new BinaryRowWriter(nestedRow);
+ // 遍历嵌套Struct的字段
+ for (int i = 0; i < nestedArity; i++) {
+ Field field = nestedFields.get(i);
+ String nestedFieldName = field.name();
+ Schema nestedFieldType = nestedSchema.field(nestedFieldName).schema();
+ // 获取嵌套Struct字段的值
+ Object nestedFieldValue = nestedStruct.getWithoutDefault(nestedFieldName);
+ // 将嵌套Struct字段值写入BinaryRowData
+ writeField(nestedWriter, i, nestedFieldType, nestedFieldValue);
+ }
+ // 关闭嵌套Struct的BinaryRowWriter
+ nestedWriter.complete();
+ RowDataSerializer rowDataSerializer = new RowDataSerializer();
+ writer.writeRow(index, nestedRow, rowDataSerializer);
+ writer.complete();
+ }
+}
\ No newline at end of file
From fb1eb116251da1bac5a3ba668ba96450eb41e029 Mon Sep 17 00:00:00 2001
From: ChenYunHey <1908166778@qq.com>
Date: Mon, 18 Mar 2024 14:54:49 +0800
Subject: [PATCH 02/13] support mongodb sync
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
lakesoul-flink/pom.xml | 15 ++
.../apache/flink/lakesoul/entry/JdbcCDC.java | 17 +-
.../flink/lakesoul/entry/SyncDatabase.java | 161 ++++++++++++++++--
.../LakeSoulSinkGlobalCommitter.java | 3 +
.../tool/LakeSoulSinkDatabasesOptions.java | 20 ++-
.../lakesoul/tool/LakeSoulSinkOptions.java | 11 ++
.../lakesoul/types/LakeSoulRecordConvert.java | 53 +++++-
.../flink/lakesoul/types/ParseDocument.java | 41 ++++-
8 files changed, 292 insertions(+), 29 deletions(-)
diff --git a/lakesoul-flink/pom.xml b/lakesoul-flink/pom.xml
index 6462cf692..5218fc67d 100644
--- a/lakesoul-flink/pom.xml
+++ b/lakesoul-flink/pom.xml
@@ -147,11 +147,26 @@ SPDX-License-Identifier: Apache-2.0
flink-sql-connector-oracle-cdc
${cdc.version}
+
+ org.mongodb
+ mongo-java-driver
+ 3.12.14
+
com.ververica
flink-sql-connector-mongodb-cdc
${cdc.version}
+
+ org.apache.flink
+ flink-connector-mongodb
+ 1.0.1-1.17
+
+
+ org.mongodb
+ mongodb-driver-sync
+ 3.10.1
+
org.mongodb
bson
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
index aa5a6317f..d1c9cb1e5 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
@@ -54,6 +54,8 @@ public class JdbcCDC {
private static String[] tableList;
private static String serverTimezone;
private static String pluginName;
+ private static int batchSize;
+ private static String mongoDatabase;
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
@@ -64,7 +66,7 @@ public static void main(String[] args) throws Exception {
host = parameter.get(SOURCE_DB_HOST.key());
port = parameter.getInt(SOURCE_DB_PORT.key(), MysqlDBManager.DEFAULT_MYSQL_PORT);
//Postgres Oracle
- if (dbType.equalsIgnoreCase("oracle") || dbType.equalsIgnoreCase("postgres") || dbType.equalsIgnoreCase("mongodb")) {
+ if (dbType.equalsIgnoreCase("oracle") || dbType.equalsIgnoreCase("postgres") ) {
schemaList = parameter.get(SOURCE_DB_SCHEMA_LIST.key()).split(",");
String[] tables = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
tableList = new String[tables.length];
@@ -73,7 +75,12 @@ public static void main(String[] args) throws Exception {
}
splitSize = parameter.getInt(SOURCE_DB_SPLIT_SIZE.key(), SOURCE_DB_SPLIT_SIZE.defaultValue());
}
- if (dbType.equalsIgnoreCase("sqlserver")){
+ if (dbType.equalsIgnoreCase("sqlserver") ){
+ tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
+ }
+ if ( dbType.equalsIgnoreCase("mongodb")){
+ mongoDatabase = parameter.get(MONGO_DB_DATABASE.key());
+ batchSize = parameter.getInt(BATCH_SIZE.key(), BATCH_SIZE.defaultValue());
tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
}
pluginName = parameter.get(PLUGIN_NAME.key(), PLUGIN_NAME.defaultValue());
@@ -287,11 +294,11 @@ private static void mongoCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Config
MongoDBSource mongoSource =
MongoDBSource.builder()
.hosts(host)
- .databaseList(schemaList) // 设置捕获的数据库,支持正则表达式
+ .databaseList(mongoDatabase) // 设置捕获的数据库,支持正则表达式
.collectionList(tableList) //设置捕获的集合,支持正则表达式
.startupOptions(StartupOptions.initial())
.scanFullChangelog(true)
- .batchSize(splitSize)
+ .batchSize(batchSize)
.username(userName)
.password(passWord)
.deserializer(new BinaryDebeziumDeserializationSchema(lakeSoulRecordConvert, conf.getString(WAREHOUSE_PATH)))
@@ -305,7 +312,7 @@ private static void mongoCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Config
builder =
new LakeSoulMultiTableSinkStreamBuilder(mongoSource, context, lakeSoulRecordConvert);
DataStreamSource source = builder.buildMultiTableSource("mongodb Source");
-
+ source.print();
DataStream stream = builder.buildHashPartitionedCDCStream(source);
DataStreamSink dmlSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From mongo Database " + dbName);
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
index aaf13d0c3..725707caf 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
@@ -5,18 +5,27 @@
package org.apache.flink.lakesoul.entry;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.amazonaws.services.dynamodbv2.xspec.S;
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
+import com.mongodb.MongoClientURI;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.WriteModel;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.mongodb.sink.MongoSink;
+import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
+import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.lakesoul.metadata.LakeSoulCatalog;
import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
@@ -24,25 +33,38 @@
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.*;
+import org.apache.flink.types.Row;
+import org.bson.*;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
import java.sql.*;
-import java.util.List;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.Date;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkDatabasesOptions.*;
public class SyncDatabase {
- public static void main(String[] args) throws SQLException {
+ static Table coll;
+ public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
String sourceDatabase = parameter.get(SOURCE_DB_DB_NAME.key());
String sourceTableName = parameter.get(SOURCE_DB_LAKESOUL_TABLE.key()).toLowerCase();
- String targetSyncName = parameter.get(TARGET_DATABASE_TYPE.key());
+ String dbType = parameter.get(TARGET_DATABASE_TYPE.key());
String targetDatabase = parameter.get(TARGET_DB_DB_NAME.key());
String targetTableName = parameter.get(TARGET_DB_TABLE_NAME.key()).toLowerCase();
String url = parameter.get(TARGET_DB_URL.key());
- String username = parameter.get(TARGET_DB_USER.key());
- String password = parameter.get(TARGET_DB_PASSWORD.key());
+ String username = null;
+ String password = null;
+ if (!dbType.equals("mongodb")){
+ username = parameter.get(TARGET_DB_USER.key());
+ password = parameter.get(TARGET_DB_PASSWORD.key());
+ }
int sinkParallelism = parameter.getInt(SINK_PARALLELISM.key(), SINK_PARALLELISM.defaultValue());
boolean useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue());
//int replicationNum = parameter.getInt(DORIS_REPLICATION_NUM.key(), DORIS_REPLICATION_NUM.defaultValue());
@@ -54,7 +76,7 @@ public static void main(String[] args) throws SQLException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(sinkParallelism);
- switch (targetSyncName) {
+ switch (dbType) {
case "mysql":
xsyncToMysql(env, useBatch, url, sourceDatabase, username, password, targetDatabase, sourceTableName, targetTableName);
break;
@@ -64,8 +86,14 @@ public static void main(String[] args) throws SQLException {
case "doris":
xsyncToDoris(env, useBatch, url, sourceDatabase, username, password, targetDatabase, sourceTableName, targetTableName, fenodes);
break;
+ case "mongodb":
+ String uri = parameter.get(MONGO_DB_URI.key());
+ int batchSize = parameter.getInt(BATCH_SIZE.key(), BATCH_SIZE.defaultValue());
+ int batchIntervalMs = parameter.getInt(BATCH_INTERVAL_MS.key(), BATCH_INTERVAL_MS.defaultValue());
+ xsyncToMongodb(env,useBatch,uri,sourceDatabase,targetDatabase,sourceTableName,targetTableName,batchSize,sinkParallelism,batchIntervalMs);
+ break;
default:
- throw new RuntimeException("not supported the database: " + targetSyncName);
+ throw new RuntimeException("not supported the database: " + dbType);
}
}
@@ -296,4 +324,117 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, boolean batchXyn
tEnvs.executeSql(sql);
tEnvs.executeSql("insert into "+targetTableName+" select * from lakeSoul.`"+sourceDatabase+"`."+sourceTableName);
}
+
+ public static void xsyncToMongodb(StreamExecutionEnvironment env,
+ boolean batchXync,
+ String uri ,
+ String sourceDatabase,
+ String targetDatabase,
+ String sourceTableName,
+ String targetCollection,
+ int batchSize,
+ int sinkParallelism,
+ int batchInservalMs) throws Exception {
+ createMongoColl(targetDatabase,targetCollection,uri);
+ if (batchXync) {
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ } else {
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ }
+ StreamTableEnvironment tEnvs = StreamTableEnvironment.create(env);
+ Catalog lakesoulCatalog = new LakeSoulCatalog();
+ tEnvs.registerCatalog("lakeSoul",lakesoulCatalog);
+ coll = tEnvs.sqlQuery("select * from lakeSoul.`"+sourceDatabase+"`.`"+sourceTableName+"`");
+ tEnvs.registerTable("mongodbTbl",coll);
+ Table table = tEnvs.sqlQuery("select * from mongodbTbl");
+ DataStream> rowDataStream = tEnvs.toRetractStream(table, Row.class);
+
+ MongoClientURI mongUri = new MongoClientURI(uri);
+ MongoClient mongoClient = new MongoClient(mongUri);
+ MongoDatabase database = mongoClient.getDatabase(targetDatabase);
+ database.createCollection(targetCollection);
+ MongoSink> sink = MongoSink.>builder()
+ .setUri(uri)
+ .setDatabase(targetDatabase)
+ .setCollection(targetCollection)
+ .setBatchSize(batchSize)
+ .setBatchIntervalMs(batchInservalMs)
+ .setMaxRetries(3)
+ .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .setSerializationSchema(new MyMongoSerializationSchema())
+ .build();
+ rowDataStream.sinkTo(sink).setParallelism(sinkParallelism);
+ env.execute();
+ mongoClient.close();
+ }
+ public static void createMongoColl(String database,String collName,String uri){
+ MongoClientURI clientURI = new MongoClientURI(uri);
+ MongoClient mongoClient = new MongoClient(clientURI);
+ MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
+ mongoDatabase.createCollection(collName);
+ mongoClient.close();
+ }
+ public static class MyMongoSerializationSchema implements MongoSerializationSchema>, Serializable {
+ @Override
+ public WriteModel serialize(Tuple2 record, MongoSinkContext context) {
+ Row row = record.f1; // Extract the Row object from the Tuple2
+ BsonDocument document = new BsonDocument();
+ int fieldCount = row.getArity();
+ for (int i = 0; i < fieldCount; i++) {
+ String fieldName = coll.getSchema().getFieldNames()[i];
+ Object fieldValue = row.getField(i);
+ if (fieldValue != null) {
+ try {
+ document.append(fieldName, convertTonBsonValue(fieldValue));
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return new InsertOneModel<>(document);
+ }
+
+ public static BsonValue convertTonBsonValue(Object value) throws ParseException {
+ if (value == null) {
+ return new BsonNull();
+ } else if (value instanceof Integer) {
+ return new BsonInt32((Integer) value);
+ } else if (value instanceof Long) {
+ return new BsonInt64((Long) value);
+ } else if (value instanceof String) {
+ return new BsonString((String) value);
+ } else if (value instanceof Boolean) {
+ return new BsonBoolean( (Boolean) value);
+ } else if (value instanceof Double) {
+ return new BsonDouble( (Double) value);
+ } else if (value instanceof DecimalType) {
+ return new BsonDecimal128((Decimal128) value);
+ } else if (value instanceof Date) {
+ return new BsonDateTime( (long) value);
+ } else if (value instanceof Object[]) {
+ Object[] array = (Object[]) value;
+ BsonArray bsonArray = new BsonArray();
+ for (Object element : array) {
+ bsonArray.add(convertTonBsonValue(element));
+ }
+ return bsonArray;
+ } else if (isDateTimeString(value)) {
+ Date date = parseDateTime(value.toString());
+ return new BsonDateTime(date.getTime());
+ } else {
+ throw new IllegalArgumentException("Unsupported data type: " + value.getClass());
+ }
+ }
+ private static boolean isDateTimeString(Object value) {
+ return value.toString().matches("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$");
+ }
+ private static Date parseDateTime(String value) throws ParseException {
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return dateFormat.parse(value);
+ }
+
+ }
}
\ No newline at end of file
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
index de3c129dc..ecd8c3573 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
@@ -201,6 +201,9 @@ public List commit(
identity.useCDC,
identity.cdcColumn);
if (dbType.equals("mongodb")){
+ System.out.println(msgSchema.toJson());
+ System.out.println(origSchema.json());
+ System.out.println(mergeStructType.json());
dbManager.updateTableSchema(tableInfo.getTableId(), mergeStructType.json());
}else {
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.toJson());
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkDatabasesOptions.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkDatabasesOptions.java
index 6b9aa5fd3..278e2a6c5 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkDatabasesOptions.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkDatabasesOptions.java
@@ -35,7 +35,23 @@ public class LakeSoulSinkDatabasesOptions extends LakeSoulSinkOptions {
.noDefaultValue()
.withDescription("source database user_name");
+ public static final ConfigOption MONGO_DB_URI = ConfigOptions
+ .key("mongodb.uri")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("mongodb uri");
+ public static final ConfigOption BATCH_SIZE = ConfigOptions
+ .key("batchSize")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("Sets the maximum number of actions to buffer for each batch request");
+
+ public static final ConfigOption BATCH_INTERVAL_MS = ConfigOptions
+ .key("batchIntervalMs")
+ .intType()
+ .defaultValue(1000)
+ .withDescription("Sets the batch flush interval, in milliseconds");
public static final ConfigOption TARGET_DB_PASSWORD = ConfigOptions
.key("target_db.password")
.stringType()
@@ -49,21 +65,18 @@ public class LakeSoulSinkDatabasesOptions extends LakeSoulSinkOptions {
.noDefaultValue()
.withDescription("mysql,postgres,doris");
-
public static final ConfigOption TARGET_DB_DB_NAME = ConfigOptions
.key("target_db.db_name")
.stringType()
.noDefaultValue()
.withDescription("target ddatabase name");
-
public static final ConfigOption SOURCE_DB_LAKESOUL_TABLE = ConfigOptions
.key("source_db.table_name")
.stringType()
.noDefaultValue()
.withDescription("lakesoul table");
-
public static final ConfigOption TARGET_DB_TABLE_NAME = ConfigOptions
.key("target_db.table_name")
.stringType()
@@ -84,7 +97,6 @@ public class LakeSoulSinkDatabasesOptions extends LakeSoulSinkOptions {
.defaultValue(1)
.withDescription("parallelism settings for out-of-the-lake");
-
public static final ConfigOption BATHC_STREAM_SINK = ConfigOptions
.key("use_batch")
.booleanType()
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java
index 87051d84e..2d4e6fe4c 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkOptions.java
@@ -141,6 +141,17 @@ public class LakeSoulSinkOptions {
.stringType()
.noDefaultValue()
.withDescription("source database schemaList");
+ public static final ConfigOption MONGO_DB_DATABASE = ConfigOptions
+ .key("mongodb_database")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("mongodb database");
+
+ public static final ConfigOption BATCH_SIZE = ConfigOptions
+ .key("batchSize")
+ .intType()
+ .defaultValue(1024)
+ .withDescription("The cursor batch size");
public static final ConfigOption SOURCE_DB_SCHEMA_TABLES = ConfigOptions
.key("source_db.schema_tables")
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
index 829cc6eb6..a8a27015e 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
@@ -36,6 +36,7 @@
import org.apache.flink.table.data.*;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.runtime.typeutils.ArrayDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.*;
@@ -237,7 +238,7 @@ public RowType toFlinkRowType(Schema schema, boolean nullable) {
for (int i = 0; i < (useCDC ? arity - 2 : arity - 1); i++) {
Field item = fieldNames.get(i);
colNames[i] = item.name();
- colTypes[i] = convertToLogical(item.schema(),nullable);
+ colTypes[i] = convertToLogical(item.schema(), !item.name().equals("_id") && nullable);
}
// colNames[useCDC ? arity - 3 : arity - 2] = BINLOG_FILE_INDEX;
// colTypes[useCDC ? arity - 3 : arity - 2] = new BigIntType();
@@ -292,6 +293,8 @@ private LogicalType primitiveLogicalType(Schema fieldSchema,boolean nullable) {
case STRUCT:
List rowFields = getRowFields(fieldSchema);
return new RowType(nullable, rowFields);
+ case ARRAY:
+ return new ArrayType(nullable, Objects.requireNonNull(primitiveLogicalType(fieldSchema.valueSchema(), nullable)));
case BYTES:
Map paras = fieldSchema.parameters();
int byteLen = Integer.MAX_VALUE;
@@ -325,13 +328,18 @@ private LogicalType otherLogicalType(Schema fieldSchema, boolean nullable) {
return new LocalZonedTimestampType(nullable, 9);
case Decimal.LOGICAL_NAME:
Map paras = fieldSchema.parameters();
- return new DecimalType(nullable, Integer.parseInt(paras.get("connect.decimal.precision")), Integer.parseInt(paras.get("scale")));
+ if (paras.get("connect.decimal.precision") == null){
+ return new DecimalType(nullable, 38, 30);
+ }else {
+ return new DecimalType(nullable, Integer.parseInt(paras.get("connect.decimal.precision")), Integer.parseInt(paras.get("scale")));
+ }
case Date.SCHEMA_NAME:
return new DateType(nullable);
case Year.SCHEMA_NAME:
return new IntType(nullable);
case ZonedTime.SCHEMA_NAME:
case ZonedTimestamp.SCHEMA_NAME:
+ case com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME:
return new LocalZonedTimestampType(nullable, LocalZonedTimestampType.DEFAULT_PRECISION);
case Geometry.LOGICAL_NAME:
case VariableScaleDecimal.LOGICAL_NAME:
@@ -515,6 +523,9 @@ private void primitiveTypeWrite(BinaryRowWriter writer, int index, Object fieldV
case BYTES:
writeBinary(writer, index, fieldValue);
break;
+ case ARRAY:
+ writeArray(writer,index,fieldValue, fieldSchema);
+ break;
// case STRUCT:
// writeRow(writer, index, fieldValue);
// break;
@@ -540,6 +551,7 @@ private void otherTypeWrite(BinaryRowWriter writer, int index,
case Timestamp.SCHEMA_NAME:
case MicroTimestamp.SCHEMA_NAME:
case NanoTimestamp.SCHEMA_NAME:
+ case com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME:
writeTimeStamp(writer, index, fieldValue, fieldSchema, serverTimeZone);
break;
case Decimal.LOGICAL_NAME:
@@ -561,6 +573,7 @@ private void otherTypeWrite(BinaryRowWriter writer, int index,
case MicroDuration.SCHEMA_NAME:
writeLong(writer, index, fieldValue);
break;
+
// Geometry and Point can not support now
// case Geometry.LOGICAL_NAME:
// Object object = convertToGeometry(fieldValue, fieldSchema);
@@ -612,7 +625,7 @@ public Object convertToDecimal(Object dbzObj, Schema schema) {
}
}
Map paras = schema.parameters();
- if (paras == null) {
+ if (paras.get("connect.decimal.precision") == null) {
return DecimalData.fromBigDecimal(bigDecimal, 38, 30);
} else {
return DecimalData.fromBigDecimal(bigDecimal, Integer.parseInt(paras.get("connect.decimal.precision")), Integer.parseInt(paras.get("scale")));
@@ -711,6 +724,40 @@ private Object convertToPoint(Object dbzObj, Schema schema) {
"Unsupported Struct value type: " + dbzObj.getClass().getSimpleName());
}
}
+ public void writeArray(BinaryRowWriter writer, int index, Object dbzObj, Schema schema) {
+ if (dbzObj instanceof ArrayList) {
+ ArrayList
-
- com.ververica
- flink-sql-connector-postgres-cdc
- ${cdc.version}
-
+
com.ververica
flink-sql-connector-sqlserver-cdc
@@ -148,9 +144,9 @@ SPDX-License-Identifier: Apache-2.0
${cdc.version}
- org.mongodb
- mongo-java-driver
- 3.12.14
+ com.ververica
+ flink-sql-connector-postgres-cdc
+ ${cdc.version}
com.ververica
@@ -162,15 +158,10 @@ SPDX-License-Identifier: Apache-2.0
flink-connector-mongodb
1.0.1-1.17
-
- org.mongodb
- mongodb-driver-sync
- 3.10.1
-
org.mongodb
bson
- 4.3.4
+ 4.3.4
org.apache.doris
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
index d1c9cb1e5..fd365ece6 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
@@ -12,7 +12,7 @@
import com.ververica.cdc.connectors.oracle.source.OracleSourceBuilder;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.connectors.mongodb.source.MongoDBSource;
-import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
+
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
@@ -218,6 +218,7 @@ private static void postgresCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Con
DataStreamSource source = builder.buildMultiTableSource("Postgres Source");
DataStream stream = builder.buildHashPartitionedCDCStream(source);
+ stream.print();
DataStreamSink dmlSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From Postgres Database " + dbName);
}
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MongoSinkUtils.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MongoSinkUtils.java
new file mode 100644
index 000000000..7e4b1f06a
--- /dev/null
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MongoSinkUtils.java
@@ -0,0 +1,171 @@
+// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package org.apache.flink.lakesoul.entry;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.WriteModel;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
+import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.bson.*;
+import org.bson.types.Decimal128;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+public class MongoSinkUtils {
+
+ static Table coll;
+ static List structNameFiledList;
+
+ public static void createMongoColl(String database, String collName, String uri) {
+ MongoClient mongoClient = MongoClients.create(uri);
+ MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
+ mongoDatabase.createCollection(collName);
+ mongoClient.close();
+ }
+
+ public static class MyMongoSerializationSchema implements MongoSerializationSchema>, Serializable {
+ @Override
+ public WriteModel serialize(Tuple2 record, MongoSinkContext context) {
+ Row row = record.f1; // Extract the Row object from the Tuple2
+ BsonDocument document = new BsonDocument();
+ int fieldCount = row.getArity();
+ DataType[] fieldDataTypes = coll.getSchema().getFieldDataTypes();
+ for (int i = 0; i < fieldCount; i++) {
+ String fieldName = coll.getSchema().getFieldNames()[i];
+ Object fieldValue = row.getField(i);
+ if (fieldValue instanceof Row) {
+ DataType dataType = fieldDataTypes[i];
+ RowType rowType = (RowType) dataType.getLogicalType();
+ structNameFiledList = traverseRow(rowType);
+ }
+ if (fieldValue != null) {
+ try {
+ document.append(fieldName, convertTonBsonValue(fieldValue, fieldName));
+ } catch (ParseException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return new InsertOneModel<>(document);
+ }
+
+
+ public static BsonValue convertTonBsonValue(Object value, String fieldName) throws ParseException {
+ if (value == null) {
+ return new BsonNull();
+ } else if (value instanceof Integer) {
+ return new BsonInt32((Integer) value);
+ } else if (value instanceof Long) {
+ return new BsonInt64((Long) value);
+ } else if (value instanceof String) {
+ return new BsonString((String) value);
+ } else if (value instanceof Boolean) {
+ return new BsonBoolean((Boolean) value);
+ } else if (value instanceof Double) {
+ return new BsonDouble((Double) value);
+ } else if (value instanceof BigDecimal) {
+ return new BsonDecimal128(new Decimal128((BigDecimal) value));
+ } else if (value instanceof Date) {
+ return new BsonDateTime((long) value);
+ } else if (value instanceof BinaryType) {
+ return new BsonBinary((byte[]) value);
+ } else if (value instanceof byte[]) {
+ return new BsonBinary((byte[]) value);
+ } else if (value instanceof Object[]) {
+ Object[] array = (Object[]) value;
+ BsonArray bsonArray = new BsonArray();
+ for (Object element : array) {
+ bsonArray.add(convertTonBsonValue(element, fieldName));
+ }
+ return bsonArray;
+ } else if (isDateTimeString(value)) {
+ Date date = parseDateTime(value.toString());
+ return new BsonDateTime(date.getTime());
+ } else if (value instanceof Row) {
+ Row row = (Row) value;
+ BsonDocument bsonDocument = new BsonDocument();
+ for (int i = 0; i < row.getArity(); i++) {
+ Object fieldValue = row.getField(i);
+ List stringList = new ArrayList<>(structNameFiledList);
+ String name = structNameFiledList.get(0);
+ stringList.remove(0);
+ structNameFiledList = stringList;
+ bsonDocument.append(name, convertTonBsonValue(fieldValue, name));
+ }
+ return bsonDocument;
+ } else {
+ throw new IllegalArgumentException("Unsupported data type: " + value.getClass());
+ }
+ }
+
+ public static List traverseRow(RowType rowType) {
+ List nameList = new ArrayList<>();
+ traverseField(rowType, nameList);
+ return nameList;
+ }
+
+
+ private static void traverseField(RowType rowType, List nameList) {
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ String fieldName = rowType.getFieldNames().get(i);
+ LogicalType fieldType = rowType.getTypeAt(i);
+ nameList.add(fieldName);
+ if (fieldType instanceof RowType) {
+ traverseField((RowType) fieldType, nameList);
+ }
+ }
+ }
+
+ public static List findDirectNestedNames(JSONObject jsonObject, String targetFieldName, int currentLevel, int targetLevel) {
+ List nestedNames = new ArrayList<>();
+ findDirectNestedNamesHelper(jsonObject, targetFieldName, currentLevel, targetLevel, nestedNames);
+ return nestedNames;
+ }
+
+ public static void findDirectNestedNamesHelper(JSONObject jsonObject, String targetFieldName, int currentLevel, int targetLevel, List nestedNames) {
+ if (currentLevel == targetLevel) {
+ if (jsonObject.getString("name").equals(targetFieldName)) {
+ JSONArray children = jsonObject.getJSONArray("children");
+ for (Object obj : children) {
+ JSONObject child = (JSONObject) obj;
+ nestedNames.add(child.getString("name"));
+ }
+ }
+ } else {
+ JSONArray children = jsonObject.getJSONArray("children");
+ for (Object obj : children) {
+ JSONObject child = (JSONObject) obj;
+ findDirectNestedNamesHelper(child, targetFieldName, currentLevel + 1, targetLevel, nestedNames);
+ }
+ }
+ }
+
+ public static boolean isDateTimeString(Object value) {
+ return value.toString().matches("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$");
+ }
+
+ public static Date parseDateTime(String value) throws ParseException {
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return dateFormat.parse(value);
+ }
+ }
+}
\ No newline at end of file
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
index 725707caf..14b67bbaa 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
@@ -2,24 +2,16 @@
//
// SPDX-License-Identifier: Apache-2.0
-
package org.apache.flink.lakesoul.entry;
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
-import com.mongodb.MongoClientURI;
-import com.mongodb.MongoClient;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.InsertOneModel;
-import com.mongodb.client.model.WriteModel;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.sink.MongoSink;
-import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
-import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.lakesoul.metadata.LakeSoulCatalog;
@@ -34,39 +26,41 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.*;
import org.apache.flink.types.Row;
-import org.bson.*;
-import org.bson.types.Decimal128;
-
-import java.io.Serializable;
import java.sql.*;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
import java.util.*;
-import java.util.Date;
+import static org.apache.flink.lakesoul.entry.MongoSinkUtils.*;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkDatabasesOptions.*;
public class SyncDatabase {
- static Table coll;
+ static String targetTableName;
+ static String dbType;
+ static String sourceDatabase;
+ static String sourceTableName;
+ static String targetDatabase;
+ static String url;
+ static String username;
+ static String password;
+ static boolean useBatch;
+ static int sinkParallelism;
+
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
- String sourceDatabase = parameter.get(SOURCE_DB_DB_NAME.key());
- String sourceTableName = parameter.get(SOURCE_DB_LAKESOUL_TABLE.key()).toLowerCase();
- String dbType = parameter.get(TARGET_DATABASE_TYPE.key());
- String targetDatabase = parameter.get(TARGET_DB_DB_NAME.key());
- String targetTableName = parameter.get(TARGET_DB_TABLE_NAME.key()).toLowerCase();
- String url = parameter.get(TARGET_DB_URL.key());
- String username = null;
- String password = null;
- if (!dbType.equals("mongodb")){
+ sourceDatabase = parameter.get(SOURCE_DB_DB_NAME.key());
+ sourceTableName = parameter.get(SOURCE_DB_LAKESOUL_TABLE.key()).toLowerCase();
+ dbType = parameter.get(TARGET_DATABASE_TYPE.key());
+ targetDatabase = parameter.get(TARGET_DB_DB_NAME.key());
+ targetTableName = parameter.get(TARGET_DB_TABLE_NAME.key()).toLowerCase();
+ url = parameter.get(TARGET_DB_URL.key());
+ if (!dbType.equals("mongodb")) {
username = parameter.get(TARGET_DB_USER.key());
password = parameter.get(TARGET_DB_PASSWORD.key());
}
- int sinkParallelism = parameter.getInt(SINK_PARALLELISM.key(), SINK_PARALLELISM.defaultValue());
- boolean useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue());
+ sinkParallelism = parameter.getInt(SINK_PARALLELISM.key(), SINK_PARALLELISM.defaultValue());
+ useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue());
//int replicationNum = parameter.getInt(DORIS_REPLICATION_NUM.key(), DORIS_REPLICATION_NUM.defaultValue());
String fenodes = parameter.get(DORIS_FENODES.key(), DORIS_FENODES.defaultValue());
@@ -78,19 +72,19 @@ public static void main(String[] args) throws Exception {
switch (dbType) {
case "mysql":
- xsyncToMysql(env, useBatch, url, sourceDatabase, username, password, targetDatabase, sourceTableName, targetTableName);
+ xsyncToMysql(env);
break;
case "postgres":
- xsyncToPg(env, useBatch, url, sourceDatabase, username, password, targetDatabase, sourceTableName, targetTableName);
+ xsyncToPg(env);
break;
case "doris":
- xsyncToDoris(env, useBatch, url, sourceDatabase, username, password, targetDatabase, sourceTableName, targetTableName, fenodes);
+ xsyncToDoris(env, fenodes);
break;
case "mongodb":
String uri = parameter.get(MONGO_DB_URI.key());
int batchSize = parameter.getInt(BATCH_SIZE.key(), BATCH_SIZE.defaultValue());
int batchIntervalMs = parameter.getInt(BATCH_INTERVAL_MS.key(), BATCH_INTERVAL_MS.defaultValue());
- xsyncToMongodb(env,useBatch,uri,sourceDatabase,targetDatabase,sourceTableName,targetTableName,batchSize,sinkParallelism,batchIntervalMs);
+ xsyncToMongodb(env, uri, batchSize, batchIntervalMs);
break;
default:
throw new RuntimeException("not supported the database: " + dbType);
@@ -109,7 +103,7 @@ public static String pgAndMsqlCreateTableSql(String[] stringFieldTypes, String[]
createTableQuery.append(", ");
}
}
- if (pk!=null){
+ if (pk != null) {
createTableQuery.append(" ,PRIMARY KEY(").append(pk);
createTableQuery.append(")");
}
@@ -119,11 +113,10 @@ public static String pgAndMsqlCreateTableSql(String[] stringFieldTypes, String[]
public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] fieldNames, String pk) {
String[] stringFieldTypes = new String[fieldTypes.length];
-
for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
String mysqlType = "TEXT";
- if (pk!=null){
+ if (pk != null) {
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR(100)";
}
@@ -137,6 +130,8 @@ public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] field
stringFieldTypes[i] = "TIMESTAMP";
} else if (fieldTypes[i].getLogicalType() instanceof BooleanType) {
stringFieldTypes[i] = "BOOLEAN";
+ } else if (fieldTypes[i].getLogicalType() instanceof VarBinaryType) {
+ stringFieldTypes[i] = "BLOB";
} else {
stringFieldTypes[i] = fieldTypes[i].toString();
}
@@ -150,7 +145,7 @@ public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNam
for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
String mysqlType = "TEXT";
- if (pk!=null){
+ if (pk != null) {
if (pk.contains(fieldNames[i])) {
mysqlType = "VARCHAR(100)";
}
@@ -164,6 +159,8 @@ public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNam
stringFieldTypes[i] = "BYTEA";
} else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) {
stringFieldTypes[i] = "TIMESTAMP";
+ } else if (fieldTypes[i].getLogicalType() instanceof VarBinaryType) {
+ stringFieldTypes[i] = "BYTEA";
} else {
stringFieldTypes[i] = fieldTypes[i].toString();
}
@@ -174,13 +171,13 @@ public static String[] getPgFieldsTypes(DataType[] fieldTypes, String[] fieldNam
public static String[] getDorisFieldTypes(DataType[] fieldTypes) {
String[] stringFieldTypes = new String[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
- if (fieldTypes[i].getLogicalType() instanceof TimestampType){
+ if (fieldTypes[i].getLogicalType() instanceof TimestampType) {
stringFieldTypes[i] = "DATETIME";
- }
- else if (fieldTypes[i].getLogicalType() instanceof VarCharType){
+ } else if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
stringFieldTypes[i] = "VARCHAR";
-
- } else {
+ } else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) {
+ stringFieldTypes[i] = "TIMESTAMP";
+ } else {
stringFieldTypes[i] = fieldTypes[i].toString();
}
}
@@ -192,19 +189,20 @@ public static String getTablePk(String sourceDataBae, String sourceTableName) {
TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(sourceTableName, sourceDataBae);
String partitions = tableInfo.getPartitions();
DBUtil.TablePartitionKeys keys = DBUtil.parseTableInfoPartitions(partitions);
+
List primaryKeys = keys.primaryKeys;
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < primaryKeys.size(); i++) {
stringBuilder.append(primaryKeys.get(i));
- if (i> rowDataStream = tEnvs.toRetractStream(table, Row.class);
-
- MongoClientURI mongUri = new MongoClientURI(uri);
- MongoClient mongoClient = new MongoClient(mongUri);
- MongoDatabase database = mongoClient.getDatabase(targetDatabase);
- database.createCollection(targetCollection);
MongoSink> sink = MongoSink.>builder()
.setUri(uri)
.setDatabase(targetDatabase)
- .setCollection(targetCollection)
+ .setCollection(targetTableName)
.setBatchSize(batchSize)
.setBatchIntervalMs(batchInservalMs)
.setMaxRetries(3)
@@ -367,74 +357,5 @@ public static void xsyncToMongodb(StreamExecutionEnvironment env,
.build();
rowDataStream.sinkTo(sink).setParallelism(sinkParallelism);
env.execute();
- mongoClient.close();
- }
- public static void createMongoColl(String database,String collName,String uri){
- MongoClientURI clientURI = new MongoClientURI(uri);
- MongoClient mongoClient = new MongoClient(clientURI);
- MongoDatabase mongoDatabase = mongoClient.getDatabase(database);
- mongoDatabase.createCollection(collName);
- mongoClient.close();
- }
- public static class MyMongoSerializationSchema implements MongoSerializationSchema>, Serializable {
- @Override
- public WriteModel serialize(Tuple2 record, MongoSinkContext context) {
- Row row = record.f1; // Extract the Row object from the Tuple2
- BsonDocument document = new BsonDocument();
- int fieldCount = row.getArity();
- for (int i = 0; i < fieldCount; i++) {
- String fieldName = coll.getSchema().getFieldNames()[i];
- Object fieldValue = row.getField(i);
- if (fieldValue != null) {
- try {
- document.append(fieldName, convertTonBsonValue(fieldValue));
- } catch (ParseException e) {
- throw new RuntimeException(e);
- }
- }
- }
- return new InsertOneModel<>(document);
- }
-
- public static BsonValue convertTonBsonValue(Object value) throws ParseException {
- if (value == null) {
- return new BsonNull();
- } else if (value instanceof Integer) {
- return new BsonInt32((Integer) value);
- } else if (value instanceof Long) {
- return new BsonInt64((Long) value);
- } else if (value instanceof String) {
- return new BsonString((String) value);
- } else if (value instanceof Boolean) {
- return new BsonBoolean( (Boolean) value);
- } else if (value instanceof Double) {
- return new BsonDouble( (Double) value);
- } else if (value instanceof DecimalType) {
- return new BsonDecimal128((Decimal128) value);
- } else if (value instanceof Date) {
- return new BsonDateTime( (long) value);
- } else if (value instanceof Object[]) {
- Object[] array = (Object[]) value;
- BsonArray bsonArray = new BsonArray();
- for (Object element : array) {
- bsonArray.add(convertTonBsonValue(element));
- }
- return bsonArray;
- } else if (isDateTimeString(value)) {
- Date date = parseDateTime(value.toString());
- return new BsonDateTime(date.getTime());
- } else {
- throw new IllegalArgumentException("Unsupported data type: " + value.getClass());
- }
- }
- private static boolean isDateTimeString(Object value) {
- return value.toString().matches("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z$");
- }
- private static Date parseDateTime(String value) throws ParseException {
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
- dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- return dateFormat.parse(value);
- }
-
}
}
\ No newline at end of file
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
index ecd8c3573..32cf835c0 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
@@ -164,7 +164,9 @@ public List commit(
StructType origSchema ;
if (TableInfoDao.isArrowKindSchema(tableInfo.getTableSchema())) {
Schema arrowSchema = Schema.fromJSON(tableInfo.getTableSchema());
+ System.out.println(arrowSchema.toJson());
origSchema = ArrowUtils.fromArrowSchema(arrowSchema);
+ System.out.println(origSchema.json());
} else {
origSchema = (StructType) StructType.fromJson(tableInfo.getTableSchema());
}
@@ -177,7 +179,20 @@ public List commit(
String equalOrCanCast = equalOrCanCastTuple3._1();
boolean schemaChanged = (boolean) equalOrCanCastTuple3._2();
StructType mergeStructType = equalOrCanCastTuple3._3();
- if (equalOrCanCast.equals(DataTypeCastUtils.CAN_CAST())) {
+
+ System.out.println(origSchema.json());
+ System.out.println(msgSchema.toJson());
+ System.out.println(mergeStructType.json());
+ System.out.println(ArrowUtils.toArrowSchema(mergeStructType, "UTC").toJson());
+ boolean schemaChangeFound = false;
+ if (dbType.equals("mongodb")){
+ if (mergeStructType.length() > origSchema.size()){
+ schemaChangeFound = schemaChanged;
+ }
+ }else {
+ schemaChangeFound = equalOrCanCast.equals(DataTypeCastUtils.CAN_CAST());
+ }
+ if (schemaChangeFound) {
LOG.warn("Schema change found, origin schema = {}, changed schema = {}",
origSchema.json(),
msgSchema.toJson());
@@ -201,10 +216,7 @@ public List commit(
identity.useCDC,
identity.cdcColumn);
if (dbType.equals("mongodb")){
- System.out.println(msgSchema.toJson());
- System.out.println(origSchema.json());
- System.out.println(mergeStructType.json());
- dbManager.updateTableSchema(tableInfo.getTableId(), mergeStructType.json());
+ dbManager.updateTableSchema(tableInfo.getTableId(), ArrowUtils.toArrowSchema(mergeStructType,"UTC").toJson());
}else {
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.toJson());
}
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkDatabasesOptions.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkDatabasesOptions.java
index 278e2a6c5..157a42061 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkDatabasesOptions.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/LakeSoulSinkDatabasesOptions.java
@@ -108,4 +108,6 @@ public class LakeSoulSinkDatabasesOptions extends LakeSoulSinkOptions {
.key("doris.fenodes")
.stringType()
.defaultValue("127.0.0.1:8030");
+
+
}
\ No newline at end of file
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java
index b327f6c3c..cb7bba789 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java
@@ -13,6 +13,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.lakesoul.tool.FlinkUtil;
+import org.apache.flink.table.data.TimestampData;
import java.util.ArrayList;
import java.util.Collections;
@@ -100,6 +101,8 @@ public static BinarySourceRecord fromMysqlSourceRecord(SourceRecord sourceRecord
}
long sortField = (binlogFileIndex << 32) + binlogPosition;
LakeSoulRowDataWrapper data = convert.toLakeSoulDataType(valueSchema, value, tableId, tsMs, sortField);
+// TimestampData timestamp = data.getAfter().getTimestamp(1, 18);
+// System.out.println(timestamp);
String tablePath;
if (tableId.schema()==null){
tablePath = new Path(new Path(basePath, tableId.catalog()), tableId.table()).toString();
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
index a8a27015e..666e1aee6 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
@@ -552,7 +552,7 @@ private void otherTypeWrite(BinaryRowWriter writer, int index,
case MicroTimestamp.SCHEMA_NAME:
case NanoTimestamp.SCHEMA_NAME:
case com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME:
- writeTimeStamp(writer, index, fieldValue, fieldSchema, serverTimeZone);
+ writeTimeStamp(writer, index, fieldValue, fieldSchema,serverTimeZone);
break;
case Decimal.LOGICAL_NAME:
writeDecimal(writer, index, fieldValue, fieldSchema);
@@ -625,7 +625,7 @@ public Object convertToDecimal(Object dbzObj, Schema schema) {
}
}
Map paras = schema.parameters();
- if (paras.get("connect.decimal.precision") == null) {
+ if ( paras==null || paras.get("connect.decimal.precision") == null) {
return DecimalData.fromBigDecimal(bigDecimal, 38, 30);
} else {
return DecimalData.fromBigDecimal(bigDecimal, Integer.parseInt(paras.get("connect.decimal.precision")), Integer.parseInt(paras.get("scale")));
@@ -700,8 +700,14 @@ public Object convertToTimeStamp(Object dbzObj, Schema schema, ZoneId serverTime
return TimestampData.fromInstant(zonedDateTime.toInstant());
}
return null;
+ } else if (dbzObj instanceof java.util.Date) {
+ java.util.Date date = (java.util.Date)dbzObj;
+ long timestamp = date.toInstant().toEpochMilli();
+ Instant instant = TimestampData.fromEpochMillis( timestamp).toInstant();
+ return TimestampData.fromInstant(instant);
}
// fallback to zoned timestamp
+
LocalDateTime localDateTime =
TemporalConversions.toLocalDateTime(dbzObj, ZoneId.of("UTC"));
return TimestampData.fromLocalDateTime(localDateTime);
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/ParseDocument.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/ParseDocument.java
index c2899a1fc..f619c2357 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/ParseDocument.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/ParseDocument.java
@@ -5,7 +5,9 @@
package org.apache.flink.lakesoul.types;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.*;
+import org.bson.BsonTimestamp;
import org.bson.Document;
+import org.bson.types.Binary;
import org.bson.types.Decimal128;
import java.math.BigDecimal;
@@ -72,6 +74,12 @@ private static void fillStructValues(Document bsonDocument, Struct struct) {
} else if (value instanceof Decimal128) {
BigDecimal decimalValue = new BigDecimal(value.toString());
struct.put(fieldName, decimalValue);
+ } else if (value instanceof Binary) {
+ Binary binaryData = (Binary) value;
+ struct.put(fieldName,binaryData.getData());
+ } else if (value instanceof BsonTimestamp) {
+ BsonTimestamp bsonTimestamp = (BsonTimestamp) value;
+ struct.put(fieldName,bsonTimestamp.getValue());
} else {
struct.put(fieldName,value);
}
@@ -95,8 +103,12 @@ private static Schema getSchemaForValue(Object value) {
return Decimal.schema(decimalValue.scale());
} else if (value instanceof Byte) {
return Schema.BYTES_SCHEMA;
+ } else if (value instanceof Binary) {
+ return Schema.BYTES_SCHEMA;
} else if (value instanceof Date) {
return Timestamp.SCHEMA;
+ } else if (value instanceof BsonTimestamp) {
+ return Schema.INT64_SCHEMA;
} else {
// 处理其他类型,可以根据实际情况添加更多类型
return Schema.STRING_SCHEMA;
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/StructConvert.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/StructConvert.java
index 1917d8367..649d1db21 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/StructConvert.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/StructConvert.java
@@ -15,43 +15,22 @@
import java.util.List;
public class StructConvert {
public RowData convert(Struct struct) {
- // 获取Struct的schema和字段个数
Schema schema = struct.schema();
int arity = schema.fields().size() + 1; // +1 for extra event sortField
List fieldNames = schema.fields();
- // 创建BinaryRowData
BinaryRowData row = new BinaryRowData(arity);
BinaryRowWriter writer = new BinaryRowWriter(row);
- // 遍历Struct的字段
for (int i = 0; i < schema.fields().size(); i++) {
Field field = fieldNames.get(i);
String fieldName = field.name();
Schema fieldSchema = schema.field(fieldName).schema();
- // 获取Struct字段的值
Object fieldValue = struct.getWithoutDefault(fieldName);
- // 将字段值写入BinaryRowData
writeField(writer, i, fieldSchema, fieldValue);
}
- // 关闭BinaryRowWriter
writer.complete();
return row;
}
- public RowData convert1(Struct struct){
- Schema schema = struct.schema();
- int arity = schema.fields().size()+1;
- List fieldNames = schema.fields();
- BinaryRowData row = new BinaryRowData(arity);
- BinaryRowWriter writer = new BinaryRowWriter(row);
- Field field = fieldNames.get(0);
- String fieldName = field.name();;
- Schema fieldSchema = schema.field(fieldName).schema();
- Object fieldValue = struct.getWithoutDefault(fieldName);
- RowDataSerializer rowDataSerializer = new RowDataSerializer();
- writer.writeRow(0,(RowData) fieldValue ,rowDataSerializer);
- writer.complete();
- return row;
- }
// 递归处理嵌套的Struct类型
private void writeField(BinaryRowWriter writer, int index, Schema schema, Object fieldValue) {
@@ -81,23 +60,17 @@ private void writeField(BinaryRowWriter writer, int index, Schema schema, Object
}
// 递归处理嵌套的Struct类型
private void convertNestedStruct(BinaryRowWriter writer, int index, Struct nestedStruct, Schema nestedSchema) {
- // 获取嵌套Struct的schema和字段个数
int nestedArity = nestedSchema.fields().size();
List nestedFields = nestedSchema.fields();
- // 创建嵌套Struct的BinaryRowData
BinaryRowData nestedRow = new BinaryRowData(nestedArity);
BinaryRowWriter nestedWriter = new BinaryRowWriter(nestedRow);
- // 遍历嵌套Struct的字段
for (int i = 0; i < nestedArity; i++) {
Field field = nestedFields.get(i);
String nestedFieldName = field.name();
Schema nestedFieldType = nestedSchema.field(nestedFieldName).schema();
- // 获取嵌套Struct字段的值
Object nestedFieldValue = nestedStruct.getWithoutDefault(nestedFieldName);
- // 将嵌套Struct字段值写入BinaryRowData
writeField(nestedWriter, i, nestedFieldType, nestedFieldValue);
}
- // 关闭嵌套Struct的BinaryRowWriter
nestedWriter.complete();
RowDataSerializer rowDataSerializer = new RowDataSerializer();
writer.writeRow(index, nestedRow, rowDataSerializer);
From d025d1a77c71a07b21cc4253de5b8be18a87277a Mon Sep 17 00:00:00 2001
From: ChenYunHey <1908166778@qq.com>
Date: Wed, 27 Mar 2024 15:58:07 +0800
Subject: [PATCH 04/13]
lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
.../lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java | 4 ----
1 file changed, 4 deletions(-)
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
index 32cf835c0..0320655de 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
@@ -180,10 +180,6 @@ public List commit(
boolean schemaChanged = (boolean) equalOrCanCastTuple3._2();
StructType mergeStructType = equalOrCanCastTuple3._3();
- System.out.println(origSchema.json());
- System.out.println(msgSchema.toJson());
- System.out.println(mergeStructType.json());
- System.out.println(ArrowUtils.toArrowSchema(mergeStructType, "UTC").toJson());
boolean schemaChangeFound = false;
if (dbType.equals("mongodb")){
if (mergeStructType.length() > origSchema.size()){
From 10f63f644e52d74182e668d93e12754f0c6c0197 Mon Sep 17 00:00:00 2001
From: ChenYunHey <1908166778@qq.com>
Date: Wed, 27 Mar 2024 16:24:50 +0800
Subject: [PATCH 05/13] fix some bugs
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
.../flink/lakesoul/entry/SyncDatabase.java | 19 ++++++-------------
1 file changed, 6 insertions(+), 13 deletions(-)
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
index 38596cd8c..14b67bbaa 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
@@ -114,7 +114,6 @@ public static String pgAndMsqlCreateTableSql(String[] stringFieldTypes, String[]
public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] fieldNames, String pk) {
String[] stringFieldTypes = new String[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
- String typeName = fieldTypes[i].getLogicalType().toString();
if (fieldTypes[i].getLogicalType() instanceof VarCharType) {
String mysqlType = "TEXT";
if (pk != null) {
@@ -127,9 +126,7 @@ public static String[] getMysqlFieldsTypes(DataType[] fieldTypes, String[] field
stringFieldTypes[i] = "FLOAT";
} else if (fieldTypes[i].getLogicalType() instanceof BinaryType) {
stringFieldTypes[i] = "BINARY";
- } else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType || fieldTypes[i].getLogicalType() instanceof TimestampType) {
- stringFieldTypes[i] = "TIMESTAMP";
- } else if (fieldTypes[i].getLogicalType().toString().equals("TIMESTAMP_LTZ(6)")) {
+ } else if (fieldTypes[i].getLogicalType() instanceof LocalZonedTimestampType | fieldTypes[i].getLogicalType() instanceof TimestampType) {
stringFieldTypes[i] = "TIMESTAMP";
} else if (fieldTypes[i].getLogicalType() instanceof BooleanType) {
stringFieldTypes[i] = "BOOLEAN";
@@ -263,21 +260,19 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept
DataType[] fieldDataTypes = schemaResult.getTableSchema().getFieldDataTypes();
String[] fieldNames = schemaResult.getTableSchema().getFieldNames();
String tablePk = getTablePk(sourceDatabase, sourceTableName);
- String[] mysqlFieldTypes = getMysqlFieldsTypes(fieldDataTypes, fieldNames, tablePk);
- //String[] stringFieldsTypes = getMysqlFieldsTypes(fieldDataTypes, fieldNames, tablePk);
- String createTableSql = pgAndMsqlCreateTableSql(mysqlFieldTypes, fieldNames, targetTableName, tablePk);
+ String[] stringFieldsTypes = getMysqlFieldsTypes(fieldDataTypes, fieldNames, tablePk);
+ String createTableSql = pgAndMsqlCreateTableSql(stringFieldsTypes, fieldNames, targetTableName, tablePk);
Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
Statement statement = conn.createStatement();
// Create the target table in MySQL
statement.executeUpdate(createTableSql.toString());
StringBuilder coulmns = new StringBuilder();
-
- for (int i = 0; i < mysqlFieldTypes.length; i++) {
- if (mysqlFieldTypes[i].equals("VARBINARY(40)")) {
+ for (int i = 0; i < fieldDataTypes.length; i++) {
+ if (stringFieldsTypes[i].equals("BLOB")) {
coulmns.append("`").append(fieldNames[i]).append("` ").append("BYTES");
} else {
- coulmns.append("`").append(fieldNames[i]).append("` ").append(mysqlFieldTypes[i]);
+ coulmns.append("`").append(fieldNames[i]).append("` ").append(stringFieldsTypes[i]);
}
if (i < fieldDataTypes.length - 1) {
coulmns.append(",");
@@ -291,7 +286,6 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept
} else {
sql = String.format("create table %s(%s) with ('connector' = '%s', 'url' = '%s', 'table-name' = '%s', 'username' = '%s', 'password' = '%s' , 'sink.parallelism' = '%s')",
targetTableName, coulmns, "jdbc", jdbcUrl, targetTableName, username, password, sinkParallelism);
-
}
tEnvs.executeSql(sql);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);
@@ -330,7 +324,6 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes)
targetTableName, coulmns, "doris", jdbcUrl, fenodes, targetDatabase + "." + targetTableName, username, password);
tEnvs.executeSql(sql);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);
-
}
public static void xsyncToMongodb(StreamExecutionEnvironment env,
From 0079162c075e55feba804a8fbfd4768b6913b99d Mon Sep 17 00:00:00 2001
From: ChenYunHey <1908166778@qq.com>
Date: Wed, 27 Mar 2024 18:29:30 +0800
Subject: [PATCH 06/13] add mongodb cdc docs
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
.../flink/lakesoul/entry/MongoSinkUtils.java | 1 -
.../docs/03-Usage Docs/05-flink-cdc-sync.md | 30 +++++++-
.../03-Usage Docs/14-export-to-databases.md | 24 +++++-
.../03-Usage Docs/05-flink-cdc-sync.md | 11 ++-
.../03-Usage Docs/14-export-to-databases.md | 74 ++++++++++++-------
5 files changed, 107 insertions(+), 33 deletions(-)
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MongoSinkUtils.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MongoSinkUtils.java
index 7e4b1f06a..ca281ca4f 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MongoSinkUtils.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MongoSinkUtils.java
@@ -122,7 +122,6 @@ public static List traverseRow(RowType rowType) {
return nameList;
}
-
private static void traverseField(RowType rowType, List nameList) {
for (int i = 0; i < rowType.getFieldCount(); i++) {
String fieldName = rowType.getFieldNames().get(i);
diff --git a/website/docs/03-Usage Docs/05-flink-cdc-sync.md b/website/docs/03-Usage Docs/05-flink-cdc-sync.md
index 8bf30c7b3..f8fccd907 100644
--- a/website/docs/03-Usage Docs/05-flink-cdc-sync.md
+++ b/website/docs/03-Usage Docs/05-flink-cdc-sync.md
@@ -63,8 +63,8 @@ Description of required parameters:
| -c | The task runs the main function entry class | org.apache.flink.lakesoul.entry.MysqlCdc |
| Main package | Task running jar | lakesoul-flink-flink-1.17-VAR::VERSION.jar |
| --source_db.type | source database type | mysql postgres oracle |
-| --source_db.host | The address of the source database | |
-| --source_db.port | source database port | |
+| --source_db.host | The address of the source database,When mongodb enters the lake, you need to bring the port. | |
+| --source_db.port | source database port,This parameter is not required when mongodb enters the lake. | |
| --source_db.user | source database username | |
| --source_db.password | Password for source database | |
| --source.parallelism | The parallelism of a single table read task affects the data reading speed. The larger the value, the greater the pressure on source database. | The parallelism can be adjusted according to the write QPS of source database |
@@ -89,6 +89,8 @@ For MySQL, the following additional parameters need to be configured
| --source_db.exclude_tables| optional | A list of data table names that do not need to be synchronized, separated by commas, the default is empty | --source_db.exclude_tables test_1,test_2 |
| --server_time_zone=Asia/Shanghai | optional | MySQL server time zone, Flink side defaults to "Asia/Shanghai" | Refer to [JDK ZoneID 文档](https://docs.oracle.com/javase/8/docs/api/java/time/ZoneId.html) |
+
+
Synchronous mysql job example
For Mysql configuration, please refer to https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/mysql-cdc.html
@@ -172,6 +174,30 @@ For Postgresql configuration,please refer to https://ververica.github.io/flink-
--flink.checkpoint s3://bucket/lakesoul/flink/checkpoints \
--flink.savepoint s3://bucket/lakesoul/flink/savepoints
```
+
+For mongoDB, the following additinal parameters need to be configured
+
+| Parameter | Required | Meaning Description | Parameter Filling Format |
+|-------------|----------|-----------------------|----------------------|
+| --batchSize | optional | Get data size for each batch| --batchSize 1024 |
+
+```bash
+./bin/flink run -c org.apache.flink.lakesoul.entry.JdbcCDC \
+ lakesoul-flink-2.4.0-flink-1.17-SNAPSHOT.jar \
+ --source_db.db_name "cdc" \
+ --source_db.user "flinkuser" \
+ --source.parallelism 1 \
+ --mongodb_database cdc \
+ --server_time_zone Asia/Shanghai \
+ --source_db.db_type "mongodb" \
+ --source_db.password "flinkpw" \
+ --source_db.host "172.18.0.2:27017" \
+ --source_db.schema_tables "cdc.bincoll" \
+ --sink.parallelism 1 \
+ --job.checkpoint_interval 1000 \
+ --flink.checkpoint "file:/tmp/data/lakesoul/227/" \
+ --warehouse_path "file:/home/cyh/data/lakesoul/mongo"
+```
## LakeSoul Flink CDC Sink job execution process
In the initialization phase after the LakeSoul Flink job starts, it will first read all the tables in the configured MySQL DB (excluding tables that do not need to be synchronized). For each table, first determine whether it exists in LakeSoul. If it does not exist, a LakeSoul table is automatically created, and its schema is consistent with the corresponding table in MySQL.
diff --git a/website/docs/03-Usage Docs/14-export-to-databases.md b/website/docs/03-Usage Docs/14-export-to-databases.md
index 12daea2c3..dcda3d2fb 100644
--- a/website/docs/03-Usage Docs/14-export-to-databases.md
+++ b/website/docs/03-Usage Docs/14-export-to-databases.md
@@ -29,6 +29,12 @@ Synchronize table to doris,additional configuration parameters are required
|-----------------|----------|----------------------------------------------------------------------------------------------------|
| --doris.fenodes | optional | Doris FE http address, multiple addresses are separated by commas,
the default is 127.0.0.1:8030 |
+Synchronize table to mongodb,additioanl configuration parameters are
+
+| Parameter | Required | Meaning Description |
+|---------------|----------|-------------------------------------------------------------|
+| --mongodb.uri | require | mongodb uri sush as :mongodb://user:password@127.0.0.1:2701 |
+
## Examples
Synchronize table to MySQL task
@@ -41,7 +47,7 @@ Synchronize table to MySQL task
--target_db.user root \
--target_db.password 123456 \
--target_db.table_name t1 \
- --source_db.db_name
+ --source_db.db_name \
--source_db.table_name t1 \
--sink_parallelism 1 \
--use_batch true
@@ -78,7 +84,21 @@ Synchronize table to doris task
--doris.fenodes 127.0.0.1:8030 \
--use_batch false
```
-
+Synchronize table to mongodb task
+```bash
+./bin/flink run -c org.apache.flink.lakesoul.entry.SyncDatabase \
+ lakesoul-flink-2.4.0-flink-1.17-SNAPSHOT.jar \
+ --mongodb.uri "mongodb://user:password@127.0.0.1:27017" \
+ --source_db.db_name cdc \
+ --target_db.db_name cdc \
+ --target_db.db_type mongodb \
+ --target_db.table_name bincoll \
+ --source_db.table_name bincoll \
+ --sink_parallelism 2 \
+ --use_batch true \
+ --batchSize 1
+```
+For streaming out of the lake, batchsize is set to 1 or 0
## Instructions for use
1. For data exported to both PostgreSQL and MySQL, users have the option to manually create tables according to users' requirements or table will be automatically created by the program. If users have to specific data type, it is recommended to create the tables in target databases in advacne.
2. If the exported table is partitioned, users must manually create the target table; otherwise, the synchronized table will lack partition fields.
diff --git a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/05-flink-cdc-sync.md b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/05-flink-cdc-sync.md
index ca6549799..02c19f0e2 100644
--- a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/05-flink-cdc-sync.md
+++ b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/05-flink-cdc-sync.md
@@ -65,8 +65,8 @@ export LAKESOUL_PG_PASSWORD=root
| -c | 任务运行main函数入口类 | org.apache.flink.lakesoul.entry.JdbcCDC |
| 主程序包 | 任务运行jar包 | lakesoul-flink-flink-1.17-VAR::VERSION.jar |
| --source_db.type | 源数据库类型 | mysql postgres oracle |
-| --source_db.host | 源数据库的地址 | |
-| --source_db.port | 源数据库的端口 | |
+| --source_db.host | 源数据库的地址,mongodb入湖则需要带上port | |
+| --source_db.port | 源数据库的端口,mongodb入湖不需要这个参数 | |
| --source_db.user | 源数据库的用户名 | |
| --source_db.password | 源数据库的密码 | |
| --source.parallelism | 单表读取任务并行度,影响数据读取速度,值越大对 MySQL 压力越大 | 可以根据 MySQL 的写入 QPS 来调整并行度 |
@@ -91,6 +91,13 @@ export LAKESOUL_PG_PASSWORD=root
|----------------------|------|------------------------------|------------------------------------------|
| --source_db.exclude_tables| 否 | 不需要同步的数据表名列表,表名之间用逗号分隔,默认为空 | --source_db.exclude_tables test_1,test_2 |
| --server_time_zone=Asia/Shanghai | 否 | MySQL 服务端时区,Flink 端默认为 "Asia/Shanghai" | 参考 [JDK ZoneID 文档](https://docs.oracle.com/javase/8/docs/api/java/time/ZoneId.html) |
+对于mongodb需要额外配置以下参数
+
+| 参数 | 是否必须 | 含义说明 | 参数填写格式 |
+|-------------|------|--------------------|-------------|
+| --batchSize | 否 | 每批次获取数据的大小,默认值1024 | --batchSize |
+
+
同步mysql作业示例
对于Mysql数据库配置,可参考https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/mysql-cdc.html
diff --git a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/14-export-to-databases.md b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/14-export-to-databases.md
index bc760a77d..6d0ea8dc2 100644
--- a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/14-export-to-databases.md
+++ b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/14-export-to-databases.md
@@ -6,22 +6,22 @@ SPDX-FileCopyrightText: 2023 LakeSoul Contributors
SPDX-License-Identifier: Apache-2.0
-->
## 支持出湖的目标库
-LakeSoul 至 2.5.0 开始,支持单表数据以批同步出湖,流同步出湖,现支持 LakeSoul 表导出到 MySQL,Apache Doris,PostgreSQL 以及兼容这些数据库协议的数据库。
+LakeSoul 至 2.5.1 开始,支持单表数据以批同步出湖,流同步出湖,现支持 LakeSoul 表导出到 MySQL,Apache Doris,PostgreSQL,MongoDB 以及兼容这些数据库协议的数据库。
## 参数配置
-| 参数名称 | 是否必须 | 含义 |
-|------------------------|------|--------------------------------|
-| --target_db.url | 是 | 目标数据库的url,‘/’结尾 |
-| --target_db.db_type | 是 | 目标数据库的类型(doris,mysql,postgres) |
-| --target_db.db_name | 是 | 目标数据库库名字 |
-| --target_db.user | 是 | 目标数据库用户名 |
-| --target_db.password | 是 | 用户密码 |
-| --target_db.table_name | 是 | 目标数据库的表名 |
-| --source_db.db_name | 是 | lakesoul库名 |
-| --source_db.table_name | 是 | lakesoul表名 |
-| --sink_parallelism | 否 | 同步作业的并行度,默认1 |
-| --use_batch | 否 | true表示批同步,false表示流同步,默认采用批同步 |
+| 参数名称 | 是否必须 | 含义 |
+|------------------------|------|----------------------------------------|
+| --target_db.url | 是 | 目标数据库的url,‘/’结尾,mongodb不需要配此参数 |
+| --target_db.db_type | 是 | 目标数据库的类型(doris,mysql,postgres,mongodb) |
+| --target_db.db_name | 是 | 目标数据库库名字 |
+| --target_db.user | 是 | 目标数据库用户名 |
+| --target_db.password | 是 | 用户密码 |
+| --target_db.table_name | 是 | 目标数据库的表名 |
+| --source_db.db_name | 是 | lakesoul库名 |
+| --source_db.table_name | 是 | lakesoul表名 |
+| --sink_parallelism | 否 | 同步作业的并行度,默认1 |
+| --use_batch | 否 | true表示批同步,false表示流同步,默认采用批同步 |
对于到doris的出湖,需要额外配置:
@@ -29,6 +29,14 @@ LakeSoul 至 2.5.0 开始,支持单表数据以批同步出湖,流同步出
|-----------------------|------|---------------------------------------------------|
| --doris.fenodes | 否 | Doris FE http 地址,多个地址之间使用逗号隔开,默认为
127.0.0.1:8030 |
+对于到mongodb的出湖,需要额外配置
+
+| 参数名称 | 是否必须 | 含义 |
+|---------------|------|-----------------------------------------------------|
+| --mongodb.uri | 是 | mongodb uri:mongodb://user:password@127.0.0.1:27017 |
+
+
+
## 启动示例
出湖mysql任务启动
@@ -64,20 +72,34 @@ LakeSoul 至 2.5.0 开始,支持单表数据以批同步出湖,流同步出
出湖到doris任务启动
```bash
./bin/flink run -c org.apache.flink.lakesoul.entry.SyncDatabase \
-lakesoul-flink-flink-1.17-VAR::VERSION.jar \
---target_db.url "jdbc:mysql://172.17.0.2:9030/" \
---source_db.db_name test \
---target_db.db_name test \
---target_db.user root \
---target_db.password 123456 \
---target_db.db_type doris \
---target_db.table_name tb \
---source_db.table_name tb \
---sink_parallelism 1 \
---doris.fenodes 127.0.0.1:8030 \
---use_batch false
+ lakesoul-flink-flink-1.17-VAR::VERSION.jar \
+ --target_db.url "jdbc:mysql://172.17.0.2:9030/" \
+ --source_db.db_name test \
+ --target_db.db_name test \
+ --target_db.user root \
+ --target_db.password 123456 \
+ --target_db.db_type doris \
+ --target_db.table_name tb \
+ --source_db.table_name tb \
+ --sink_parallelism 1 \
+ --doris.fenodes 127.0.0.1:8030 \
+ --use_batch false
```
-
+出湖到mongodb任务启动
+```bash
+./bin/flink run -c org.apache.flink.lakesoul.entry.SyncDatabase \
+ lakesoul-flink-2.4.0-flink-1.17-SNAPSHOT.jar \
+ --mongodb.uri "mongodb://flinkuser:flinkpw@127.0.0.1:27017" \
+ --source_db.db_name cdc \
+ --target_db.db_name cdc \
+ --target_db.db_type mongodb \
+ --target_db.table_name bincoll \
+ --source_db.table_name bincoll \
+ --sink_parallelism 2 \
+ --use_batch true \
+ --batchSize 1
+```
+当流式出湖时,batchSize的值请设置为1或者0
## 使用事项
1. 出湖到 PostgreSQL 和 MySql,可以支持用户根据需求手动建表,也支持程序自动建表,如果用户对数据的类型要求自定义控制(例如 varchar),那么建议用户提前在目标库建表
2. 如果出湖的 LakeSoul 表是分区表,那么需要用户手动在目标库建表,否则同步后的表无分区字段
From 4a323f590f64b16c64ae019b4a1dbf7359fada99 Mon Sep 17 00:00:00 2001
From: ChenYunHey <1908166778@qq.com>
Date: Thu, 28 Mar 2024 11:01:29 +0800
Subject: [PATCH 07/13] Add some documentation
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
.../org/apache/flink/lakesoul/entry/JdbcCDC.java | 4 ++--
.../flink/lakesoul/entry/MongoSinkUtils.java | 8 ++++----
.../apache/flink/lakesoul/types/ParseDocument.java | 10 ----------
website/docs/03-Usage Docs/05-flink-cdc-sync.md | 13 +++++++++++++
.../current/03-Usage Docs/05-flink-cdc-sync.md | 14 ++++++++++++++
5 files changed, 33 insertions(+), 16 deletions(-)
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
index fd365ece6..475889679 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
@@ -295,8 +295,8 @@ private static void mongoCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Config
MongoDBSource mongoSource =
MongoDBSource.builder()
.hosts(host)
- .databaseList(mongoDatabase) // 设置捕获的数据库,支持正则表达式
- .collectionList(tableList) //设置捕获的集合,支持正则表达式
+ .databaseList(mongoDatabase)
+ .collectionList(tableList)
.startupOptions(StartupOptions.initial())
.scanFullChangelog(true)
.batchSize(batchSize)
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MongoSinkUtils.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MongoSinkUtils.java
index ca281ca4f..b37392516 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MongoSinkUtils.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/MongoSinkUtils.java
@@ -58,7 +58,7 @@ public WriteModel serialize(Tuple2 record, MongoSink
}
if (fieldValue != null) {
try {
- document.append(fieldName, convertTonBsonValue(fieldValue, fieldName));
+ document.append(fieldName, convertTonBsonValue(fieldValue));
} catch (ParseException e) {
throw new RuntimeException(e);
}
@@ -68,7 +68,7 @@ public WriteModel serialize(Tuple2 record, MongoSink
}
- public static BsonValue convertTonBsonValue(Object value, String fieldName) throws ParseException {
+ public static BsonValue convertTonBsonValue(Object value) throws ParseException {
if (value == null) {
return new BsonNull();
} else if (value instanceof Integer) {
@@ -93,7 +93,7 @@ public static BsonValue convertTonBsonValue(Object value, String fieldName) thro
Object[] array = (Object[]) value;
BsonArray bsonArray = new BsonArray();
for (Object element : array) {
- bsonArray.add(convertTonBsonValue(element, fieldName));
+ bsonArray.add(convertTonBsonValue(element));
}
return bsonArray;
} else if (isDateTimeString(value)) {
@@ -108,7 +108,7 @@ public static BsonValue convertTonBsonValue(Object value, String fieldName) thro
String name = structNameFiledList.get(0);
stringList.remove(0);
structNameFiledList = stringList;
- bsonDocument.append(name, convertTonBsonValue(fieldValue, name));
+ bsonDocument.append(name, convertTonBsonValue(fieldValue));
}
return bsonDocument;
} else {
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/ParseDocument.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/ParseDocument.java
index f619c2357..4704ecf04 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/ParseDocument.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/ParseDocument.java
@@ -18,10 +18,8 @@
public class ParseDocument {
public static Struct convertBSONToStruct(String value) {
Document bsonDocument = Document.parse(value);
- // 创建根 Struct 的 Schema
SchemaBuilder structSchemaBuilder = SchemaBuilder.struct();
Struct struct = new Struct(buildSchema(bsonDocument, structSchemaBuilder));
- // 填充 Struct 的值
fillStructValues(bsonDocument, struct);
return struct;
@@ -32,16 +30,13 @@ private static Schema buildSchema(Document bsonDocument, SchemaBuilder structSch
String fieldName = entry.getKey();
Object value = entry.getValue();
if (value instanceof Document) {
- // 处理嵌套的 Document
SchemaBuilder nestedStructSchemaBuilder = SchemaBuilder.struct();
structSchemaBuilder.field(fieldName, buildSchema((Document) value, nestedStructSchemaBuilder));
} else if (value instanceof List) {
- // 处理ArrayList类型
List> arrayList = (List>) value;
Schema arraySchema = getSchemaForArrayList(arrayList);
structSchemaBuilder.field(fieldName, arraySchema);
} else {
- // 处理普通字段
structSchemaBuilder.field(fieldName, getSchemaForValue(value));
}
}
@@ -49,7 +44,6 @@ private static Schema buildSchema(Document bsonDocument, SchemaBuilder structSch
}
private static Schema getSchemaForArrayList(List> arrayList) {
- // 假设 ArrayList 中的元素都是整数,您可以根据实际情况调整
Schema elementSchema = null;
if (!arrayList.isEmpty()) {
Object firstElement = arrayList.get(0);
@@ -63,12 +57,10 @@ private static void fillStructValues(Document bsonDocument, Struct struct) {
String fieldName = entry.getKey();
Object value = entry.getValue();
if (value instanceof Document) {
- // 处理嵌套的 Document
Struct nestedStruct = new Struct(struct.schema().field(fieldName).schema());
fillStructValues((Document) value, nestedStruct);
struct.put(fieldName, nestedStruct);
} else if (value instanceof List) {
- // 处理ArrayList类型
List> arrayList = (List>) value;
struct.put(fieldName, arrayList);
} else if (value instanceof Decimal128) {
@@ -87,7 +79,6 @@ private static void fillStructValues(Document bsonDocument, Struct struct) {
}
private static Schema getSchemaForValue(Object value) {
- // 根据值的类型返回对应的 Schema
if (value instanceof String) {
return Schema.STRING_SCHEMA;
} else if (value instanceof Integer) {
@@ -110,7 +101,6 @@ private static Schema getSchemaForValue(Object value) {
} else if (value instanceof BsonTimestamp) {
return Schema.INT64_SCHEMA;
} else {
- // 处理其他类型,可以根据实际情况添加更多类型
return Schema.STRING_SCHEMA;
}
}
diff --git a/website/docs/03-Usage Docs/05-flink-cdc-sync.md b/website/docs/03-Usage Docs/05-flink-cdc-sync.md
index f8fccd907..b3eec2e24 100644
--- a/website/docs/03-Usage Docs/05-flink-cdc-sync.md
+++ b/website/docs/03-Usage Docs/05-flink-cdc-sync.md
@@ -286,6 +286,19 @@ Type mapping relationship between Oracle and LakeSoul
| CHAR(n)
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
NCLOB
XMLType
SYS.XMLTYPE | org.apache.spark.sql.types.DataTypes.StringType |
| BLOB | org.apache.spark.sql.types.DataTypes.BinaryType |
+Type mapping relationship between Oracle and Lakesoul
+
+| STRING | org.apache.spark.sql.types.DataTypes.StringTypes |
+|-------------|----------------------------------------|
+| DOUBLE | org.apache.spark.sql.types.DataTypes.DoubleType |
+| INTEGER | org.apache.spark.sql.types.DataTypes.Integer |
+| BOOLEAN | org.apache.spark.sql.types.DataTypes.DoubleType |
+| DATE | org.apache.spark.sql.types.DataTypes.DateType |
+| ARRAYS | org.apache.spark.sql.types.ArrayType |
+| BINARY DATA | org.apache.spark.sql.types.DataTypes.BinaryTYpe |
+| LONG | org.apache.spark.sql.types.DataTypes.LongType |
+| STRUCT | org.apache.spark.sql.types.StructField |
+| DECIMAL | org.apache.spark.sql.types.DecimalType(M,D) |
## Precautions
diff --git a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/05-flink-cdc-sync.md b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/05-flink-cdc-sync.md
index 02c19f0e2..bef0b7c9c 100644
--- a/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/05-flink-cdc-sync.md
+++ b/website/i18n/zh-Hans/docusaurus-plugin-content-docs/current/03-Usage Docs/05-flink-cdc-sync.md
@@ -272,6 +272,20 @@ Oracle与LakeSoul的映射关系
| CHAR(n)
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
NCLOB
XMLType
SYS.XMLTYPE | org.apache.spark.sql.types.DataTypes.StringType |
| BLOB | org.apache.spark.sql.types.DataTypes.BinaryType |
+MongoDB与LakeSoul的类型映射关系
+
+| STRING | org.apache.spark.sql.types.DataTypes.StringTypes |
+|-------------|----------------------------------------|
+| DOUBLE | org.apache.spark.sql.types.DataTypes.DoubleType |
+| INTEGER | org.apache.spark.sql.types.DataTypes.Integer |
+| BOOLEAN | org.apache.spark.sql.types.DataTypes.DoubleType |
+| DATE | org.apache.spark.sql.types.DataTypes.DateType |
+| ARRAYS | org.apache.spark.sql.types.ArrayType |
+| BINARY DATA | org.apache.spark.sql.types.DataTypes.BinaryTYpe |
+| LONG | org.apache.spark.sql.types.DataTypes.LongType |
+| STRUCT | org.apache.spark.sql.types.StructField |
+| DECIMAL | org.apache.spark.sql.types.DecimalType(M,D) |
+
## 注意事项
1. 源数据库中的表必须存在主键,无主键表目前暂不支持;
From d4d3e5729dd78bbac3102d50c2f4cf6367cd454d Mon Sep 17 00:00:00 2001
From: ChenYunHey <1908166778@qq.com>
Date: Thu, 28 Mar 2024 11:32:18 +0800
Subject: [PATCH 08/13] format the code
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
.../java/org/apache/flink/lakesoul/entry/JdbcCDC.java | 8 +++-----
.../sink/committer/LakeSoulSinkGlobalCommitter.java | 2 --
.../org/apache/flink/lakesoul/types/StructConvert.java | 7 +------
3 files changed, 4 insertions(+), 13 deletions(-)
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
index 475889679..f246acdf0 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
@@ -79,7 +79,7 @@ public static void main(String[] args) throws Exception {
tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
}
if ( dbType.equalsIgnoreCase("mongodb")){
- mongoDatabase = parameter.get(MONGO_DB_DATABASE.key());
+ //mongoDatabase = parameter.get(MONGO_DB_DATABASE.key());
batchSize = parameter.getInt(BATCH_SIZE.key(), BATCH_SIZE.defaultValue());
tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
}
@@ -218,7 +218,6 @@ private static void postgresCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Con
DataStreamSource source = builder.buildMultiTableSource("Postgres Source");
DataStream stream = builder.buildHashPartitionedCDCStream(source);
- stream.print();
DataStreamSink dmlSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From Postgres Database " + dbName);
}
@@ -283,7 +282,6 @@ public static void sqlserverCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Con
LakeSoulMultiTableSinkStreamBuilder
builder =
new LakeSoulMultiTableSinkStreamBuilder(sqlServerSource, context, lakeSoulRecordConvert);
-
DataStreamSource source = builder.buildMultiTableSource("Sqlserver Source");
DataStream stream = builder.buildHashPartitionedCDCStream(source);
@@ -295,7 +293,7 @@ private static void mongoCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Config
MongoDBSource mongoSource =
MongoDBSource.builder()
.hosts(host)
- .databaseList(mongoDatabase)
+ .databaseList(dbName)
.collectionList(tableList)
.startupOptions(StartupOptions.initial())
.scanFullChangelog(true)
@@ -313,7 +311,7 @@ private static void mongoCdc(LakeSoulRecordConvert lakeSoulRecordConvert, Config
builder =
new LakeSoulMultiTableSinkStreamBuilder(mongoSource, context, lakeSoulRecordConvert);
DataStreamSource source = builder.buildMultiTableSource("mongodb Source");
- source.print();
+
DataStream stream = builder.buildHashPartitionedCDCStream(source);
DataStreamSink dmlSink = builder.buildLakeSoulDMLSink(stream);
env.execute("LakeSoul CDC Sink From mongo Database " + dbName);
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
index 0320655de..52b6f1e95 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/committer/LakeSoulSinkGlobalCommitter.java
@@ -164,9 +164,7 @@ public List commit(
StructType origSchema ;
if (TableInfoDao.isArrowKindSchema(tableInfo.getTableSchema())) {
Schema arrowSchema = Schema.fromJSON(tableInfo.getTableSchema());
- System.out.println(arrowSchema.toJson());
origSchema = ArrowUtils.fromArrowSchema(arrowSchema);
- System.out.println(origSchema.json());
} else {
origSchema = (StructType) StructType.fromJson(tableInfo.getTableSchema());
}
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/StructConvert.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/StructConvert.java
index 649d1db21..e9577b64f 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/StructConvert.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/StructConvert.java
@@ -31,18 +31,13 @@ public RowData convert(Struct struct) {
return row;
}
- // 递归处理嵌套的Struct类型
private void writeField(BinaryRowWriter writer, int index, Schema schema, Object fieldValue) {
if (fieldValue == null) {
- // 如果字段值为null,则写入null
writer.setNullAt(index);
} else if (schema.type().getName().equals("struct")) {
- // 如果字段类型是StructType,递归处理
convertNestedStruct(writer, index, (Struct) fieldValue, schema);
} else {
- // 根据字段类型写入值
- // 这里根据实际情况,可能需要根据字段类型进行不同的处理
switch (schema.type()) {
case INT8:
case INT16:
@@ -58,7 +53,7 @@ private void writeField(BinaryRowWriter writer, int index, Schema schema, Object
}
}
- // 递归处理嵌套的Struct类型
+
private void convertNestedStruct(BinaryRowWriter writer, int index, Struct nestedStruct, Schema nestedSchema) {
int nestedArity = nestedSchema.fields().size();
List nestedFields = nestedSchema.fields();
From 6498ff878103755680122c5e88bcbc6f40d3f8f3 Mon Sep 17 00:00:00 2001
From: ChenYunHey <1908166778@qq.com>
Date: Thu, 28 Mar 2024 14:48:04 +0800
Subject: [PATCH 09/13] Fix field attribute issues
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
.../apache/flink/lakesoul/entry/JdbcCDC.java | 2 --
.../lakesoul/types/LakeSoulRecordConvert.java | 26 ++++++++++---------
2 files changed, 14 insertions(+), 14 deletions(-)
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
index f246acdf0..4ded3508a 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/JdbcCDC.java
@@ -55,7 +55,6 @@ public class JdbcCDC {
private static String serverTimezone;
private static String pluginName;
private static int batchSize;
- private static String mongoDatabase;
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
@@ -79,7 +78,6 @@ public static void main(String[] args) throws Exception {
tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
}
if ( dbType.equalsIgnoreCase("mongodb")){
- //mongoDatabase = parameter.get(MONGO_DB_DATABASE.key());
batchSize = parameter.getInt(BATCH_SIZE.key(), BATCH_SIZE.defaultValue());
tableList = parameter.get(SOURCE_DB_SCHEMA_TABLES.key()).split(",");
}
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
index 666e1aee6..fbd120059 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
@@ -175,30 +175,30 @@ public LakeSoulRowDataWrapper toLakeSoulDataType(Schema sch, Struct value, Table
Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
RowData insert = convert(after, afterSchema, RowKind.INSERT, sortField);
- boolean afterNullable = afterSchema.isOptional();
- RowType rt = toFlinkRowType(afterSchema,afterNullable);
+ //boolean afterNullable = afterSchema.isOptional();
+ RowType rt = toFlinkRowType(afterSchema,false);
insert.setRowKind(RowKind.INSERT);
builder.setOperation("insert").setAfterRowData(insert).setAfterType(rt);
} else if (op == Envelope.Operation.DELETE) {
Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
RowData delete = convert(before, beforeSchema, RowKind.DELETE, sortField);
- boolean nullable = beforeSchema.isOptional();
- RowType rt = toFlinkRowType(beforeSchema,nullable);
+// boolean nullable = beforeSchema.isOptional();
+ RowType rt = toFlinkRowType(beforeSchema,false);
builder.setOperation("delete").setBeforeRowData(null).setBeforeRowType(rt);
delete.setRowKind(RowKind.DELETE);
} else {
Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
RowData beforeData = convert(before, beforeSchema, RowKind.UPDATE_BEFORE, sortField);
- boolean beforNullable = beforeSchema.isOptional();
- RowType beforeRT = toFlinkRowType(beforeSchema,beforNullable);
+ //boolean beforNullable = beforeSchema.isOptional();
+ RowType beforeRT = toFlinkRowType(beforeSchema,false);
beforeData.setRowKind(RowKind.UPDATE_BEFORE);
Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
RowData afterData = convert(after, afterSchema, RowKind.UPDATE_AFTER, sortField);
- boolean afterNullable = afterSchema.isOptional();
- RowType afterRT = toFlinkRowType(afterSchema,afterNullable);
+ //boolean afterNullable = afterSchema.isOptional();
+ RowType afterRT = toFlinkRowType(afterSchema,false);
afterData.setRowKind(RowKind.UPDATE_AFTER);
if (partitionFieldsChanged(beforeRT, beforeData, afterRT, afterData)) {
// partition fields changed. we need to emit both before and after RowData
@@ -229,7 +229,7 @@ public RowType toFlinkRowTypeCDC(RowType rowType) {
return RowType.of(colTypes, colNames);
}
- public RowType toFlinkRowType(Schema schema, boolean nullable) {
+ public RowType toFlinkRowType(Schema schema, boolean isMongoDDL) {
int arity = schema.fields().size() + 1;
if (useCDC) ++arity;
String[] colNames = new String[arity];
@@ -238,7 +238,11 @@ public RowType toFlinkRowType(Schema schema, boolean nullable) {
for (int i = 0; i < (useCDC ? arity - 2 : arity - 1); i++) {
Field item = fieldNames.get(i);
colNames[i] = item.name();
- colTypes[i] = convertToLogical(item.schema(), !item.name().equals("_id") && nullable);
+ if (isMongoDDL){
+ colTypes[i] = convertToLogical(item.schema(), !item.name().equals("_id"));
+ }else {
+ colTypes[i] = convertToLogical(item.schema(), item.schema().isOptional());
+ }
}
// colNames[useCDC ? arity - 3 : arity - 2] = BINLOG_FILE_INDEX;
// colTypes[useCDC ? arity - 3 : arity - 2] = new BigIntType();
@@ -273,8 +277,6 @@ public List getRowFields(Schema schema) {
}
private LogicalType primitiveLogicalType(Schema fieldSchema,boolean nullable) {
-// boolean nullable = fieldSchema.isOptional();
-// if (isMongoDDl) nullable = true;
switch (fieldSchema.type()) {
case BOOLEAN:
return new BooleanType(nullable);
From 69ab76bdd1c6b5e3b2e0f02123e3936e31e09735 Mon Sep 17 00:00:00 2001
From: ChenYunHey <1908166778@qq.com>
Date: Thu, 28 Mar 2024 18:00:44 +0800
Subject: [PATCH 10/13] test
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
.../apache/spark/sql/lakesoul/benchmark/Benchmark.scala | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/Benchmark.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/Benchmark.scala
index 4d681c0f1..2e853b5d2 100644
--- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/Benchmark.scala
+++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/Benchmark.scala
@@ -155,6 +155,12 @@ object Benchmark {
val diff1 = jdbcDF.rdd.subtract(lakesoulDF.rdd)
val diff2 = lakesoulDF.rdd.subtract(jdbcDF.rdd)
+ println("*******************jdbcDF count"+jdbcDF.count()+"***************")
+ println("*******************jdbcDF count"+lakesoulDF.count()+"***************")
+ println("*********diff1.count:"+diff1.count()+"*******")
+ println("*********diff1.count:"+diff2.count()+"*******")
+ println("jdbcDF schema"+jdbcDF.printSchema())
+ println("lakesoulDF schema"+lakesoulDF.printSchema())
val result = diff1.count() == 0 && diff2.count() == 0
if (!result) {
@@ -177,4 +183,4 @@ object Benchmark {
.withColumn("col_20", col("col_20").cast("string"))
.withColumn("col_23", col("col_23").cast("string"))
}
-}
+}
\ No newline at end of file
From f0c51a96825359a6655e9f248fabad6a2ebcb0a1 Mon Sep 17 00:00:00 2001
From: ChenYunHey <1908166778@qq.com>
Date: Thu, 28 Mar 2024 18:43:43 +0800
Subject: [PATCH 11/13] test
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
.../spark/sql/lakesoul/benchmark/Benchmark.scala | 10 ++--------
1 file changed, 2 insertions(+), 8 deletions(-)
diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/Benchmark.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/Benchmark.scala
index 2e853b5d2..cdac08413 100644
--- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/Benchmark.scala
+++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/Benchmark.scala
@@ -155,19 +155,13 @@ object Benchmark {
val diff1 = jdbcDF.rdd.subtract(lakesoulDF.rdd)
val diff2 = lakesoulDF.rdd.subtract(jdbcDF.rdd)
- println("*******************jdbcDF count"+jdbcDF.count()+"***************")
- println("*******************jdbcDF count"+lakesoulDF.count()+"***************")
- println("*********diff1.count:"+diff1.count()+"*******")
- println("*********diff1.count:"+diff2.count()+"*******")
- println("jdbcDF schema"+jdbcDF.printSchema())
- println("lakesoulDF schema"+lakesoulDF.printSchema())
val result = diff1.count() == 0 && diff2.count() == 0
if (!result) {
println(printLine + table + " result: " + result + printLine)
- println("*************diff1**************")
+ println("*************diff1**************"+"dfff1.count:"+diff1.count()+"schema"+jdbcDF.printSchema()+"jdbcDF count:"+jdbcDF.count())
spark.createDataFrame(diff1, lakesoulDF.schema).show()
- println("*************diff2**************")
+ println("*************diff2**************"+"diff2.count:"+diff2.count()+"schema"+lakesoulDF.printSchema()+"lakesoulDF count:"+lakesoulDF.count())
spark.createDataFrame(diff2, lakesoulDF.schema).show()
println(table + " data verification ERROR!!!")
System.exit(1)
From 6254383130da0488232b9e2cd97a47824f42e9fb Mon Sep 17 00:00:00 2001
From: ChenYunHey <1908166778@qq.com>
Date: Fri, 29 Mar 2024 18:17:36 +0800
Subject: [PATCH 12/13] fix data delete event
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
.../flink/lakesoul/types/LakeSoulRecordConvert.java | 11 ++++++++++-
.../spark/sql/lakesoul/benchmark/Benchmark.scala | 4 ++--
2 files changed, 12 insertions(+), 3 deletions(-)
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
index fbd120059..b58f548b6 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
@@ -185,8 +185,9 @@ public LakeSoulRowDataWrapper toLakeSoulDataType(Schema sch, Struct value, Table
RowData delete = convert(before, beforeSchema, RowKind.DELETE, sortField);
// boolean nullable = beforeSchema.isOptional();
RowType rt = toFlinkRowType(beforeSchema,false);
- builder.setOperation("delete").setBeforeRowData(null).setBeforeRowType(rt);
delete.setRowKind(RowKind.DELETE);
+ builder.setOperation("delete").setBeforeRowData(delete).setBeforeRowType(rt);
+
} else {
Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
@@ -370,6 +371,14 @@ public long computeBinarySourceRecordPrimaryKeyHash(BinarySourceRecord sourceRec
RowData rowData = Objects.equals(data.getOp(), "delete") ? data.getBefore() : data.getAfter();
List pks = sourceRecord.getPrimaryKeys();
long hash = 42;
+ if (Objects.equals(data.getOp(),"delete")){
+ for (String pk : pks) {
+ int typeIndex = rowType.getFieldIndex(pk);
+ LogicalType type = rowType.getTypeAt(typeIndex);
+ Object fieldOrNull = RowData.createFieldGetter(type, typeIndex).getFieldOrNull(rowData);
+ //hash = LakeSoulKeyGen.getHash(type, fieldOrNull, hash);
+ }
+ }
for (String pk : pks) {
int typeIndex = rowType.getFieldIndex(pk);
LogicalType type = rowType.getTypeAt(typeIndex);
diff --git a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/Benchmark.scala b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/Benchmark.scala
index cdac08413..28512ade0 100644
--- a/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/Benchmark.scala
+++ b/lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/Benchmark.scala
@@ -159,9 +159,9 @@ object Benchmark {
val result = diff1.count() == 0 && diff2.count() == 0
if (!result) {
println(printLine + table + " result: " + result + printLine)
- println("*************diff1**************"+"dfff1.count:"+diff1.count()+"schema"+jdbcDF.printSchema()+"jdbcDF count:"+jdbcDF.count())
+ println("*************diff1**************")
spark.createDataFrame(diff1, lakesoulDF.schema).show()
- println("*************diff2**************"+"diff2.count:"+diff2.count()+"schema"+lakesoulDF.printSchema()+"lakesoulDF count:"+lakesoulDF.count())
+ println("*************diff2**************")
spark.createDataFrame(diff2, lakesoulDF.schema).show()
println(table + " data verification ERROR!!!")
System.exit(1)
From 06b100d7747ee910d1d76dad61c7f1155e45494c Mon Sep 17 00:00:00 2001
From: ChenYunHey <1908166778@qq.com>
Date: Fri, 29 Mar 2024 21:14:40 +0800
Subject: [PATCH 13/13] Remove irrelevant code
Signed-off-by: ChenYunHey <1908166778@qq.com>
---
.../apache/flink/lakesoul/types/BinarySourceRecord.java | 2 --
.../flink/lakesoul/types/LakeSoulRecordConvert.java | 8 --------
2 files changed, 10 deletions(-)
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java
index cb7bba789..0db4354d5 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/BinarySourceRecord.java
@@ -101,8 +101,6 @@ public static BinarySourceRecord fromMysqlSourceRecord(SourceRecord sourceRecord
}
long sortField = (binlogFileIndex << 32) + binlogPosition;
LakeSoulRowDataWrapper data = convert.toLakeSoulDataType(valueSchema, value, tableId, tsMs, sortField);
-// TimestampData timestamp = data.getAfter().getTimestamp(1, 18);
-// System.out.println(timestamp);
String tablePath;
if (tableId.schema()==null){
tablePath = new Path(new Path(basePath, tableId.catalog()), tableId.table()).toString();
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
index b58f548b6..66d961a46 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/types/LakeSoulRecordConvert.java
@@ -371,14 +371,6 @@ public long computeBinarySourceRecordPrimaryKeyHash(BinarySourceRecord sourceRec
RowData rowData = Objects.equals(data.getOp(), "delete") ? data.getBefore() : data.getAfter();
List pks = sourceRecord.getPrimaryKeys();
long hash = 42;
- if (Objects.equals(data.getOp(),"delete")){
- for (String pk : pks) {
- int typeIndex = rowType.getFieldIndex(pk);
- LogicalType type = rowType.getTypeAt(typeIndex);
- Object fieldOrNull = RowData.createFieldGetter(type, typeIndex).getFieldOrNull(rowData);
- //hash = LakeSoulKeyGen.getHash(type, fieldOrNull, hash);
- }
- }
for (String pk : pks) {
int typeIndex = rowType.getFieldIndex(pk);
LogicalType type = rowType.getTypeAt(typeIndex);