diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java b/src/java/org/apache/cassandra/concurrent/Stage.java index cd2ef6934528..d2b964c93796 100644 --- a/src/java/org/apache/cassandra/concurrent/Stage.java +++ b/src/java/org/apache/cassandra/concurrent/Stage.java @@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; @@ -32,14 +33,18 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import java.util.function.IntSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.annotation.Nullable; + import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.metrics.ThreadPoolMetrics; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; import org.apache.cassandra.utils.ExecutorUtils; @@ -141,26 +146,27 @@ public static Stage fromPoolName(String stageName) } // Convenience functions to execute on this stage - public void execute(Runnable command) { executor().execute(command); } - public void execute(Runnable command, ExecutorLocals locals) { executor().execute(command, locals); } - public void maybeExecuteImmediately(Runnable command) { executor().maybeExecuteImmediately(command); } + public void execute(Runnable command) { executor().execute(withTimeMeasurement(command)); } + + public void execute(Runnable command, ExecutorLocals locals) { executor().execute(withTimeMeasurement(command), locals); } + public void maybeExecuteImmediately(Runnable command) { executor().maybeExecuteImmediately(withTimeMeasurement(command)); } public CompletableFuture submit(Callable task) { return CompletableFuture.supplyAsync(() -> { try { - return task.call(); + return withTimeMeasurement(task).call(); } catch (Exception e) { throw Throwables.unchecked(e); } }, executor()); } - public CompletableFuture submit(Runnable task) { return CompletableFuture.runAsync(task, executor()); } + public CompletableFuture submit(Runnable task) { return CompletableFuture.runAsync(withTimeMeasurement(task), executor()); } public CompletableFuture submit(Runnable task, T result) { return CompletableFuture.supplyAsync(() -> { - task.run(); + withTimeMeasurement(task).run(); return result; }, executor()); } - public LocalAwareExecutorService executor() + private LocalAwareExecutorService executor() { if (executor == null) { @@ -266,6 +272,42 @@ static LocalAwareExecutorService customExecutor(String jmxName, String jmxType, return customStageExecutorFactory.init(jmxName, jmxType, numThreads, onSetMaximumPoolSize); } + public int getPendingTaskCount() + { + return executor().getPendingTaskCount(); + } + + public int getActiveTaskCount() + { + return executor().getActiveTaskCount(); + } + + /** + * return additional, executor-related ThreadPoolMetrics for the executor + * @param metricsExtractor function to extract metrics from the executor + * @return ThreadPoolMetrics or null if the extractor was not able to extract metrics + */ + public @Nullable ThreadPoolMetrics getExecutorMetrics(Function metricsExtractor) + { + return metricsExtractor.apply(executor()); + } + + public boolean isShutdown() + { + return executor().isShutdown(); + } + + public boolean runsInSingleThread(Thread thread) + { + return (executor() instanceof JMXEnabledSingleThreadExecutor) && + ((JMXEnabledSingleThreadExecutor) executor()).isExecutedBy(thread); + } + + public boolean isTerminated() + { + return executor().isTerminated(); + } + @FunctionalInterface public interface ExecutorServiceInitialiser { @@ -337,4 +379,37 @@ public int getPendingTaskCount() return getQueue().size(); } } + + private Runnable withTimeMeasurement(Runnable command) + { + long queueStartTime = System.nanoTime(); + return () -> { + long executionStartTime = System.nanoTime(); + try + { + TaskExecutionCallback.instance.onDequeue(this, executionStartTime - queueStartTime); + command.run(); + } + finally + { + TaskExecutionCallback.instance.onCompleted(this, System.nanoTime() - executionStartTime); + } + }; + } + + private Callable withTimeMeasurement(Callable command) + { + return () -> { + long startTime = System.nanoTime(); + try + { + return command.call(); + } + finally + { + TaskExecutionCallback.instance.onCompleted(this, System.nanoTime() - startTime); + } + }; + } + } diff --git a/src/java/org/apache/cassandra/concurrent/TaskExecutionCallback.java b/src/java/org/apache/cassandra/concurrent/TaskExecutionCallback.java new file mode 100644 index 000000000000..6317987f1615 --- /dev/null +++ b/src/java/org/apache/cassandra/concurrent/TaskExecutionCallback.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.concurrent; + +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_TASK_EXECUTION_CALLBACK_CLASS; + +public interface TaskExecutionCallback +{ + TaskExecutionCallback instance = CUSTOM_TASK_EXECUTION_CALLBACK_CLASS.isPresent() ? + FBUtilities.construct(CUSTOM_TASK_EXECUTION_CALLBACK_CLASS.getString(), "Task execution callback") : + new NoopTaskExecutionCallback(); + + void onCompleted(Stage stage, long executionDurationNanos); + void onDequeue(Stage stage, long enqueuedDurationNanos); + + class NoopTaskExecutionCallback implements TaskExecutionCallback + { + @Override + public void onCompleted(Stage stage, long executionDurationNanos) + { + } + + @Override + public void onDequeue(Stage stage, long enqueuedDurationNanos) + { + } + } +} diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 302306c28037..c5792af7b728 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -333,6 +333,11 @@ public enum CassandraRelevantProperties */ CUSTOM_GUARDRAILS_FACTORY_PROPERTY("cassandra.custom_guardrails_factory_class"), + /** + * Which class to use when notifying about stage task execution + */ + CUSTOM_TASK_EXECUTION_CALLBACK_CLASS("cassandra.custom_task_execution_callback_class"), + /** * Used to support directory creation for different file system and remote/local conversion */ diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index b576907c42e8..4b6663f7a294 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -29,6 +29,7 @@ import org.apache.cassandra.cache.IRowCacheEntry; import org.apache.cassandra.cache.RowCacheKey; import org.apache.cassandra.cache.RowCacheSentinel; +import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.lifecycle.*; @@ -574,7 +575,7 @@ public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, ReadExe { assert executionController != null && executionController.validForReadOn(cfs); if (Tracing.traceSinglePartitions()) - Tracing.trace("Executing single-partition query on {}", cfs.name); + Tracing.trace("Executing single-partition query on {}; stage READ pending: {}, active: {}", cfs.name, Stage.READ.getPendingTaskCount(), Stage.READ.getActiveTaskCount()); return queryMemtableAndDiskInternal(cfs, executionController, System.nanoTime()); } @@ -586,7 +587,7 @@ public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, { assert executionController != null && executionController.validForReadOn(cfs); if (Tracing.traceSinglePartitions()) - Tracing.trace("Executing single-partition query on {}", cfs.name); + Tracing.trace("Executing single-partition query on {}; stage READ pending: {}, active: {}", cfs.name, Stage.READ.getPendingTaskCount(), Stage.READ.getActiveTaskCount()); return queryMemtableAndDiskInternal(cfs, view, rowTransformer, executionController, System.nanoTime()); } diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 894191c9ad68..d7e39f60cf27 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -281,7 +281,7 @@ private static long getVeryLongTime() private static boolean isInGossipStage() { - return ((JMXEnabledSingleThreadExecutor) Stage.GOSSIP.executor()).isExecutedBy(Thread.currentThread()); + return Stage.GOSSIP.runsInSingleThread(Thread.currentThread()); } private static void checkProperThreadForStateMutation() @@ -1048,7 +1048,7 @@ void doStatusCheck() long now = System.currentTimeMillis(); long nowNano = System.nanoTime(); - long pending = ((JMXEnabledThreadPoolExecutor) Stage.GOSSIP.executor()).metrics.pendingTasks.getValue(); + long pending = Stage.GOSSIP.getPendingTaskCount(); if (pending > 0 && lastProcessedMessageAt < now - 1000) { // if some new messages just arrived, give the executor some time to work on them diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 79b0f45e78f6..dfdb918713ec 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -499,7 +499,7 @@ public void shutdown(long timeout, TimeUnit units, boolean shutdownGracefully, b isShuttingDown = true; logger.info("Waiting for messaging service to quiesce"); // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first - assert !MUTATION.executor().isShutdown(); + assert !MUTATION.isShutdown(); if (shutdownGracefully) { @@ -567,7 +567,7 @@ public void shutdownAbrubtly() isShuttingDown = true; logger.info("Waiting for messaging service to quiesce"); // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first - assert !MUTATION.executor().isShutdown(); + assert !MUTATION.isShutdown(); callbacks.shutdownNow(false); inboundSockets.close(); diff --git a/src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java b/src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java index 22dddb8cb35a..b41946065d50 100644 --- a/src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java +++ b/src/java/org/apache/cassandra/schema/DefaultSchemaUpdateHandler.java @@ -67,7 +67,7 @@ public class DefaultSchemaUpdateHandler implements SchemaUpdateHandler, IEndpoin private MigrationCoordinator createMigrationCoordinator(MessagingService messagingService) { return new MigrationCoordinator(messagingService, - Stage.MIGRATION.executor(), + Stage.MIGRATION, ScheduledExecutors.scheduledTasks, MAX_OUTSTANDING_VERSION_REQUESTS, Gossiper.instance, diff --git a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java index a685c76c0a9b..6f8283bdd016 100644 --- a/src/java/org/apache/cassandra/schema/MigrationCoordinator.java +++ b/src/java/org/apache/cassandra/schema/MigrationCoordinator.java @@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.exceptions.RequestFailureReason; @@ -223,7 +224,7 @@ public String toString() private final Supplier schemaVersion; private final BiConsumer> schemaUpdateCallback; - final ExecutorService executor; + final Stage executor; /** * Creates but does not start migration coordinator instance. @@ -232,7 +233,7 @@ public String toString() * @param periodicCheckExecutor executor on which the periodic checks are scheduled */ MigrationCoordinator(MessagingService messagingService, - ExecutorService executor, + Stage executor, ScheduledExecutorService periodicCheckExecutor, int maxOutstandingVersionRequests, Gossiper gossiper, @@ -619,7 +620,7 @@ private CompletableFuture submitToMigrationIfNotShutdown(Runnable task) } else { - return CompletableFuture.runAsync(task, executor); + return executor.submit(task); } } diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 05c3e6dea455..74e651ef17f1 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -371,8 +371,7 @@ private StorageProxy() { EndpointsForToken selected = targets.contacts().withoutSelf(); Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548 - Stage.COUNTER_MUTATION.executor() - .execute(counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter)); + Stage.COUNTER_MUTATION.execute(counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter)); }; ReadRepairMetrics.init(); diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java index e204c944b764..29534a01acb1 100644 --- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java +++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java @@ -104,13 +104,13 @@ protected void waitForPendingEvents() void executeMutation(final Mutation mutation) { - CompletableFuture fut = CompletableFuture.runAsync(new WrappedRunnable() + CompletableFuture fut = Stage.TRACING.submit(new WrappedRunnable() { protected void runMayThrow() { mutateWithCatch(clientState, mutation); } - }, Stage.TRACING.executor()); + }); boolean ret = pendingFutures.add(fut); if (!ret) diff --git a/test/long/org/apache/cassandra/cql3/ViewLongTest.java b/test/long/org/apache/cassandra/cql3/ViewLongTest.java index 8d6680dcd0e9..5ae76e3a3bd9 100644 --- a/test/long/org/apache/cassandra/cql3/ViewLongTest.java +++ b/test/long/org/apache/cassandra/cql3/ViewLongTest.java @@ -434,8 +434,8 @@ private void updateView(String query, Object... params) throws Throwable private void updateViewWithFlush(String query, boolean flush, Object... params) throws Throwable { executeNet(version, query, params); - while (!(((SEPExecutor) Stage.VIEW_MUTATION.executor()).getPendingTaskCount() == 0 - && ((SEPExecutor) Stage.VIEW_MUTATION.executor()).getActiveTaskCount() == 0)) + while (!(Stage.VIEW_MUTATION.getPendingTaskCount() == 0 + && Stage.VIEW_MUTATION.getActiveTaskCount() == 0)) { Thread.sleep(1); } diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 178517b3e252..5a6ec34b02ef 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -1295,7 +1295,7 @@ protected void waitForTracingEvents() { try { - Stage.TRACING.executor().submit(() -> {}).get(); + Stage.TRACING.submit(() -> {}).get(); } catch (Throwable t) { diff --git a/test/unit/org/apache/cassandra/cql3/ViewAbstractTest.java b/test/unit/org/apache/cassandra/cql3/ViewAbstractTest.java index bbd21dced906..905add5d32ba 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewAbstractTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewAbstractTest.java @@ -102,7 +102,7 @@ protected static void waitForViewMutations() { Awaitility.await() .atMost(5, TimeUnit.MINUTES) - .until(() -> Stage.VIEW_MUTATION.executor().getPendingTaskCount() == 0 - && Stage.VIEW_MUTATION.executor().getActiveTaskCount() == 0); + .until(() -> Stage.VIEW_MUTATION.getPendingTaskCount() == 0 + && Stage.VIEW_MUTATION.getActiveTaskCount() == 0); } } diff --git a/test/unit/org/apache/cassandra/cql3/ViewComplexTester.java b/test/unit/org/apache/cassandra/cql3/ViewComplexTester.java index cffe04da30ed..60e867e8949f 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewComplexTester.java +++ b/test/unit/org/apache/cassandra/cql3/ViewComplexTester.java @@ -123,8 +123,8 @@ protected void updateView(String query, Object... params) throws Throwable protected void updateViewWithFlush(String query, boolean flush, Object... params) throws Throwable { executeNet(version, query, params); - while (!(Stage.VIEW_MUTATION.executor().getPendingTaskCount() == 0 - && Stage.VIEW_MUTATION.executor().getActiveTaskCount() == 0)) + while (!(Stage.VIEW_MUTATION.getPendingTaskCount() == 0 + && Stage.VIEW_MUTATION.getActiveTaskCount() == 0)) { Thread.sleep(1); } diff --git a/test/unit/org/apache/cassandra/cql3/ViewFilteringTester.java b/test/unit/org/apache/cassandra/cql3/ViewFilteringTester.java index d1f89d4c76a6..4003db9cdbee 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewFilteringTester.java +++ b/test/unit/org/apache/cassandra/cql3/ViewFilteringTester.java @@ -107,8 +107,8 @@ protected void createView(String name, String query) throws Throwable protected void updateView(String query, Object... params) throws Throwable { executeNet(version, query, params); - while (!(Stage.VIEW_MUTATION.executor().getPendingTaskCount() == 0 - && Stage.VIEW_MUTATION.executor().getActiveTaskCount() == 0)) + while (!(Stage.VIEW_MUTATION.getPendingTaskCount() == 0 + && Stage.VIEW_MUTATION.getActiveTaskCount() == 0)) { Thread.sleep(1); } diff --git a/test/unit/org/apache/cassandra/cql3/ViewSchemaTest.java b/test/unit/org/apache/cassandra/cql3/ViewSchemaTest.java index f6c249b4530c..ddc75a7ebf56 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewSchemaTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewSchemaTest.java @@ -98,8 +98,8 @@ private void createView(String name, String query) throws Throwable private void updateView(String query, Object... params) throws Throwable { executeNet(protocolVersion, query, params); - while (!(((SEPExecutor) Stage.VIEW_MUTATION.executor()).getPendingTaskCount() == 0 - && ((SEPExecutor) Stage.VIEW_MUTATION.executor()).getActiveTaskCount() == 0)) + while (!(Stage.VIEW_MUTATION.getPendingTaskCount() == 0 + && Stage.VIEW_MUTATION.getActiveTaskCount() == 0)) { Thread.sleep(1); } diff --git a/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java b/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java index 3ef216950fa2..f950c1a75070 100644 --- a/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java +++ b/test/unit/org/apache/cassandra/schema/MigrationCoordinatorTest.java @@ -40,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Mutation; @@ -138,7 +139,7 @@ private Wrapper(int maxOutstandingRequests) when(versions.knows(any())).thenReturn(true); when(versions.getRaw(any())).thenReturn(MessagingService.current_version); this.coordinator = new MigrationCoordinator(messagingService, - MoreExecutors.newDirectExecutorService(), + Stage.IMMEDIATE, oneTimeExecutor, maxOutstandingRequests, gossiper,