From e8cc355cbc7f5462765d86f4c6903fe6a3302353 Mon Sep 17 00:00:00 2001 From: maoling Date: Wed, 12 Jun 2024 23:14:00 +0800 Subject: [PATCH] Partially port CASSANDRA-13890 Ported the compaction throughput metering, but not the changes in nodetool. --- .../db/compaction/CompactionManager.java | 14 ++++++++++---- .../cassandra/db/compaction/CompactionTask.java | 3 +-- .../cassandra/metrics/CompactionMetrics.java | 3 +++ .../cassandra/service/StorageService.java | 17 +++++++++++++++++ .../cassandra/service/StorageServiceMBean.java | 1 + 5 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 74f8982afad9..57dcc5d98b2c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -65,7 +65,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import com.codahale.metrics.Meter; import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; @@ -290,6 +290,11 @@ public void setRate(final double throughPutMbPerSec) compactionRateLimiter.setRate(throughput); } + public Meter getCompactionThroughput() + { + return metrics.bytesCompactedThroughput; + } + /** * Call this whenever a compaction might be needed on the given column family store. * It's okay to over-call (within reason) if a call is unnecessary, it will @@ -314,7 +319,7 @@ public CompletableFuture[] startCompactionTasks(ColumnFamilyStore cfs, Collec { return backgroundCompactionRunner.startCompactionTasks(cfs, tasks); } - + public int getOngoingBackgroundUpgradesCount() { return backgroundCompactionRunner.getOngoingUpgradesCount(); @@ -1340,7 +1345,7 @@ private void doCleanupOne(final ColumnFamilyStore cfs, } - static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio) + protected boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio) { if (DatabaseDescriptor.getCompactionThroughputMbPerSec() == 0) return false; @@ -1353,8 +1358,9 @@ static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScann return actuallyAcquire(limiter, lengthRead); } - private static boolean actuallyAcquire(RateLimiter limiter, long lengthRead) + private boolean actuallyAcquire(RateLimiter limiter, long lengthRead) { + metrics.bytesCompactedThroughput.mark(lengthRead); while (lengthRead >= Integer.MAX_VALUE) { limiter.acquire(Integer.MAX_VALUE); diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index a70f940697e3..68f13ceb985e 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -60,7 +60,6 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.COMPACTION_HISTORY_ENABLED; import static org.apache.cassandra.config.CassandraRelevantProperties.CURSORS_ENABLED; -import static org.apache.cassandra.db.compaction.CompactionManager.compactionRateLimiterAcquire; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemoryPerSecond; @@ -602,7 +601,7 @@ void execute0() long bytesScanned = compactionIterator.getTotalBytesScanned(); // Rate limit the scanners, and account for compression - if (compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio)) + if (CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio)) lastBytesScanned = bytesScanned; maybeStopOrUpdateState(); diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java index c11476122076..45552e594fe2 100644 --- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java @@ -66,6 +66,8 @@ public class CompactionMetrics public final Counter totalCompactionsFailed; /** Total number of bytes processed by operations since server [re]start */ public final Counter bytesCompacted; + /** Recent/current throughput of compactions take */ + public final Meter bytesCompactedThroughput; /** * The compaction strategy information for each table. Cached, because its computation might be fairly expensive. @@ -190,6 +192,7 @@ public Long getValue() totalCompactionsCompleted = Metrics.meter(factory.createMetricName("TotalCompactionsCompleted")); totalCompactionsFailed = Metrics.counter(factory.createMetricName("FailedCompactions")); bytesCompacted = Metrics.counter(factory.createMetricName("BytesCompacted")); + bytesCompactedThroughput = Metrics.meter(factory.createMetricName("BytesCompactedThroughput")); // compaction failure metrics compactionsReduced = Metrics.counter(factory.createMetricName("CompactionsReduced")); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 7698eb4a4f48..bfba9b864621 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -96,6 +96,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.Meter; import org.apache.cassandra.audit.AuditLogManager; import org.apache.cassandra.audit.AuditLogOptions; import org.apache.cassandra.auth.AuthKeyspace; @@ -233,6 +234,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACEMENT_ALLOW_EMPTY; import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName; import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily; +import static org.apache.cassandra.io.util.FileUtils.ONE_MB; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ; import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; @@ -1650,6 +1652,21 @@ public void setCompactionThroughputMbPerSec(int value) CompactionManager.instance.setRate(value); } + /** + * Get the Current Compaction Throughput + * key is 1/5/15minute time dimension for statistics + * value is the metric double string (unit is:mib/s) + */ + public Map getCurrentCompactionThroughputMebibytesPerSec() + { + HashMap result = new LinkedHashMap<>(); + Meter rate = CompactionManager.instance.getCompactionThroughput(); + result.put("1minute", String.format("%.3f", rate.getOneMinuteRate() / ONE_MB)); + result.put("5minute", String.format("%.3f", rate.getFiveMinuteRate() / ONE_MB)); + result.put("15minute", String.format("%.3f", rate.getFifteenMinuteRate() / ONE_MB)); + return result; + } + public int getBatchlogReplayThrottleInKB() { return DatabaseDescriptor.getBatchlogReplayThrottleInKB(); diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 2b49cb273a90..87565f35dfc8 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -626,6 +626,7 @@ public interface StorageServiceMBean extends NotificationEmitter public int getCompactionThroughputMbPerSec(); public void setCompactionThroughputMbPerSec(int value); + Map getCurrentCompactionThroughputMebibytesPerSec(); public int getBatchlogReplayThrottleInKB(); public void setBatchlogReplayThrottleInKB(int value);