Skip to content

Commit

Permalink
expressions
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Sep 11, 2023
1 parent d9ba620 commit c43b7ba
Show file tree
Hide file tree
Showing 39 changed files with 2,070 additions and 869 deletions.
23 changes: 9 additions & 14 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@
import io.delta.kernel.client.FileReadContext;
import io.delta.kernel.client.ParquetHandler;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.DataReadResult;
import io.delta.kernel.data.FileDataReadResult;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.data.*;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.Tuple2;
import io.delta.kernel.utils.Utils;
import static io.delta.kernel.expressions.AlwaysTrue.ALWAYS_TRUE;

import io.delta.kernel.internal.actions.DeletionVectorDescriptor;
import io.delta.kernel.internal.data.AddFileColumnarBatch;
Expand All @@ -63,9 +59,9 @@ public interface Scan {
* Get the remaining filter that is not guaranteed to be satisfied for the data Delta Kernel
* returns. This filter is used by Delta Kernel to do data skipping when possible.
*
* @return the remaining filter as an {@link Expression}.
* @return the remaining filter as a {@link Predicate}.
*/
Optional<Expression> getRemainingFilter();
Optional<Predicate> getRemainingFilter();

/**
* Get the scan state associated with the current scan. This state is common across all
Expand All @@ -84,9 +80,8 @@ public interface Scan {
* @param scanFileRowIter an iterator of {@link Row}s. Each {@link Row} represents one scan file
* from the {@link ColumnarBatch} returned by
* {@link Scan#getScanFiles(TableClient)}
* @param filter An optional filter that can be used for data skipping while reading
* the
* scan files.
* @param predicate An optional predicate that can be used for data skipping while reading
* the scan files.
* @return Data read from the input scan files as an iterator of {@link DataReadResult}s. Each
* {@link DataReadResult} instance contains the data read and an optional selection
* vector that indicates data rows as valid or invalid. It is the responsibility of the
Expand All @@ -97,7 +92,7 @@ static CloseableIterator<DataReadResult> readData(
TableClient tableClient,
Row scanState,
CloseableIterator<Row> scanFileRowIter,
Optional<Expression> filter) throws IOException {
Optional<Predicate> predicate) throws IOException {
StructType physicalSchema = Utils.getPhysicalSchema(tableClient, scanState);
StructType logicalSchema = Utils.getLogicalSchema(tableClient, scanState);
List<String> partitionColumns = Utils.getPartitionColumns(scanState);
Expand All @@ -118,7 +113,7 @@ static CloseableIterator<DataReadResult> readData(
CloseableIterator<FileReadContext> filesReadContextsIter =
parquetHandler.contextualizeFileReads(
scanFileRowIter,
filter.orElse(Literal.TRUE));
predicate.orElse(ALWAYS_TRUE));

CloseableIterator<FileDataReadResult> data = parquetHandler.readParquetFiles(
filesReadContextsIter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.delta.kernel;

import io.delta.kernel.client.TableClient;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;

/**
Expand All @@ -30,10 +30,10 @@ public interface ScanBuilder {
* the given filter.
*
* @param tableClient {@link TableClient} instance to use in Delta Kernel.
* @param filter an {@link Expression} which evaluates to boolean.
* @param predicate a {@link Predicate} to prune the metadata or data.
* @return A {@link ScanBuilder} with filter applied.
*/
ScanBuilder withFilter(TableClient tableClient, Expression filter);
ScanBuilder withFilter(TableClient tableClient, Predicate predicate);

/**
* Apply the given <i>readSchema</i>. If the builder already has a projection applied, calling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.client;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.ExpressionEvaluator;
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructType;

/**
Expand All @@ -28,12 +29,18 @@
public interface ExpressionHandler {
/**
* Create an {@link ExpressionEvaluator} that can evaluate the given <i>expression</i> on
* {@link io.delta.kernel.data.ColumnarBatch}s with the given <i>batchSchema</i>.
* {@link ColumnarBatch}s with the given <i>batchSchema</i>. The <i>expression</i> is
* expected to be a scalar expression where for each one input row there
* is a one output value.
*
* @param batchSchema Schema of the input data.
* @param inputSchema Input data schema
* @param expression Expression to evaluate.
* @param outputType Expected result data type.
* @return An {@link ExpressionEvaluator} instance bound to the given expression and
* batchSchema.
* <i>inputSchema</i>.
*/
ExpressionEvaluator getEvaluator(StructType batchSchema, Expression expression);
ExpressionEvaluator getEvaluator(
StructType inputSchema,
Expression expression,
DataType outputType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package io.delta.kernel.client;

import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Expression;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.fs.FileStatus;
import io.delta.kernel.utils.CloseableIterator;

Expand Down Expand Up @@ -45,5 +45,5 @@ public interface FileHandler {
*/
CloseableIterator<FileReadContext> contextualizeFileReads(
CloseableIterator<Row> fileIter,
Expression predicate);
Predicate predicate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.expressions;

import java.util.Collections;

/**
* Evaluates {@code expr1} = {@code expr2} for {@code new EqualTo(expr1, expr2)}.
* Predicate which always evaluates to {@code false}.
*/
public final class EqualTo extends BinaryComparison implements Predicate {
public final class AlwaysFalse extends Predicate {
public static final AlwaysFalse ALWAYS_FALSE = new AlwaysFalse();

public EqualTo(Expression left, Expression right) {
super(left, right, "=");
}

@Override
protected Object nullSafeEval(Object leftResult, Object rightResult) {
return compare(leftResult, rightResult) == 0;
private AlwaysFalse() {
super("ALWAYS_FALSE", Collections.emptyList());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.expressions;

import java.util.Collections;
import java.util.List;
import java.util.Set;

/**
* An {@link Expression} with no children.
* Predicate which always evaluates to {@code true}.
*/
public abstract class LeafExpression implements Expression {

protected LeafExpression() {}

@Override
public List<Expression> children() {
return Collections.emptyList();
}
public final class AlwaysTrue extends Predicate {
public static final AlwaysTrue ALWAYS_TRUE = new AlwaysTrue();

@Override
public Set<String> references() {
return Collections.emptySet();
private AlwaysTrue() {
super("ALWAYS_TRUE", Collections.emptyList());
}

public abstract boolean equals(Object o);

public abstract int hashCode();
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,42 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.expressions;

import java.util.Collection;

import io.delta.kernel.types.BooleanType;
import java.util.Arrays;

/**
* Evaluates logical {@code expr1} AND {@code expr2} for {@code new And(expr1, expr2)}.
* {@code AND} expression
* <p>
* Definition:
* <p>
* Requires both left and right input expressions evaluate to booleans.
* <ul>
* <li>Logical {@code expr1} AND {@code expr2} on two inputs.</li>
* <li>Requires both left and right input expressions of type {@link Predicate}.</li>
* <li>Result is null at least one of the inputs is null.</li>
* </ul>
*/
public final class And extends BinaryOperator implements Predicate {

public static And apply(Collection<Expression> conjunctions) {
if (conjunctions.size() == 0) {
throw new IllegalArgumentException("And.apply must be called with at least 1 element");
}

return (And) conjunctions
.stream()
// we start off with And(true, true)
// then we get the 1st expression: And(And(true, true), expr1)
// then we get the 2nd expression: And(And(true, true), expr1), expr2) etc.
.reduce(new And(Literal.TRUE, Literal.TRUE), And::new);
public final class And extends Predicate {
public And(Predicate left, Predicate right) {
super("AND", Arrays.asList(left, right));
}

public And(Expression left, Expression right) {
super(left, right, "&&");
if (!(left.dataType() instanceof BooleanType) ||
!(right.dataType() instanceof BooleanType)) {
/**
* @return Left side operand.
*/
public Predicate getLeft() {
return (Predicate) getChildren().get(0);
}

throw new IllegalArgumentException(
String.format(
"'And' requires expressions of type boolean. Got %s and %s.",
left.dataType(),
right.dataType()
)
);
}
/**
* @return Right side operand.
*/
public Predicate getRight() {
return (Predicate) getChildren().get(1);
}

@Override
public Object nullSafeEval(Object leftResult, Object rightResult) {
return (boolean) leftResult && (boolean) rightResult;
public String toString() {
return "(" + getLeft() + " AND " + getRight() + ")";
}
}

This file was deleted.

Loading

0 comments on commit c43b7ba

Please sign in to comment.