Skip to content

Commit

Permalink
Assign base row ID to AddFile actions
Browse files Browse the repository at this point in the history
  • Loading branch information
qiyuandong-db committed Nov 21, 2024
1 parent b32c56a commit 54b77cc
Show file tree
Hide file tree
Showing 10 changed files with 568 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ public static ConcurrentWriteException concurrentDomainMetadataAction(
return new ConcurrentWriteException(message);
}

public static KernelException rowIDAssignmentWithoutStats() {
return new KernelException(
"Cannot assign baseRowId to add action. The number of records in this data file is missing.");
}

/* ------------------------ HELPER METHODS ----------------------------- */
private static String formatTimestamp(long millisSinceEpochUTC) {
return new Timestamp(millisSinceEpochUTC).toInstant().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ public Path getDataPath() {
return dataPath;
}

/**
* Returns the log replay object for this snapshot. Visible for testing.
*
* @return the {@link LogReplay} object
*/
public LogReplay getLogReplay() {
return logReplay;
}

/**
* Returns the timestamp of the latest commit of this snapshot. For an uninitialized snapshot,
* this returns -1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class TableFeatures {
add("typeWidening-preview");
add("typeWidening");
add(DOMAIN_METADATA_FEATURE_NAME);
add(ROW_TRACKING_FEATURE_NAME);
}
});

Expand All @@ -61,6 +62,9 @@ public class TableFeatures {
/** The feature name for domain metadata. */
public static final String DOMAIN_METADATA_FEATURE_NAME = "domainMetadata";

/** The feature name for row tracking. */
public static final String ROW_TRACKING_FEATURE_NAME = "rowTracking";

/** The minimum writer version required to support domain metadata. */
public static final int TABLE_FEATURES_MIN_WRITER_VERSION = 7;

Expand Down Expand Up @@ -100,7 +104,8 @@ public static void validateReadSupportedTable(
* <li>protocol writer version 1.
* <li>protocol writer version 2 only with appendOnly feature enabled.
* <li>protocol writer version 7 with {@code appendOnly}, {@code inCommitTimestamp}, {@code
* columnMapping}, {@code typeWidening}, {@code domainMetadata} feature enabled.
* columnMapping}, {@code typeWidening}, {@code domainMetadata}, {@code rowTracking} feature
* enabled.
* </ul>
*
* @param protocol Table protocol
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.ConflictChecker;
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.rowtracking.RowIDAssignmentResult;
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
Expand Down Expand Up @@ -69,10 +71,15 @@ public class TransactionImpl implements Transaction {
private final Optional<SetTransaction> setTxnOpt;
private final boolean shouldUpdateProtocol;
private final Clock clock;
private final List<DomainMetadata> domainMetadatas = new ArrayList<>();
private Metadata metadata;
private boolean shouldUpdateMetadata;

// Contains domain metadata actions known prior to iterating and writing the data actions
private final List<DomainMetadata> domainMetadatas = new ArrayList<>();
// Contains domain metadata actions generated on the fly while writing the data actions
private CloseableIterator<DomainMetadata> domainMetadataIter =
toCloseableIterator(Collections.emptyIterator());

private boolean closed; // To avoid trying to commit the same transaction again.

public TransactionImpl(
Expand Down Expand Up @@ -145,6 +152,16 @@ public TransactionCommitResult commit(Engine engine, CloseableIterable<Row> data
CommitInfo attemptCommitInfo = generateCommitAction(engine);
updateMetadataWithICTIfRequired(
engine, attemptCommitInfo.getInCommitTimestamp(), readSnapshot.getVersion(engine));

// For Row Tracking, we assign base row IDs to all AddFiles inside dataActions that
// do not have it yet. If the high water mark has changed, we also emit a
// DomainMetadata action with the new high water mark.
RowIDAssignmentResult rowIDAssignmentResult =
RowTracking.assignBaseRowId(protocol, readSnapshot, FULL_SCHEMA, dataActions);
dataActions = rowIDAssignmentResult.getDataActions();
domainMetadataIter =
domainMetadataIter.combine(rowIDAssignmentResult.getDomainMetadatasIter());

int numRetries = 0;
do {
logger.info("Committing transaction as version = {}.", commitAsVersion);
Expand Down Expand Up @@ -235,17 +252,14 @@ private TransactionCommitResult doCommit(
}
setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow())));

// Check for duplicate domain metadata and if the protocol supports
DomainMetadataUtils.validateDomainMetadatas(domainMetadatas, protocol);

domainMetadatas.forEach(
dm -> metadataActions.add(createDomainMetadataSingleAction(dm.toRow())));

try (CloseableIterator<Row> stageDataIter = dataActions.iterator()) {
// Create a new CloseableIterator that will return the metadata actions followed by the
// data actions.
// data actions, and then DomainMetadata actions. The order is crucial as DomainMetadata
// actions may depend on the data actions.
CloseableIterator<Row> dataAndMetadataActions =
toCloseableIterator(metadataActions.iterator()).combine(stageDataIter);
toCloseableIterator(metadataActions.iterator())
.combine(stageDataIter)
.combine(getDomainMetadataActions());

if (commitAsVersion == 0) {
// New table, create a delta log directory
Expand Down Expand Up @@ -329,6 +343,29 @@ private Map<String, String> getOperationParameters() {
return Collections.emptyMap();
}

/**
* Returns an iterator of all domain metadata actions to be committed. Domain metadata actions
* exist in two places:
*
* <ol>
* <li>{@code List<DomainMetadata> domainMetadatas}: Contains domain metadata actions known
* prior to iterating and writing the data actions.
* <li>{@code CloseableIterator<DomainMetadata> domainMetadataIter}`: Contains domain metadata
* actions generated on the fly while writing the data actions.
* </ol>
*
* This method combines both sources of domain metadata actions and returns an iterator of all
* domain metadata action rows to be committed.
*
* @return a {@link CloseableIterator} of domain metadata action rows
*/
private CloseableIterator<Row> getDomainMetadataActions() {
// TODO: Implement the check for duplicate domain metadata and if the protocol supports here
return toCloseableIterator(domainMetadatas.listIterator())
.combine(domainMetadataIter)
.map(domainMetadata -> createDomainMetadataSingleAction(domainMetadata.toRow()));
}

/**
* Get the part of the schema of the table that needs the statistics to be collected per file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@
package io.delta.kernel.internal.actions;

import static io.delta.kernel.internal.util.InternalUtils.relativizePath;
import static io.delta.kernel.internal.util.InternalUtils.requireNonNull;
import static io.delta.kernel.internal.util.PartitionUtils.serializePartitionMap;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;

import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.types.*;
import io.delta.kernel.utils.DataFileStatistics;
import io.delta.kernel.utils.DataFileStatus;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.IntStream;

/** Delta log action representing an `AddFile` */
Expand Down Expand Up @@ -55,7 +60,8 @@ public class AddFile {
.add(
"tags",
new MapType(StringType.STRING, StringType.STRING, true /* valueContainsNull */),
true /* nullable */);
true /* nullable */)
.add("baseRowId", LongType.LONG, true /* nullable */);

public static final StructType SCHEMA_WITH_STATS = SCHEMA_WITHOUT_STATS.add(JSON_STATS_FIELD);

Expand Down Expand Up @@ -93,4 +99,121 @@ public static Row convertDataFileStatus(
// any fields not present in the valueMap are considered null
return new GenericRow(FULL_SCHEMA, valueMap);
}

/**
* Extracts the fields of an `AddFile` from a {@link Row}.
*
* @param row the row to extract the fields from
* @return the extracted `AddFile`
*/
public static AddFile fromRow(Row row) {
if (row == null) {
return null;
}
assert row.getSchema().equals(FULL_SCHEMA);

return new AddFile(
requireNonNull(row, COL_NAME_TO_ORDINAL.get("path"), "path")
.getString(COL_NAME_TO_ORDINAL.get("path")),
requireNonNull(row, COL_NAME_TO_ORDINAL.get("partitionValues"), "partitionValues")
.getMap(COL_NAME_TO_ORDINAL.get("partitionValues")),
requireNonNull(row, COL_NAME_TO_ORDINAL.get("size"), "size")
.getLong(COL_NAME_TO_ORDINAL.get("size")),
requireNonNull(row, COL_NAME_TO_ORDINAL.get("modificationTime"), "modificationTime")
.getLong(COL_NAME_TO_ORDINAL.get("modificationTime")),
requireNonNull(row, COL_NAME_TO_ORDINAL.get("dataChange"), "dataChange")
.getBoolean(COL_NAME_TO_ORDINAL.get("dataChange")),
Optional.ofNullable(
row.isNullAt(COL_NAME_TO_ORDINAL.get("deletionVector"))
? null
: DeletionVectorDescriptor.fromRow(
row.getStruct(COL_NAME_TO_ORDINAL.get("deletionVector")))),
Optional.ofNullable(
row.isNullAt(COL_NAME_TO_ORDINAL.get("tags"))
? null
: row.getMap(COL_NAME_TO_ORDINAL.get("tags"))),
Optional.ofNullable(
row.isNullAt(COL_NAME_TO_ORDINAL.get("baseRowId"))
? null
: row.getLong(COL_NAME_TO_ORDINAL.get("baseRowId"))),
Optional.ofNullable(
row.isNullAt(COL_NAME_TO_ORDINAL.get("stats"))
? null
: DataFileStatistics.deserializeFromJson(
row.getString(COL_NAME_TO_ORDINAL.get("stats")))
.orElse(null)));
}

private final String path;
private final MapValue partitionValues;
private final long size;
private final long modificationTime;
private final boolean dataChange;
private final Optional<DeletionVectorDescriptor> deletionVector;
private final Optional<MapValue> tags;
private final Optional<Long> baseRowId;
private final Optional<DataFileStatistics> stats;

public AddFile(
String path,
MapValue partitionValues,
long size,
long modificationTime,
boolean dataChange,
Optional<DeletionVectorDescriptor> deletionVector,
Optional<MapValue> tags,
Optional<Long> baseRowId,
Optional<DataFileStatistics> stats) {
this.path = requireNonNull(path, "path is null");
this.partitionValues = requireNonNull(partitionValues, "partitionValues is null");
this.size = size;
this.modificationTime = modificationTime;
this.dataChange = dataChange;
this.deletionVector = deletionVector;
this.tags = tags;
this.baseRowId = baseRowId;
this.stats = stats;
}

public Row toRow() {
Map<Integer, Object> valueMap = new HashMap<>();
valueMap.put(COL_NAME_TO_ORDINAL.get("path"), path);
valueMap.put(COL_NAME_TO_ORDINAL.get("partitionValues"), partitionValues);
valueMap.put(COL_NAME_TO_ORDINAL.get("size"), size);
valueMap.put(COL_NAME_TO_ORDINAL.get("modificationTime"), modificationTime);
valueMap.put(COL_NAME_TO_ORDINAL.get("dataChange"), dataChange);
deletionVector.ifPresent(dv -> valueMap.put(COL_NAME_TO_ORDINAL.get("deletionVector"), dv));
tags.ifPresent(tags -> valueMap.put(COL_NAME_TO_ORDINAL.get("tags"), tags));
baseRowId.ifPresent(rowId -> valueMap.put(COL_NAME_TO_ORDINAL.get("baseRowId"), rowId));
stats.ifPresent(
stats -> valueMap.put(COL_NAME_TO_ORDINAL.get("stats"), stats.serializeAsJson()));
return new GenericRow(FULL_SCHEMA, valueMap);
}

public Optional<Long> getBaseRowId() {
return baseRowId;
}

public Optional<DataFileStatistics> getStats() {
return stats;
}

/**
* Creates a new AddFile instance with the specified base row ID.
*
* @param baseRowId the new base row ID to be assigned
* @return a new AddFile instance with the updated base row ID
*/
public AddFile withNewBaseRowId(long baseRowId) {
return new AddFile(
path,
partitionValues,
size,
modificationTime,
dataChange,
deletionVector,
tags,
Optional.of(baseRowId),
stats);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.rowtracking;

import io.delta.kernel.data.Row;
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.utils.CloseableIterable;
import io.delta.kernel.utils.CloseableIterator;

/**
* Contains the result of a base row ID assignment. Returned by {@link RowTracking#assignBaseRowId}
*
* @since 3.2.0
*/
public class RowIDAssignmentResult {
private final CloseableIterable<Row> dataActions;
private final CloseableIterator<DomainMetadata> domainMetadatasIter;

public RowIDAssignmentResult(
CloseableIterable<Row> dataActions, CloseableIterator<DomainMetadata> domainMetadatasIter) {
this.dataActions = dataActions;
this.domainMetadatasIter = domainMetadatasIter;
}

public CloseableIterable<Row> getDataActions() {
return dataActions;
}

public CloseableIterator<DomainMetadata> getDomainMetadatasIter() {
return domainMetadatasIter;
}
}
Loading

0 comments on commit 54b77cc

Please sign in to comment.