Skip to content

Commit

Permalink
merge openlinage into merge_main
Browse files Browse the repository at this point in the history
MR-title: [Flink] Integrate open lineage (lakesoul-io#532)* 1 support lineage for kafka、lakesoul datastream source to sink into lakesoul2 suppport lineage for laksoul datastream source to sink to doris、lakesoul and kafka3 support lineage for flinl sql submit4 support yarn ...
Created-by: hw_syl_cyh
Author-id: 7155560
MR-id: 7279028
Commit-by: hw_syl_cyh;ChenYunHey;zenghua;hw_syl_zenghua;maosen;fphantam;Ceng;hw_jnj_syl;Xu Chen;moresun
Merged-by: hw_syl_cyh
Description: merge "openlinage" into "merge_main"
[Flink] Integrate open lineage (lakesoul-io#532)

* 1 support lineage for kafka、lakesoul datastream source to sink into lakesoul
2 suppport lineage for laksoul datastream source to sink to doris、lakesoul and kafka
3 support lineage for flinl sql submit
4 support yarn and k8s application mode

Signed-off-by: maosen <[email protected]>

* modify lineage dependency

Signed-off-by: maosen <[email protected]>

* add lakesoul lineage listener

Signed-off-by: maosen <[email protected]>

* remove redundant items

Signed-off-by: maosen <[email protected]>

* default to jdk 11 for build

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: maosen <[email protected]>
Signed-off-by: chenxu <[email protected]>
Co-authored-by: maosen <[email protected]>
Co-authored-by: chenxu <[email protected]>,
add kafka lineage into and out of lakesoul,
extract job name for batch schedule
change job runstate from running to completed,
[Flink] Fix arrow sink config (lakesoul-io#546)

* fix arrow sink config

Signed-off-by: chenxu <[email protected]>

* fix ut

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>,
add scheduleTime replacer in flink sql submitter,
fix clean task bug case path is null

Signed-off-by: fphantam <[email protected]>,
[NativeIO/Fix] Add error info of native writer && fix case of aux_sort_cols (lakesoul-io#547)

* add error info of native writer && fix case of aux_sort_cols

Signed-off-by: zenghua <[email protected]>

* fix clippy

Signed-off-by: zenghua <[email protected]>

* do cargo fmt

Signed-off-by: zenghua <[email protected]>

---------

Signed-off-by: zenghua <[email protected]>
Co-authored-by: zenghua <[email protected]>,
add LakeSoulLocalJavaWriter

Signed-off-by: zenghua <[email protected]>,
fix case of native reader timestamp convert

Signed-off-by: zenghua <[email protected]>,
support cdc table

Signed-off-by: zenghua <[email protected]>,
add debug log for LakeSoulLocalJavaWriter

Signed-off-by: zenghua <[email protected]>,
add compaction ut and update transactionCommit interface

Signed-off-by: zenghua <[email protected]>,
compact with file number limit

Signed-off-by: zenghua <[email protected]>,
[WIP]support compaction with changing bucket num

Signed-off-by: zenghua <[email protected]>,
fix changing bucket num

Signed-off-by: zenghua <[email protected]>,
update pg tableinfo with bucketnum,
update assertion of ut

Signed-off-by: zenghua <[email protected]>,
add CompactionTask parameter file_num_limit

Signed-off-by: zenghua <[email protected]>,
fix case of cleanOldCompaction

Signed-off-by: zenghua <[email protected]>,
continue compaction operation when newBucketNum exists

Signed-off-by: fphantam <[email protected]>,
fileNumLimit compaction do not filter 'delete'

Signed-off-by: fphantam <[email protected]>,
fix compaction filter bug

Signed-off-by: fphantam <[email protected]>,
optimize clean sql and compaction task add reconnect function

Signed-off-by: fphantam <[email protected]>,
add limit push down for table source for batch and stream,but not work with order by clause because of flink,
fix out put,
update the build.yml,
update the build.yml,
add compaction paras of size limit

Signed-off-by: zenghua <[email protected]>,
fix LakeSoulFileWriter bucketId

Signed-off-by: zenghua <[email protected]>,
copy compacted file by fs directly instead of spark read&write

Signed-off-by: zenghua <[email protected]>,
fix ci

Signed-off-by: zenghua <[email protected]>,
compaction with file size condition in parallel

Signed-off-by: zenghua <[email protected]>,
..

Signed-off-by: ChenYunHey <[email protected]>

See merge request: 42b369588d84469d95d7b738fc58da8e/LakeSoul/for-nanhang!9
  • Loading branch information
hw_syl_cyh committed Nov 4, 2024
2 parents 4630749 + cef7920 commit 84ec85c
Show file tree
Hide file tree
Showing 18 changed files with 483 additions and 48 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/consistency-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ jobs:
- name: Init PG
run: |
./script/meta_init_for_local_test.sh -j 2
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Set up Python 3.9
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: maven
- uses: actions-rs/toolchain@v1
Expand Down Expand Up @@ -51,10 +51,10 @@ jobs:
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Install Protoc
Expand Down Expand Up @@ -151,10 +151,10 @@ jobs:
with:
name: lakesoul-nativemetadata-x86_64-pc-windows-msvc
path: ./rust/target/release/
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Install Protoc
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/native-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: maven
- uses: actions-rs/toolchain@v1
Expand Down Expand Up @@ -64,10 +64,10 @@ jobs:
runs-on: windows-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: maven
- name: Install Protoc
Expand Down
16 changes: 15 additions & 1 deletion lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,21 @@ SPDX-License-Identifier: Apache-2.0
<version>${flink.version}</version>
<scope>${local.scope}</scope>
</dependency>


<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-flink</artifactId>
<version>1.19.0</version>
<scope>${local.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>${local.scope}</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
Expand Down Expand Up @@ -526,7 +541,6 @@ SPDX-License-Identifier: Apache-2.0
<include>org.furyio:fury-core</include>
<include>com.google.guava:guava</include>
<include>com.google.guava:failureaccess</include>

<!-- casbin & aspectj -->
<include>org.casbin:jdbc-adapter</include>
<include>org.aspectj:aspectjrt</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.lakesoul.entry.sql.flink.LakeSoulInAndOutputJobListener;
import org.apache.flink.lakesoul.entry.sql.utils.FileUtil;
import org.apache.flink.lakesoul.sink.LakeSoulMultiTableSinkStreamBuilder;
import org.apache.flink.lakesoul.tool.JobOptions;
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.*;
import org.apache.flink.streaming.api.CheckpointingMode;
Expand All @@ -43,6 +46,7 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
Expand All @@ -53,11 +57,10 @@
public class KafkaCdc {

/**
* @param args
* --bootstrap_servers localhost:9092 --topic t_test --auto_offset_reset earliest --group_id test
* --source.parallelism 4 --sink.parallelism 4 --job.checkpoint_interval 5000
* --warehouse_path /tmp/lakesoul/kafka
* --flink.checkpoint /tmp/lakesoul/chk
* @param args --bootstrap_servers localhost:9092 --topic t_test --auto_offset_reset earliest --group_id test
* --source.parallelism 4 --sink.parallelism 4 --job.checkpoint_interval 5000
* --warehouse_path /tmp/lakesoul/kafka
* --flink.checkpoint /tmp/lakesoul/chk
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -164,7 +167,30 @@ public static void main(String[] args) throws Exception {
conf.set(LakeSoulSinkOptions.LOGICALLY_DROP_COLUM, logicallyDropColumn);
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
String lineageUrl = System.getenv("LINEAGE_URL");
LakeSoulInAndOutputJobListener listener = null;
StreamExecutionEnvironment env;
String appName = null;
String namespace = null;
if (lineageUrl != null) {
conf.set(JobOptions.transportTypeOption, "http");
conf.set(JobOptions.urlOption, lineageUrl);
conf.set(JobOptions.execAttach, false);
conf.set(lineageOption, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
appName = FileUtil.getSubNameFromBatch(env.getConfiguration().get(JobOptions.KUBE_CLUSTER_ID));
namespace = System.getenv("LAKESOUL_CURRENT_DOMAIN");
if (namespace == null) {
namespace = "public";
}
listener = new LakeSoulInAndOutputJobListener(lineageUrl);
listener.jobName(appName, namespace);
listener.inputFacets("kafka." + kafkaTopic, "kafka-public", null, null);
env.registerJobListener(listener);
} else {
env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
}

env.getConfig().registerTypeWithKryoSerializer(BinarySourceRecord.class, BinarySourceRecordSerializer.class);
ParameterTool pt = ParameterTool.fromMap(conf.toMap());
env.getConfig().setGlobalJobParameters(pt);
Expand Down Expand Up @@ -207,7 +233,16 @@ public static void main(String[] args) throws Exception {

LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context();
context.env = env;
context.conf = (Configuration) env.getConfiguration();
if (lineageUrl != null) {
Map<String, String> confs = ((Configuration) env.getConfiguration()).toMap();
confs.put(linageJobName.key(), appName);
confs.put(linageJobNamespace.key(), namespace);
confs.put(lineageJobUUID.key(), listener.getRunId());
confs.put(lineageOption.key(), "true");
context.conf = Configuration.fromMap(confs);
} else {
context.conf = (Configuration) env.getConfiguration();
}
LakeSoulMultiTableSinkStreamBuilder builder = new LakeSoulMultiTableSinkStreamBuilder(kafkaSource, context, lakeSoulRecordConvert);
DataStreamSource<BinarySourceRecord> source = builder.buildMultiTableSource("Kafka Source");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.dmetasoul.lakesoul.meta.DBManager;
import com.dmetasoul.lakesoul.meta.DBUtil;
import com.dmetasoul.lakesoul.meta.entity.TableInfo;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand All @@ -23,7 +24,10 @@
import org.apache.flink.formats.avro.RowDataToAvroConverters.RowDataToAvroConverter;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroSerializationSchema;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.lakesoul.entry.sql.flink.LakeSoulInAndOutputJobListener;
import org.apache.flink.lakesoul.entry.sql.utils.FileUtil;
import org.apache.flink.lakesoul.source.arrow.LakeSoulArrowSource;
import org.apache.flink.lakesoul.tool.JobOptions;
import org.apache.flink.lakesoul.tool.LakeSoulKeyGen;
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.LakSoulKafkaPartitioner;
Expand Down Expand Up @@ -137,7 +141,42 @@ public static void main(String[] args) throws Exception {
conf.set(LakeSoulSinkOptions.BUCKET_PARALLELISM, sinkParallelism);
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
String lineageUrl = System.getenv("LINEAGE_URL");
LakeSoulInAndOutputJobListener listener;
StreamExecutionEnvironment env;
if (lineageUrl != null) {
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
conf.set(JobOptions.transportTypeOption, "http");
conf.set(JobOptions.urlOption, lineageUrl);
conf.set(JobOptions.execAttach, false);
conf.set(lineageOption, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
String appName = FileUtil.getSubNameFromBatch(env.getConfiguration().get(JobOptions.KUBE_CLUSTER_ID));
String namespace = System.getenv("LAKESOUL_CURRENT_DOMAIN");
if (namespace == null) {
namespace = "public";
}
listener = new LakeSoulInAndOutputJobListener(lineageUrl);
listener.jobName(appName, namespace);
listener.outputFacets("Kafka." + kafkaTopic, "kafka-public", null, null);
DBManager lakesoulDBManager = new DBManager();
TableInfo tableInfo = lakesoulDBManager.getTableInfoByNameAndNamespace(lakeSoulTableName, lakeSoulDBName);
String tableSchema = tableInfo.getTableSchema();
Schema schema = Schema.fromJSON(tableSchema);
int size = schema.getFields().size();
String[] colNames = new String[size];
String[] colTypes = new String[size];
for (int i = 0; i < size; i++) {
Field field = schema.getFields().get(i);
colNames[i] = field.getName();
colTypes[i] = field.getType().toString();
}
listener.inputFacets("lakesoul." + lakeSoulDBName + "." + lakeSoulTableName, tableInfo.getDomain(), colNames, colTypes);
env.registerJobListener(listener);
} else {
env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
}

ParameterTool pt = ParameterTool.fromMap(conf.toMap());
env.getConfig().setGlobalJobParameters(pt);
env.enableCheckpointing(checkpointInterval);
Expand Down Expand Up @@ -183,15 +222,15 @@ public static void main(String[] args) throws Exception {
.build();


Tuple4<ConfluentRegistryAvroSerializationSchema, RowDataToAvroConverter, RowType, RowData.FieldGetter[]> keyInfo = getKeyInfo(lakeSoulDBName,
Tuple4<ConfluentRegistryAvroSerializationSchema, RowDataToAvroConverter, RowType, RowData.FieldGetter[]> keyInfo = getKeyInfo(lakeSoulDBName,
lakeSoulTableName, kafkaTopic, schemaRegistryUrl, props);
ConfluentRegistryAvroSerializationSchema keySerialization;
RowDataToAvroConverter keyRowDataToAvroConverter;
final RowType keyRowType;
FieldGetter[] keyFieldGetters;
if (keyInfo != null) {
keySerialization = keyInfo.f0;
keyRowDataToAvroConverter = keyInfo.f1 ;
keyRowDataToAvroConverter = keyInfo.f1;
keyRowType = keyInfo.f2;
keyFieldGetters = keyInfo.f3;
} else {
Expand Down Expand Up @@ -221,7 +260,7 @@ public void flatMap(LakeSoulArrowWrapper lakeSoulArrowWrapper, Collector<Tuple3<
RowData kafkaRowData = toKafkaAvroRawData(rowData, rowType, String.format("%s.%s", lakeSoulDBName, lakeSoulTableName));

byte[] keyBytes = null;
if (keySerialization != null ) {
if (keySerialization != null) {
GenericRecord keyGenericRecord = (GenericRecord) keyRowDataToAvroConverter.convert(
AvroSchemaConverter.convertToSchema(keyRowType, false),
createProjectedRow(rowData, RowKind.INSERT, keyFieldGetters));
Expand Down Expand Up @@ -269,7 +308,8 @@ public static Tuple4<ConfluentRegistryAvroSerializationSchema, RowDataToAvroConv

final RowData.FieldGetter[] keyFieldGetters;
RowType keyRowType;
int[] keyIndex;;
int[] keyIndex;
;

if (primaryKeys.size() > 0) {
try {
Expand Down
Loading

0 comments on commit 84ec85c

Please sign in to comment.