Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Resolve some TODOs #2143

Merged
merged 4 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public interface Scan {
* </ul></li>
* <li><ul>
* <li>name: {@code tableRoot}, type: {@code string}</li>
* <li>Description: Absolute path of the table location. TODO: this is temporary. Will
* <li>Description: Absolute path of the table location. NOTE: this is temporary. Will
* be removed in future. @see <a href=https://github.com/delta-io/delta/issues/2089>
* </a></li>
* </ul></li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public final class Literal implements Expression {
* @return a {@link Literal} of type {@link BooleanType}
*/
public static Literal ofBoolean(boolean value) {
return new Literal(value, BooleanType.INSTANCE);
return new Literal(value, BooleanType.BOOLEAN);
}

/**
Expand All @@ -59,7 +59,7 @@ public static Literal ofBoolean(boolean value) {
* @return a {@link Literal} of type {@link ByteType}
*/
public static Literal ofByte(byte value) {
return new Literal(value, ByteType.INSTANCE);
return new Literal(value, ByteType.BYTE);
}

/**
Expand All @@ -69,7 +69,7 @@ public static Literal ofByte(byte value) {
* @return a {@link Literal} of type {@link ShortType}
*/
public static Literal ofShort(short value) {
return new Literal(value, ShortType.INSTANCE);
return new Literal(value, ShortType.SHORT);
}

/**
Expand All @@ -79,7 +79,7 @@ public static Literal ofShort(short value) {
* @return a {@link Literal} of type {@link IntegerType}
*/
public static Literal ofInt(int value) {
return new Literal(value, IntegerType.INSTANCE);
return new Literal(value, IntegerType.INTEGER);
}

/**
Expand All @@ -89,7 +89,7 @@ public static Literal ofInt(int value) {
* @return a {@link Literal} of type {@link LongType}
*/
public static Literal ofLong(long value) {
return new Literal(value, LongType.INSTANCE);
return new Literal(value, LongType.LONG);
}

/**
Expand All @@ -99,7 +99,7 @@ public static Literal ofLong(long value) {
* @return a {@link Literal} of type {@link FloatType}
*/
public static Literal ofFloat(float value) {
return new Literal(value, FloatType.INSTANCE);
return new Literal(value, FloatType.FLOAT);
}

/**
Expand All @@ -109,7 +109,7 @@ public static Literal ofFloat(float value) {
* @return a {@link Literal} of type {@link DoubleType}
*/
public static Literal ofDouble(double value) {
return new Literal(value, DoubleType.INSTANCE);
return new Literal(value, DoubleType.DOUBLE);
}

/**
Expand All @@ -119,7 +119,7 @@ public static Literal ofDouble(double value) {
* @return a {@link Literal} of type {@link StringType}
*/
public static Literal ofString(String value) {
return new Literal(value, StringType.INSTANCE);
return new Literal(value, StringType.STRING);
}

/**
Expand All @@ -129,7 +129,7 @@ public static Literal ofString(String value) {
* @return a {@link Literal} of type {@link BinaryType}
*/
public static Literal ofBinary(byte[] value) {
return new Literal(value, BinaryType.INSTANCE);
return new Literal(value, BinaryType.BINARY);
}

/**
Expand All @@ -139,7 +139,7 @@ public static Literal ofBinary(byte[] value) {
* @return a {@link Literal} of type {@link DateType}
*/
public static Literal ofDate(int daysSinceEpochUTC) {
return new Literal(daysSinceEpochUTC, DateType.INSTANCE);
return new Literal(daysSinceEpochUTC, DateType.DATE);
}

/**
Expand All @@ -149,7 +149,7 @@ public static Literal ofDate(int daysSinceEpochUTC) {
* @return a {@link Literal} with data type {@link TimestampType}
*/
public static Literal ofTimestamp(long microsSinceEpochUTC) {
return new Literal(microsSinceEpochUTC, TimestampType.INSTANCE);
return new Literal(microsSinceEpochUTC, TimestampType.TIMESTAMP);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class InternalScanFileUtils {
private InternalScanFileUtils() {}

private static final String TABLE_ROOT_COL_NAME = "tableRoot";
private static final DataType TABLE_ROOT_DATA_TYPE = StringType.INSTANCE;
private static final DataType TABLE_ROOT_DATA_TYPE = StringType.STRING;
/**
* {@link Column} expression referring to the `partitionValues` in scan `add` file.
*/
Expand All @@ -58,7 +58,7 @@ private InternalScanFileUtils() {}

public static final StructType SCAN_FILE_SCHEMA = new StructType()
.add("add", AddFile.SCHEMA)
// TODO: table root is temporary, until the path in `add.path` is converted to
// NOTE: table root is temporary, until the path in `add.path` is converted to
// an absolute path. https://github.com/delta-io/delta/issues/2089
.add(TABLE_ROOT_COL_NAME, TABLE_ROOT_DATA_TYPE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
* Delta log action representing an `AddFile`
*/
public class AddFile {
// TODO: there are more optional fields in `AddFile` according to the spec. We will be adding
// Note: these are more optional fields in `AddFile` according to the spec. We will be adding
// them in read schema as we support the related features.
public static final StructType SCHEMA = new StructType()
.add("path", StringType.INSTANCE, false /* nullable */)
.add("path", StringType.STRING, false /* nullable */)
.add("partitionValues",
new MapType(StringType.INSTANCE, StringType.INSTANCE, true),
new MapType(StringType.STRING, StringType.STRING, true),
false /* nullable*/)
.add("size", LongType.INSTANCE, false /* nullable*/)
.add("modificationTime", LongType.INSTANCE, false /* nullable*/)
.add("dataChange", BooleanType.INSTANCE, false /* nullable*/)
.add("size", LongType.LONG, false /* nullable*/)
.add("modificationTime", LongType.LONG, false /* nullable*/)
.add("dataChange", BooleanType.BOOLEAN, false /* nullable*/)
.add("deletionVector", DeletionVectorDescriptor.READ_SCHEMA, true /* nullable */);
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ public static DeletionVectorDescriptor fromRow(Row row) {
public static final String UUID_DV_MARKER = "u";

public static final StructType READ_SCHEMA = new StructType()
.add("storageType", StringType.INSTANCE, false /* nullable*/)
.add("pathOrInlineDv", StringType.INSTANCE, false /* nullable*/)
.add("offset", IntegerType.INSTANCE, true /* nullable*/)
.add("sizeInBytes", IntegerType.INSTANCE, false /* nullable*/)
.add("cardinality", LongType.INSTANCE, false /* nullable*/);
.add("storageType", StringType.STRING, false /* nullable*/)
.add("pathOrInlineDv", StringType.STRING, false /* nullable*/)
.add("offset", IntegerType.INTEGER, true /* nullable*/)
.add("sizeInBytes", IntegerType.INTEGER, false /* nullable*/)
.add("cardinality", LongType.LONG, false /* nullable*/);

private static final Map<String, Integer> COL_NAME_TO_ORDINAL =
IntStream.range(0, READ_SCHEMA.length())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public static Format fromRow(Row row) {
}

public static final StructType READ_SCHEMA = new StructType()
.add("provider", StringType.INSTANCE, false /* nullable */)
.add("provider", StringType.STRING, false /* nullable */)
.add("options",
new MapType(StringType.INSTANCE, StringType.INSTANCE, false),
new MapType(StringType.STRING, StringType.STRING, false),
true /* nullable */
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ public static Metadata fromRow(Row row, TableClient tableClient) {
}

public static final StructType READ_SCHEMA = new StructType()
.add("id", StringType.INSTANCE, false /* nullable */)
.add("name", StringType.INSTANCE, true /* nullable */)
.add("description", StringType.INSTANCE, true /* nullable */)
.add("id", StringType.STRING, false /* nullable */)
.add("name", StringType.STRING, true /* nullable */)
.add("description", StringType.STRING, true /* nullable */)
.add("format", Format.READ_SCHEMA, false /* nullable */)
.add("schemaString", StringType.INSTANCE, false /* nullable */)
.add("schemaString", StringType.STRING, false /* nullable */)
.add("partitionColumns",
new ArrayType(StringType.INSTANCE, false /* contains null */),
new ArrayType(StringType.STRING, false /* contains null */),
false /* nullable */)
.add("createdTime", LongType.INSTANCE, true /* contains null */)
.add("createdTime", LongType.LONG, true /* contains null */)
.add("configuration",
new MapType(StringType.INSTANCE, StringType.INSTANCE, false),
new MapType(StringType.STRING, StringType.STRING, false),
false /* nullable */);

private final String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public static Protocol fromRow(Row row) {
}

public static final StructType READ_SCHEMA = new StructType()
.add("minReaderVersion", IntegerType.INSTANCE, false /* nullable */)
.add("minWriterVersion", IntegerType.INSTANCE, false /* nullable */)
.add("readerFeatures", new ArrayType(StringType.INSTANCE, false /* contains null */))
.add("writerFeatures", new ArrayType(StringType.INSTANCE, false /* contains null */));
.add("minReaderVersion", IntegerType.INTEGER, false /* nullable */)
.add("minWriterVersion", IntegerType.INTEGER, false /* nullable */)
.add("readerFeatures", new ArrayType(StringType.STRING, false /* contains null */))
.add("writerFeatures", new ArrayType(StringType.STRING, false /* contains null */));

private final int minReaderVersion;
private final int minWriterVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ public static CheckpointMetaData fromRow(Row row) {
);
}

// TODO: there are more optional fields
public static StructType READ_SCHEMA = new StructType()
.add("version", LongType.INSTANCE, false /* nullable */)
.add("size", LongType.INSTANCE, false /* nullable */)
.add("parts", LongType.INSTANCE);
.add("version", LongType.LONG, false /* nullable */)
.add("size", LongType.LONG, false /* nullable */)
.add("parts", LongType.LONG);

public final long version;
public final long size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,8 @@ public Optional<CheckpointMetaData> readLastCheckpointFile(TableClient tableClie
*/
private Optional<CheckpointMetaData> loadMetadataFromFile(TableClient tableClient) {
try {
// TODO: we have no way to get the file size and modification time within the api
// module. Should we have a client API for that or make use of the
// `FileSystemClient#listFrom`?
// For now we use file size = 0 and modification time = 0, in the future we should use
// listFrom to retrieve the real values see delta-io/delta#2140
FileStatus lastCheckpointFile = FileStatus.of(lastCheckpointFilePath.toString(), 0, 0);
JsonHandler jsonHandler = tableClient.getJsonHandler();
try (CloseableIterator<FileReadContext> fileReadContextIter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@
*/
public class ScanStateRow extends GenericRow {
private static final StructType SCHEMA = new StructType()
.add("configuration", new MapType(StringType.INSTANCE, StringType.INSTANCE, false))
.add("logicalSchemaString", StringType.INSTANCE)
.add("physicalSchemaString", StringType.INSTANCE)
.add("partitionColumns", new ArrayType(StringType.INSTANCE, false))
.add("minReaderVersion", IntegerType.INSTANCE)
.add("minWriterVersion", IntegerType.INSTANCE)
.add("tablePath", StringType.INSTANCE);
.add("configuration", new MapType(StringType.STRING, StringType.STRING, false))
.add("logicalSchemaString", StringType.STRING)
.add("physicalSchemaString", StringType.STRING)
.add("partitionColumns", new ArrayType(StringType.STRING, false))
.add("minReaderVersion", IntegerType.INTEGER)
.add("minWriterVersion", IntegerType.INTEGER)
.add("tablePath", StringType.STRING);

private static final Map<String, Integer> COL_NAME_TO_ORDINAL =
IntStream.range(0, SCHEMA.length())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public SelectionColumnVector(RoaringBitmapArray bitmap, ColumnVector rowIndices)

@Override
public DataType getDataType() {
return BooleanType.INSTANCE;
return BooleanType.BOOLEAN;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@

import static io.delta.kernel.internal.util.InternalUtils.checkArgument;

// TODO: add test suite
// If we implement additional methods (i.e. serialize) we can copy the test suite from delta-spark

/**
* A 64-bit extension of [[RoaringBitmap]] that is optimized for cases that usually fit within
* a 32-bit bitmap, but may run over by a few bits on occasion.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
* <p>
* Taken from https://github.com/apache/hadoop/blob/branch-3.3
* .4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java
* TODO: remove unused parts
*/
// TODO remove this class
public class Path
implements Comparable<Path>, Serializable, ObjectInputValidation {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private void prepareNext() {
.getEvaluator(
scanAddFiles.getSchema(),
Literal.ofString(tableRoot.toUri().toString()),
StringType.INSTANCE);
StringType.STRING);
}
ColumnVector tableRootVector = tableRootVectorGenerator.eval(scanAddFiles);
scanAddFiles = scanAddFiles.withNewColumn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class LogReplay implements Logging {
// the whether stats are needed or not: https://github.com/delta-io/delta/issues/1961
.add("add", AddFile.SCHEMA)
.add("remove", new StructType()
.add("path", StringType.INSTANCE, false /* nullable */)
.add("path", StringType.STRING, false /* nullable */)
.add("deletionVector", DeletionVectorDescriptor.READ_SCHEMA, true /* nullable */)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.types;
package io.delta.kernel.internal.types;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructType;

/**
* TODO:
* TODO: this is temporary
* This is a controversial data type to have, but we have no way to specify the schema
* of JSON serialized table schema. In order to use the
* {@link io.delta.kernel.client.JsonHandler#parseJson(ColumnVector, StructType)}, the Kernel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.DecimalType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.MixedDataType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Utils;
import io.delta.kernel.utils.VectorUtils;
import io.delta.kernel.internal.util.InternalUtils;

/**
* Utility class to serialize and deserialize the table schema which is of type {@link StructType}.
Expand Down Expand Up @@ -190,7 +190,7 @@ private static <R> R parseAndEvalSingleRow(
String jsonString,
StructType outputSchema,
Function<Row, R> evalFunction) {
ColumnVector columnVector = Utils.singletonColumnVector(jsonString);
ColumnVector columnVector = InternalUtils.singletonStringColumnVector(jsonString);
ColumnarBatch result = jsonHandler.parseJson(columnVector, outputSchema);

assert result.getSize() == 1;
Expand All @@ -207,11 +207,11 @@ private static <R> R parseAndEvalSingleRow(
* Schema of the one member ({@link StructField}) in {@link StructType}.
*/
private static final StructType STRUCT_FIELD_SCHEMA = new StructType()
.add("name", StringType.INSTANCE)
.add("name", StringType.STRING)
.add("type", MixedDataType.INSTANCE) // Data type can be a string or a object.
.add("nullable", BooleanType.INSTANCE)
.add("nullable", BooleanType.BOOLEAN)
.add("metadata",
new MapType(StringType.INSTANCE, StringType.INSTANCE, false /* valueContainsNull */));
new MapType(StringType.STRING, StringType.STRING, false /* valueContainsNull */));

/**
* Schema of the serialized {@link StructType}.
Expand All @@ -238,9 +238,9 @@ private static <R> R parseAndEvalSingleRow(
*/
private static StructType ARRAY_TYPE_SCHEMA =
new StructType()
.add("type", StringType.INSTANCE)
.add("type", StringType.STRING)
.add("elementType", MixedDataType.INSTANCE)
.add("containsNull", BooleanType.INSTANCE);
.add("containsNull", BooleanType.BOOLEAN);

/**
* Example Map Type in serialized format
Expand All @@ -253,10 +253,10 @@ private static <R> R parseAndEvalSingleRow(
*/
private static StructType MAP_TYPE_SCHEMA =
new StructType()
.add("type", StringType.INSTANCE)
.add("type", StringType.STRING)
.add("keyType", MixedDataType.INSTANCE)
.add("valueType", MixedDataType.INSTANCE)
.add("valueContainsNull", BooleanType.INSTANCE);
.add("valueContainsNull", BooleanType.BOOLEAN);

private static Pattern DECIMAL_TYPE_PATTERN =
Pattern.compile("decimal\\(\\s*(?<precision>[0-9]+),\\s*(?<scale>[0-9]+)\\s*\\)");
Expand Down
Loading
Loading