Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Refactor expressions #1997

Merged
merged 4 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,14 @@
import io.delta.kernel.client.FileReadContext;
import io.delta.kernel.client.ParquetHandler;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.DataReadResult;
import io.delta.kernel.data.FileDataReadResult;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.data.*;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Tuple2;
import io.delta.kernel.utils.Utils;
import static io.delta.kernel.expressions.AlwaysTrue.ALWAYS_TRUE;

import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
import io.delta.kernel.internal.data.AddFileColumnarBatch;
Expand Down Expand Up @@ -67,9 +63,9 @@ public interface Scan {
* Get the remaining filter that is not guaranteed to be satisfied for the data Delta Kernel
* returns. This filter is used by Delta Kernel to do data skipping when possible.
*
* @return the remaining filter as an {@link Expression}.
* @return the remaining filter as a {@link Predicate}.
*/
Optional<Expression> getRemainingFilter();
Optional<Predicate> getRemainingFilter();

/**
* Get the scan state associated with the current scan. This state is common across all
Expand All @@ -88,9 +84,8 @@ public interface Scan {
* @param scanFileRowIter an iterator of {@link Row}s. Each {@link Row} represents one scan file
* from the {@link ColumnarBatch} returned by
* {@link Scan#getScanFiles(TableClient)}
* @param filter An optional filter that can be used for data skipping while reading
* the
* scan files.
* @param predicate An optional predicate that can be used for data skipping while reading
* the scan files.
* @return Data read from the input scan files as an iterator of {@link DataReadResult}s. Each
* {@link DataReadResult} instance contains the data read and an optional selection
* vector that indicates data rows as valid or invalid. It is the responsibility of the
Expand All @@ -101,7 +96,7 @@ static CloseableIterator<DataReadResult> readData(
TableClient tableClient,
Row scanState,
CloseableIterator<Row> scanFileRowIter,
Optional<Expression> filter) throws IOException {
Optional<Predicate> predicate) throws IOException {
StructType physicalSchema = Utils.getPhysicalSchema(tableClient, scanState);
StructType logicalSchema = Utils.getLogicalSchema(tableClient, scanState);
List<String> partitionColumns = Utils.getPartitionColumns(scanState);
Expand All @@ -122,7 +117,7 @@ static CloseableIterator<DataReadResult> readData(
CloseableIterator<FileReadContext> filesReadContextsIter =
parquetHandler.contextualizeFileReads(
scanFileRowIter,
filter.orElse(Literal.TRUE));
predicate.orElse(ALWAYS_TRUE));

CloseableIterator<FileDataReadResult> data = parquetHandler.readParquetFiles(
filesReadContextsIter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;

/**
Expand All @@ -34,10 +34,10 @@ public interface ScanBuilder {
* the given filter.
*
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @param filter an {@link Expression} which evaluates to boolean.
* @param predicate a {@link Predicate} to prune the metadata or data.
* @return A {@link ScanBuilder} with filter applied.
*/
ScanBuilder withFilter(TableClient tableClient, Expression filter);
ScanBuilder withFilter(TableClient tableClient, Predicate predicate);

/**
* Apply the given <i>readSchema</i>. If the builder already has a projection applied, calling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.client;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.ExpressionEvaluator;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructType;

/**
Expand All @@ -32,12 +33,16 @@
public interface ExpressionHandler {
/**
* Create an {@link ExpressionEvaluator} that can evaluate the given <i>expression</i> on
* {@link io.delta.kernel.data.ColumnarBatch}s with the given <i>batchSchema</i>.
* {@link ColumnarBatch}s with the given <i>batchSchema</i>. The <i>expression</i> is
* expected to be a scalar expression where for each one input row there
* is a one output value.
vkorukanti marked this conversation as resolved.
Show resolved Hide resolved
*
* @param batchSchema Schema of the input data.
* @param inputSchema Input data schema
* @param expression Expression to evaluate.
* @return An {@link ExpressionEvaluator} instance bound to the given expression and
* batchSchema.
* @param outputType Expected result data type.
*/
ExpressionEvaluator getEvaluator(StructType batchSchema, Expression expression);
ExpressionEvaluator getEvaluator(
StructType inputSchema,
Expression expression,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be ScalarExpression if we're adding that as an an expression type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok to update this, but we need to start adding more methods to the interface if we end up adding aggregation expressions or introduce another interface between Expression and ScalarExpression

@tdas let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, its likely we have to for aggregate expressions... because for aggregate .. would ExpressionsEvaluator produce ColumnVector?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it makes sense to make this a ScalaExpression

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently Column and Literal are not derived from ScalarExpression. Should we make them extend ScalarExpression before making this change? Otherwise we cannot populate partition values.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me, seems like literal and column should be scalar expressions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Literal and Column are leaf expressions which don't take any input expressions. There isn't a one input and output concept here. Also Spark DS v2 doesn't extend Literal and Column from ScalarExpression.

I am inclined to keep it this way. If there is a strong reason, I will followup the change in a separate PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay good to merge. Let's follow up on it with @tdas next week?

DataType outputType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.fs.FileStatus;
import io.delta.kernel.utils.CloseableIterator;

Expand Down Expand Up @@ -49,5 +49,5 @@ public interface FileHandler {
*/
CloseableIterator<FileReadContext> contextualizeFileReads(
CloseableIterator<Row> fileIter,
Expression predicate);
Predicate predicate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.expressions;

import java.util.Collections;

import io.delta.kernel.annotation.Evolving;

/**
* Evaluates {@code expr1} = {@code expr2} for {@code new EqualTo(expr1, expr2)}.
* Predicate which always evaluates to {@code false}.
*
* @since 3.0.0
*/
public final class EqualTo extends BinaryComparison implements Predicate {
@Evolving
public final class AlwaysFalse extends Predicate {
public static final AlwaysFalse ALWAYS_FALSE = new AlwaysFalse();

public EqualTo(Expression left, Expression right) {
super(left, right, "=");
}

@Override
protected Object nullSafeEval(Object leftResult, Object rightResult) {
return compare(leftResult, rightResult) == 0;
private AlwaysFalse() {
super("ALWAYS_FALSE", Collections.emptyList());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.expressions;

import java.util.Collections;
import java.util.List;
import java.util.Set;

import io.delta.kernel.annotation.Evolving;

/**
* An {@link Expression} with no children.
* Predicate which always evaluates to {@code true}.
*
* @since 3.0.0
*/
public abstract class LeafExpression implements Expression {

protected LeafExpression() {}
@Evolving
public final class AlwaysTrue extends Predicate {
public static final AlwaysTrue ALWAYS_TRUE = new AlwaysTrue();

@Override
public List<Expression> children() {
return Collections.emptyList();
private AlwaysTrue() {
super("ALWAYS_TRUE", Collections.emptyList());
}

@Override
public Set<String> references() {
return Collections.emptySet();
}

public abstract boolean equals(Object o);

public abstract int hashCode();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,47 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.expressions;

import java.util.Collection;
import java.util.Arrays;

import io.delta.kernel.types.BooleanType;
import io.delta.kernel.annotation.Evolving;

/**
* Evaluates logical {@code expr1} AND {@code expr2} for {@code new And(expr1, expr2)}.
* {@code AND} expression
* <p>
* Definition:
* <p>
* Requires both left and right input expressions evaluate to booleans.
* <ul>
* <li>Logical {@code expr1} AND {@code expr2} on two inputs.</li>
* <li>Requires both left and right input expressions of type {@link Predicate}.</li>
* <li>Result is null at least one of the inputs is null.</li>
* </ul>
*
* @since 3.0.0
*/
public final class And extends BinaryOperator implements Predicate {

public static And apply(Collection<Expression> conjunctions) {
if (conjunctions.size() == 0) {
throw new IllegalArgumentException("And.apply must be called with at least 1 element");
}

return (And) conjunctions
.stream()
// we start off with And(true, true)
// then we get the 1st expression: And(And(true, true), expr1)
// then we get the 2nd expression: And(And(true, true), expr1), expr2) etc.
.reduce(new And(Literal.TRUE, Literal.TRUE), And::new);
@Evolving
public final class And extends Predicate {
public And(Predicate left, Predicate right) {
super("AND", Arrays.asList(left, right));
}

public And(Expression left, Expression right) {
super(left, right, "&&");
if (!(left.dataType() instanceof BooleanType) ||
!(right.dataType() instanceof BooleanType)) {
/**
* @return Left side operand.
*/
public Predicate getLeft() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this fit best inside of a BinaryPredicate class?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are trying to minimize the number of interfaces. The goal is to have just one Predicate class that captures all the predicates. And and Or are just two special expressions.

return (Predicate) getChildren().get(0);
}

throw new IllegalArgumentException(
String.format(
"'And' requires expressions of type boolean. Got %s and %s.",
left.dataType(),
right.dataType()
)
);
}
/**
* @return Right side operand.
*/
public Predicate getRight() {
return (Predicate) getChildren().get(1);
}

@Override
public Object nullSafeEval(Object leftResult, Object rightResult) {
return (boolean) leftResult && (boolean) rightResult;
public String toString() {
return "(" + getLeft() + " AND " + getRight() + ")";
}
}

This file was deleted.

Loading
Loading