Skip to content

Commit

Permalink
[Kernel] Resolve some TODOs (delta-io#2143)
Browse files Browse the repository at this point in the history
## Description

Resolves some miscellaneous TODOs in the Kernel code including
- Moves `MixedDataType` to an internal package since it is a temporary type
- Change `INSTANCE` to `$TYPENAME` for all simple data types
- Move `singletonColumnVector` to internal utilities
- Remove logging to stdout for now
- Remove miscellaneous TODO comments that don't need to be in the code

## How was this patch tested?

Existing tests suffice.
  • Loading branch information
allisonport-db authored and xupefei committed Oct 31, 2023
1 parent fc15f52 commit 2e21220
Show file tree
Hide file tree
Showing 67 changed files with 380 additions and 400 deletions.
2 changes: 1 addition & 1 deletion kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public interface Scan {
* </ul></li>
* <li><ul>
* <li>name: {@code tableRoot}, type: {@code string}</li>
* <li>Description: Absolute path of the table location. TODO: this is temporary. Will
* <li>Description: Absolute path of the table location. NOTE: this is temporary. Will
* be removed in future. @see <a href=https://github.com/delta-io/delta/issues/2089>
* </a></li>
* </ul></li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public final class Literal implements Expression {
* @return a {@link Literal} of type {@link BooleanType}
*/
public static Literal ofBoolean(boolean value) {
return new Literal(value, BooleanType.INSTANCE);
return new Literal(value, BooleanType.BOOLEAN);
}

/**
Expand All @@ -59,7 +59,7 @@ public static Literal ofBoolean(boolean value) {
* @return a {@link Literal} of type {@link ByteType}
*/
public static Literal ofByte(byte value) {
return new Literal(value, ByteType.INSTANCE);
return new Literal(value, ByteType.BYTE);
}

/**
Expand All @@ -69,7 +69,7 @@ public static Literal ofByte(byte value) {
* @return a {@link Literal} of type {@link ShortType}
*/
public static Literal ofShort(short value) {
return new Literal(value, ShortType.INSTANCE);
return new Literal(value, ShortType.SHORT);
}

/**
Expand All @@ -79,7 +79,7 @@ public static Literal ofShort(short value) {
* @return a {@link Literal} of type {@link IntegerType}
*/
public static Literal ofInt(int value) {
return new Literal(value, IntegerType.INSTANCE);
return new Literal(value, IntegerType.INTEGER);
}

/**
Expand All @@ -89,7 +89,7 @@ public static Literal ofInt(int value) {
* @return a {@link Literal} of type {@link LongType}
*/
public static Literal ofLong(long value) {
return new Literal(value, LongType.INSTANCE);
return new Literal(value, LongType.LONG);
}

/**
Expand All @@ -99,7 +99,7 @@ public static Literal ofLong(long value) {
* @return a {@link Literal} of type {@link FloatType}
*/
public static Literal ofFloat(float value) {
return new Literal(value, FloatType.INSTANCE);
return new Literal(value, FloatType.FLOAT);
}

/**
Expand All @@ -109,7 +109,7 @@ public static Literal ofFloat(float value) {
* @return a {@link Literal} of type {@link DoubleType}
*/
public static Literal ofDouble(double value) {
return new Literal(value, DoubleType.INSTANCE);
return new Literal(value, DoubleType.DOUBLE);
}

/**
Expand All @@ -119,7 +119,7 @@ public static Literal ofDouble(double value) {
* @return a {@link Literal} of type {@link StringType}
*/
public static Literal ofString(String value) {
return new Literal(value, StringType.INSTANCE);
return new Literal(value, StringType.STRING);
}

/**
Expand All @@ -129,7 +129,7 @@ public static Literal ofString(String value) {
* @return a {@link Literal} of type {@link BinaryType}
*/
public static Literal ofBinary(byte[] value) {
return new Literal(value, BinaryType.INSTANCE);
return new Literal(value, BinaryType.BINARY);
}

/**
Expand All @@ -139,7 +139,7 @@ public static Literal ofBinary(byte[] value) {
* @return a {@link Literal} of type {@link DateType}
*/
public static Literal ofDate(int daysSinceEpochUTC) {
return new Literal(daysSinceEpochUTC, DateType.INSTANCE);
return new Literal(daysSinceEpochUTC, DateType.DATE);
}

/**
Expand All @@ -149,7 +149,7 @@ public static Literal ofDate(int daysSinceEpochUTC) {
* @return a {@link Literal} with data type {@link TimestampType}
*/
public static Literal ofTimestamp(long microsSinceEpochUTC) {
return new Literal(microsSinceEpochUTC, TimestampType.INSTANCE);
return new Literal(microsSinceEpochUTC, TimestampType.TIMESTAMP);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class InternalScanFileUtils {
private InternalScanFileUtils() {}

private static final String TABLE_ROOT_COL_NAME = "tableRoot";
private static final DataType TABLE_ROOT_DATA_TYPE = StringType.INSTANCE;
private static final DataType TABLE_ROOT_DATA_TYPE = StringType.STRING;
/**
* {@link Column} expression referring to the `partitionValues` in scan `add` file.
*/
Expand All @@ -58,7 +58,7 @@ private InternalScanFileUtils() {}

public static final StructType SCAN_FILE_SCHEMA = new StructType()
.add("add", AddFile.SCHEMA)
// TODO: table root is temporary, until the path in `add.path` is converted to
// NOTE: table root is temporary, until the path in `add.path` is converted to
// an absolute path. https://github.com/delta-io/delta/issues/2089
.add(TABLE_ROOT_COL_NAME, TABLE_ROOT_DATA_TYPE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
* Delta log action representing an `AddFile`
*/
public class AddFile {
// TODO: there are more optional fields in `AddFile` according to the spec. We will be adding
// Note: these are more optional fields in `AddFile` according to the spec. We will be adding
// them in read schema as we support the related features.
public static final StructType SCHEMA = new StructType()
.add("path", StringType.INSTANCE, false /* nullable */)
.add("path", StringType.STRING, false /* nullable */)
.add("partitionValues",
new MapType(StringType.INSTANCE, StringType.INSTANCE, true),
new MapType(StringType.STRING, StringType.STRING, true),
false /* nullable*/)
.add("size", LongType.INSTANCE, false /* nullable*/)
.add("modificationTime", LongType.INSTANCE, false /* nullable*/)
.add("dataChange", BooleanType.INSTANCE, false /* nullable*/)
.add("size", LongType.LONG, false /* nullable*/)
.add("modificationTime", LongType.LONG, false /* nullable*/)
.add("dataChange", BooleanType.BOOLEAN, false /* nullable*/)
.add("deletionVector", DeletionVectorDescriptor.READ_SCHEMA, true /* nullable */);
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ public static DeletionVectorDescriptor fromRow(Row row) {
public static final String UUID_DV_MARKER = "u";

public static final StructType READ_SCHEMA = new StructType()
.add("storageType", StringType.INSTANCE, false /* nullable*/)
.add("pathOrInlineDv", StringType.INSTANCE, false /* nullable*/)
.add("offset", IntegerType.INSTANCE, true /* nullable*/)
.add("sizeInBytes", IntegerType.INSTANCE, false /* nullable*/)
.add("cardinality", LongType.INSTANCE, false /* nullable*/);
.add("storageType", StringType.STRING, false /* nullable*/)
.add("pathOrInlineDv", StringType.STRING, false /* nullable*/)
.add("offset", IntegerType.INTEGER, true /* nullable*/)
.add("sizeInBytes", IntegerType.INTEGER, false /* nullable*/)
.add("cardinality", LongType.LONG, false /* nullable*/);

private static final Map<String, Integer> COL_NAME_TO_ORDINAL =
IntStream.range(0, READ_SCHEMA.length())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public static Format fromRow(Row row) {
}

public static final StructType READ_SCHEMA = new StructType()
.add("provider", StringType.INSTANCE, false /* nullable */)
.add("provider", StringType.STRING, false /* nullable */)
.add("options",
new MapType(StringType.INSTANCE, StringType.INSTANCE, false),
new MapType(StringType.STRING, StringType.STRING, false),
true /* nullable */
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ public static Metadata fromRow(Row row, TableClient tableClient) {
}

public static final StructType READ_SCHEMA = new StructType()
.add("id", StringType.INSTANCE, false /* nullable */)
.add("name", StringType.INSTANCE, true /* nullable */)
.add("description", StringType.INSTANCE, true /* nullable */)
.add("id", StringType.STRING, false /* nullable */)
.add("name", StringType.STRING, true /* nullable */)
.add("description", StringType.STRING, true /* nullable */)
.add("format", Format.READ_SCHEMA, false /* nullable */)
.add("schemaString", StringType.INSTANCE, false /* nullable */)
.add("schemaString", StringType.STRING, false /* nullable */)
.add("partitionColumns",
new ArrayType(StringType.INSTANCE, false /* contains null */),
new ArrayType(StringType.STRING, false /* contains null */),
false /* nullable */)
.add("createdTime", LongType.INSTANCE, true /* contains null */)
.add("createdTime", LongType.LONG, true /* contains null */)
.add("configuration",
new MapType(StringType.INSTANCE, StringType.INSTANCE, false),
new MapType(StringType.STRING, StringType.STRING, false),
false /* nullable */);

private final String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public static Protocol fromRow(Row row) {
}

public static final StructType READ_SCHEMA = new StructType()
.add("minReaderVersion", IntegerType.INSTANCE, false /* nullable */)
.add("minWriterVersion", IntegerType.INSTANCE, false /* nullable */)
.add("readerFeatures", new ArrayType(StringType.INSTANCE, false /* contains null */))
.add("writerFeatures", new ArrayType(StringType.INSTANCE, false /* contains null */));
.add("minReaderVersion", IntegerType.INTEGER, false /* nullable */)
.add("minWriterVersion", IntegerType.INTEGER, false /* nullable */)
.add("readerFeatures", new ArrayType(StringType.STRING, false /* contains null */))
.add("writerFeatures", new ArrayType(StringType.STRING, false /* contains null */));

private final int minReaderVersion;
private final int minWriterVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ public static CheckpointMetaData fromRow(Row row) {
);
}

// TODO: there are more optional fields
public static StructType READ_SCHEMA = new StructType()
.add("version", LongType.INSTANCE, false /* nullable */)
.add("size", LongType.INSTANCE, false /* nullable */)
.add("parts", LongType.INSTANCE);
.add("version", LongType.LONG, false /* nullable */)
.add("size", LongType.LONG, false /* nullable */)
.add("parts", LongType.LONG);

public final long version;
public final long size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,8 @@ public Optional<CheckpointMetaData> readLastCheckpointFile(TableClient tableClie
*/
private Optional<CheckpointMetaData> loadMetadataFromFile(TableClient tableClient) {
try {
// TODO: we have no way to get the file size and modification time within the api
// module. Should we have a client API for that or make use of the
// `FileSystemClient#listFrom`?
// For now we use file size = 0 and modification time = 0, in the future we should use
// listFrom to retrieve the real values see delta-io/delta#2140
FileStatus lastCheckpointFile = FileStatus.of(lastCheckpointFilePath.toString(), 0, 0);
JsonHandler jsonHandler = tableClient.getJsonHandler();
try (CloseableIterator<FileReadContext> fileReadContextIter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@
*/
public class ScanStateRow extends GenericRow {
private static final StructType SCHEMA = new StructType()
.add("configuration", new MapType(StringType.INSTANCE, StringType.INSTANCE, false))
.add("logicalSchemaString", StringType.INSTANCE)
.add("physicalSchemaString", StringType.INSTANCE)
.add("partitionColumns", new ArrayType(StringType.INSTANCE, false))
.add("minReaderVersion", IntegerType.INSTANCE)
.add("minWriterVersion", IntegerType.INSTANCE)
.add("tablePath", StringType.INSTANCE);
.add("configuration", new MapType(StringType.STRING, StringType.STRING, false))
.add("logicalSchemaString", StringType.STRING)
.add("physicalSchemaString", StringType.STRING)
.add("partitionColumns", new ArrayType(StringType.STRING, false))
.add("minReaderVersion", IntegerType.INTEGER)
.add("minWriterVersion", IntegerType.INTEGER)
.add("tablePath", StringType.STRING);

private static final Map<String, Integer> COL_NAME_TO_ORDINAL =
IntStream.range(0, SCHEMA.length())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public SelectionColumnVector(RoaringBitmapArray bitmap, ColumnVector rowIndices)

@Override
public DataType getDataType() {
return BooleanType.INSTANCE;
return BooleanType.BOOLEAN;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@

import static io.delta.kernel.internal.util.InternalUtils.checkArgument;

// TODO: add test suite
// If we implement additional methods (i.e. serialize) we can copy the test suite from delta-spark

/**
* A 64-bit extension of [[RoaringBitmap]] that is optimized for cases that usually fit within
* a 32-bit bitmap, but may run over by a few bits on occasion.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
* <p>
* Taken from https://github.com/apache/hadoop/blob/branch-3.3
* .4/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Path.java
* TODO: remove unused parts
*/
// TODO remove this class
public class Path
implements Comparable<Path>, Serializable, ObjectInputValidation {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private void prepareNext() {
.getEvaluator(
scanAddFiles.getSchema(),
Literal.ofString(tableRoot.toUri().toString()),
StringType.INSTANCE);
StringType.STRING);
}
ColumnVector tableRootVector = tableRootVectorGenerator.eval(scanAddFiles);
scanAddFiles = scanAddFiles.withNewColumn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class LogReplay implements Logging {
// the whether stats are needed or not: https://github.com/delta-io/delta/issues/1961
.add("add", AddFile.SCHEMA)
.add("remove", new StructType()
.add("path", StringType.INSTANCE, false /* nullable */)
.add("path", StringType.STRING, false /* nullable */)
.add("deletionVector", DeletionVectorDescriptor.READ_SCHEMA, true /* nullable */)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.types;
package io.delta.kernel.internal.types;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructType;

/**
* TODO:
* TODO: this is temporary
* This is a controversial data type to have, but we have no way to specify the schema
* of JSON serialized table schema. In order to use the
* {@link io.delta.kernel.client.JsonHandler#parseJson(ColumnVector, StructType)}, the Kernel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.DecimalType;
import io.delta.kernel.types.MapType;
import io.delta.kernel.types.MixedDataType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Utils;
import io.delta.kernel.utils.VectorUtils;
import io.delta.kernel.internal.util.InternalUtils;

/**
* Utility class to serialize and deserialize the table schema which is of type {@link StructType}.
Expand Down Expand Up @@ -190,7 +190,7 @@ private static <R> R parseAndEvalSingleRow(
String jsonString,
StructType outputSchema,
Function<Row, R> evalFunction) {
ColumnVector columnVector = Utils.singletonColumnVector(jsonString);
ColumnVector columnVector = InternalUtils.singletonStringColumnVector(jsonString);
ColumnarBatch result = jsonHandler.parseJson(columnVector, outputSchema);

assert result.getSize() == 1;
Expand All @@ -207,11 +207,11 @@ private static <R> R parseAndEvalSingleRow(
* Schema of the one member ({@link StructField}) in {@link StructType}.
*/
private static final StructType STRUCT_FIELD_SCHEMA = new StructType()
.add("name", StringType.INSTANCE)
.add("name", StringType.STRING)
.add("type", MixedDataType.INSTANCE) // Data type can be a string or a object.
.add("nullable", BooleanType.INSTANCE)
.add("nullable", BooleanType.BOOLEAN)
.add("metadata",
new MapType(StringType.INSTANCE, StringType.INSTANCE, false /* valueContainsNull */));
new MapType(StringType.STRING, StringType.STRING, false /* valueContainsNull */));

/**
* Schema of the serialized {@link StructType}.
Expand All @@ -238,9 +238,9 @@ private static <R> R parseAndEvalSingleRow(
*/
private static StructType ARRAY_TYPE_SCHEMA =
new StructType()
.add("type", StringType.INSTANCE)
.add("type", StringType.STRING)
.add("elementType", MixedDataType.INSTANCE)
.add("containsNull", BooleanType.INSTANCE);
.add("containsNull", BooleanType.BOOLEAN);

/**
* Example Map Type in serialized format
Expand All @@ -253,10 +253,10 @@ private static <R> R parseAndEvalSingleRow(
*/
private static StructType MAP_TYPE_SCHEMA =
new StructType()
.add("type", StringType.INSTANCE)
.add("type", StringType.STRING)
.add("keyType", MixedDataType.INSTANCE)
.add("valueType", MixedDataType.INSTANCE)
.add("valueContainsNull", BooleanType.INSTANCE);
.add("valueContainsNull", BooleanType.BOOLEAN);

private static Pattern DECIMAL_TYPE_PATTERN =
Pattern.compile("decimal\\(\\s*(?<precision>[0-9]+),\\s*(?<scale>[0-9]+)\\s*\\)");
Expand Down
Loading

0 comments on commit 2e21220

Please sign in to comment.