Skip to content

Commit

Permalink
Merge the dev branch.
Browse files Browse the repository at this point in the history
  • Loading branch information
shaomengwang committed Apr 22, 2022
1 parent e95ab78 commit f145b02
Show file tree
Hide file tree
Showing 299 changed files with 6,160 additions and 321 deletions.
66 changes: 66 additions & 0 deletions connectors/connector-parquet-0.11/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>alink_connectors</artifactId>
<groupId>com.alibaba.alink</groupId>
<version>1.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>alink_connector_parquet_flink-${alink.flink.major.version}_${alink.scala.major.version}</artifactId>
<packaging>jar</packaging>
<name>alink_connector_parquet</name>

<dependencies>
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-${alink.flink.major.version}_${alink.scala.major.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.20</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.12.0</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package com.alibaba.alink.common.io.parquet.plugin;

import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.formats.parquet.utils.ParquetRecordReader;
import org.apache.flink.formats.parquet.utils.RowReadSupport;
import org.apache.flink.metrics.Counter;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import com.alibaba.alink.common.io.filesystem.BaseFileSystem;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.filesystem.copy.FileInputFormat;
import com.alibaba.alink.common.io.parquet.plugin.ParquetUtil.ParquetInputFile;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class GenericParquetInputFormat extends FileInputFormat<Row>
implements CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {

private static final Logger LOG = LoggerFactory.getLogger(GenericParquetInputFormat.class);

/**
* The flag to specify whether to skip corrupted record.
*/
private boolean skipCorruptedRecord = false;

/**
* The flag to track that the current split should be skipped.
*/
private boolean skipThisSplit = false;

private MessageType readSchema;

private FilePath parquetFilePath;
private BaseFileSystem<?> fileSystem;

private FilterPredicate filterPredicate;

private transient Counter recordConsumed;

private transient ParquetRecordReader<Row> parquetRecordReader;

public GenericParquetInputFormat(FilePath filePath) {
super(filePath.getPath(), filePath.getFileSystem());
this.parquetFilePath = filePath;
fileSystem = filePath.getFileSystem();
// read whole parquet file as one file split
this.unsplittable = true;
// read schema
}

@Override
public void configure(Configuration configuration) {
super.configure(configuration);

if (this.skipCorruptedRecord) {
this.skipCorruptedRecord = configuration.getBoolean(PARQUET_SKIP_CORRUPTED_RECORD, false);
}
}

@Override
public void open(FileInputSplit split) throws IOException {
// reset the flag when open a new split
this.skipThisSplit = false;
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
InputFile inputFile = new ParquetInputFile(parquetFilePath);

readSchema = ParquetUtil.getReadSchemaFromParquetFile(parquetFilePath);

ParquetReadOptions options = ParquetReadOptions.builder().build();
ParquetFileReader fileReader = new ParquetFileReader(inputFile, options);
if (skipThisSplit) {
LOG.warn(
String.format(
"Escaped the file split [%s] due to mismatch of file schema to expected result schema",
split.getPath().toString()));
} else {
this.parquetRecordReader =
new ParquetRecordReader<>(
new RowReadSupport(),
readSchema,
filterPredicate == null
? FilterCompat.NOOP
: FilterCompat.get(filterPredicate));
this.parquetRecordReader.initialize(fileReader, configuration);
this.parquetRecordReader.setSkipCorruptedRecord(this.skipCorruptedRecord);

if (this.recordConsumed == null) {
this.recordConsumed =
getRuntimeContext().getMetricGroup().counter("parquet-records-consumed");
}

LOG.debug(
String.format(
"Open ParquetInputFormat with FileInputSplit [%s]",
split.getPath().toString()));
}
}

@Override
public boolean reachedEnd() throws IOException {
if (skipThisSplit) {
return true;
}

return parquetRecordReader.reachEnd();
}

@Override
public Row nextRecord(Row row) throws IOException {
if (reachedEnd()) {
return null;
}

recordConsumed.inc();
return parquetRecordReader.nextRecord();
}

@Override
public void close() throws IOException {
if (parquetRecordReader != null) {
parquetRecordReader.close();
}
}

@Override
public Tuple2<Long, Long> getCurrentState() {
return parquetRecordReader.getCurrentReadPosition();
}

@Override
public void reopen(FileInputSplit split, Tuple2<Long, Long> state) throws IOException {
Preconditions.checkNotNull(split, "reopen() cannot be called on a null split.");
Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state.");
this.open(split);
// seek to the read position in the split that we were at when the checkpoint was taken.
parquetRecordReader.seek(state.f0, state.f1);
}

/**
* The config parameter which defines whether to skip corrupted record.
*/
public static final String PARQUET_SKIP_CORRUPTED_RECORD = "skip.corrupted.record";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.alibaba.alink.common.io.parquet.plugin;

import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.parquet.ParquetSourceFactory;
import com.alibaba.alink.params.io.ParquetSourceParams;

public class ParquetSourceFactoryImpl implements ParquetSourceFactory {
@Override
public Tuple2<RichInputFormat<Row, FileInputSplit>, TableSchema> createParquetSourceFunction(Params params) {
FilePath filePath = FilePath.deserialize(params.get(ParquetSourceParams.FILE_PATH));

TableSchema schema = null;
try {
schema = ParquetUtil.getTableSchemaFromParquetFile(filePath);
} catch (Exception e) {
throw new IllegalArgumentException("Cannot read footer from parquet file");
}

GenericParquetInputFormat parquetInputFormat = new GenericParquetInputFormat(filePath);

return Tuple2.of(parquetInputFormat, schema);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.alibaba.alink.common.io.parquet.plugin;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
import org.apache.flink.shaded.guava18.com.google.common.collect.BiMap;
import org.apache.flink.shaded.guava18.com.google.common.collect.HashBiMap;
import org.apache.flink.table.api.TableSchema;

import com.alibaba.alink.common.io.filesystem.FilePath;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ParquetUtil {
private static final BiMap<PrimitiveTypeName, TypeInformation<?>> PRIMITIVE_TYPE_MAP = HashBiMap.create();

static {
PRIMITIVE_TYPE_MAP.put(PrimitiveTypeName.BOOLEAN, Types.BOOLEAN);
PRIMITIVE_TYPE_MAP.put(PrimitiveTypeName.BINARY, Types.STRING);
PRIMITIVE_TYPE_MAP.put(PrimitiveTypeName.INT32, Types.INT);
PRIMITIVE_TYPE_MAP.put(PrimitiveTypeName.INT64, Types.LONG);
PRIMITIVE_TYPE_MAP.put(PrimitiveTypeName.INT96, Types.SQL_TIMESTAMP);
PRIMITIVE_TYPE_MAP.put(PrimitiveTypeName.DOUBLE, Types.DOUBLE);
PRIMITIVE_TYPE_MAP.put(PrimitiveTypeName.FLOAT, Types.FLOAT);
}

public static MessageType getReadSchemaFromParquetFile(FilePath filePath) throws IOException {
MessageType messageType = readSchemaFromFile(filePath);
RowTypeInfo schema = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType);
List<String[]> paths = messageType.getPaths();
List<Type> types = new ArrayList<>();

for (int i = 0; i < paths.size(); i++) {
String[] path = paths.get(i);
Type type = messageType.getType(path);
if(PRIMITIVE_TYPE_MAP.containsKey(type.asPrimitiveType().getPrimitiveTypeName())){
types.add(type);
}
}
MessageType readMessageType = new MessageType("alink_parquet_source",types);
return readMessageType;
}

public static TableSchema getTableSchemaFromParquetFile(FilePath filePath) throws IOException {
MessageType messageType = getReadSchemaFromParquetFile(filePath);
RowTypeInfo schema = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType);
return new TableSchema(schema.getFieldNames(),schema.getFieldTypes());
}

;

public static MessageType readSchemaFromFile(FilePath filePath) throws IOException {

try (ParquetFileReader fileReader
= new ParquetFileReader(new ParquetInputFile(filePath), ParquetReadOptions.builder().build())) {
return fileReader.getFileMetaData().getSchema();
}
}

public static class ParquetInputFile implements InputFile {
private FilePath filePath;

public ParquetInputFile(FilePath filePath) throws IOException {
this.filePath = filePath;
}

@Override
public long getLength() throws IOException {
return filePath.getFileSystem().getFileStatus(filePath.getPath()).getLen();
}

@Override
public SeekableInputStream newStream() throws IOException {
return new MyDelegatingSeekableInputStream(filePath.getFileSystem().open(filePath.getPath()));
}

private static class MyDelegatingSeekableInputStream extends DelegatingSeekableInputStream {

private final FSDataInputStream fsDataInputStream;

public MyDelegatingSeekableInputStream(FSDataInputStream fsDataInputStream) {
super(fsDataInputStream);

this.fsDataInputStream = fsDataInputStream;
}

@Override
public long getPos() throws IOException {
return fsDataInputStream.getPos();
}

@Override
public void seek(long newPos) throws IOException {
fsDataInputStream.seek(newPos);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.alibaba.alink.common.io.parquet.plugin.ParquetSourceFactoryImpl
Loading

0 comments on commit f145b02

Please sign in to comment.