Skip to content

Commit

Permalink
[Kernel] Misc. cleanup of code in kernel-api module around scan fil…
Browse files Browse the repository at this point in the history
…e generation

## Description
Various refactorings which together reduce the code size by 1000+ lines.
* Change the `Scan.getScanFiles()` to return `FilteredColumnarBatch` instead of `ColumnarBatch`. The former has an additional selection vector which avoids rewriting the `ColumnarBatch`es generated by the readers of Delta log commit/checkpoint files. Before this change, there was a rewriting of `ColumnarBatch`es in `kernel-api` module.
* Remove the POJO based `ColumnarBatch` and `Row`. They are no longer needed.
* Create a `ScanFile` API class that contains the schema of the scan file rows returned by `Scan.getScanFiles`
  * Create an extension `InternalScanFile` for utility methods that are internal only to `kernel-api` module. 
* Clean up the `ScanStateRow` (move related APIs from `Utils.java` to `ScanStateRow.java`)
* Remove unneeded `Action` classes

## How was this patch tested?
Existing tests.
  • Loading branch information
vkorukanti authored Sep 22, 2023
1 parent c7a39da commit 87f80ce
Show file tree
Hide file tree
Showing 34 changed files with 518 additions and 1,563 deletions.
62 changes: 49 additions & 13 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Tuple2;
import io.delta.kernel.utils.Utils;
import static io.delta.kernel.expressions.AlwaysTrue.ALWAYS_TRUE;

import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
import io.delta.kernel.internal.data.AddFileColumnarBatch;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.internal.data.SelectionColumnVector;
import io.delta.kernel.internal.deletionvectors.DeletionVectorUtils;
Expand All @@ -54,10 +53,46 @@ public interface Scan {
* Get an iterator of data files to scan.
*
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @return iterator of {@link ColumnarBatch}s where each selected row in
* the batch corresponds to one scan file
* @return iterator of {@link FilteredColumnarBatch}s where each selected row in
* the batch corresponds to one scan file. Schema of each row is defined as follows:
* <p>
* <ol>
* <li><ul>
* <li>name: {@code add}, type: {@code struct}</li>
* <li>Description: Represents `AddFile` DeltaLog action</li>
* <li><ul>
* <li>name: {@code path}, type: {@code string}, description: location of the file.</li>
* <li>name: {@code partitionValues}, type: {@code map(string, string)},
* description: A map from partition column to value for this logical file. </li>
* <li>name: {@code size}, type: {@code log}, description: size of the file.</li>
* <li>name: {@code modificationTime}, type: {@code log}, description: the time this
* logical file was created, as milliseconds since the epoch.</li>
* <li>name: {@code dataChange}, type: {@code boolean}, description: When false the
* logical file must already be present in the table or the records in the added file
* must be contained in one or more remove actions in the same version</li>
* <li>name: {@code deletionVector}, type: {@code string}, description: Either null
* (or absent in JSON) when no DV is associated with this data file, or a struct
* (described below) that contains necessary information about the DV that is part of
* this logical file. For description of each member variable in `deletionVector` @see
* <a href=https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Deletion-Vectors>
* Protocol</a><ul>
* <li>name: {@code storageType}, type: {@code string}</li>
* <li>name: {@code pathOrInlineDv}, type: {@code string}</li>
* <li>name: {@code offset}, type: {@code log}</li>
* <li>name: {@code sizeInBytes}, type: {@code log}</li>
* <li>name: {@code cardinality}, type: {@code log}</li>
* </ul></li>
* </ul></li>
* </ul></li>
* <li><ul>
* <li>name: {@code tableRoot}, type: {@code string}</li>
* <li>Description: Absolute path of the table location. TODO: this is temporary. Will
* be removed in future. @see <a href=https://github.com/delta-io/delta/issues/2089>
* </a></li>
* </ul></li>
* </ol>
*/
CloseableIterator<ColumnarBatch> getScanFiles(TableClient tableClient);
CloseableIterator<FilteredColumnarBatch> getScanFiles(TableClient tableClient);

/**
* Get the remaining filter that is not guaranteed to be satisfied for the data Delta Kernel
Expand Down Expand Up @@ -97,9 +132,9 @@ static CloseableIterator<FilteredColumnarBatch> readData(
Row scanState,
CloseableIterator<Row> scanFileRowIter,
Optional<Predicate> predicate) throws IOException {
StructType physicalSchema = Utils.getPhysicalSchema(tableClient, scanState);
StructType logicalSchema = Utils.getLogicalSchema(tableClient, scanState);
List<String> partitionColumns = Utils.getPartitionColumns(scanState);
StructType physicalSchema = ScanStateRow.getPhysicalSchema(tableClient, scanState);
StructType logicalSchema = ScanStateRow.getLogicalSchema(tableClient, scanState);
List<String> partitionColumns = ScanStateRow.getPartitionColumns(scanState);
Set<String> partitionColumnsSet = new HashSet<>(partitionColumns);

StructType readSchemaWithoutPartitionColumns =
Expand All @@ -123,7 +158,7 @@ static CloseableIterator<FilteredColumnarBatch> readData(
filesReadContextsIter,
readSchema);

String tablePath = ScanStateRow.getTablePath(scanState);
String tablePath = ScanStateRow.getTableRoot(scanState);

return new CloseableIterator<FilteredColumnarBatch>() {
RoaringBitmapArray currBitmap = null;
Expand All @@ -144,8 +179,8 @@ public FilteredColumnarBatch next() {
FileDataReadResult fileDataReadResult = data.next();

Row scanFileRow = fileDataReadResult.getScanFileRow();
DeletionVectorDescriptor dv = DeletionVectorDescriptor.fromRow(
scanFileRow.getStruct(AddFileColumnarBatch.getDeletionVectorColOrdinal()));
DeletionVectorDescriptor dv =
InternalScanFileUtils.getDeletionVectorDescriptorFromRow(scanFileRow);

int rowIndexOrdinal = fileDataReadResult.getData().getSchema()
.indexOf(StructField.ROW_INDEX_COLUMN_NAME);
Expand Down Expand Up @@ -177,12 +212,13 @@ public FilteredColumnarBatch next() {
tableClient.getExpressionHandler(),
updatedBatch,
readSchemaWithoutPartitionColumns,
Utils.getPartitionValues(fileDataReadResult.getScanFileRow()),
InternalScanFileUtils
.getPartitionValues(fileDataReadResult.getScanFileRow()),
physicalSchema
);

// Change back to logical schema
String columnMappingMode = Utils.getColumnMappingMode(scanState);
String columnMappingMode = ScanStateRow.getColumnMappingMode(scanState);
switch (columnMappingMode) {
case "name":
updatedBatch = updatedBatch.withNewSchema(logicalSchema);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import io.delta.kernel.Scan;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.Row;
import io.delta.kernel.fs.FileStatus;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;

import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.fs.Path;

/**
* Utilities to extract information out of the scan file rows returned by
* {@link Scan#getScanFiles(TableClient)}.
*/
public class InternalScanFileUtils {
private InternalScanFileUtils() {}

private static final String TABLE_ROOT_COL_NAME = "tableRoot";
private static final DataType TABLE_ROOT_DATA_TYPE = StringType.INSTANCE;

public static StructField TABLE_ROOT_STRUCT_FIELD = new StructField(
TABLE_ROOT_COL_NAME,
TABLE_ROOT_DATA_TYPE,
false, /* nullable */
Collections.emptyMap());

public static final StructType SCAN_FILE_SCHEMA = new StructType()
.add("add", AddFile.SCHEMA)
// TODO: table root is temporary, until the path in `add.path` is converted to
// an absolute path. https://github.com/delta-io/delta/issues/2089
.add(TABLE_ROOT_COL_NAME, TABLE_ROOT_DATA_TYPE);

private static final int ADD_FILE_ORDINAL = SCAN_FILE_SCHEMA.indexOf("add");

private static final StructType ADD_FILE_SCHEMA =
(StructType) SCAN_FILE_SCHEMA.get("add").getDataType();

private static final int ADD_FILE_PATH_ORDINAL = ADD_FILE_SCHEMA.indexOf("path");

private static final int ADD_FILE_PARTITION_VALUES_ORDINAL =
ADD_FILE_SCHEMA.indexOf("partitionValues");

private static final int ADD_FILE_SIZE_ORDINAL = ADD_FILE_SCHEMA.indexOf("size");

private static final int ADD_FILE_MOD_TIME_ORDINAL =
ADD_FILE_SCHEMA.indexOf("modificationTime");

private static final int ADD_FILE_DATA_CHANGE_ORDINAL = ADD_FILE_SCHEMA.indexOf("dataChange");

private static final int ADD_FILE_DV_ORDINAL = ADD_FILE_SCHEMA.indexOf("deletionVector");

private static final int TABLE_ROOT_ORDINAL = SCAN_FILE_SCHEMA.indexOf(TABLE_ROOT_COL_NAME);

/**
* Get the {@link FileStatus} of {@code AddFile} from given scan file {@link Row}. The
* {@link FileStatus} contains file metadata about the file.
*
* @param scanFileInfo {@link Row} representing one scan file.
* @return a {@link FileStatus} object created from the given scan file row.
*/
public static FileStatus getAddFileStatus(Row scanFileInfo) {
Row addFile = getAddFileEntry(scanFileInfo);
String path = addFile.getString(ADD_FILE_PATH_ORDINAL);
Long size = addFile.getLong(ADD_FILE_SIZE_ORDINAL);
Long modificationTime = addFile.getLong(ADD_FILE_MOD_TIME_ORDINAL);

// TODO: this is hack until the path in `add.path` is converted to an absolute path
String tableRoot = scanFileInfo.getString(TABLE_ROOT_ORDINAL);
String absolutePath = new Path(tableRoot, path).toString();

return FileStatus.of(absolutePath, size, modificationTime);
}

/**
* Get the partition columns and values belonging to the {@code AddFile} from given scan file
* row.
*
* @param scanFileInfo {@link Row} representing one scan file.
* @return Map of partition column name to partition column value.
*/
public static Map<String, String> getPartitionValues(Row scanFileInfo) {
Row addFile = getAddFileEntry(scanFileInfo);
return addFile.getMap(ADD_FILE_PARTITION_VALUES_ORDINAL);
}

/**
* Helper method to get the {@code AddFile} struct from the scan file row.
*
* @param scanFileInfo
* @return {@link Row} representing the {@code AddFile}
* @throws IllegalArgumentException If the scan file row doesn't contain {@code add} file entry.
*/
protected static Row getAddFileEntry(Row scanFileInfo) {
if (scanFileInfo.isNullAt(ADD_FILE_ORDINAL)) {
throw new IllegalArgumentException("There is no `add` entry in the scan file row");
}
return scanFileInfo.getStruct(ADD_FILE_ORDINAL);
}

/**
* Create a scan file row conforming to the schema {@link #SCAN_FILE_SCHEMA} for
* given file status. This is used when creating the ScanFile row for reading commit or
* checkpoint files.
*
* @param fileStatus
* @return
*/
public static Row generateScanFileRow(FileStatus fileStatus) {
Row addFile = new GenericRow(
ADD_FILE_SCHEMA,
new HashMap<Integer, Object>() {
{
put(ADD_FILE_PATH_ORDINAL, fileStatus.getPath());
put(ADD_FILE_PARTITION_VALUES_ORDINAL, null); // partitionValues
put(ADD_FILE_SIZE_ORDINAL, fileStatus.getSize());
put(ADD_FILE_MOD_TIME_ORDINAL, fileStatus.getModificationTime());
put(ADD_FILE_DATA_CHANGE_ORDINAL, null); // dataChange
put(ADD_FILE_DV_ORDINAL, null); // deletionVector
}
});

return new GenericRow(
SCAN_FILE_SCHEMA,
new HashMap<Integer, Object>() {
{
put(ADD_FILE_ORDINAL, addFile);
put(TABLE_ROOT_ORDINAL, "/");
}
});
}

/**
* Create a {@link DeletionVectorDescriptor} from {@code add} entry in the given scan file row.
*
* @param scanFile {@link Row} representing one scan file.
* @return
*/
public static DeletionVectorDescriptor getDeletionVectorDescriptorFromRow(Row scanFile) {
Row addFile = getAddFileEntry(scanFile);
return DeletionVectorDescriptor.fromRow(addFile.getStruct(ADD_FILE_DV_ORDINAL));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import io.delta.kernel.Scan;
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
import io.delta.kernel.types.TimestampType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Tuple2;

import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
Expand All @@ -41,7 +41,7 @@ public class ScanBuilderImpl
implements ScanBuilder {

private final StructType snapshotSchema;
private final CloseableIterator<AddFile> filesIter;
private final CloseableIterator<FilteredColumnarBatch> filesIter;
private final Lazy<Tuple2<Protocol, Metadata>> protocolAndMetadata;
private final TableClient tableClient;
private final Path dataPath;
Expand All @@ -53,7 +53,7 @@ public ScanBuilderImpl(
Path dataPath,
Lazy<Tuple2<Protocol, Metadata>> protocolAndMetadata,
StructType snapshotSchema,
CloseableIterator<AddFile> filesIter,
CloseableIterator<FilteredColumnarBatch> filesIter,
TableClient tableClient) {
this.dataPath = dataPath;
this.snapshotSchema = snapshotSchema;
Expand Down
Loading

0 comments on commit 87f80ce

Please sign in to comment.