Skip to content

Commit

Permalink
Support partitioned writer scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Nov 26, 2024
1 parent b7501f9 commit f7a8487
Show file tree
Hide file tree
Showing 31 changed files with 283 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@
import static com.facebook.presto.spi.MaterializedViewStatus.MaterializedViewState.NOT_MATERIALIZED;
import static com.facebook.presto.spi.MaterializedViewStatus.MaterializedViewState.PARTIALLY_MATERIALIZED;
import static com.facebook.presto.spi.MaterializedViewStatus.MaterializedViewState.TOO_MANY_PARTITIONS_MISSING;
import static com.facebook.presto.spi.PartitionedTableWritePolicy.MULTIPLE_WRITERS_PER_PARTITION_ALLOWED;
import static com.facebook.presto.spi.PartitionedTableWritePolicy.SINGLE_WRITER_PER_PARTITION_REQUIRED;
import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
Expand Down Expand Up @@ -2995,7 +2997,7 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(session, table);
if (!hiveBucketHandle.isPresent()) {
if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || table.getPartitionColumns().isEmpty()) {
if (table.getPartitionColumns().isEmpty()) {
return Optional.empty();
}

Expand All @@ -3010,7 +3012,10 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio
.map(Column::getName)
.collect(toList());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
return Optional.of(new ConnectorNewTableLayout(
partitioningHandle,
partitionedBy,
isShufflePartitionedColumnsForTableWriteEnabled(session) ? SINGLE_WRITER_PER_PARTITION_REQUIRED : MULTIPLE_WRITERS_PER_PARTITION_ALLOWED));
}
HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty()
.orElseThrow(() -> new NoSuchElementException("Bucket property should be set"));
Expand Down Expand Up @@ -3055,7 +3060,7 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());
if (!bucketProperty.isPresent()) {
List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || partitionedBy.isEmpty()) {
if (partitionedBy.isEmpty()) {
return Optional.empty();
}

Expand All @@ -3074,7 +3079,10 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
.collect(toList()),
OptionalInt.empty());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
return Optional.of(new ConnectorNewTableLayout(
partitioningHandle,
partitionedBy,
isShufflePartitionedColumnsForTableWriteEnabled(session) ? SINGLE_WRITER_PER_PARTITION_REQUIRED : MULTIPLE_WRITERS_PER_PARTITION_ALLOWED));
}
checkArgument(bucketProperty.get().getBucketFunctionType().equals(BucketFunctionType.HIVE_COMPATIBLE),
"bucketFunctionType is expected to be HIVE_COMPATIBLE, got: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2296,6 +2296,7 @@ public void testBucketPruning()
{
Session session = getSession();
QueryRunner queryRunner = getQueryRunner();

queryRunner.execute("CREATE TABLE orders_bucketed WITH (bucket_count = 11, bucketed_by = ARRAY['orderkey']) AS " +
"SELECT * FROM orders");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ public final class SystemSessionProperties
private static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments";
public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled";
public static final String NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING = "native_min_columnar_encoding_channels_to_prefer_row_wise_encoding";
public static final String NATIVE_EXECUTION_SCALE_PARTITIONED_WRITER_THREADS_ENABLED = "native_execution_scale_partitioned_writer_threads_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -1824,7 +1825,11 @@ public SystemSessionProperties(
NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING,
"Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only",
queryManagerConfig.getMinColumnarEncodingChannelsToPreferRowWiseEncoding(),
false));
false),
booleanProperty(NATIVE_EXECUTION_SCALE_PARTITIONED_WRITER_THREADS_ENABLED,
"Automatically scale number of writer threads per partition for partitioned tables",
featuresConfig.isNativeExecutionScalePartitionedWritersThreadsEnabled(),
!featuresConfig.isNativeExecutionEnabled()));
}

public static boolean isSpoolingOutputBufferEnabled(Session session)
Expand Down Expand Up @@ -3100,4 +3105,9 @@ public static int getMinColumnarEncodingChannelsToPreferRowWiseEncoding(Session
{
return session.getSystemProperty(NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, Integer.class);
}

public static boolean isNativeExecutionScalePartitionedWritersThreadsEnabled(Session session)
{
return session.getSystemProperty(NATIVE_EXECUTION_SCALE_PARTITIONED_WRITER_THREADS_ENABLED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
Optional.empty(),
false,
COLUMNAR,
Optional.empty(),
Optional.empty());

ExchangeNode writerRemoteSource = new ExchangeNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public class FeaturesConfig
private int eagerPlanValidationThreadPoolSize = 20;

private boolean prestoSparkExecutionEnvironment;
private boolean nativeExecutionScalePartitionedWritersThreadsEnabled;

public enum PartitioningPrecisionStrategy
{
Expand Down Expand Up @@ -2847,4 +2848,16 @@ public FeaturesConfig setPrestoSparkExecutionEnvironment(boolean prestoSparkExec
this.prestoSparkExecutionEnvironment = prestoSparkExecutionEnvironment;
return this;
}

public boolean isNativeExecutionScalePartitionedWritersThreadsEnabled()
{
return nativeExecutionScalePartitionedWritersThreadsEnabled;
}

@Config("native-execution-scale-partitioned-writer-threads-enabled")
public FeaturesConfig setNativeExecutionScalePartitionedWritersThreadsEnabled(boolean nativeExecutionScalePartitionedWritersThreadsEnabled)
{
this.nativeExecutionScalePartitionedWritersThreadsEnabled = nativeExecutionScalePartitionedWritersThreadsEnabled;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Set;

import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.spi.PartitionedTableWritePolicy.MULTIPLE_WRITERS_PER_PARTITION_ALLOWED;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables;
import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames;
Expand Down Expand Up @@ -252,7 +253,7 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<FragmentProper
@Override
public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<FragmentProperties> context)
{
if (node.getTablePartitioningScheme().isPresent()) {
if (node.isSingleWriterPerPartitionRequired()) {
context.get().setDistribution(node.getTablePartitioningScheme().get().getPartitioning().getHandle(), metadata, session);
}
return context.defaultRewrite(node, context.get());
Expand Down Expand Up @@ -283,6 +284,8 @@ public PlanNode visitExchange(ExchangeNode exchange, RewriteContext<FragmentProp
private PlanNode createRemoteStreamingExchange(ExchangeNode exchange, RewriteContext<FragmentProperties> context)
{
checkArgument(exchange.getScope() == REMOTE_STREAMING, "Unexpected exchange scope: %s", exchange.getScope());
checkArgument(!exchange.getPartitioningScheme().getPartitionedTableWritePolicy().equals(Optional.of(MULTIPLE_WRITERS_PER_PARTITION_ALLOWED)),
"task scaling for partitioned tables is not yet supported");

PartitioningScheme partitioningScheme = exchange.getPartitioningScheme();

Expand Down Expand Up @@ -329,6 +332,9 @@ private PlanNode createRemoteMaterializedExchange(ExchangeNode exchange, Rewrite
checkArgument(exchange.getType() == REPARTITION, "Unexpected exchange type: %s", exchange.getType());
checkArgument(exchange.getScope() == REMOTE_MATERIALIZED, "Unexpected exchange scope: %s", exchange.getScope());

checkArgument(!exchange.getPartitioningScheme().getPartitionedTableWritePolicy().equals(Optional.of(MULTIPLE_WRITERS_PER_PARTITION_ALLOWED)),
"task scaling for partitioned tables is not yet supported");

PartitioningScheme partitioningScheme = exchange.getPartitioningScheme();

PartitioningHandle partitioningHandle = partitioningScheme.getPartitioning().getHandle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@
import static com.facebook.presto.sessionpropertyproviders.JavaWorkerSessionPropertyProvider.isOrderBySpillEnabled;
import static com.facebook.presto.sessionpropertyproviders.JavaWorkerSessionPropertyProvider.isTopNSpillEnabled;
import static com.facebook.presto.sessionpropertyproviders.JavaWorkerSessionPropertyProvider.isWindowSpillEnabled;
import static com.facebook.presto.spi.PartitionedTableWritePolicy.MULTIPLE_WRITERS_PER_PARTITION_ALLOWED;
import static com.facebook.presto.spi.StandardErrorCode.COMPILER_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
Expand Down Expand Up @@ -2680,7 +2681,7 @@ public PhysicalOperation visitSemiJoin(SemiJoinNode node, LocalExecutionPlanCont
public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context)
{
// Set table writer count
if (node.getTablePartitioningScheme().isPresent()) {
if (node.isSingleWriterPerPartitionRequired()) {
context.setDriverInstanceCount(getTaskPartitionedWriterCount(session));
}
else {
Expand Down Expand Up @@ -3070,6 +3071,9 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan

private PhysicalOperation createLocalExchange(ExchangeNode node, LocalExecutionPlanContext context)
{
checkArgument(!node.getPartitioningScheme().getPartitionedTableWritePolicy().equals(Optional.of(MULTIPLE_WRITERS_PER_PARTITION_ALLOWED)),
"thread scaling for partitioned tables is only supported by native execution");

int driverInstanceCount;
if (node.getType() == ExchangeNode.Type.GATHER) {
driverInstanceCount = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,8 @@ private static Optional<PartitioningScheme> getPartitioningSchemeForTableWrite(O

partitioningScheme = Optional.of(new PartitioningScheme(
Partitioning.create(tableLayout.get().getPartitioning(), partitionFunctionArguments),
outputLayout));
outputLayout,
tableLayout.get().getWriterPolicy()));
}
return partitioningScheme;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ private static SubPlan reassignPartitioningHandleIfNecessaryHelper(Metadata meta
outputPartitioningScheme.getHashColumn(),
outputPartitioningScheme.isReplicateNullsAndAny(),
outputPartitioningScheme.getEncoding(),
outputPartitioningScheme.getBucketToPartition()),
outputPartitioningScheme.getBucketToPartition(),
outputPartitioningScheme.getPartitionedTableWritePolicy()),
fragment.getStageExecutionDescriptor(),
fragment.isOutputTableWriterFragment(),
fragment.getStatsAndCosts(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ private PlanNode pushPartial(AggregationNode aggregation, ExchangeNode exchange,
exchange.getPartitioningScheme().getHashColumn(),
exchange.getPartitioningScheme().isReplicateNullsAndAny(),
exchange.getPartitioningScheme().getEncoding(),
exchange.getPartitioningScheme().getBucketToPartition());
exchange.getPartitioningScheme().getBucketToPartition(),
exchange.getPartitioningScheme().getPartitionedTableWritePolicy());

return new ExchangeNode(
aggregation.getSourceLocation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public Result apply(ProjectNode project, Captures captures, Context context)
exchange.getPartitioningScheme().getHashColumn(),
exchange.getPartitioningScheme().isReplicateNullsAndAny(),
exchange.getPartitioningScheme().getEncoding(),
exchange.getPartitioningScheme().getBucketToPartition());
exchange.getPartitioningScheme().getBucketToPartition(),
exchange.getPartitioningScheme().getPartitionedTableWritePolicy());

PlanNode result = new ExchangeNode(
exchange.getSourceLocation(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public Result apply(ExchangeNode node, Captures captures, Context context)
partitioningScheme.getHashColumn(),
partitioningScheme.isReplicateNullsAndAny(),
partitioningScheme.getEncoding(),
partitioningScheme.getBucketToPartition()),
partitioningScheme.getBucketToPartition(),
partitioningScheme.getPartitionedTableWritePolicy()),
ImmutableList.of(assignUniqueId.getSource()),
ImmutableList.of(removeVariable(getOnlyElement(node.getInputs()), assignUniqueId.getIdVariable())),
node.isEnsureSourceOrdering(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public Result apply(ExchangeNode node, Captures captures, Context context)
partitioningScheme.getHashColumn(),
partitioningScheme.isReplicateNullsAndAny(),
partitioningScheme.getEncoding(),
partitioningScheme.getBucketToPartition()),
partitioningScheme.getBucketToPartition(),
partitioningScheme.getPartitionedTableWritePolicy()),
ImmutableList.of(groupIdNode.getSource()),
ImmutableList.of(inputs),
node.isEnsureSourceOrdering(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class PushTableWriteThroughUnion
// guaranteed regardless of this optimizer. The level of local parallelism will be
// determined by LocalExecutionPlanner separately, and shouldn't be a concern of
// this optimizer.
.matching(tableWriter -> !tableWriter.getTablePartitioningScheme().isPresent())
.matching(tableWriter -> !tableWriter.isSingleWriterPerPartitionRequired())
.with(source().matching(union().capturedAs(CHILD)));

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,13 +650,17 @@ public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProper
PlanWithProperties source = accept(node.getSource(), preferredProperties);

Optional<PartitioningScheme> shufflePartitioningScheme = node.getTablePartitioningScheme();
if (!shufflePartitioningScheme.isPresent()) {
if (!node.isSingleWriterPerPartitionRequired()) {
if (scaleWriters) {
shufflePartitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), source.getNode().getOutputVariables()));
}
else if (redistributeWrites) {
shufflePartitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(FIXED_ARBITRARY_DISTRIBUTION, ImmutableList.of()), source.getNode().getOutputVariables()));
}
else {
// TODO: refactor
return rebaseAndDeriveProperties(node, source);
}
}

if (shufflePartitioningScheme.isPresent() &&
Expand Down Expand Up @@ -1032,6 +1036,7 @@ public PlanWithProperties visitSemiJoin(SemiJoinNode node, PreferredProperties p
Optional.empty(),
true,
COLUMNAR,
Optional.empty(),
Optional.empty())),
filteringSource.getProperties());
}
Expand Down Expand Up @@ -1074,6 +1079,7 @@ public PlanWithProperties visitSemiJoin(SemiJoinNode node, PreferredProperties p
Optional.empty(),
true,
COLUMNAR,
Optional.empty(),
Optional.empty())),
filteringSource.getProperties());
}
Expand Down Expand Up @@ -1250,6 +1256,7 @@ public PlanWithProperties visitUnion(UnionNode node, PreferredProperties parentP
Optional.empty(),
nullsAndAnyReplicated,
COLUMNAR,
Optional.empty(),
Optional.empty())),
source.getProperties());
}
Expand Down
Loading

0 comments on commit f7a8487

Please sign in to comment.