Skip to content

Commit

Permalink
CNDB-11581: Stage metrics improvements
Browse files Browse the repository at this point in the history
With this change an external actor can now track task queue and execution times on stages.
The Stage encapsulation was improved. Executor-related particulars can no longer leak out.
  • Loading branch information
jakubzytka committed Nov 26, 2024
1 parent 76037f0 commit 880c49a
Show file tree
Hide file tree
Showing 17 changed files with 161 additions and 33 deletions.
89 changes: 82 additions & 7 deletions src/java/org/apache/cassandra/concurrent/Stage.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,26 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.ThreadPoolMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.utils.ExecutorUtils;
Expand Down Expand Up @@ -141,26 +146,27 @@ public static Stage fromPoolName(String stageName)
}

// Convenience functions to execute on this stage
public void execute(Runnable command) { executor().execute(command); }
public void execute(Runnable command, ExecutorLocals locals) { executor().execute(command, locals); }
public void maybeExecuteImmediately(Runnable command) { executor().maybeExecuteImmediately(command); }
public void execute(Runnable command) { executor().execute(withTimeMeasurement(command)); }

public void execute(Runnable command, ExecutorLocals locals) { executor().execute(withTimeMeasurement(command), locals); }
public void maybeExecuteImmediately(Runnable command) { executor().maybeExecuteImmediately(withTimeMeasurement(command)); }
public <T> CompletableFuture<T> submit(Callable<T> task) { return CompletableFuture.supplyAsync(() -> {
try
{
return task.call();
return withTimeMeasurement(task).call();
}
catch (Exception e)
{
throw Throwables.unchecked(e);
}
}, executor()); }
public CompletableFuture<Void> submit(Runnable task) { return CompletableFuture.runAsync(task, executor()); }
public CompletableFuture<Void> submit(Runnable task) { return CompletableFuture.runAsync(withTimeMeasurement(task), executor()); }
public <T> CompletableFuture<T> submit(Runnable task, T result) { return CompletableFuture.supplyAsync(() -> {
task.run();
withTimeMeasurement(task).run();
return result;
}, executor()); }

public LocalAwareExecutorService executor()
private LocalAwareExecutorService executor()
{
if (executor == null)
{
Expand Down Expand Up @@ -266,6 +272,42 @@ static LocalAwareExecutorService customExecutor(String jmxName, String jmxType,
return customStageExecutorFactory.init(jmxName, jmxType, numThreads, onSetMaximumPoolSize);
}

public int getPendingTaskCount()
{
return executor().getPendingTaskCount();
}

public int getActiveTaskCount()
{
return executor().getActiveTaskCount();
}

/**
* return additional, executor-related ThreadPoolMetrics for the executor
* @param metricsExtractor function to extract metrics from the executor
* @return ThreadPoolMetrics or null if the extractor was not able to extract metrics
*/
public @Nullable ThreadPoolMetrics getExecutorMetrics(Function<Executor, ThreadPoolMetrics> metricsExtractor)
{
return metricsExtractor.apply(executor());
}

public boolean isShutdown()
{
return executor().isShutdown();
}

public boolean runsInSingleThread(Thread thread)
{
return (executor() instanceof JMXEnabledSingleThreadExecutor) &&
((JMXEnabledSingleThreadExecutor) executor()).isExecutedBy(thread);
}

public boolean isTerminated()
{
return executor().isTerminated();
}

@FunctionalInterface
public interface ExecutorServiceInitialiser
{
Expand Down Expand Up @@ -337,4 +379,37 @@ public int getPendingTaskCount()
return getQueue().size();
}
}

private Runnable withTimeMeasurement(Runnable command)
{
long queueStartTime = System.nanoTime();
return () -> {
long executionStartTime = System.nanoTime();
try
{
TaskExecutionCallback.instance.onDequeue(this, executionStartTime - queueStartTime);
command.run();
}
finally
{
TaskExecutionCallback.instance.onCompleted(this, System.nanoTime() - executionStartTime);
}
};
}

private <T> Callable<T> withTimeMeasurement(Callable<T> command)
{
return () -> {
long startTime = System.nanoTime();
try
{
return command.call();
}
finally
{
TaskExecutionCallback.instance.onCompleted(this, System.nanoTime() - startTime);
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.concurrent;

import org.apache.cassandra.utils.FBUtilities;

import static org.apache.cassandra.config.CassandraRelevantProperties.CUSTOM_TASK_EXECUTION_CALLBACK_CLASS;

public interface TaskExecutionCallback
{
TaskExecutionCallback instance = CUSTOM_TASK_EXECUTION_CALLBACK_CLASS.isPresent() ?
FBUtilities.construct(CUSTOM_TASK_EXECUTION_CALLBACK_CLASS.getString(), "Task execution callback") :
new NoopTaskExecutionCallback();

void onCompleted(Stage stage, long executionDurationNanos);
void onDequeue(Stage stage, long enqueuedDurationNanos);

class NoopTaskExecutionCallback implements TaskExecutionCallback
{
@Override
public void onCompleted(Stage stage, long executionDurationNanos)
{
}

@Override
public void onDequeue(Stage stage, long enqueuedDurationNanos)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ public enum CassandraRelevantProperties
*/
CUSTOM_GUARDRAILS_FACTORY_PROPERTY("cassandra.custom_guardrails_factory_class"),

/**
* Which class to use when notifying about stage task execution
*/
CUSTOM_TASK_EXECUTION_CALLBACK_CLASS("cassandra.custom_task_execution_callback_class"),

/**
* Used to support directory creation for different file system and remote/local conversion
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.cassandra.cache.IRowCacheEntry;
import org.apache.cassandra.cache.RowCacheKey;
import org.apache.cassandra.cache.RowCacheSentinel;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.lifecycle.*;
Expand Down Expand Up @@ -574,7 +575,7 @@ public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, ReadExe
{
assert executionController != null && executionController.validForReadOn(cfs);
if (Tracing.traceSinglePartitions())
Tracing.trace("Executing single-partition query on {}", cfs.name);
Tracing.trace("Executing single-partition query on {}; stage READ pending: {}, active: {}", cfs.name, Stage.READ.getPendingTaskCount(), Stage.READ.getActiveTaskCount());

return queryMemtableAndDiskInternal(cfs, executionController, System.nanoTime());
}
Expand All @@ -586,7 +587,7 @@ public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs,
{
assert executionController != null && executionController.validForReadOn(cfs);
if (Tracing.traceSinglePartitions())
Tracing.trace("Executing single-partition query on {}", cfs.name);
Tracing.trace("Executing single-partition query on {}; stage READ pending: {}, active: {}", cfs.name, Stage.READ.getPendingTaskCount(), Stage.READ.getActiveTaskCount());

return queryMemtableAndDiskInternal(cfs, view, rowTransformer, executionController, System.nanoTime());
}
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/gms/Gossiper.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ private static long getVeryLongTime()

private static boolean isInGossipStage()
{
return ((JMXEnabledSingleThreadExecutor) Stage.GOSSIP.executor()).isExecutedBy(Thread.currentThread());
return Stage.GOSSIP.runsInSingleThread(Thread.currentThread());
}

private static void checkProperThreadForStateMutation()
Expand Down Expand Up @@ -1048,7 +1048,7 @@ void doStatusCheck()
long now = System.currentTimeMillis();
long nowNano = System.nanoTime();

long pending = ((JMXEnabledThreadPoolExecutor) Stage.GOSSIP.executor()).metrics.pendingTasks.getValue();
long pending = Stage.GOSSIP.getPendingTaskCount();
if (pending > 0 && lastProcessedMessageAt < now - 1000)
{
// if some new messages just arrived, give the executor some time to work on them
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/net/MessagingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ public void shutdown(long timeout, TimeUnit units, boolean shutdownGracefully, b
isShuttingDown = true;
logger.info("Waiting for messaging service to quiesce");
// We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
assert !MUTATION.executor().isShutdown();
assert !MUTATION.isShutdown();

if (shutdownGracefully)
{
Expand Down Expand Up @@ -567,7 +567,7 @@ public void shutdownAbrubtly()
isShuttingDown = true;
logger.info("Waiting for messaging service to quiesce");
// We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
assert !MUTATION.executor().isShutdown();
assert !MUTATION.isShutdown();

callbacks.shutdownNow(false);
inboundSockets.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class DefaultSchemaUpdateHandler implements SchemaUpdateHandler, IEndpoin
private MigrationCoordinator createMigrationCoordinator(MessagingService messagingService)
{
return new MigrationCoordinator(messagingService,
Stage.MIGRATION.executor(),
Stage.MIGRATION,
ScheduledExecutors.scheduledTasks,
MAX_OUTSTANDING_VERSION_REQUESTS,
Gossiper.instance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.RequestFailureReason;
Expand Down Expand Up @@ -223,7 +224,7 @@ public String toString()
private final Supplier<UUID> schemaVersion;
private final BiConsumer<InetAddressAndPort, Collection<Mutation>> schemaUpdateCallback;

final ExecutorService executor;
final Stage executor;

/**
* Creates but does not start migration coordinator instance.
Expand All @@ -232,7 +233,7 @@ public String toString()
* @param periodicCheckExecutor executor on which the periodic checks are scheduled
*/
MigrationCoordinator(MessagingService messagingService,
ExecutorService executor,
Stage executor,
ScheduledExecutorService periodicCheckExecutor,
int maxOutstandingVersionRequests,
Gossiper gossiper,
Expand Down Expand Up @@ -619,7 +620,7 @@ private CompletableFuture<Void> submitToMigrationIfNotShutdown(Runnable task)
}
else
{
return CompletableFuture.runAsync(task, executor);
return executor.submit(task);
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,7 @@ private StorageProxy()
{
EndpointsForToken selected = targets.contacts().withoutSelf();
Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
Stage.COUNTER_MUTATION.executor()
.execute(counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter));
Stage.COUNTER_MUTATION.execute(counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter));
};

ReadRepairMetrics.init();
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/tracing/TraceStateImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ protected void waitForPendingEvents()

void executeMutation(final Mutation mutation)
{
CompletableFuture<Void> fut = CompletableFuture.runAsync(new WrappedRunnable()
CompletableFuture<Void> fut = Stage.TRACING.submit(new WrappedRunnable()
{
protected void runMayThrow()
{
mutateWithCatch(clientState, mutation);
}
}, Stage.TRACING.executor());
});

boolean ret = pendingFutures.add(fut);
if (!ret)
Expand Down
4 changes: 2 additions & 2 deletions test/long/org/apache/cassandra/cql3/ViewLongTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,8 @@ private void updateView(String query, Object... params) throws Throwable
private void updateViewWithFlush(String query, boolean flush, Object... params) throws Throwable
{
executeNet(version, query, params);
while (!(((SEPExecutor) Stage.VIEW_MUTATION.executor()).getPendingTaskCount() == 0
&& ((SEPExecutor) Stage.VIEW_MUTATION.executor()).getActiveTaskCount() == 0))
while (!(Stage.VIEW_MUTATION.getPendingTaskCount() == 0
&& Stage.VIEW_MUTATION.getActiveTaskCount() == 0))
{
Thread.sleep(1);
}
Expand Down
2 changes: 1 addition & 1 deletion test/unit/org/apache/cassandra/cql3/CQLTester.java
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,7 @@ protected void waitForTracingEvents()
{
try
{
Stage.TRACING.executor().submit(() -> {}).get();
Stage.TRACING.submit(() -> {}).get();
}
catch (Throwable t)
{
Expand Down
4 changes: 2 additions & 2 deletions test/unit/org/apache/cassandra/cql3/ViewAbstractTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ protected static void waitForViewMutations()
{
Awaitility.await()
.atMost(5, TimeUnit.MINUTES)
.until(() -> Stage.VIEW_MUTATION.executor().getPendingTaskCount() == 0
&& Stage.VIEW_MUTATION.executor().getActiveTaskCount() == 0);
.until(() -> Stage.VIEW_MUTATION.getPendingTaskCount() == 0
&& Stage.VIEW_MUTATION.getActiveTaskCount() == 0);
}
}
4 changes: 2 additions & 2 deletions test/unit/org/apache/cassandra/cql3/ViewComplexTester.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ protected void updateView(String query, Object... params) throws Throwable
protected void updateViewWithFlush(String query, boolean flush, Object... params) throws Throwable
{
executeNet(version, query, params);
while (!(Stage.VIEW_MUTATION.executor().getPendingTaskCount() == 0
&& Stage.VIEW_MUTATION.executor().getActiveTaskCount() == 0))
while (!(Stage.VIEW_MUTATION.getPendingTaskCount() == 0
&& Stage.VIEW_MUTATION.getActiveTaskCount() == 0))
{
Thread.sleep(1);
}
Expand Down
4 changes: 2 additions & 2 deletions test/unit/org/apache/cassandra/cql3/ViewFilteringTester.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ protected void createView(String name, String query) throws Throwable
protected void updateView(String query, Object... params) throws Throwable
{
executeNet(version, query, params);
while (!(Stage.VIEW_MUTATION.executor().getPendingTaskCount() == 0
&& Stage.VIEW_MUTATION.executor().getActiveTaskCount() == 0))
while (!(Stage.VIEW_MUTATION.getPendingTaskCount() == 0
&& Stage.VIEW_MUTATION.getActiveTaskCount() == 0))
{
Thread.sleep(1);
}
Expand Down
4 changes: 2 additions & 2 deletions test/unit/org/apache/cassandra/cql3/ViewSchemaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ private void createView(String name, String query) throws Throwable
private void updateView(String query, Object... params) throws Throwable
{
executeNet(protocolVersion, query, params);
while (!(((SEPExecutor) Stage.VIEW_MUTATION.executor()).getPendingTaskCount() == 0
&& ((SEPExecutor) Stage.VIEW_MUTATION.executor()).getActiveTaskCount() == 0))
while (!(Stage.VIEW_MUTATION.getPendingTaskCount() == 0
&& Stage.VIEW_MUTATION.getActiveTaskCount() == 0))
{
Thread.sleep(1);
}
Expand Down
Loading

0 comments on commit 880c49a

Please sign in to comment.