Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporary tables #2962

Merged
merged 18 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ with the new code.
* **Performance** Improvement 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Performance** Improvement 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Support temporary table planning and execution [(Issue #2962)](https://github.com/FoundationDB/fdb-record-layer/pull/2962)
* **Feature** Feature 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,13 @@ public Object getBinding(@Nonnull String name) {
/**
* Get the value bound to a single parameter.
*
* @param type the type of the parameter
* @param alias the correlation identifier
*
* @return the value bound to the given parameter
*/
public Object getBinding(@Nonnull CorrelationIdentifier alias) {
return bindings.get(Bindings.Internal.CORRELATION.bindingName(alias.getId()));
public Object getBinding(@Nonnull final Bindings.Internal type, @Nonnull final CorrelationIdentifier alias) {
return bindings.get(type.bindingName(alias.getId()));
}

/**
Expand Down Expand Up @@ -201,11 +203,13 @@ public EvaluationContext withBinding(@Nonnull String bindingName, @Nullable Obje
* context included all bindings except that it will bind an additional
* parameter to an additional value.
*
* @param type the type of the binding.
* @param alias the alias determining the binding name to add
* @param value the value to bind the name to
*
* @return a new <code>EvaluationContext</code> with the new binding
*/
public EvaluationContext withBinding(@Nonnull CorrelationIdentifier alias, @Nullable Object value) {
return childBuilder().setBinding(Bindings.Internal.CORRELATION.bindingName(alias.getId()), value).build(typeRepository);
public EvaluationContext withBinding(final Bindings.Internal type, @Nonnull CorrelationIdentifier alias, @Nullable Object value) {
return childBuilder().setBinding(type.bindingName(alias.getId()), value).build(typeRepository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.apple.foundationdb.record.cursors.aggregate;

import com.apple.foundationdb.record.Bindings;
import com.apple.foundationdb.record.EvaluationContext;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreBase;
Expand Down Expand Up @@ -183,13 +184,13 @@ private void finalizeGroup(Object nextGroup) {
}

private void accumulate(@Nullable Object currentObject) {
EvaluationContext nestedContext = context.withBinding(alias, currentObject);
EvaluationContext nestedContext = context.withBinding(Bindings.Internal.CORRELATION, alias, currentObject);
final Object partial = aggregateValue.evalToPartial(store, nestedContext);
accumulator.accumulate(partial);
}

private Object evalGroupingKey(@Nullable final Object currentObject) {
final EvaluationContext nestedContext = context.withBinding(alias, currentObject);
final EvaluationContext nestedContext = context.withBinding(Bindings.Internal.CORRELATION, alias, currentObject);
return Objects.requireNonNull(groupingKeyValue).eval(store, nestedContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.apple.foundationdb.record.query.plan.plans.RecordQueryInValuesJoinPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryIndexPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryInsertPlan;
import com.apple.foundationdb.record.query.plan.plans.TempTableInsertPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryIntersectionOnKeyExpressionPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryIntersectionOnValuesPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryIntersectionPlan;
Expand All @@ -58,6 +59,7 @@
import com.apple.foundationdb.record.query.plan.plans.RecordQueryScoreForRankPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQuerySelectorPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryStreamingAggregationPlan;
import com.apple.foundationdb.record.query.plan.plans.TempTableScanPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryTextIndexPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryTypeFilterPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryUnionOnKeyExpressionPlan;
Expand Down Expand Up @@ -403,6 +405,14 @@ public PlanStringRepresentation visitInsertPlan(@Nonnull RecordQueryInsertPlan e
.append(element.getTargetRecordType());
}

@Nonnull
@Override
public PlanStringRepresentation visitTempTableInsertPlan(@Nonnull final TempTableInsertPlan element) {
return visit(element.getChild())
.append(" | TEMP TABLE INSERT INTO ")
.append(element.getTargetRecordType());
}

@Nonnull
private PlanStringRepresentation visitIntersectionPlan(@Nonnull RecordQueryIntersectionPlan element) {
return appendItems(element.getChildren(), " ∩ ");
Expand Down Expand Up @@ -577,6 +587,14 @@ public PlanStringRepresentation visitSortPlan(@Nonnull RecordQuerySortPlan eleme
.append(element.getKey());
}

@Nonnull
@Override
public PlanStringRepresentation visitTempTableScanPlan(@Nonnull final TempTableScanPlan element) {
return append("TEMP TABLE SCAN ([")
.append(element.getResultValue())
.append("])");
}

@Nonnull
@Override
public PlanStringRepresentation visitDefault(@Nonnull RecordQueryPlan element) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementInJoinRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementInUnionRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementInsertRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementTempTableInsertRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementIntersectionRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementNestedLoopJoinRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementPhysicalScanRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementSimpleSelectRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementStreamingAggregationRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementTempTableScanRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementTypeFilterRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementUniqueRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementUnorderedUnionRule;
Expand Down Expand Up @@ -136,6 +138,7 @@ public class PlannerRuleSet {
);

private static final List<CascadesRule<? extends RelationalExpression>> IMPLEMENTATION_RULES = ImmutableList.of(
new ImplementTempTableScanRule(),
new ImplementTypeFilterRule(),
new ImplementFilterRule(),
new PushTypeFilterBelowFilterRule(),
Expand Down Expand Up @@ -169,6 +172,7 @@ public class PlannerRuleSet {
new ImplementStreamingAggregationRule(),
new ImplementDeleteRule(),
new ImplementInsertRule(),
new ImplementTempTableInsertRule(),
normen662 marked this conversation as resolved.
Show resolved Hide resolved
new ImplementUpdateRule()
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* TempTable.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.record.query.plan.cascades;

import com.apple.foundationdb.record.RecordCoreException;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordMetaDataProto;
import com.apple.foundationdb.record.cursors.ListCursor;
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.query.plan.plans.QueryResult;
import com.apple.foundationdb.tuple.ByteArrayUtil2;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ZeroCopyByteString;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

/**
* A mutable, temporary, serializable, and in-memory buffer of {@link QueryResult}s. It is aimed to be used as a temporary
* placeholder for computation results produced by some physical operator, but can be leveraged to represent, for example,
* a SQL temporary tables as well.
*/
public class TempTable {

@Nonnull
private final Queue<QueryResult> underlyingBuffer;

private TempTable() {
this(new LinkedList<>());
}

private TempTable(@Nonnull Queue<QueryResult> buffer) {
this.underlyingBuffer = buffer;
}

/**
* Add a new {@link QueryResult} element to the queue.
* @param element the new element to be added.
*/
public void add(@Nonnull QueryResult element) {
underlyingBuffer.add(element);
}

/**
* Add a new {@link QueryResult} elements to the queue.
* @param elements the new elements to be added.
*/
public void add(@Nonnull QueryResult... elements) {
Arrays.stream(elements).forEach(this::add);
}


@Nonnull
public Queue<QueryResult> getReadBuffer() {
return underlyingBuffer;
}

@SuppressWarnings("unchecked")
@Nonnull
public RecordCursor<QueryResult> getReadCursor(@Nullable byte[] continuation) {
return new ListCursor<>((List<QueryResult>)getReadBuffer(), continuation);
}

private void serializeBuffer(@Nonnull RecordMetaDataProto.PTempTable.Builder protoMessageBuilder) {
for (final var element : underlyingBuffer) {
normen662 marked this conversation as resolved.
Show resolved Hide resolved
final var elementByteString = element.toByteString();
protoMessageBuilder.addBufferItems(elementByteString);
}
}

@Nonnull
public RecordMetaDataProto.PTempTable toProto() {
final var builder = RecordMetaDataProto.PTempTable.newBuilder();
serializeBuffer(builder);
return builder.build();
}

@Nonnull
public ByteString toByteString() {
return toProto().toByteString();
}

@Nonnull
public byte[] toBytes() {
return toByteString().toByteArray();
normen662 marked this conversation as resolved.
Show resolved Hide resolved
}

@Nonnull
public static TempTable deserialize(@Nullable Descriptors.Descriptor descriptor, @Nonnull byte[]bytes) {
return deserialize(descriptor, ZeroCopyByteString.wrap(bytes));
}

@Nonnull
public static TempTable deserialize(@Nullable Descriptors.Descriptor descriptor, @Nonnull ByteString byteString) {
final RecordMetaDataProto.PTempTable tempTableProto;
try {
tempTableProto = RecordMetaDataProto.PTempTable.parseFrom(byteString);
} catch (InvalidProtocolBufferException ex) {
throw new RecordCoreException("invalid bytes", ex)
.addLogInfo(LogMessageKeys.RAW_BYTES, ByteArrayUtil2.loggable(byteString.toByteArray()));
}
return fromProto(tempTableProto, descriptor);
}

@Nonnull
public static TempTable fromProto(@Nonnull final RecordMetaDataProto.PTempTable tempTableProto,
@Nullable Descriptors.Descriptor descriptor) {
final var underlyingBuffer = new LinkedList<QueryResult>();
for (final var element : tempTableProto.getBufferItemsList()) {
underlyingBuffer.add(QueryResult.deserialize(descriptor, element));
}
return new TempTable(underlyingBuffer);
}

@Nonnull
public static TempTable newInstance() {
return new TempTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ public class NodeInfo {
NodeIcon.DATA_ACCESS_OPERATOR,
"Scan",
"A scan operator loads a set of records from the database that are within the given range of primary keys.");
public static final NodeInfo TEMP_TABLE_SCAN_OPERATOR = new NodeInfo(
"TempTableScanOperator",
NodeIcon.DATA_ACCESS_OPERATOR,
"Temp Table Scan",
"A scan operator loads a set of records from an in-memory temporary buffer.");
public static final NodeInfo SCORE_FOR_RANK_OPERATOR = new NodeInfo(
"ScoreForRankOperator",
NodeIcon.DATA_ACCESS_OPERATOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,26 @@ String getToolTip() {
}
}

/**
* Node class for temporary data objects.
*/
public static class TemporaryDataNodeWithInfo extends DataNodeWithInfo {

public TemporaryDataNodeWithInfo(@Nonnull final Type type, @Nullable final List<String> sources) {
super(NodeInfo.TEMPORARY_BUFFER_DATA, type, sources);
}

public TemporaryDataNodeWithInfo(@Nonnull Type type, @Nullable final List<String> sources, @Nonnull final Map<String, Attribute> additionalAttributes) {
super(NodeInfo.TEMPORARY_BUFFER_DATA, type, sources, additionalAttributes);
}

@Nonnull
@Override
public String getFillColor() {
return "goldenrod2";
}
}

/**
* Node class for actual plan operators.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public List<? extends Quantifier> getQuantifiers() {
@Nonnull
@Override
public DeleteExpression translateCorrelations(@Nonnull final TranslationMap translationMap, @Nonnull final List<? extends Quantifier> translatedQuantifiers) {
return new DeleteExpression(inner, targetRecordType);
return new DeleteExpression(Iterables.getOnlyElement(translatedQuantifiers).narrow(Quantifier.ForEach.class), targetRecordType);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public List<? extends Quantifier> getQuantifiers() {
@Nonnull
@Override
public InsertExpression translateCorrelations(@Nonnull final TranslationMap translationMap, @Nonnull final List<? extends Quantifier> translatedQuantifiers) {
return new InsertExpression(inner, targetRecordType, targetType);
return new InsertExpression(Iterables.getOnlyElement(translatedQuantifiers).narrow(Quantifier.ForEach.class), targetRecordType, targetType);
}

@Nonnull
Expand Down
Loading