diff --git a/.github/workflows/consistency-ci.yml b/.github/workflows/consistency-ci.yml
index f95902f97..253f5d2d9 100644
--- a/.github/workflows/consistency-ci.yml
+++ b/.github/workflows/consistency-ci.yml
@@ -56,10 +56,10 @@ jobs:
- name: Init PG
run: |
./script/meta_init_for_local_test.sh -j 2
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v4
with:
- java-version: '8'
+ java-version: '11'
distribution: 'temurin'
cache: maven
- name: Set up Python 3.9
diff --git a/.github/workflows/deployment.yml b/.github/workflows/deployment.yml
index 5b26dfa81..8d9ed61a1 100644
--- a/.github/workflows/deployment.yml
+++ b/.github/workflows/deployment.yml
@@ -15,10 +15,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v4
with:
- java-version: '8'
+ java-version: '11'
distribution: 'temurin'
cache: maven
- uses: actions-rs/toolchain@v1
@@ -51,10 +51,10 @@ jobs:
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v4
with:
- java-version: '8'
+ java-version: '11'
distribution: 'temurin'
cache: maven
- name: Install Protoc
@@ -151,10 +151,10 @@ jobs:
with:
name: lakesoul-nativemetadata-x86_64-pc-windows-msvc
path: ./rust/target/release/
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v4
with:
- java-version: '8'
+ java-version: '11'
distribution: 'temurin'
cache: maven
- name: Install Protoc
diff --git a/.github/workflows/native-build.yml b/.github/workflows/native-build.yml
index a03df16a8..e3152226c 100644
--- a/.github/workflows/native-build.yml
+++ b/.github/workflows/native-build.yml
@@ -27,10 +27,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v4
with:
- java-version: '8'
+ java-version: '11'
distribution: 'temurin'
cache: maven
- uses: actions-rs/toolchain@v1
@@ -64,10 +64,10 @@ jobs:
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
- - name: Set up JDK 8
+ - name: Set up JDK 11
uses: actions/setup-java@v4
with:
- java-version: '8'
+ java-version: '11'
distribution: 'temurin'
cache: maven
- name: Install Protoc
diff --git a/lakesoul-flink/pom.xml b/lakesoul-flink/pom.xml
index 1a9d24282..f323a764a 100644
--- a/lakesoul-flink/pom.xml
+++ b/lakesoul-flink/pom.xml
@@ -69,6 +69,21 @@ SPDX-License-Identifier: Apache-2.0
${flink.version}
${local.scope}
+
+
+
+ io.openlineage
+ openlineage-flink
+ 1.19.0
+ ${local.scope}
+
+
+ org.apache.flink
+ flink-connector-kafka
+ ${flink.version}
+ ${local.scope}
+
+
org.apache.flink
flink-core
@@ -475,7 +490,6 @@ SPDX-License-Identifier: Apache-2.0
org.furyio:fury-core
com.google.guava:guava
com.google.guava:failureaccess
-
org.casbin:jdbc-adapter
org.aspectj:aspectjrt
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
index f9734c9d2..cf9c44feb 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/SyncDatabase.java
@@ -14,12 +14,16 @@
import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.lakesoul.entry.sql.flink.LakeSoulInAndOutputJobListener;
import org.apache.flink.lakesoul.metadata.LakeSoulCatalog;
+import org.apache.flink.lakesoul.tool.JobOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.types.DataType;
@@ -28,9 +32,9 @@
import java.sql.*;
import java.util.*;
+import java.util.stream.Collectors;
import static org.apache.flink.lakesoul.entry.MongoSinkUtils.*;
-import static org.apache.flink.lakesoul.tool.JobOptions.FLINK_CHECKPOINT;
import static org.apache.flink.lakesoul.tool.JobOptions.JOB_CHECKPOINT_INTERVAL;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkDatabasesOptions.*;
@@ -48,6 +52,8 @@ public class SyncDatabase {
static int sinkParallelism;
static String jdbcOrDorisOptions;
static int checkpointInterval;
+ static LakeSoulInAndOutputJobListener listener;
+ static String lineageUrl = null;
public static void main(String[] args) throws Exception {
StringBuilder connectorOptions = new StringBuilder();
@@ -84,9 +90,30 @@ public static void main(String[] args) throws Exception {
useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue());
Configuration conf = new Configuration();
conf.setString(RestOptions.BIND_PORT, "8081-8089");
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
+ StreamExecutionEnvironment env = null;
+ lineageUrl = System.getenv("LINEAGE_URL");
+ if (lineageUrl != null) {
+ conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+ conf.set(JobOptions.transportTypeOption, "http");
+ conf.set(JobOptions.urlOption, lineageUrl);
+ conf.set(JobOptions.execAttach, true);
+ env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
+ String appName = env.getConfiguration().get(JobOptions.KUBE_CLUSTER_ID);
+ String namespace = System.getenv("LAKESOUL_CURRENT_DOMAIN");
+ if (namespace == null) {
+ namespace = "public";
+ }
+ if (useBatch) {
+ listener = new LakeSoulInAndOutputJobListener(lineageUrl, "BATCH");
+ } else {
+ listener = new LakeSoulInAndOutputJobListener(lineageUrl);
+ }
+ listener.jobName(appName, namespace);
+ env.registerJobListener(listener);
+ } else {
+ env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
+ }
env.setParallelism(sinkParallelism);
-
switch (dbType) {
case "mysql":
xsyncToMysql(env);
@@ -219,6 +246,12 @@ public static String getTablePk(String sourceDataBae, String sourceTableName) {
return primaryKeys.size() == 0 ? null : stringBuilder.toString();
}
+ public static String getTableDomain(String sourceDataBae, String sourceTableName) {
+ DBManager dbManager = new DBManager();
+ TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(sourceTableName, sourceDataBae);
+ return tableInfo.getDomain();
+ }
+
public static void xsyncToPg(StreamExecutionEnvironment env) throws SQLException {
if (useBatch) {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -354,7 +387,7 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept
conn.close();
}
- public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes) throws SQLException {
+ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes) throws Exception {
if (useBatch) {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
} else {
@@ -370,6 +403,14 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes)
DataType[] fieldDataTypes = lakesoulTable.getSchema().getFieldDataTypes();
String[] fieldNames = lakesoulTable.getSchema().getFieldNames();
String[] dorisFieldTypes = getDorisFieldTypes(fieldDataTypes);
+ if (lineageUrl != null) {
+ String inputName = "lakeSoul." + sourceDatabase + "." + sourceTableName;
+ String inputnNamespace = getTableDomain(sourceDatabase,sourceTableName);
+ String[] inputTypes = Arrays.stream(fieldDataTypes).map(type -> type.toString()).collect(Collectors.toList()).toArray(new String[0]);
+ listener.inputFacets(inputName,inputnNamespace,fieldNames,inputTypes);
+ String targetName = "doris." + targetDatabase + "." + targetTableName;
+ listener.outputFacets(targetName,"lake-public",fieldNames,dorisFieldTypes);
+ }
StringBuilder coulmns = new StringBuilder();
for (int i = 0; i < fieldDataTypes.length; i++) {
coulmns.append("`").append(fieldNames[i]).append("` ").append(dorisFieldTypes[i]);
@@ -404,7 +445,16 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes)
}
tEnvs.executeSql(sql);
- tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);
+ if (lineageUrl != null){
+ String insertsql = "insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName;
+ StreamStatementSet statements = tEnvs.createStatementSet();
+ statements.addInsertSql(insertsql);
+ statements.attachAsDataStream();
+ env.execute();
+ }else{
+ tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);
+
+ }
}
public static void xsyncToMongodb(StreamExecutionEnvironment env,
@@ -439,4 +489,4 @@ public static void xsyncToMongodb(StreamExecutionEnvironment env,
rowDataStream.sinkTo(sink).setParallelism(sinkParallelism);
env.execute();
}
-}
\ No newline at end of file
+}
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/ExecuteSql.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/ExecuteSql.java
index ac59c6ed3..53b6f3bf0 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/ExecuteSql.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/ExecuteSql.java
@@ -42,7 +42,7 @@ public static void executeSqlFileContent(String script, StreamTableEnvironment t
Parser parser = ((TableEnvironmentInternal) tableEnv).getParser();
StreamStatementSet statementSet = tableEnv.createStatementSet();
-
+ Boolean hasModifiedOp = false;
for (String statement : statements) {
Operation operation;
try {
@@ -75,6 +75,7 @@ public static void executeSqlFileContent(String script, StreamTableEnvironment t
} else if (operation instanceof ModifyOperation) {
System.out.println(MessageFormatter.format("\n======Executing insertion:\n{}", statement).getMessage());
// add insertion to statement set
+ hasModifiedOp = true;
statementSet.addInsertSql(statement);
} else if ((operation instanceof QueryOperation) || (operation instanceof AddJarOperation)) {
LOG.warn("SQL Statement {} is ignored", statement);
@@ -85,12 +86,16 @@ public static void executeSqlFileContent(String script, StreamTableEnvironment t
tableEnv.executeSql(statement).print();
}
}
- statementSet.attachAsDataStream();
- Configuration conf = (Configuration) env.getConfiguration();
+ if (hasModifiedOp) {
+ statementSet.attachAsDataStream();
+ Configuration conf = (Configuration) env.getConfiguration();
- // try get k8s cluster name
- String k8sClusterID = conf.getString("kubernetes.cluster-id", "");
- env.execute(k8sClusterID.isEmpty() ? null : k8sClusterID + "-job");
+ // try get k8s cluster name
+ String k8sClusterID = conf.getString("kubernetes.cluster-id", "");
+ env.execute(k8sClusterID.isEmpty() ? null : k8sClusterID + "-job");
+ } else {
+ System.out.println("There's no INSERT INTO statement, the program will terminate");
+ }
}
public static List parseStatements(String script) {
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/FlinkSqlSubmitter.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/FlinkSqlSubmitter.java
index 729859b56..15e40aa2c 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/FlinkSqlSubmitter.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/FlinkSqlSubmitter.java
@@ -4,13 +4,17 @@
package org.apache.flink.lakesoul.entry.sql.flink;
+import io.openlineage.flink.OpenLineageFlinkJobListener;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobListener;
import org.apache.flink.lakesoul.entry.sql.Submitter;
import org.apache.flink.lakesoul.entry.sql.common.FlinkOption;
import org.apache.flink.lakesoul.entry.sql.common.JobType;
import org.apache.flink.lakesoul.entry.sql.common.SubmitOption;
import org.apache.flink.lakesoul.entry.sql.utils.FileUtil;
+import org.apache.flink.lakesoul.tool.JobOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -32,6 +36,13 @@ public FlinkSqlSubmitter(SubmitOption submitOption) {
@Override
public void submit() throws Exception {
+ String lineageUrl = System.getenv("LINEAGE_URL");
+ Configuration conf = new Configuration();
+ if (lineageUrl != null) {
+ conf.set(JobOptions.transportTypeOption, "http");
+ conf.set(JobOptions.urlOption, lineageUrl);
+ conf.set(JobOptions.execAttach, true);
+ }
EnvironmentSettings settings = null;
StreamTableEnvironment tEnv = null;
if (submitOption.getJobType().equals(JobType.STREAM.getType())) {
@@ -47,10 +58,24 @@ public void submit() throws Exception {
} else {
throw new RuntimeException("jobType is not supported: " + submitOption.getJobType());
}
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
if (submitOption.getJobType().equals(JobType.STREAM.getType())) {
this.setCheckpoint(env);
}
+ if (lineageUrl != null) {
+ String appName = env.getConfiguration().get(JobOptions.KUBE_CLUSTER_ID);
+ String namespace = System.getenv("LAKESOUL_CURRENT_DOMAIN");
+ if (namespace == null) {
+ namespace = "lake-public";
+ }
+ LOG.info("----namespace:table----{}:{}", appName, namespace);
+ JobListener listener = OpenLineageFlinkJobListener.builder()
+ .executionEnvironment(env)
+ .jobName(appName)
+ .jobNamespace(namespace)
+ .build();
+ env.registerJobListener(listener);
+ }
tEnv = StreamTableEnvironment.create(env, settings);
String sql = FileUtil.readHDFSFile(submitOption.getSqlFilePath());
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/LakeSoulInAndOutputJobListener.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/LakeSoulInAndOutputJobListener.java
new file mode 100644
index 000000000..21310abbf
--- /dev/null
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/entry/sql/flink/LakeSoulInAndOutputJobListener.java
@@ -0,0 +1,158 @@
+// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
+//
+// SPDX-License-Identifier: Apache-2.0
+package org.apache.flink.lakesoul.entry.sql.flink;
+
+
+import io.openlineage.client.OpenLineage;
+import io.openlineage.client.OpenLineageClient;
+import io.openlineage.client.transports.HttpConfig;
+import io.openlineage.client.transports.HttpTransport;
+import io.openlineage.client.transports.Transport;
+import io.openlineage.flink.client.EventEmitter;
+import io.openlineage.flink.shaded.org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.core.execution.DetachedJobExecutionResult;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+import java.time.ZonedDateTime;
+import java.util.*;
+
+public class LakeSoulInAndOutputJobListener implements JobListener {
+ private static final Logger log = LoggerFactory.getLogger(LakeSoulInAndOutputJobListener.class);
+ private OpenLineageClient client;
+ private OpenLineage openLineage;
+ private List inputDatasets;
+ private List outputDatasets;
+ private String executeMode = "STREAM";
+ private UUID runId;
+ OpenLineage.Run run;
+ private OpenLineage.Job job;
+
+ public LakeSoulInAndOutputJobListener(String url) {
+ HttpConfig config = new HttpConfig();
+ config.setUrl(URI.create(url));
+ Transport transport = new HttpTransport(config);
+ client = new OpenLineageClient(transport);
+ openLineage = new OpenLineage(EventEmitter.OPEN_LINEAGE_CLIENT_URI);
+ }
+
+ public LakeSoulInAndOutputJobListener(String url, String executeMode) {
+ this(url);
+ this.executeMode = executeMode;
+ }
+
+ public LakeSoulInAndOutputJobListener jobName(String name, String namespace) {
+ this.runId = UUID.randomUUID();
+ this.run = openLineage.newRunBuilder().runId(this.runId).build();
+ OpenLineage.JobFacets jobFacets = openLineage.newJobFacetsBuilder().jobType(openLineage.newJobTypeJobFacetBuilder().jobType("Flink Job").integration("Flink").processingType(this.executeMode).build()).build();
+ this.job = openLineage.newJobBuilder().name(name).namespace(namespace).facets(jobFacets).build();
+ return this;
+ }
+
+ public LakeSoulInAndOutputJobListener inputFacets(String inputName, String inputNamespace, String[] inputSchemaNames, String[] inputSchemaTypes) {
+ List schemaFields = new ArrayList<>();
+ if (inputSchemaNames != null && inputSchemaTypes != null && inputSchemaTypes.length == inputSchemaTypes.length) {
+ for (int i = 0; i < inputSchemaNames.length; i++) {
+ schemaFields.add(openLineage.newSchemaDatasetFacetFieldsBuilder().name(inputSchemaNames[i]).type(inputSchemaTypes[i]).build());
+ }
+ }
+ if (inputSchemaNames != null && inputSchemaTypes == null) {
+ for (int i = 0; i < inputSchemaNames.length; i++) {
+ schemaFields.add(openLineage.newSchemaDatasetFacetFieldsBuilder().name(inputSchemaNames[i]).build());
+ }
+ }
+
+ OpenLineage.SchemaDatasetFacet schemaFacet = openLineage.newSchemaDatasetFacetBuilder().fields(schemaFields).build();
+ this.inputDatasets = Arrays.asList(
+ openLineage.newInputDatasetBuilder().name(inputName).namespace(inputNamespace)
+ .facets(
+ openLineage.newDatasetFacetsBuilder().schema(schemaFacet).build()
+ ).build()
+ );
+ return this;
+ }
+
+ public LakeSoulInAndOutputJobListener outputFacets(String outputName, String outputNamespace, String[] outputSchemaNames, String[] outputSchemaTypes) {
+
+ List schemaFields = new ArrayList<>();
+ if (outputSchemaNames != null && outputSchemaTypes != null && outputSchemaTypes.length == outputSchemaTypes.length) {
+ for (int i = 0; i < outputSchemaNames.length; i++) {
+ schemaFields.add(openLineage.newSchemaDatasetFacetFieldsBuilder().name(outputSchemaNames[i]).type(outputSchemaTypes[i]).build());
+ }
+ }
+ if (outputSchemaNames != null && outputSchemaTypes == null) {
+ for (int i = 0; i < outputSchemaNames.length; i++) {
+ schemaFields.add(openLineage.newSchemaDatasetFacetFieldsBuilder().name(outputSchemaNames[i]).build());
+ }
+ }
+
+ OpenLineage.SchemaDatasetFacet schemaFacet = openLineage.newSchemaDatasetFacetBuilder().fields(schemaFields).build();
+ this.outputDatasets = Arrays.asList(
+ openLineage.newOutputDatasetBuilder().name(outputName).namespace(outputNamespace)
+ .facets(
+ openLineage.newDatasetFacetsBuilder().schema(schemaFacet).build()
+ ).build()
+ );
+ return this;
+ }
+
+
+ @Override
+ public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
+ log.info("------lake onjobsubmit----jobid:{}",jobClient.getJobID());
+
+ OpenLineage.RunEvent runStateUpdate =
+ openLineage.newRunEventBuilder()
+ .eventType(OpenLineage.RunEvent.EventType.RUNNING)
+ .eventTime(ZonedDateTime.now())
+ .run(this.run)
+ .job(this.job)
+ .inputs(this.inputDatasets)
+ .outputs(this.outputDatasets)
+ .build();
+ if(this.inputDatasets != null || this.outputDatasets != null) {
+ this.client.emit(runStateUpdate);
+ }
+ }
+
+ @Override
+ public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
+ if (jobExecutionResult instanceof DetachedJobExecutionResult) {
+ log.warn("Job running in detached mode. Set execution.attached to true if you want to emit completed events.");
+ } else {
+ OpenLineage.RunEvent runStateUpdate = null;
+ if (jobExecutionResult != null) {
+ log.info("------onjobexecuted----jobresult:{}",jobExecutionResult.getJobExecutionResult().toString());
+ runStateUpdate =
+ openLineage.newRunEventBuilder()
+ .eventType(OpenLineage.RunEvent.EventType.COMPLETE)
+ .eventTime(ZonedDateTime.now())
+ .run(this.run)
+ .job(this.job)
+ .inputs(this.inputDatasets)
+ .outputs(this.outputDatasets)
+ .build();
+ } else {
+ log.info("------onjobexecuted----jobresult:{null}");
+ OpenLineage.Run failRun = openLineage.newRunBuilder().runId(this.runId).facets(openLineage.newRunFacetsBuilder().errorMessage(openLineage.newErrorMessageRunFacet(throwable.getMessage(), "JAVA", ExceptionUtils.getStackTrace(throwable))).build()).build();
+ runStateUpdate =
+ openLineage.newRunEventBuilder()
+ .eventType(OpenLineage.RunEvent.EventType.FAIL)
+ .eventTime(ZonedDateTime.now())
+ .run(failRun)
+ .job(this.job)
+ .inputs(this.inputDatasets)
+ .outputs(this.outputDatasets)
+ .build();
+ }
+ this.client.emit(runStateUpdate);
+ }
+ }
+
+}
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTablesSink.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTablesSink.java
index f561bc49b..8a8984000 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTablesSink.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/LakeSoulMultiTablesSink.java
@@ -133,4 +133,7 @@ public Collection getCompatibleStateNames() {
// StreamingFileSink
return Collections.singleton("lakesoul-cdc-multitable-bucket-states");
}
+ public BucketsBuilder getBucketsBuilder(){
+ return this.bucketsBuilder;
+ }
}
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java
index 5f932f525..489e65a77 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/sink/bucket/DefaultOneTableBulkFormatBuilder.java
@@ -33,6 +33,9 @@ public DefaultOneTableBulkFormatBuilder(
super(basePath, conf, new DefaultLakeSoulWriterBucketFactory(conf));
this.identity = identity;
}
+ public TableSchemaIdentity getIdentity(){
+ return this.identity;
+ }
@Override
public AbstractLakeSoulMultiTableSinkWriter createWriter(Sink.InitContext context, int subTaskId) throws
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulLookupTableSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulLookupTableSource.java
index 9fbb63336..a4552a5a1 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulLookupTableSource.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulLookupTableSource.java
@@ -67,7 +67,9 @@ public LakeSoulLookupTableSource(TableId tableId,
catalogTable.getOptions().forEach(configuration::setString);
validateLookupConfigurations();
}
-
+ public TableId getTableId(){
+ return this.tableId;
+ }
private void validateLookupConfigurations() {
String partitionInclude = configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE);
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java
index 05af12a12..12736ebde 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/source/LakeSoulSource.java
@@ -80,6 +80,9 @@ public LakeSoulSource(TableId tableId,
this.pushedFilter = pushedFilter;
this.partitionFilters = partitionFilters;
}
+ public TableId getTableId(){
+ return this.tableId;
+ }
@Override
public Boundedness getBoundedness() {
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java
index a943e6baa..2e3cd4cd4 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/table/LakeSoulTableSource.java
@@ -89,7 +89,9 @@ public LakeSoulTableSource(TableId tableId,
this.optionParams = optionParams;
this.modificationContext = null;
}
-
+ public TableId getTableId(){
+ return this.tableId;
+ }
@Override
public DynamicTableSource copy() {
LakeSoulTableSource newInstance = new LakeSoulTableSource(this.tableId,
diff --git a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/JobOptions.java b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/JobOptions.java
index a86d3b980..87cd97904 100644
--- a/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/JobOptions.java
+++ b/lakesoul-flink/src/main/java/org/apache/flink/lakesoul/tool/JobOptions.java
@@ -11,6 +11,7 @@
import java.time.Duration;
import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;
public class JobOptions {
@@ -120,4 +121,27 @@ public class JobOptions {
.defaultValue("file:///")
.withDescription("Option to set fs default scheme");
+ public static final ConfigOption transportTypeOption =
+ ConfigOptions.key("openlineage.transport.type").stringType().defaultValue("http");
+ public static final ConfigOption urlOption =
+ ConfigOptions.key("openlineage.transport.url").stringType().noDefaultValue();
+ public static final ConfigOption execAttach =
+ ConfigOptions.key("execution.attached").booleanType().defaultValue(false);
+ public static final ConfigOption lineageOption =
+ ConfigOptions.key("openlineage.executed").booleanType().defaultValue(false);
+
+ public static final ConfigOption KUBE_CLUSTER_ID =
+ key("kubernetes.cluster-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "The cluster-id, which should be no more than 45 characters, is used for identifying a unique Flink cluster. "
+ + "The id must only contain lowercase alphanumeric characters and \"-\". "
+ + "The required format is %s. "
+ + "If not set, the client will automatically generate it with a random ID.",
+ code("[a-z]([-a-z0-9]*[a-z0-9])"))
+ .build());
+
}