diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java index abad533ec81d..30c62cef4edc 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java @@ -333,6 +333,8 @@ import static com.facebook.presto.spi.MaterializedViewStatus.MaterializedViewState.NOT_MATERIALIZED; import static com.facebook.presto.spi.MaterializedViewStatus.MaterializedViewState.PARTIALLY_MATERIALIZED; import static com.facebook.presto.spi.MaterializedViewStatus.MaterializedViewState.TOO_MANY_PARTITIONS_MISSING; +import static com.facebook.presto.spi.PartitionedTableWritePolicy.MULTIPLE_WRITERS_PER_PARTITION_ALLOWED; +import static com.facebook.presto.spi.PartitionedTableWritePolicy.SINGLE_WRITER_PER_PARTITION_REQUIRED; import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static com.facebook.presto.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY; @@ -2995,7 +2997,7 @@ public Optional getInsertLayout(ConnectorSession sessio Optional hiveBucketHandle = getHiveBucketHandle(session, table); if (!hiveBucketHandle.isPresent()) { - if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || table.getPartitionColumns().isEmpty()) { + if (table.getPartitionColumns().isEmpty()) { return Optional.empty(); } @@ -3010,7 +3012,10 @@ public Optional getInsertLayout(ConnectorSession sessio .map(Column::getName) .collect(toList()); - return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy)); + return Optional.of(new ConnectorNewTableLayout( + partitioningHandle, + partitionedBy, + isShufflePartitionedColumnsForTableWriteEnabled(session) ? SINGLE_WRITER_PER_PARTITION_REQUIRED : MULTIPLE_WRITERS_PER_PARTITION_ALLOWED)); } HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty() .orElseThrow(() -> new NoSuchElementException("Bucket property should be set")); @@ -3055,7 +3060,7 @@ public Optional getNewTableLayout(ConnectorSession sess Optional bucketProperty = getBucketProperty(tableMetadata.getProperties()); if (!bucketProperty.isPresent()) { List partitionedBy = getPartitionedBy(tableMetadata.getProperties()); - if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || partitionedBy.isEmpty()) { + if (partitionedBy.isEmpty()) { return Optional.empty(); } @@ -3074,7 +3079,10 @@ public Optional getNewTableLayout(ConnectorSession sess .collect(toList()), OptionalInt.empty()); - return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy)); + return Optional.of(new ConnectorNewTableLayout( + partitioningHandle, + partitionedBy, + isShufflePartitionedColumnsForTableWriteEnabled(session) ? SINGLE_WRITER_PER_PARTITION_REQUIRED : MULTIPLE_WRITERS_PER_PARTITION_ALLOWED)); } checkArgument(bucketProperty.get().getBucketFunctionType().equals(BucketFunctionType.HIVE_COMPATIBLE), "bucketFunctionType is expected to be HIVE_COMPATIBLE, got: %s", diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java index 4b10aa81a45a..fa2a064dc958 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java @@ -2296,6 +2296,7 @@ public void testBucketPruning() { Session session = getSession(); QueryRunner queryRunner = getQueryRunner(); + queryRunner.execute("CREATE TABLE orders_bucketed WITH (bucket_count = 11, bucketed_by = ARRAY['orderkey']) AS " + "SELECT * FROM orders"); diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index f66aeaf89a2e..41a57b369227 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -332,6 +332,7 @@ public final class SystemSessionProperties private static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments"; public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled"; public static final String NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING = "native_min_columnar_encoding_channels_to_prefer_row_wise_encoding"; + public static final String NATIVE_EXECUTION_SCALE_PARTITIONED_WRITER_THREADS_ENABLED = "native_execution_scale_partitioned_writer_threads_enabled"; private final List> sessionProperties; @@ -1824,7 +1825,11 @@ public SystemSessionProperties( NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, "Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only", queryManagerConfig.getMinColumnarEncodingChannelsToPreferRowWiseEncoding(), - false)); + false), + booleanProperty(NATIVE_EXECUTION_SCALE_PARTITIONED_WRITER_THREADS_ENABLED, + "Automatically scale number of writer threads per partition for partitioned tables", + featuresConfig.isNativeExecutionScalePartitionedWritersThreadsEnabled(), + !featuresConfig.isNativeExecutionEnabled())); } public static boolean isSpoolingOutputBufferEnabled(Session session) @@ -3100,4 +3105,9 @@ public static int getMinColumnarEncodingChannelsToPreferRowWiseEncoding(Session { return session.getSystemProperty(NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, Integer.class); } + + public static boolean isNativeExecutionScalePartitionedWritersThreadsEnabled(Session session) + { + return session.getSystemProperty(NATIVE_EXECUTION_SCALE_PARTITIONED_WRITER_THREADS_ENABLED, Boolean.class); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java b/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java index f40bc95a8d0a..82f894676665 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java @@ -289,6 +289,7 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges( Optional.empty(), false, COLUMNAR, + Optional.empty(), Optional.empty()); ExchangeNode writerRemoteSource = new ExchangeNode( diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 5c2040e6ade8..df8f3621a82f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -288,6 +288,7 @@ public class FeaturesConfig private int eagerPlanValidationThreadPoolSize = 20; private boolean prestoSparkExecutionEnvironment; + private boolean nativeExecutionScalePartitionedWritersThreadsEnabled; public enum PartitioningPrecisionStrategy { @@ -2847,4 +2848,16 @@ public FeaturesConfig setPrestoSparkExecutionEnvironment(boolean prestoSparkExec this.prestoSparkExecutionEnvironment = prestoSparkExecutionEnvironment; return this; } + + public boolean isNativeExecutionScalePartitionedWritersThreadsEnabled() + { + return nativeExecutionScalePartitionedWritersThreadsEnabled; + } + + @Config("native-execution-scale-partitioned-writer-threads-enabled") + public FeaturesConfig setNativeExecutionScalePartitionedWritersThreadsEnabled(boolean nativeExecutionScalePartitionedWritersThreadsEnabled) + { + this.nativeExecutionScalePartitionedWritersThreadsEnabled = nativeExecutionScalePartitionedWritersThreadsEnabled; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java index eba7eeb25efd..0b18392b1ad5 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/BasePlanFragmenter.java @@ -61,6 +61,7 @@ import java.util.Set; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; +import static com.facebook.presto.spi.PartitionedTableWritePolicy.MULTIPLE_WRITERS_PER_PARTITION_ALLOWED; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables; import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames; @@ -252,7 +253,7 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext context) { - if (node.getTablePartitioningScheme().isPresent()) { + if (node.isSingleWriterPerPartitionRequired()) { context.get().setDistribution(node.getTablePartitioningScheme().get().getPartitioning().getHandle(), metadata, session); } return context.defaultRewrite(node, context.get()); @@ -283,6 +284,8 @@ public PlanNode visitExchange(ExchangeNode exchange, RewriteContext context) { checkArgument(exchange.getScope() == REMOTE_STREAMING, "Unexpected exchange scope: %s", exchange.getScope()); + checkArgument(!exchange.getPartitioningScheme().getPartitionedTableWritePolicy().equals(Optional.of(MULTIPLE_WRITERS_PER_PARTITION_ALLOWED)), + "task scaling for partitioned tables is not yet supported"); PartitioningScheme partitioningScheme = exchange.getPartitioningScheme(); @@ -329,6 +332,9 @@ private PlanNode createRemoteMaterializedExchange(ExchangeNode exchange, Rewrite checkArgument(exchange.getType() == REPARTITION, "Unexpected exchange type: %s", exchange.getType()); checkArgument(exchange.getScope() == REMOTE_MATERIALIZED, "Unexpected exchange scope: %s", exchange.getScope()); + checkArgument(!exchange.getPartitioningScheme().getPartitionedTableWritePolicy().equals(Optional.of(MULTIPLE_WRITERS_PER_PARTITION_ALLOWED)), + "task scaling for partitioned tables is not yet supported"); + PartitioningScheme partitioningScheme = exchange.getPartitioningScheme(); PartitioningHandle partitioningHandle = partitioningScheme.getPartitioning().getHandle(); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index c1b80bdf1ad3..4f6a2d8ead2f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -308,6 +308,7 @@ import static com.facebook.presto.sessionpropertyproviders.JavaWorkerSessionPropertyProvider.isOrderBySpillEnabled; import static com.facebook.presto.sessionpropertyproviders.JavaWorkerSessionPropertyProvider.isTopNSpillEnabled; import static com.facebook.presto.sessionpropertyproviders.JavaWorkerSessionPropertyProvider.isWindowSpillEnabled; +import static com.facebook.presto.spi.PartitionedTableWritePolicy.MULTIPLE_WRITERS_PER_PARTITION_ALLOWED; import static com.facebook.presto.spi.StandardErrorCode.COMPILER_ERROR; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; @@ -2680,7 +2681,7 @@ public PhysicalOperation visitSemiJoin(SemiJoinNode node, LocalExecutionPlanCont public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPlanContext context) { // Set table writer count - if (node.getTablePartitioningScheme().isPresent()) { + if (node.isSingleWriterPerPartitionRequired()) { context.setDriverInstanceCount(getTaskPartitionedWriterCount(session)); } else { @@ -3070,6 +3071,9 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan private PhysicalOperation createLocalExchange(ExchangeNode node, LocalExecutionPlanContext context) { + checkArgument(!node.getPartitioningScheme().getPartitionedTableWritePolicy().equals(Optional.of(MULTIPLE_WRITERS_PER_PARTITION_ALLOWED)), + "thread scaling for partitioned tables is only supported by native execution"); + int driverInstanceCount; if (node.getType() == ExchangeNode.Type.GATHER) { driverInstanceCount = 1; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java index 9d0d1aaaf180..df3afb20a23f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java @@ -633,7 +633,8 @@ private static Optional getPartitioningSchemeForTableWrite(O partitioningScheme = Optional.of(new PartitioningScheme( Partitioning.create(tableLayout.get().getPartitioning(), partitionFunctionArguments), - outputLayout)); + outputLayout, + tableLayout.get().getWriterPolicy())); } return partitioningScheme; } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java index d93be0f969d4..13e67e028b23 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java @@ -228,7 +228,8 @@ private static SubPlan reassignPartitioningHandleIfNecessaryHelper(Metadata meta outputPartitioningScheme.getHashColumn(), outputPartitioningScheme.isReplicateNullsAndAny(), outputPartitioningScheme.getEncoding(), - outputPartitioningScheme.getBucketToPartition()), + outputPartitioningScheme.getBucketToPartition(), + outputPartitioningScheme.getPartitionedTableWritePolicy()), fragment.getStageExecutionDescriptor(), fragment.isOutputTableWriterFragment(), fragment.getStatsAndCosts(), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java index d3bc4c8b562b..2a46cde6148b 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushPartialAggregationThroughExchange.java @@ -234,7 +234,8 @@ private PlanNode pushPartial(AggregationNode aggregation, ExchangeNode exchange, exchange.getPartitioningScheme().getHashColumn(), exchange.getPartitioningScheme().isReplicateNullsAndAny(), exchange.getPartitioningScheme().getEncoding(), - exchange.getPartitioningScheme().getBucketToPartition()); + exchange.getPartitioningScheme().getBucketToPartition(), + exchange.getPartitioningScheme().getPartitionedTableWritePolicy()); return new ExchangeNode( aggregation.getSourceLocation(), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushProjectionThroughExchange.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushProjectionThroughExchange.java index cc568f94a01e..1cef96b480c6 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushProjectionThroughExchange.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushProjectionThroughExchange.java @@ -147,7 +147,8 @@ public Result apply(ProjectNode project, Captures captures, Context context) exchange.getPartitioningScheme().getHashColumn(), exchange.getPartitioningScheme().isReplicateNullsAndAny(), exchange.getPartitioningScheme().getEncoding(), - exchange.getPartitioningScheme().getBucketToPartition()); + exchange.getPartitioningScheme().getBucketToPartition(), + exchange.getPartitioningScheme().getPartitionedTableWritePolicy()); PlanNode result = new ExchangeNode( exchange.getSourceLocation(), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughAssignUniqueId.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughAssignUniqueId.java index c98ddfe1c3e2..f3e886d4e8e5 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughAssignUniqueId.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughAssignUniqueId.java @@ -82,7 +82,8 @@ public Result apply(ExchangeNode node, Captures captures, Context context) partitioningScheme.getHashColumn(), partitioningScheme.isReplicateNullsAndAny(), partitioningScheme.getEncoding(), - partitioningScheme.getBucketToPartition()), + partitioningScheme.getBucketToPartition(), + partitioningScheme.getPartitionedTableWritePolicy()), ImmutableList.of(assignUniqueId.getSource()), ImmutableList.of(removeVariable(getOnlyElement(node.getInputs()), assignUniqueId.getIdVariable())), node.isEnsureSourceOrdering(), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughGroupId.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughGroupId.java index 085dcc44f2f6..cecf13567ae2 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughGroupId.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushRemoteExchangeThroughGroupId.java @@ -153,7 +153,8 @@ public Result apply(ExchangeNode node, Captures captures, Context context) partitioningScheme.getHashColumn(), partitioningScheme.isReplicateNullsAndAny(), partitioningScheme.getEncoding(), - partitioningScheme.getBucketToPartition()), + partitioningScheme.getBucketToPartition(), + partitioningScheme.getPartitionedTableWritePolicy()), ImmutableList.of(groupIdNode.getSource()), ImmutableList.of(inputs), node.isEnsureSourceOrdering(), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushTableWriteThroughUnion.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushTableWriteThroughUnion.java index 9df8e9d8963a..bb10b2185f7b 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushTableWriteThroughUnion.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PushTableWriteThroughUnion.java @@ -52,7 +52,7 @@ public class PushTableWriteThroughUnion // guaranteed regardless of this optimizer. The level of local parallelism will be // determined by LocalExecutionPlanner separately, and shouldn't be a concern of // this optimizer. - .matching(tableWriter -> !tableWriter.getTablePartitioningScheme().isPresent()) + .matching(tableWriter -> !tableWriter.isSingleWriterPerPartitionRequired()) .with(source().matching(union().capturedAs(CHILD))); @Override diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 6e96c4b29c2c..9e9df03b5de1 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -650,13 +650,17 @@ public PlanWithProperties visitTableWriter(TableWriterNode node, PreferredProper PlanWithProperties source = accept(node.getSource(), preferredProperties); Optional shufflePartitioningScheme = node.getTablePartitioningScheme(); - if (!shufflePartitioningScheme.isPresent()) { + if (!node.isSingleWriterPerPartitionRequired()) { if (scaleWriters) { shufflePartitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(SCALED_WRITER_DISTRIBUTION, ImmutableList.of()), source.getNode().getOutputVariables())); } else if (redistributeWrites) { shufflePartitioningScheme = Optional.of(new PartitioningScheme(Partitioning.create(FIXED_ARBITRARY_DISTRIBUTION, ImmutableList.of()), source.getNode().getOutputVariables())); } + else { + // TODO: refactor + return rebaseAndDeriveProperties(node, source); + } } if (shufflePartitioningScheme.isPresent() && @@ -1032,6 +1036,7 @@ public PlanWithProperties visitSemiJoin(SemiJoinNode node, PreferredProperties p Optional.empty(), true, COLUMNAR, + Optional.empty(), Optional.empty())), filteringSource.getProperties()); } @@ -1074,6 +1079,7 @@ public PlanWithProperties visitSemiJoin(SemiJoinNode node, PreferredProperties p Optional.empty(), true, COLUMNAR, + Optional.empty(), Optional.empty())), filteringSource.getProperties()); } @@ -1250,6 +1256,7 @@ public PlanWithProperties visitUnion(UnionNode node, PreferredProperties parentP Optional.empty(), nullsAndAnyReplicated, COLUMNAR, + Optional.empty(), Optional.empty())), source.getProperties()); } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java index d9dc2ff02e5a..7dd0ace7300f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java @@ -71,6 +71,7 @@ import static com.facebook.presto.SystemSessionProperties.isDistributedSortEnabled; import static com.facebook.presto.SystemSessionProperties.isEnforceFixedDistributionForOutputOperator; import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled; +import static com.facebook.presto.SystemSessionProperties.isNativeExecutionScalePartitionedWritersThreadsEnabled; import static com.facebook.presto.SystemSessionProperties.isQuickDistinctLimitEnabled; import static com.facebook.presto.SystemSessionProperties.isSegmentedAggregationEnabled; import static com.facebook.presto.SystemSessionProperties.isSpillEnabled; @@ -538,120 +539,103 @@ public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, StreamPrefe // @Override - public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNode, StreamPreferredProperties parentPreferences) + public PlanWithProperties visitTableWriter(TableWriterNode tableWrite, StreamPreferredProperties parentPreferences) { - if (originalTableWriterNode.getTablePartitioningScheme().isPresent() && getTaskPartitionedWriterCount(session) == 1) { - return planAndEnforceChildren(originalTableWriterNode, singleStream(), defaultParallelism(session)); + if (tableWrite.isSingleWriterPerPartitionRequired()) { + if (getTaskPartitionedWriterCount(session) == 1) { + return planAndEnforceChildren(tableWrite, singleStream(), defaultParallelism(session)); + } + PlanWithProperties source = accept(tableWrite.getSource(), defaultParallelism(session)); + PlanWithProperties exchange = deriveProperties( + partitionedExchange( + idAllocator.getNextId(), + LOCAL, + source.getNode(), + tableWrite.getTablePartitioningScheme().get()), + source.getProperties()); + return planTableWriteWithTableWriteMerge(tableWrite, exchange); } - if (!originalTableWriterNode.getTablePartitioningScheme().isPresent() && getTaskWriterCount(session) == 1) { - return planAndEnforceChildren(originalTableWriterNode, singleStream(), defaultParallelism(session)); + if (getTaskWriterCount(session) == 1) { + return planAndEnforceChildren(tableWrite, singleStream(), defaultParallelism(session)); } - Optional statisticAggregations = originalTableWriterNode - .getStatisticsAggregation() - .map(aggregations -> splitIntoPartialAndIntermediate( - aggregations, - variableAllocator, - metadata.getFunctionAndTypeManager())); - - PlanWithProperties tableWriter; - - if (!originalTableWriterNode.getTablePartitioningScheme().isPresent()) { - int taskWriterCount = getTaskWriterCount(session); - int taskConcurrency = getTaskConcurrency(session); - if (taskWriterCount == taskConcurrency) { - tableWriter = planAndEnforceChildren( - new TableWriterNode( - originalTableWriterNode.getSourceLocation(), - originalTableWriterNode.getId(), - originalTableWriterNode.getStatsEquivalentPlanNode(), - originalTableWriterNode.getSource(), - originalTableWriterNode.getTarget(), - variableAllocator.newVariable("partialrowcount", BIGINT), - variableAllocator.newVariable("partialfragments", VARBINARY), - variableAllocator.newVariable("partialcontext", VARBINARY), - originalTableWriterNode.getColumns(), - originalTableWriterNode.getColumnNames(), - originalTableWriterNode.getNotNullColumnVariables(), - originalTableWriterNode.getTablePartitioningScheme(), - statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation), - originalTableWriterNode.getTaskCountIfScaledWriter(), - originalTableWriterNode.getIsTemporaryTableWriter()), - fixedParallelism(), - fixedParallelism()); - } - else { - PlanWithProperties source = accept(originalTableWriterNode.getSource(), defaultParallelism(session)); - PlanWithProperties exchange = deriveProperties( - roundRobinExchange(idAllocator.getNextId(), LOCAL, source.getNode()), - source.getProperties()); - tableWriter = deriveProperties( - new TableWriterNode( - originalTableWriterNode.getSourceLocation(), - originalTableWriterNode.getId(), - originalTableWriterNode.getStatsEquivalentPlanNode(), - exchange.getNode(), - originalTableWriterNode.getTarget(), - variableAllocator.newVariable("partialrowcount", BIGINT), - variableAllocator.newVariable("partialfragments", VARBINARY), - variableAllocator.newVariable("partialcontext", VARBINARY), - originalTableWriterNode.getColumns(), - originalTableWriterNode.getColumnNames(), - originalTableWriterNode.getNotNullColumnVariables(), - originalTableWriterNode.getTablePartitioningScheme(), - statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation), - originalTableWriterNode.getTaskCountIfScaledWriter(), - originalTableWriterNode.getIsTemporaryTableWriter()), - exchange.getProperties()); - } - } - else { - PlanWithProperties source = accept(originalTableWriterNode.getSource(), defaultParallelism(session)); + if (nativeExecution + // preferred partitioning (MULTIPLE_WRITERS_PER_PARTITION_ALLOWED) + && tableWrite.getTablePartitioningScheme().isPresent() + && isNativeExecutionScalePartitionedWritersThreadsEnabled(session)) { + PlanWithProperties source = accept(tableWrite.getSource(), defaultParallelism(session)); PlanWithProperties exchange = deriveProperties( partitionedExchange( idAllocator.getNextId(), LOCAL, source.getNode(), - originalTableWriterNode.getTablePartitioningScheme().get()), + tableWrite.getTablePartitioningScheme().get()), source.getProperties()); - tableWriter = deriveProperties( - new TableWriterNode( - originalTableWriterNode.getSourceLocation(), - originalTableWriterNode.getId(), - originalTableWriterNode.getStatsEquivalentPlanNode(), - exchange.getNode(), - originalTableWriterNode.getTarget(), - variableAllocator.newVariable("partialrowcount", BIGINT), - variableAllocator.newVariable("partialfragments", VARBINARY), - variableAllocator.newVariable("partialcontext", VARBINARY), - originalTableWriterNode.getColumns(), - originalTableWriterNode.getColumnNames(), - originalTableWriterNode.getNotNullColumnVariables(), - originalTableWriterNode.getTablePartitioningScheme(), - statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation), - originalTableWriterNode.getTaskCountIfScaledWriter(), - originalTableWriterNode.getIsTemporaryTableWriter()), - exchange.getProperties()); - } - - PlanWithProperties gatheringExchange = deriveProperties( + return planTableWriteWithTableWriteMerge(tableWrite, exchange); + } + + int taskWriterCount = getTaskWriterCount(session); + int taskConcurrency = getTaskConcurrency(session); + if (taskWriterCount == taskConcurrency) { + return planTableWriteWithTableWriteMerge( + tableWrite, + planAndEnforce(tableWrite.getSource(), fixedParallelism(), fixedParallelism())); + } + else { + PlanWithProperties source = accept(tableWrite.getSource(), defaultParallelism(session)); + PlanWithProperties exchange = deriveProperties( + roundRobinExchange(idAllocator.getNextId(), LOCAL, source.getNode()), + source.getProperties()); + return planTableWriteWithTableWriteMerge(tableWrite, exchange); + } + } + + private PlanWithProperties planTableWriteWithTableWriteMerge(TableWriterNode tableWrite, PlanWithProperties source) + { + Optional statisticAggregations = tableWrite + .getStatisticsAggregation() + .map(aggregations -> splitIntoPartialAndIntermediate( + aggregations, + variableAllocator, + metadata.getFunctionAndTypeManager())); + + PlanWithProperties tableWriteWithProperties = deriveProperties( + new TableWriterNode( + tableWrite.getSourceLocation(), + tableWrite.getId(), + tableWrite.getStatsEquivalentPlanNode(), + source.getNode(), + tableWrite.getTarget(), + variableAllocator.newVariable("partialrowcount", BIGINT), + variableAllocator.newVariable("partialfragments", VARBINARY), + variableAllocator.newVariable("partialcontext", VARBINARY), + tableWrite.getColumns(), + tableWrite.getColumnNames(), + tableWrite.getNotNullColumnVariables(), + tableWrite.getTablePartitioningScheme(), + statisticAggregations.map(StatisticAggregations.Parts::getPartialAggregation), + tableWrite.getTaskCountIfScaledWriter(), + tableWrite.getIsTemporaryTableWriter()), + source.getProperties()); + + PlanWithProperties gatherExchangeWithProperties = deriveProperties( gatheringExchange( idAllocator.getNextId(), LOCAL, - tableWriter.getNode()), - tableWriter.getProperties()); + tableWriteWithProperties.getNode()), + tableWriteWithProperties.getProperties()); return deriveProperties( new TableWriterMergeNode( - originalTableWriterNode.getSourceLocation(), + tableWrite.getSourceLocation(), idAllocator.getNextId(), - gatheringExchange.getNode(), - originalTableWriterNode.getRowCountVariable(), - originalTableWriterNode.getFragmentVariable(), - originalTableWriterNode.getTableCommitContextVariable(), + gatherExchangeWithProperties.getNode(), + tableWrite.getRowCountVariable(), + tableWrite.getFragmentVariable(), + tableWrite.getTableCommitContextVariable(), statisticAggregations.map(StatisticAggregations.Parts::getIntermediateAggregation)), - gatheringExchange.getProperties()); + gatherExchangeWithProperties.getProperties()); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HashGenerationOptimizer.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HashGenerationOptimizer.java index 995a687c98b2..23cbe152960b 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HashGenerationOptimizer.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/HashGenerationOptimizer.java @@ -588,7 +588,8 @@ public PlanWithProperties visitExchange(ExchangeNode node, HashComputationSet pa partitionVariables.map(newHashVariables::get), partitioningScheme.isReplicateNullsAndAny(), partitioningScheme.getEncoding(), - partitioningScheme.getBucketToPartition()); + partitioningScheme.getBucketToPartition(), + partitioningScheme.getPartitionedTableWritePolicy()); // add hash variables to sources ImmutableList.Builder> newInputs = ImmutableList.builder(); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergePartialAggregationsWithFilter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergePartialAggregationsWithFilter.java index 8db400fb2b98..aef8b048326b 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergePartialAggregationsWithFilter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/MergePartialAggregationsWithFilter.java @@ -357,7 +357,8 @@ public PlanNode visitExchange(ExchangeNode node, RewriteContext context node.getPartitioningScheme().getHashColumn(), node.getPartitioningScheme().isReplicateNullsAndAny(), node.getPartitioningScheme().getEncoding(), - node.getPartitioningScheme().getBucketToPartition()); + node.getPartitioningScheme().getBucketToPartition(), + node.getPartitioningScheme().getPartitionedTableWritePolicy()); return new ExchangeNode( node.getSourceLocation(), diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PartitioningUtils.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PartitioningUtils.java index 4cbd60d20c47..859673a4034a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PartitioningUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PartitioningUtils.java @@ -344,7 +344,14 @@ public static PartitioningScheme translateOutputLayout(PartitioningScheme partit .map(oldOutputLayout::indexOf) .map(newOutputLayout::get); - return new PartitioningScheme(newPartitioning, newOutputLayout, newHashSymbol, partitioningScheme.isReplicateNullsAndAny(), partitioningScheme.getEncoding(), partitioningScheme.getBucketToPartition()); + return new PartitioningScheme( + newPartitioning, + newOutputLayout, + newHashSymbol, + partitioningScheme.isReplicateNullsAndAny(), + partitioningScheme.getEncoding(), + partitioningScheme.getBucketToPartition(), + partitioningScheme.getPartitionedTableWritePolicy()); } // Translates VariableReferenceExpression in arguments according to translator, keeps other arguments unchanged. diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java index 3b00ebd61a33..1b486d81b51f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PropertyDerivations.java @@ -90,6 +90,7 @@ import static com.facebook.presto.SystemSessionProperties.isSpillEnabled; import static com.facebook.presto.SystemSessionProperties.planWithTableNodePartitioning; import static com.facebook.presto.common.predicate.TupleDomain.toLinkedMap; +import static com.facebook.presto.spi.PartitionedTableWritePolicy.MULTIPLE_WRITERS_PER_PARTITION_ALLOWED; import static com.facebook.presto.spi.relation.DomainTranslator.BASIC_COLUMN_EXTRACTOR; import static com.facebook.presto.spi.relation.ExpressionOptimizer.Level.OPTIMIZED; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.ARBITRARY_DISTRIBUTION; @@ -634,14 +635,23 @@ else if (inputProperties.stream().anyMatch(ActualProperties::isSingleNode)) { .local(localProperties.build()) .constants(constants) .build(); - case REPARTITION: + case REPARTITION: { + Global globalPartitioning; + if (node.getPartitioningScheme().getPartitionedTableWritePolicy().equals(Optional.of(MULTIPLE_WRITERS_PER_PARTITION_ALLOWED))) { + // no strict partitioning guarantees when multiple writers per partitions allowed (scaled writers) + globalPartitioning = arbitraryPartition(); + } + else { + globalPartitioning = partitionedOn( + node.getPartitioningScheme().getPartitioning(), + Optional.of(node.getPartitioningScheme().getPartitioning())) + .withReplicatedNulls(node.getPartitioningScheme().isReplicateNullsAndAny()); + } return ActualProperties.builder() - .global(partitionedOn( - node.getPartitioningScheme().getPartitioning(), - Optional.of(node.getPartitioningScheme().getPartitioning())) - .withReplicatedNulls(node.getPartitioningScheme().isReplicateNullsAndAny())) + .global(globalPartitioning) .constants(constants) .build(); + } case REPLICATE: // TODO: this should have the same global properties as the stream taking the replicated data return ActualProperties.builder() diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java index fe8c1e11ec75..8d60aaff99c2 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PruneUnreferencedOutputs.java @@ -170,7 +170,8 @@ public PlanNode visitExchange(ExchangeNode node, RewriteContext rewrittenSources = ImmutableList.builder(); for (int i = 0; i < node.getSources().size(); i++) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java index 375a86df1dd4..b562f5cf1b4d 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/StreamPropertyDerivations.java @@ -77,6 +77,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static com.facebook.presto.spi.PartitionedTableWritePolicy.MULTIPLE_WRITERS_PER_PARTITION_ALLOWED; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION; import static com.facebook.presto.sql.planner.optimizations.PropertyDerivations.extractFixedValuesToConstantExpressions; import static com.facebook.presto.sql.planner.optimizations.StreamPropertyDerivations.StreamProperties.StreamDistribution.FIXED; @@ -341,7 +342,9 @@ public StreamProperties visitExchange(ExchangeNode node, List case GATHER: return StreamProperties.singleStream(); case REPARTITION: - if (node.getPartitioningScheme().getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION)) { + if (node.getPartitioningScheme().getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION) || + // no strict partitioning guarantees when multiple writers per partitions are allows (scaled writers) + node.getPartitioningScheme().getPartitionedTableWritePolicy().equals(Optional.of(MULTIPLE_WRITERS_PER_PARTITION_ALLOWED))) { return new StreamProperties(FIXED, Optional.empty(), false); } checkArgument( diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java index 2bab1de7f0ce..ad5d9ee4c3f4 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/SymbolMapper.java @@ -305,7 +305,8 @@ private PartitioningScheme canonicalize(PartitioningScheme scheme, PlanNode sour scheme.getHashColumn().map(this::map), scheme.isReplicateNullsAndAny(), scheme.getEncoding(), - scheme.getBucketToPartition()); + scheme.getBucketToPartition(), + scheme.getPartitionedTableWritePolicy()); } private StatisticAggregations map(StatisticAggregations statisticAggregations) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java index 512945f28567..b5650c91578f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/UnaliasSymbolReferences.java @@ -335,7 +335,8 @@ public PlanNode visitExchange(ExchangeNode node, RewriteContext context) canonicalize(node.getPartitioningScheme().getHashColumn()), node.getPartitioningScheme().isReplicateNullsAndAny(), node.getPartitioningScheme().getEncoding(), - node.getPartitioningScheme().getBucketToPartition()); + node.getPartitioningScheme().getBucketToPartition(), + node.getPartitioningScheme().getPartitionedTableWritePolicy()); Optional orderingScheme = node.getOrderingScheme().map(this::canonicalizeAndDistinct); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/ExchangeNode.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/ExchangeNode.java index 12928728b6d8..f92acfaaffc7 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/ExchangeNode.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/plan/ExchangeNode.java @@ -193,6 +193,7 @@ public static ExchangeNode partitionedExchange(PlanNodeId id, Scope scope, PlanN hashColumn, replicateNullsAndAny, COLUMNAR, + Optional.empty(), Optional.empty())); } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 4ec67782a1ab..5e1530677ad4 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -246,7 +246,8 @@ public void testDefaults() .setInlineProjectionsOnValues(false) .setEagerPlanValidationEnabled(false) .setEagerPlanValidationThreadPoolSize(20) - .setPrestoSparkExecutionEnvironment(false)); + .setPrestoSparkExecutionEnvironment(false) + .setNativeExecutionScalePartitionedWritersThreadsEnabled(false)); } @Test @@ -442,6 +443,7 @@ public void testExplicitPropertyMappings() .put("eager-plan-validation-enabled", "true") .put("eager-plan-validation-thread-pool-size", "2") .put("presto-spark-execution-environment", "true") + .put("native-execution-scale-partitioned-writer-threads-enabled", "true") .build(); FeaturesConfig expected = new FeaturesConfig() @@ -634,7 +636,8 @@ public void testExplicitPropertyMappings() .setInlineProjectionsOnValues(true) .setEagerPlanValidationEnabled(true) .setEagerPlanValidationThreadPoolSize(2) - .setPrestoSparkExecutionEnvironment(true); + .setPrestoSparkExecutionEnvironment(true) + .setNativeExecutionScalePartitionedWritersThreadsEnabled(true); assertFullMapping(properties, expected); } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorNewTableLayout.java b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorNewTableLayout.java index ea4cbecefb5c..46fcf1eb665f 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorNewTableLayout.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/ConnectorNewTableLayout.java @@ -18,17 +18,25 @@ import java.util.List; import java.util.Objects; +import static com.facebook.presto.spi.PartitionedTableWritePolicy.SINGLE_WRITER_PER_PARTITION_REQUIRED; import static java.util.Objects.requireNonNull; public class ConnectorNewTableLayout { private final ConnectorPartitioningHandle partitioning; private final List partitionColumns; + private final PartitionedTableWritePolicy writerPolicy; public ConnectorNewTableLayout(ConnectorPartitioningHandle partitioning, List partitionColumns) + { + this(partitioning, partitionColumns, SINGLE_WRITER_PER_PARTITION_REQUIRED); + } + + public ConnectorNewTableLayout(ConnectorPartitioningHandle partitioning, List partitionColumns, PartitionedTableWritePolicy writerPolicy) { this.partitioning = requireNonNull(partitioning, "partitioning is null"); this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null"); + this.writerPolicy = requireNonNull(writerPolicy, "writerPolicy is null"); } public ConnectorPartitioningHandle getPartitioning() @@ -41,6 +49,11 @@ public List getPartitionColumns() return partitionColumns; } + public PartitionedTableWritePolicy getWriterPolicy() + { + return writerPolicy; + } + @Override public boolean equals(Object o) { @@ -52,12 +65,13 @@ public boolean equals(Object o) } ConnectorNewTableLayout that = (ConnectorNewTableLayout) o; return Objects.equals(partitioning, that.partitioning) && - Objects.equals(partitionColumns, that.partitionColumns); + Objects.equals(partitionColumns, that.partitionColumns) && + Objects.equals(writerPolicy, that.writerPolicy); } @Override public int hashCode() { - return Objects.hash(partitioning, partitionColumns); + return Objects.hash(partitioning, partitionColumns, writerPolicy); } } diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/NewTableLayout.java b/presto-spi/src/main/java/com/facebook/presto/spi/NewTableLayout.java index bd9d743fe6a4..202b25c9a492 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/NewTableLayout.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/NewTableLayout.java @@ -64,6 +64,11 @@ public List getPartitionColumns() return layout.getPartitionColumns(); } + public PartitionedTableWritePolicy getWriterPolicy() + { + return layout.getWriterPolicy(); + } + @Override public boolean equals(Object o) { diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/PartitionedTableWritePolicy.java b/presto-spi/src/main/java/com/facebook/presto/spi/PartitionedTableWritePolicy.java new file mode 100644 index 000000000000..9d49184ea81e --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/PartitionedTableWritePolicy.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi; + +public enum PartitionedTableWritePolicy +{ + SINGLE_WRITER_PER_PARTITION_REQUIRED, + MULTIPLE_WRITERS_PER_PARTITION_ALLOWED, +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PartitioningScheme.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PartitioningScheme.java index 244772fd50b8..bd6461fca2ef 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/plan/PartitioningScheme.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/PartitioningScheme.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.spi.plan; +import com.facebook.presto.spi.PartitionedTableWritePolicy; import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -39,6 +40,19 @@ public class PartitioningScheme private final boolean replicateNullsAndAny; private final ExchangeEncoding encoding; private final Optional bucketToPartition; + private final Optional partitionedTableWritePolicy; + + public PartitioningScheme(Partitioning partitioning, List outputLayout, PartitionedTableWritePolicy partitionedTableWritePolicy) + { + this( + partitioning, + outputLayout, + Optional.empty(), + false, + COLUMNAR, + Optional.empty(), + Optional.of(partitionedTableWritePolicy)); + } public PartitioningScheme(Partitioning partitioning, List outputLayout) { @@ -48,6 +62,7 @@ public PartitioningScheme(Partitioning partitioning, List hashColumn, @JsonProperty("replicateNullsAndAny") boolean replicateNullsAndAny, @JsonProperty("encoding") ExchangeEncoding encoding, - @JsonProperty("bucketToPartition") Optional bucketToPartition) + @JsonProperty("bucketToPartition") Optional bucketToPartition, + @JsonProperty("partitionedTableWritePolicy") Optional partitionedTableWritePolicy) { this.partitioning = requireNonNull(partitioning, "partitioning is null"); this.outputLayout = unmodifiableList(requireNonNull(outputLayout, "outputLayout is null")); @@ -87,6 +104,7 @@ public PartitioningScheme( this.replicateNullsAndAny = replicateNullsAndAny; this.encoding = requireNonNull(encoding, "encoding is null"); this.bucketToPartition = requireNonNull(bucketToPartition, "bucketToPartition is null"); + this.partitionedTableWritePolicy = requireNonNull(partitionedTableWritePolicy, "partitionedTableWritePolicy is null"); } @JsonProperty @@ -125,14 +143,34 @@ public Optional getBucketToPartition() return bucketToPartition; } + @JsonProperty + public Optional getPartitionedTableWritePolicy() + { + return partitionedTableWritePolicy; + } + public PartitioningScheme withBucketToPartition(Optional bucketToPartition) { - return new PartitioningScheme(partitioning, outputLayout, hashColumn, replicateNullsAndAny, encoding, bucketToPartition); + return new PartitioningScheme( + partitioning, + outputLayout, + hashColumn, + replicateNullsAndAny, + encoding, + bucketToPartition, + partitionedTableWritePolicy); } public PartitioningScheme withRowWiseEncoding() { - return new PartitioningScheme(partitioning, outputLayout, hashColumn, replicateNullsAndAny, ROW_WISE, bucketToPartition); + return new PartitioningScheme( + partitioning, + outputLayout, + hashColumn, + replicateNullsAndAny, + ROW_WISE, + bucketToPartition, + partitionedTableWritePolicy); } public boolean isSingleOrBroadcastOrArbitrary() diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/plan/TableWriterNode.java b/presto-spi/src/main/java/com/facebook/presto/spi/plan/TableWriterNode.java index 0f15ad358d8b..39675857df72 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/plan/TableWriterNode.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/plan/TableWriterNode.java @@ -35,6 +35,7 @@ import java.util.Set; import static com.facebook.presto.common.Utils.checkArgument; +import static com.facebook.presto.spi.PartitionedTableWritePolicy.MULTIPLE_WRITERS_PER_PARTITION_ALLOWED; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -269,6 +270,13 @@ public PlanNode assignStatsEquivalentPlanNode(Optional statsEquivalent taskCountIfScaledWriter, isTemporaryTableWriter); } + public boolean isSingleWriterPerPartitionRequired() + { + return tablePartitioningScheme.isPresent() + // if partitionedTableWritePolicy is not set for whatever reason consider it as SINGLE_WRITER_PER_PARTITION_REQUIRED for extra safety + && !tablePartitioningScheme.get().getPartitionedTableWritePolicy().equals(Optional.of(MULTIPLE_WRITERS_PER_PARTITION_ALLOWED)); + } + // only used during planning -- will not be serialized @SuppressWarnings({"EmptyClass", "ClassMayBeInterface"}) public abstract static class WriterTarget