Skip to content

Commit

Permalink
[Flink] Integrate open lineage (#532)
Browse files Browse the repository at this point in the history
* 1 support lineage for kafka、lakesoul datastream source to sink into lakesoul
2 suppport lineage for laksoul datastream source to sink to doris、lakesoul and kafka
3 support lineage for flinl sql submit
4 support yarn and k8s application mode

Signed-off-by: maosen <[email protected]>

* modify lineage dependency

Signed-off-by: maosen <[email protected]>

* add lakesoul lineage listener

Signed-off-by: maosen <[email protected]>

* remove redundant items

Signed-off-by: maosen <[email protected]>

* default to jdk 11 for build

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: maosen <[email protected]>
Signed-off-by: chenxu <[email protected]>
Co-authored-by: maosen <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
3 people authored Sep 24, 2024
1 parent 75fa412 commit 5d0b522
Show file tree
Hide file tree
Showing 14 changed files with 317 additions and 28 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/consistency-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ jobs:
- name: Init PG
run: |
./script/meta_init_for_local_test.sh -j 2
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Set up Python 3.9
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: maven
- uses: actions-rs/toolchain@v1
Expand Down Expand Up @@ -51,10 +51,10 @@ jobs:
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Install Protoc
Expand Down Expand Up @@ -151,10 +151,10 @@ jobs:
with:
name: lakesoul-nativemetadata-x86_64-pc-windows-msvc
path: ./rust/target/release/
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Install Protoc
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/native-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: maven
- uses: actions-rs/toolchain@v1
Expand Down Expand Up @@ -64,10 +64,10 @@ jobs:
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Install Protoc
Expand Down
16 changes: 15 additions & 1 deletion lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ SPDX-License-Identifier: Apache-2.0
<version>${flink.version}</version>
<scope>${local.scope}</scope>
</dependency>


<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-flink</artifactId>
<version>1.19.0</version>
<scope>${local.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>${local.scope}</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
Expand Down Expand Up @@ -475,7 +490,6 @@ SPDX-License-Identifier: Apache-2.0
<include>org.furyio:fury-core</include>
<include>com.google.guava:guava</include>
<include>com.google.guava:failureaccess</include>

<!-- casbin & aspectj -->
<include>org.casbin:jdbc-adapter</include>
<include>org.aspectj:aspectjrt</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.lakesoul.entry.sql.flink.LakeSoulInAndOutputJobListener;
import org.apache.flink.lakesoul.metadata.LakeSoulCatalog;
import org.apache.flink.lakesoul.tool.JobOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.table.api.Table;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.types.DataType;
Expand All @@ -28,9 +32,9 @@

import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;

import static org.apache.flink.lakesoul.entry.MongoSinkUtils.*;
import static org.apache.flink.lakesoul.tool.JobOptions.FLINK_CHECKPOINT;
import static org.apache.flink.lakesoul.tool.JobOptions.JOB_CHECKPOINT_INTERVAL;
import static org.apache.flink.lakesoul.tool.LakeSoulSinkDatabasesOptions.*;

Expand All @@ -48,6 +52,8 @@ public class SyncDatabase {
static int sinkParallelism;
static String jdbcOrDorisOptions;
static int checkpointInterval;
static LakeSoulInAndOutputJobListener listener;
static String lineageUrl = null;

public static void main(String[] args) throws Exception {
StringBuilder connectorOptions = new StringBuilder();
Expand Down Expand Up @@ -84,9 +90,30 @@ public static void main(String[] args) throws Exception {
useBatch = parameter.getBoolean(BATHC_STREAM_SINK.key(), BATHC_STREAM_SINK.defaultValue());
Configuration conf = new Configuration();
conf.setString(RestOptions.BIND_PORT, "8081-8089");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
StreamExecutionEnvironment env = null;
lineageUrl = System.getenv("LINEAGE_URL");
if (lineageUrl != null) {
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
conf.set(JobOptions.transportTypeOption, "http");
conf.set(JobOptions.urlOption, lineageUrl);
conf.set(JobOptions.execAttach, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
String appName = env.getConfiguration().get(JobOptions.KUBE_CLUSTER_ID);
String namespace = System.getenv("LAKESOUL_CURRENT_DOMAIN");
if (namespace == null) {
namespace = "public";
}
if (useBatch) {
listener = new LakeSoulInAndOutputJobListener(lineageUrl, "BATCH");
} else {
listener = new LakeSoulInAndOutputJobListener(lineageUrl);
}
listener.jobName(appName, namespace);
env.registerJobListener(listener);
} else {
env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
}
env.setParallelism(sinkParallelism);

switch (dbType) {
case "mysql":
xsyncToMysql(env);
Expand Down Expand Up @@ -219,6 +246,12 @@ public static String getTablePk(String sourceDataBae, String sourceTableName) {
return primaryKeys.size() == 0 ? null : stringBuilder.toString();
}

public static String getTableDomain(String sourceDataBae, String sourceTableName) {
DBManager dbManager = new DBManager();
TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(sourceTableName, sourceDataBae);
return tableInfo.getDomain();
}

public static void xsyncToPg(StreamExecutionEnvironment env) throws SQLException {
if (useBatch) {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
Expand Down Expand Up @@ -354,7 +387,7 @@ public static void xsyncToMysql(StreamExecutionEnvironment env) throws SQLExcept
conn.close();
}

public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes) throws SQLException {
public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes) throws Exception {
if (useBatch) {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
} else {
Expand All @@ -370,6 +403,14 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes)
DataType[] fieldDataTypes = lakesoulTable.getSchema().getFieldDataTypes();
String[] fieldNames = lakesoulTable.getSchema().getFieldNames();
String[] dorisFieldTypes = getDorisFieldTypes(fieldDataTypes);
if (lineageUrl != null) {
String inputName = "lakeSoul." + sourceDatabase + "." + sourceTableName;
String inputnNamespace = getTableDomain(sourceDatabase,sourceTableName);
String[] inputTypes = Arrays.stream(fieldDataTypes).map(type -> type.toString()).collect(Collectors.toList()).toArray(new String[0]);
listener.inputFacets(inputName,inputnNamespace,fieldNames,inputTypes);
String targetName = "doris." + targetDatabase + "." + targetTableName;
listener.outputFacets(targetName,"lake-public",fieldNames,dorisFieldTypes);
}
StringBuilder coulmns = new StringBuilder();
for (int i = 0; i < fieldDataTypes.length; i++) {
coulmns.append("`").append(fieldNames[i]).append("` ").append(dorisFieldTypes[i]);
Expand Down Expand Up @@ -404,7 +445,16 @@ public static void xsyncToDoris(StreamExecutionEnvironment env, String fenodes)
}

tEnvs.executeSql(sql);
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);
if (lineageUrl != null){
String insertsql = "insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName;
StreamStatementSet statements = tEnvs.createStatementSet();
statements.addInsertSql(insertsql);
statements.attachAsDataStream();
env.execute();
}else{
tEnvs.executeSql("insert into " + targetTableName + " select * from lakeSoul.`" + sourceDatabase + "`." + sourceTableName);

}
}

public static void xsyncToMongodb(StreamExecutionEnvironment env,
Expand Down Expand Up @@ -439,4 +489,4 @@ public static void xsyncToMongodb(StreamExecutionEnvironment env,
rowDataStream.sinkTo(sink).setParallelism(sinkParallelism);
env.execute();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public static void executeSqlFileContent(String script, StreamTableEnvironment t
Parser parser = ((TableEnvironmentInternal) tableEnv).getParser();

StreamStatementSet statementSet = tableEnv.createStatementSet();

Boolean hasModifiedOp = false;
for (String statement : statements) {
Operation operation;
try {
Expand Down Expand Up @@ -75,6 +75,7 @@ public static void executeSqlFileContent(String script, StreamTableEnvironment t
} else if (operation instanceof ModifyOperation) {
System.out.println(MessageFormatter.format("\n======Executing insertion:\n{}", statement).getMessage());
// add insertion to statement set
hasModifiedOp = true;
statementSet.addInsertSql(statement);
} else if ((operation instanceof QueryOperation) || (operation instanceof AddJarOperation)) {
LOG.warn("SQL Statement {} is ignored", statement);
Expand All @@ -85,12 +86,16 @@ public static void executeSqlFileContent(String script, StreamTableEnvironment t
tableEnv.executeSql(statement).print();
}
}
statementSet.attachAsDataStream();
Configuration conf = (Configuration) env.getConfiguration();
if (hasModifiedOp) {
statementSet.attachAsDataStream();
Configuration conf = (Configuration) env.getConfiguration();

// try get k8s cluster name
String k8sClusterID = conf.getString("kubernetes.cluster-id", "");
env.execute(k8sClusterID.isEmpty() ? null : k8sClusterID + "-job");
// try get k8s cluster name
String k8sClusterID = conf.getString("kubernetes.cluster-id", "");
env.execute(k8sClusterID.isEmpty() ? null : k8sClusterID + "-job");
} else {
System.out.println("There's no INSERT INTO statement, the program will terminate");
}
}

public static List<String> parseStatements(String script) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

package org.apache.flink.lakesoul.entry.sql.flink;

import io.openlineage.flink.OpenLineageFlinkJobListener;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.lakesoul.entry.sql.Submitter;
import org.apache.flink.lakesoul.entry.sql.common.FlinkOption;
import org.apache.flink.lakesoul.entry.sql.common.JobType;
import org.apache.flink.lakesoul.entry.sql.common.SubmitOption;
import org.apache.flink.lakesoul.entry.sql.utils.FileUtil;
import org.apache.flink.lakesoul.tool.JobOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -32,6 +36,13 @@ public FlinkSqlSubmitter(SubmitOption submitOption) {

@Override
public void submit() throws Exception {
String lineageUrl = System.getenv("LINEAGE_URL");
Configuration conf = new Configuration();
if (lineageUrl != null) {
conf.set(JobOptions.transportTypeOption, "http");
conf.set(JobOptions.urlOption, lineageUrl);
conf.set(JobOptions.execAttach, true);
}
EnvironmentSettings settings = null;
StreamTableEnvironment tEnv = null;
if (submitOption.getJobType().equals(JobType.STREAM.getType())) {
Expand All @@ -47,10 +58,24 @@ public void submit() throws Exception {
} else {
throw new RuntimeException("jobType is not supported: " + submitOption.getJobType());
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
if (submitOption.getJobType().equals(JobType.STREAM.getType())) {
this.setCheckpoint(env);
}
if (lineageUrl != null) {
String appName = env.getConfiguration().get(JobOptions.KUBE_CLUSTER_ID);
String namespace = System.getenv("LAKESOUL_CURRENT_DOMAIN");
if (namespace == null) {
namespace = "lake-public";
}
LOG.info("----namespace:table----{}:{}", appName, namespace);
JobListener listener = OpenLineageFlinkJobListener.builder()
.executionEnvironment(env)
.jobName(appName)
.jobNamespace(namespace)
.build();
env.registerJobListener(listener);
}
tEnv = StreamTableEnvironment.create(env, settings);

String sql = FileUtil.readHDFSFile(submitOption.getSqlFilePath());
Expand Down
Loading

0 comments on commit 5d0b522

Please sign in to comment.