Skip to content

Commit

Permalink
Temporary tables (#2962)
Browse files Browse the repository at this point in the history
* First version of InMemoryQueueCursor.

* Add hierarchy test.

* Make InMemoryQueueCursor operator sychronously on QueryResults.

- rework continuation semantics and add few continuation tests.

* Checkpoint.

- new implementation of RecursiveUnorderedUnionCursor, incomplete, requires testing.
- new implementation of RecordQueryRecursiveUnorderedUnionPlan, incomplete, requires testing.
- new implementation of DoubleBufferCursor, requires more testing.

* WIP.

* table queue implementation.

* Add tests.

* refactoring.

* refactoring, addressing comments.

* refactoring, addressing further comments.

* refactoring, addressing further comments.

- Massive renaming, now we have `TempTableScanExpression`, `TempTableInsertExpression`, respective `TempTableInsertPlan`, and `TempTableScanPlan`, PB names are also consistent with that.
- Moved `PEnumLightValue`, `PUUID`, `PFDBRecordVersion`, `PComparableObject`, `PQueryResult`, and `PTempTable` to `record_metadata.proto` since these are runtime objects.
Removed the name from `TempTable`.
Use `Value` to “refer” to a `TempTable` in the `EvaluationContext` instead of a plain `CorrelationIdentifier`, in all `TempTableScanExpression`, `TempTableInsertExpression`, respective `TempTableInsertPlan`, and `TempTableScanPlan`. Made the expressions’ constructors private and added two factory methods that enable the customer to create an expression using a constant binding, or a correlated binding.
- Fixed the `translateCorrelations` implementations in `UpdateExpression`, `InsertExpression`, and `DeleteExpression`. Now it rebuilds the expression using the correct translated inner.

* refactoring, addressing comments.

* fix checkstyle violations.

* address further comments.

* remove extra blank lines.
  • Loading branch information
hatyo authored Nov 12, 2024
1 parent c68e818 commit 3e84644
Show file tree
Hide file tree
Showing 44 changed files with 1,805 additions and 105 deletions.
2 changes: 1 addition & 1 deletion docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ with the new code.
* **Performance** Improvement 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Performance** Improvement 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Support temporary table planning and execution [(Issue #2962)](https://github.com/FoundationDB/fdb-record-layer/pull/2962)
* **Feature** Feature 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,13 @@ public Object getBinding(@Nonnull String name) {
/**
* Get the value bound to a single parameter.
*
* @param type the type of the parameter
* @param alias the correlation identifier
*
* @return the value bound to the given parameter
*/
public Object getBinding(@Nonnull CorrelationIdentifier alias) {
return bindings.get(Bindings.Internal.CORRELATION.bindingName(alias.getId()));
public Object getBinding(@Nonnull final Bindings.Internal type, @Nonnull final CorrelationIdentifier alias) {
return bindings.get(type.bindingName(alias.getId()));
}

/**
Expand Down Expand Up @@ -201,11 +203,13 @@ public EvaluationContext withBinding(@Nonnull String bindingName, @Nullable Obje
* context included all bindings except that it will bind an additional
* parameter to an additional value.
*
* @param type the type of the binding.
* @param alias the alias determining the binding name to add
* @param value the value to bind the name to
*
* @return a new <code>EvaluationContext</code> with the new binding
*/
public EvaluationContext withBinding(@Nonnull CorrelationIdentifier alias, @Nullable Object value) {
return childBuilder().setBinding(Bindings.Internal.CORRELATION.bindingName(alias.getId()), value).build(typeRepository);
public EvaluationContext withBinding(final Bindings.Internal type, @Nonnull CorrelationIdentifier alias, @Nullable Object value) {
return childBuilder().setBinding(type.bindingName(alias.getId()), value).build(typeRepository);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.apple.foundationdb.record.cursors.aggregate;

import com.apple.foundationdb.record.Bindings;
import com.apple.foundationdb.record.EvaluationContext;
import com.apple.foundationdb.record.RecordCursorResult;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordStoreBase;
Expand Down Expand Up @@ -183,13 +184,13 @@ private void finalizeGroup(Object nextGroup) {
}

private void accumulate(@Nullable Object currentObject) {
EvaluationContext nestedContext = context.withBinding(alias, currentObject);
EvaluationContext nestedContext = context.withBinding(Bindings.Internal.CORRELATION, alias, currentObject);
final Object partial = aggregateValue.evalToPartial(store, nestedContext);
accumulator.accumulate(partial);
}

private Object evalGroupingKey(@Nullable final Object currentObject) {
final EvaluationContext nestedContext = context.withBinding(alias, currentObject);
final EvaluationContext nestedContext = context.withBinding(Bindings.Internal.CORRELATION, alias, currentObject);
return Objects.requireNonNull(groupingKeyValue).eval(store, nestedContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.apple.foundationdb.record.query.plan.plans.RecordQueryInValuesJoinPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryIndexPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryInsertPlan;
import com.apple.foundationdb.record.query.plan.plans.TempTableInsertPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryIntersectionOnKeyExpressionPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryIntersectionOnValuesPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryIntersectionPlan;
Expand All @@ -58,6 +59,7 @@
import com.apple.foundationdb.record.query.plan.plans.RecordQueryScoreForRankPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQuerySelectorPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryStreamingAggregationPlan;
import com.apple.foundationdb.record.query.plan.plans.TempTableScanPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryTextIndexPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryTypeFilterPlan;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryUnionOnKeyExpressionPlan;
Expand Down Expand Up @@ -403,6 +405,14 @@ public PlanStringRepresentation visitInsertPlan(@Nonnull RecordQueryInsertPlan e
.append(element.getTargetRecordType());
}

@Nonnull
@Override
public PlanStringRepresentation visitTempTableInsertPlan(@Nonnull final TempTableInsertPlan element) {
return visit(element.getChild())
.append(" | TEMP TABLE INSERT INTO ")
.append(element.getTargetRecordType());
}

@Nonnull
private PlanStringRepresentation visitIntersectionPlan(@Nonnull RecordQueryIntersectionPlan element) {
return appendItems(element.getChildren(), " ∩ ");
Expand Down Expand Up @@ -577,6 +587,14 @@ public PlanStringRepresentation visitSortPlan(@Nonnull RecordQuerySortPlan eleme
.append(element.getKey());
}

@Nonnull
@Override
public PlanStringRepresentation visitTempTableScanPlan(@Nonnull final TempTableScanPlan element) {
return append("TEMP TABLE SCAN ([")
.append(element.getResultValue())
.append("])");
}

@Nonnull
@Override
public PlanStringRepresentation visitDefault(@Nonnull RecordQueryPlan element) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementInJoinRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementInUnionRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementInsertRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementTempTableInsertRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementIntersectionRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementNestedLoopJoinRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementPhysicalScanRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementSimpleSelectRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementStreamingAggregationRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementTempTableScanRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementTypeFilterRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementUniqueRule;
import com.apple.foundationdb.record.query.plan.cascades.rules.ImplementUnorderedUnionRule;
Expand Down Expand Up @@ -136,6 +138,7 @@ public class PlannerRuleSet {
);

private static final List<CascadesRule<? extends RelationalExpression>> IMPLEMENTATION_RULES = ImmutableList.of(
new ImplementTempTableScanRule(),
new ImplementTypeFilterRule(),
new ImplementFilterRule(),
new PushTypeFilterBelowFilterRule(),
Expand Down Expand Up @@ -169,6 +172,7 @@ public class PlannerRuleSet {
new ImplementStreamingAggregationRule(),
new ImplementDeleteRule(),
new ImplementInsertRule(),
new ImplementTempTableInsertRule(),
new ImplementUpdateRule()
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* TempTable.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2024 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.apple.foundationdb.record.query.plan.cascades;

import com.apple.foundationdb.record.RecordCoreException;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordMetaDataProto;
import com.apple.foundationdb.record.cursors.ListCursor;
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.query.plan.plans.QueryResult;
import com.apple.foundationdb.tuple.ByteArrayUtil2;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ZeroCopyByteString;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

/**
* A mutable, temporary, serializable, and in-memory buffer of {@link QueryResult}s. It is aimed to be used as a temporary
* placeholder for computation results produced by some physical operator, but can be leveraged to represent, for example,
* a SQL temporary tables as well.
*/
public class TempTable {

@Nonnull
private final Queue<QueryResult> underlyingBuffer;

private TempTable() {
this(new LinkedList<>());
}

private TempTable(@Nonnull Queue<QueryResult> buffer) {
this.underlyingBuffer = buffer;
}

/**
* Add a new {@link QueryResult} element to the queue.
* @param element the new element to be added.
*/
public void add(@Nonnull QueryResult element) {
underlyingBuffer.add(element);
}

/**
* Add a new {@link QueryResult} elements to the queue.
* @param elements the new elements to be added.
*/
public void add(@Nonnull QueryResult... elements) {
Arrays.stream(elements).forEach(this::add);
}


@Nonnull
public Queue<QueryResult> getReadBuffer() {
return underlyingBuffer;
}

@SuppressWarnings("unchecked")
@Nonnull
public RecordCursor<QueryResult> getReadCursor(@Nullable byte[] continuation) {
return new ListCursor<>((List<QueryResult>)getReadBuffer(), continuation);
}

private void serializeBuffer(@Nonnull RecordMetaDataProto.PTempTable.Builder protoMessageBuilder) {
for (final var element : underlyingBuffer) {
final var elementByteString = element.toByteString();
protoMessageBuilder.addBufferItems(elementByteString);
}
}

@Nonnull
public RecordMetaDataProto.PTempTable toProto() {
final var builder = RecordMetaDataProto.PTempTable.newBuilder();
serializeBuffer(builder);
return builder.build();
}

@Nonnull
public ByteString toByteString() {
return toProto().toByteString();
}

@Nonnull
public byte[] toBytes() {
return toByteString().toByteArray();
}

@Nonnull
public static TempTable deserialize(@Nullable Descriptors.Descriptor descriptor, @Nonnull byte[]bytes) {
return deserialize(descriptor, ZeroCopyByteString.wrap(bytes));
}

@Nonnull
public static TempTable deserialize(@Nullable Descriptors.Descriptor descriptor, @Nonnull ByteString byteString) {
final RecordMetaDataProto.PTempTable tempTableProto;
try {
tempTableProto = RecordMetaDataProto.PTempTable.parseFrom(byteString);
} catch (InvalidProtocolBufferException ex) {
throw new RecordCoreException("invalid bytes", ex)
.addLogInfo(LogMessageKeys.RAW_BYTES, ByteArrayUtil2.loggable(byteString.toByteArray()));
}
return fromProto(tempTableProto, descriptor);
}

@Nonnull
public static TempTable fromProto(@Nonnull final RecordMetaDataProto.PTempTable tempTableProto,
@Nullable Descriptors.Descriptor descriptor) {
final var underlyingBuffer = new LinkedList<QueryResult>();
for (final var element : tempTableProto.getBufferItemsList()) {
underlyingBuffer.add(QueryResult.deserialize(descriptor, element));
}
return new TempTable(underlyingBuffer);
}

@Nonnull
public static TempTable newInstance() {
return new TempTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ public class NodeInfo {
NodeIcon.DATA_ACCESS_OPERATOR,
"Scan",
"A scan operator loads a set of records from the database that are within the given range of primary keys.");
public static final NodeInfo TEMP_TABLE_SCAN_OPERATOR = new NodeInfo(
"TempTableScanOperator",
NodeIcon.DATA_ACCESS_OPERATOR,
"Temp Table Scan",
"A scan operator loads a set of records from an in-memory temporary buffer.");
public static final NodeInfo SCORE_FOR_RANK_OPERATOR = new NodeInfo(
"ScoreForRankOperator",
NodeIcon.DATA_ACCESS_OPERATOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,26 @@ String getToolTip() {
}
}

/**
* Node class for temporary data objects.
*/
public static class TemporaryDataNodeWithInfo extends DataNodeWithInfo {

public TemporaryDataNodeWithInfo(@Nonnull final Type type, @Nullable final List<String> sources) {
super(NodeInfo.TEMPORARY_BUFFER_DATA, type, sources);
}

public TemporaryDataNodeWithInfo(@Nonnull Type type, @Nullable final List<String> sources, @Nonnull final Map<String, Attribute> additionalAttributes) {
super(NodeInfo.TEMPORARY_BUFFER_DATA, type, sources, additionalAttributes);
}

@Nonnull
@Override
public String getFillColor() {
return "goldenrod2";
}
}

/**
* Node class for actual plan operators.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public List<? extends Quantifier> getQuantifiers() {
@Nonnull
@Override
public DeleteExpression translateCorrelations(@Nonnull final TranslationMap translationMap, @Nonnull final List<? extends Quantifier> translatedQuantifiers) {
return new DeleteExpression(inner, targetRecordType);
return new DeleteExpression(Iterables.getOnlyElement(translatedQuantifiers).narrow(Quantifier.ForEach.class), targetRecordType);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public List<? extends Quantifier> getQuantifiers() {
@Nonnull
@Override
public InsertExpression translateCorrelations(@Nonnull final TranslationMap translationMap, @Nonnull final List<? extends Quantifier> translatedQuantifiers) {
return new InsertExpression(inner, targetRecordType, targetType);
return new InsertExpression(Iterables.getOnlyElement(translatedQuantifiers).narrow(Quantifier.ForEach.class), targetRecordType, targetType);
}

@Nonnull
Expand Down
Loading

0 comments on commit 3e84644

Please sign in to comment.