Skip to content

Commit

Permalink
Round one, just add support code
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklan committed Oct 11, 2023
1 parent 2117d50 commit 5e8ab5d
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 11 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,8 @@ def flinkScalaVersion(scalaBinaryVersion: String): String = {

lazy val flink = (project in file("connectors/flink"))
.dependsOn(standaloneCosmetic % "provided")
.dependsOn(kernelApi)
.dependsOn(kernelDefaults)
.settings (
name := "delta-flink",
commonSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ protected String prepareInapplicableOptionMessage(
protected SourceSchema getSourceSchema() {
DeltaLog deltaLog =
DeltaLog.forTable(hadoopConfiguration, SourceUtils.pathToString(tablePath));
SnapshotSupplier snapshotSupplier = snapshotSupplierFactory.create(deltaLog);
SnapshotSupplier snapshotSupplier = snapshotSupplierFactory.create(deltaLog, hadoopConfiguration, tablePath);
Snapshot snapshot = snapshotSupplier.getSnapshot(sourceConfiguration);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import io.delta.standalone.DeltaLog;

import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;

public class BoundedSnapshotSupplierFactory implements SnapshotSupplierFactory {

@Override
public BoundedSourceSnapshotSupplier create(DeltaLog deltaLog) {
return new BoundedSourceSnapshotSupplier(deltaLog);
public BoundedSourceSnapshotSupplier create(DeltaLog deltaLog, Configuration configuration, Path tablePath) {
return new BoundedSourceSnapshotSupplier(deltaLog, configuration, tablePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Snapshot;

import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;

/**
* An implementation of {@link SnapshotSupplier} for {#link
* {@link org.apache.flink.api.connector.source.Boundedness#BOUNDED}}
* mode.
*/
public class BoundedSourceSnapshotSupplier extends SnapshotSupplier {

public BoundedSourceSnapshotSupplier(DeltaLog deltaLog) {
super(deltaLog);
public BoundedSourceSnapshotSupplier(DeltaLog deltaLog, Configuration configuration, Path tablePath) {
super(deltaLog, configuration, tablePath);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import io.delta.standalone.DeltaLog;

import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;

public class ContinuousSnapshotSupplierFactory implements SnapshotSupplierFactory {

@Override
public ContinuousSourceSnapshotSupplier create(DeltaLog deltaLog) {
return new ContinuousSourceSnapshotSupplier(deltaLog);
public ContinuousSourceSnapshotSupplier create(DeltaLog deltaLog, Configuration configuration, Path tablePath) {
return new ContinuousSourceSnapshotSupplier(deltaLog, configuration, tablePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Snapshot;

import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;

/**
* An implementation of {@link SnapshotSupplier} for {#link
* {@link org.apache.flink.api.connector.source.Boundedness#CONTINUOUS_UNBOUNDED}}
* mode.
*/
public class ContinuousSourceSnapshotSupplier extends SnapshotSupplier {

public ContinuousSourceSnapshotSupplier(DeltaLog deltaLog) {
super(deltaLog);
public ContinuousSourceSnapshotSupplier(DeltaLog deltaLog, Configuration configuration, Path tablePath) {
super(deltaLog, configuration, tablePath);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package io.delta.flink.source.internal.enumerator.supplier;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import io.delta.kernel.data.ColumnVector;
import io.delta.standalone.DeltaScan;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.actions.Metadata;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.data.RowRecord;
import io.delta.standalone.expressions.Expression;


/**
* Wrap a {@link io.delta.kernel.Snapshot} such that it conforms to the the {@link
* io.delta.standalone.Snapshot} interface.
*
* NB: Currently only supports the getMetadata and getVersion methods. All others throw exceptions.
*/

public class KernelSnapshotWrapper implements io.delta.standalone.Snapshot {

// Converting from kernelMetadata to metadata could be expensive, so don't do it until asked,
// and cache the result.
private Optional<Metadata> metadata = Optional.empty();
private io.delta.kernel.internal.SnapshotImpl kernelSnapshot;

protected KernelSnapshotWrapper(io.delta.kernel.internal.SnapshotImpl kernelSnapshot) {
this.kernelSnapshot = kernelSnapshot;
}

/**
* Converts the metadata in the kernel snapshot to a compatible Metadata type. The kernel
* uses more optional types, which are converted to `null` if they are `None` since that's what
* the standalone Metadata expects.
*/
private Metadata convertMetadata() {
io.delta.kernel.internal.actions.Metadata kernelMetadata = kernelSnapshot.getMetadata();

// Convert the format type
io.delta.kernel.internal.actions.Format kernelFormat = kernelMetadata.getFormat();
io.delta.standalone.actions.Format format = new io.delta.standalone.actions.Format(
kernelFormat.getProvider(),
java.util.Collections.emptyMap() // TODO: Kernel doesn't currently support options
);

// Convert the partition columns from a ColumnVector to a List<String>
ColumnVector partitionsVec = kernelMetadata.getPartitionColumns().getElements();
ArrayList<String> partitionColumns = new ArrayList<String>(partitionsVec.getSize());
for(int i = 0; i < partitionsVec.getSize(); i++) {
partitionColumns.add(partitionsVec.getString(i));
}

// Convert over the schema StructType
List<io.delta.kernel.types.StructField> kernelFields = kernelMetadata.getSchema().fields();
io.delta.standalone.types.StructField[] fields =
new io.delta.standalone.types.StructField[kernelFields.size()];
int index = 0;
for (io.delta.kernel.types.StructField kernelField: kernelFields) {
io.delta.standalone.types.FieldMetadata.Builder metadataBuilder =
io.delta.standalone.types.FieldMetadata.builder();
for (java.util.Map.Entry<String,String> entry : kernelField.getMetadata().entrySet()) {
metadataBuilder.putString(entry.getKey(), entry.getValue());
}
fields[index] = new io.delta.standalone.types.StructField(
kernelField.getName(),
io.delta.standalone.types.DataType.fromJson(
kernelField.toJson()
),
kernelField.isNullable(),
metadataBuilder.build()
);
index++;
}
io.delta.standalone.types.StructType schema =
new io.delta.standalone.types.StructType(fields);

return new Metadata(
kernelMetadata.getId(),
kernelMetadata.getName().orElse(null),
kernelMetadata.getDescription().orElse(null),
format,
partitionColumns,
kernelMetadata.getConfiguration(),
kernelMetadata.getCreatedTime(),
schema
);
}

/**
* @return the table metadata for this snapshot.
*
*/
@Override
public Metadata getMetadata() {
if (!metadata.isPresent()) {
metadata = Optional.of(convertMetadata());
}
return metadata.get();
}

/**
* @return the version for this snapshot
*/
@Override
public long getVersion() {
// WARNING: getVersion in SnapshotImpl currently doesn't use the table client, so we can
// pass null, but if this changes this code could break
return kernelSnapshot.getVersion(null);
}

/**
* NOT SUPPORTED
* @return a {@link DeltaScan} of the files in this snapshot.
*/
@Override
public DeltaScan scan() {
throw new UnsupportedOperationException("not supported");
}

/**
* NOT SUPPORTED
* @param predicate the predicate to be used to filter the files in this snapshot.
* @return a {@link DeltaScan} of the files in this snapshot matching the pushed portion of
* {@code predicate}
*/
@Override
public DeltaScan scan(Expression predicate) {
throw new UnsupportedOperationException("not supported");
}

/**
* NOT SUPPORTED
* @return all of the files present in this snapshot
*/
@Override
public List<AddFile> getAllFiles() {
throw new UnsupportedOperationException("not supported");
}

/**
* NOT SUPPORTED
* Creates a {@link CloseableIterator} which can iterate over data belonging to this snapshot.
* It provides no iteration ordering guarantee among data.
*
* @return a {@link CloseableIterator} to iterate over data
*/
@Override
public CloseableIterator<RowRecord> open() {
throw new UnsupportedOperationException("not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@
import io.delta.flink.internal.options.DeltaConnectorConfiguration;
import io.delta.flink.source.internal.utils.TransitiveOptional;

import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.defaults.client.DefaultTableClient;

import io.delta.standalone.DeltaLog;
import io.delta.standalone.Snapshot;

import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;

/**
* This class abstract's logic needed to acquirer Delta table {@link Snapshot} based on {@link
* DeltaConnectorConfiguration} and any other implementation specific logic.
Expand All @@ -16,9 +24,13 @@ public abstract class SnapshotSupplier {
* The {@link DeltaLog} instance that will be used to get the desire {@link Snapshot} instance.
*/
protected final DeltaLog deltaLog;
protected final Configuration configuration;
protected final Path tablePath;

protected SnapshotSupplier(DeltaLog deltaLog) {
protected SnapshotSupplier(DeltaLog deltaLog, Configuration configuration, Path tablePath) {
this.deltaLog = deltaLog;
this.configuration = configuration;
this.tablePath = tablePath;
}

/**
Expand All @@ -38,4 +50,25 @@ protected SnapshotSupplier(DeltaLog deltaLog) {
protected TransitiveOptional<Snapshot> getHeadSnapshot() {
return TransitiveOptional.ofNullable(deltaLog.snapshot());
}

/**
* A helper method that returns the latest {@link Snapshot} at moment when this method was
* called.
* <p>
* This uses delta-kernel-java to return a snapshot.
*
* NB: The snapshot returned here currently ONLY supports the getMetadata and getVersion
* functions. All other calls on the returned snaphot will throw an Exception
*/
protected TransitiveOptional<Snapshot> getHeadSnapshotViaKernel() {
TableClient client = DefaultTableClient.create(configuration);
try {
Table table = Table.forPath(client, tablePath.getPath());
io.delta.kernel.internal.SnapshotImpl kernelSnapshot =
(io.delta.kernel.internal.SnapshotImpl)table.getLatestSnapshot(client);
return TransitiveOptional.ofNullable(new KernelSnapshotWrapper(kernelSnapshot));
} catch (TableNotFoundException e) {
return TransitiveOptional.ofNullable(null);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.delta.flink.source.internal.enumerator.supplier;

import io.delta.standalone.DeltaLog;
import org.apache.flink.core.fs.Path;
import org.apache.hadoop.conf.Configuration;

public interface SnapshotSupplierFactory {

SnapshotSupplier create(DeltaLog deltaLog);
SnapshotSupplier create(DeltaLog deltaLog, Configuration configuration, Path tablePath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ public static Format fromRow(Row row) {
public Format(String provider) {
this.provider = provider;
}

public String getProvider() {
return provider;
}
}

0 comments on commit 5e8ab5d

Please sign in to comment.