From f7982dfb71b204b49147ff88c80578817eab1cd5 Mon Sep 17 00:00:00 2001 From: maoling Date: Wed, 12 Jun 2024 23:14:00 +0800 Subject: [PATCH] Port CASSANDRA-13890 from apache/cassandra --- CHANGES.txt | 2 + .../db/compaction/CompactionManager.java | 14 +- .../db/compaction/CompactionTask.java | 3 +- .../cassandra/metrics/CompactionMetrics.java | 3 + .../cassandra/service/StorageService.java | 17 +++ .../service/StorageServiceMBean.java | 1 + .../org/apache/cassandra/tools/NodeProbe.java | 5 + .../tools/nodetool/CompactionStats.java | 96 +++++++++----- .../nodetool/GetCompactionThroughput.java | 11 +- .../SetGetCompactionThroughputTest.java | 122 ++++++++++++++++++ 10 files changed, 236 insertions(+), 38 deletions(-) create mode 100644 test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java diff --git a/CHANGES.txt b/CHANGES.txt index abd15011268f..59036cd5c32d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,7 @@ Future version (tbd) * Require only MODIFY permission on base when updating table with MV (STAR-564) +Merged from 5.1: + * Expose current compaction throughput in nodetool (CASSANDRA-13890) Merged from 5.0: * Disable chronicle analytics (CASSANDRA-19656) * Remove mocking in InternalNodeProbe spying on StorageServiceMBean (CASSANDRA-18152) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 74f8982afad9..57dcc5d98b2c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -65,7 +65,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import com.codahale.metrics.Meter; import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; @@ -290,6 +290,11 @@ public void setRate(final double throughPutMbPerSec) compactionRateLimiter.setRate(throughput); } + public Meter getCompactionThroughput() + { + return metrics.bytesCompactedThroughput; + } + /** * Call this whenever a compaction might be needed on the given column family store. * It's okay to over-call (within reason) if a call is unnecessary, it will @@ -314,7 +319,7 @@ public CompletableFuture[] startCompactionTasks(ColumnFamilyStore cfs, Collec { return backgroundCompactionRunner.startCompactionTasks(cfs, tasks); } - + public int getOngoingBackgroundUpgradesCount() { return backgroundCompactionRunner.getOngoingUpgradesCount(); @@ -1340,7 +1345,7 @@ private void doCleanupOne(final ColumnFamilyStore cfs, } - static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio) + protected boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio) { if (DatabaseDescriptor.getCompactionThroughputMbPerSec() == 0) return false; @@ -1353,8 +1358,9 @@ static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScann return actuallyAcquire(limiter, lengthRead); } - private static boolean actuallyAcquire(RateLimiter limiter, long lengthRead) + private boolean actuallyAcquire(RateLimiter limiter, long lengthRead) { + metrics.bytesCompactedThroughput.mark(lengthRead); while (lengthRead >= Integer.MAX_VALUE) { limiter.acquire(Integer.MAX_VALUE); diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index a70f940697e3..68f13ceb985e 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -60,7 +60,6 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.COMPACTION_HISTORY_ENABLED; import static org.apache.cassandra.config.CassandraRelevantProperties.CURSORS_ENABLED; -import static org.apache.cassandra.db.compaction.CompactionManager.compactionRateLimiterAcquire; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemoryPerSecond; @@ -602,7 +601,7 @@ void execute0() long bytesScanned = compactionIterator.getTotalBytesScanned(); // Rate limit the scanners, and account for compression - if (compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio)) + if (CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio)) lastBytesScanned = bytesScanned; maybeStopOrUpdateState(); diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java index c11476122076..45552e594fe2 100644 --- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java @@ -66,6 +66,8 @@ public class CompactionMetrics public final Counter totalCompactionsFailed; /** Total number of bytes processed by operations since server [re]start */ public final Counter bytesCompacted; + /** Recent/current throughput of compactions take */ + public final Meter bytesCompactedThroughput; /** * The compaction strategy information for each table. Cached, because its computation might be fairly expensive. @@ -190,6 +192,7 @@ public Long getValue() totalCompactionsCompleted = Metrics.meter(factory.createMetricName("TotalCompactionsCompleted")); totalCompactionsFailed = Metrics.counter(factory.createMetricName("FailedCompactions")); bytesCompacted = Metrics.counter(factory.createMetricName("BytesCompacted")); + bytesCompactedThroughput = Metrics.meter(factory.createMetricName("BytesCompactedThroughput")); // compaction failure metrics compactionsReduced = Metrics.counter(factory.createMetricName("CompactionsReduced")); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 7698eb4a4f48..bfba9b864621 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -96,6 +96,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.Meter; import org.apache.cassandra.audit.AuditLogManager; import org.apache.cassandra.audit.AuditLogOptions; import org.apache.cassandra.auth.AuthKeyspace; @@ -233,6 +234,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACEMENT_ALLOW_EMPTY; import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName; import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily; +import static org.apache.cassandra.io.util.FileUtils.ONE_MB; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ; import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; @@ -1650,6 +1652,21 @@ public void setCompactionThroughputMbPerSec(int value) CompactionManager.instance.setRate(value); } + /** + * Get the Current Compaction Throughput + * key is 1/5/15minute time dimension for statistics + * value is the metric double string (unit is:mib/s) + */ + public Map getCurrentCompactionThroughputMebibytesPerSec() + { + HashMap result = new LinkedHashMap<>(); + Meter rate = CompactionManager.instance.getCompactionThroughput(); + result.put("1minute", String.format("%.3f", rate.getOneMinuteRate() / ONE_MB)); + result.put("5minute", String.format("%.3f", rate.getFiveMinuteRate() / ONE_MB)); + result.put("15minute", String.format("%.3f", rate.getFifteenMinuteRate() / ONE_MB)); + return result; + } + public int getBatchlogReplayThrottleInKB() { return DatabaseDescriptor.getBatchlogReplayThrottleInKB(); diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 2b49cb273a90..87565f35dfc8 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -626,6 +626,7 @@ public interface StorageServiceMBean extends NotificationEmitter public int getCompactionThroughputMbPerSec(); public void setCompactionThroughputMbPerSec(int value); + Map getCurrentCompactionThroughputMebibytesPerSec(); public int getBatchlogReplayThrottleInKB(); public void setBatchlogReplayThrottleInKB(int value); diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 0c44023cdf52..12bc819f7c2d 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1133,6 +1133,11 @@ public int getCompactionThroughput() return ssProxy.getCompactionThroughputMbPerSec(); } + public Map getCurrentCompactionThroughputMiBPerSec() + { + return ssProxy.getCurrentCompactionThroughputMebibytesPerSec(); + } + public void setBatchlogReplayThrottle(int value) { ssProxy.setBatchlogReplayThrottleInKB(value); diff --git a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java index 659fc27d5afa..ddd7b1d3e85f 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java +++ b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java @@ -19,6 +19,7 @@ import java.io.PrintStream; import java.text.DecimalFormat; +import java.text.NumberFormat; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -30,6 +31,7 @@ import org.apache.cassandra.db.compaction.CompactionStrategyStatistics; import org.apache.cassandra.db.compaction.TableOperation; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; @@ -57,7 +59,7 @@ public void execute(NodeProbe probe) Map> pendingTaskNumberByTable = (Map>) probe.getCompactionMetric("PendingTasksByTableName"); Map> writeAmplificationByTableName = - (Map>) probe.getCompactionMetric("WriteAmplificationByTableName"); + (Map>) probe.getCompactionMetric("WriteAmplificationByTableName"); int numTotalPendingTask = 0; double totWriteAmplification = 0; for (Entry> ksEntry : pendingTaskNumberByTable.entrySet()) @@ -88,48 +90,82 @@ public void execute(NodeProbe probe) } } out.println(); - reportCompactionTable(cm.getCompactions(), probe.getCompactionThroughput(), humanReadable, out); + TableBuilder tableBuilder = new TableBuilder(); + compactionsStats(probe, tableBuilder); + reportCompactionTable(cm.getCompactions(), probe.getCompactionThroughput(), humanReadable, out, tableBuilder); if (aggregate) { reportAggregateCompactions(probe); } } + private void compactionsStats(NodeProbe probe, TableBuilder tableBuilder) + { + CassandraMetricsRegistry.JmxMeterMBean totalCompactionsCompletedMetrics = + (CassandraMetricsRegistry.JmxMeterMBean) probe.getCompactionMetric("TotalCompactionsCompleted"); + tableBuilder.add("compactions completed", String.valueOf(totalCompactionsCompletedMetrics.getCount())); + + CassandraMetricsRegistry.JmxCounterMBean bytesCompacted = (CassandraMetricsRegistry.JmxCounterMBean) probe.getCompactionMetric("BytesCompacted"); + if (humanReadable) + tableBuilder.add("data compacted", FileUtils.stringifyFileSize(Double.parseDouble(Long.toString(bytesCompacted.getCount())))); + else + tableBuilder.add("data compacted", Long.toString(bytesCompacted.getCount())); + + NumberFormat formatter = new DecimalFormat("0.00"); + + tableBuilder.add("15 minute rate", String.format("%s/minute", formatter.format(totalCompactionsCompletedMetrics.getFifteenMinuteRate() * 60))); + tableBuilder.add("mean rate", String.format("%s/hour", formatter.format(totalCompactionsCompletedMetrics.getMeanRate() * 60 * 60))); + + double configured = probe.getStorageService().getCompactionThroughputMbPerSec(); + tableBuilder.add("compaction throughput (MiB/s)", configured == 0 ? "throttling disabled (0)" : Double.toString(configured)); + Map currentCompactionThroughputMetricsMap = probe.getCurrentCompactionThroughputMiBPerSec(); + tableBuilder.add("current compaction throughput (1 minute)", currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s"); + tableBuilder.add("current compaction throughput (5 minute)", currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s"); + tableBuilder.add("current compaction throughput (15 minute)", currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s"); + } + public static void reportCompactionTable(List> compactions, int compactionThroughput, boolean humanReadable, PrintStream out) { - if (!compactions.isEmpty()) - { - long remainingBytes = 0; - TableBuilder table = new TableBuilder(); + reportCompactionTable(compactions, compactionThroughput, humanReadable, out, new TableBuilder()); + } - table.add("id", "compaction type", "keyspace", "table", "completed", "total", "unit", "progress"); - for (Map c : compactions) - { - long total = Long.parseLong(c.get(TableOperation.Progress.TOTAL)); - long completed = Long.parseLong(c.get(TableOperation.Progress.COMPLETED)); - String taskType = c.get(TableOperation.Progress.OPERATION_TYPE); - String keyspace = c.get(TableOperation.Progress.KEYSPACE); - String columnFamily = c.get(TableOperation.Progress.COLUMNFAMILY); - String unit = c.get(TableOperation.Progress.UNIT); - boolean toFileSize = humanReadable && TableOperation.Unit.isFileSize(unit); - String completedStr = toFileSize ? FileUtils.stringifyFileSize(completed) : Long.toString(completed); - String totalStr = toFileSize ? FileUtils.stringifyFileSize(total) : Long.toString(total); - String percentComplete = total == 0 ? "n/a" : new DecimalFormat("0.00").format((double) completed / total * 100) + "%"; - String id = c.get(TableOperation.Progress.OPERATION_ID); - table.add(id, taskType, keyspace, columnFamily, completedStr, totalStr, unit, percentComplete); - remainingBytes += total - completed; - } + public static void reportCompactionTable(List> compactions, int compactionThroughput, boolean humanReadable, PrintStream out, TableBuilder table) + { + if (compactions.isEmpty()) + { table.printTo(out); + return; + } - String remainingTime = "n/a"; - if (compactionThroughput != 0) - { - long remainingTimeInSecs = remainingBytes / (1024L * 1024L * compactionThroughput); - remainingTime = format("%dh%02dm%02ds", remainingTimeInSecs / 3600, (remainingTimeInSecs % 3600) / 60, (remainingTimeInSecs % 60)); - } - out.printf("%25s%10s%n", "Active compaction remaining time : ", remainingTime); + long remainingBytes = 0; + + table.add("id", "compaction type", "keyspace", "table", "completed", "total", "unit", "progress"); + for (Map c : compactions) + { + long total = Long.parseLong(c.get(TableOperation.Progress.TOTAL)); + long completed = Long.parseLong(c.get(TableOperation.Progress.COMPLETED)); + String taskType = c.get(TableOperation.Progress.OPERATION_TYPE); + String keyspace = c.get(TableOperation.Progress.KEYSPACE); + String columnFamily = c.get(TableOperation.Progress.COLUMNFAMILY); + String unit = c.get(TableOperation.Progress.UNIT); + boolean toFileSize = humanReadable && TableOperation.Unit.isFileSize(unit); + String completedStr = toFileSize ? FileUtils.stringifyFileSize(completed) : Long.toString(completed); + String totalStr = toFileSize ? FileUtils.stringifyFileSize(total) : Long.toString(total); + String percentComplete = total == 0 ? "n/a" : new DecimalFormat("0.00").format((double) completed / total * 100) + "%"; + String id = c.get(TableOperation.Progress.OPERATION_ID); + table.add(id, taskType, keyspace, columnFamily, completedStr, totalStr, unit, percentComplete); + remainingBytes += total - completed; + } + table.printTo(out); + + String remainingTime = "n/a"; + if (compactionThroughput != 0) + { + long remainingTimeInSecs = remainingBytes / (1024L * 1024L * compactionThroughput); + remainingTime = format("%dh%02dm%02ds", remainingTimeInSecs / 3600, (remainingTimeInSecs % 3600) / 60, (remainingTimeInSecs % 60)); } + out.printf("%25s%10s%n", "Active compaction remaining time : ", remainingTime); } private static void reportAggregateCompactions(NodeProbe probe) diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java index 839c78d3195b..8c17ace1882c 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java +++ b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java @@ -17,17 +17,24 @@ */ package org.apache.cassandra.tools.nodetool; +import java.util.Map; + import io.airlift.airline.Command; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; -@Command(name = "getcompactionthroughput", description = "Print the MB/s throughput cap for compaction in the system") +@Command(name = "getcompactionthroughput", description = "Print the MiB/s throughput cap for compaction in the system") public class GetCompactionThroughput extends NodeToolCmd { @Override public void execute(NodeProbe probe) { - probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MB/s"); + probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MiB/s"); + + Map currentCompactionThroughputMetricsMap = probe.getCurrentCompactionThroughputMiBPerSec(); + probe.output().out.println("Current compaction throughput (1 minute): " + currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s"); + probe.output().out.println("Current compaction throughput (5 minute): " + currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s"); + probe.output().out.println("Current compaction throughput (15 minute): " + currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s"); } } diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java new file mode 100644 index 000000000000..76c75350121f --- /dev/null +++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; + +import static org.apache.cassandra.tools.ToolRunner.ToolResult; +import static org.apache.cassandra.tools.ToolRunner.invokeNodetool; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@code nodetool setcompactionthroughput} and {@code nodetool getcompactionthroughput}. + */ +public class SetGetCompactionThroughputTest extends CQLTester +{ + private static final int MAX_INT_CONFIG_VALUE_IN_MBIT = Integer.MAX_VALUE - 1; + + @BeforeClass + public static void setup() throws Exception + { + requireNetwork(); + startJMXServer(); + } + + @Test + public void testNull() + { + assertSetInvalidThroughput(null, "Required parameters are missing: compaction_throughput"); + } + + @Test + public void testPositive() + { + assertSetGetValidThroughput(7); + } + + @Test + public void testMaxValue() + { + assertSetGetValidThroughput(MAX_INT_CONFIG_VALUE_IN_MBIT); + } + + @Test + public void testZero() + { + assertSetGetValidThroughput(0); + } + + @Test + public void testUnparseable() + { + assertSetInvalidThroughput("1.2", "compaction_throughput: can not convert \"1.2\" to a Integer"); + assertSetInvalidThroughput("value", "compaction_throughput: can not convert \"value\" to a Integer"); + } + + @Test + public void testCurrentCompactionThroughput() + { + ToolResult tool = invokeNodetool("getcompactionthroughput"); + tool.assertOnCleanExit(); + + assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(1 minute\\): \\d+\\.\\d+ MiB/s"); + assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(5 minute\\): \\d+\\.\\d+ MiB/s"); + assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(15 minute\\): \\d+\\.\\d+ MiB/s"); + } + + private static void assertSetGetValidThroughput(int throughput) + { + ToolResult tool = invokeNodetool("setcompactionthroughput", String.valueOf(throughput)); + tool.assertOnCleanExit(); + assertThat(tool.getStdout()).isEmpty(); + + assertGetThroughput(throughput); + } + + private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage) + { + ToolResult tool = throughput == null ? invokeNodetool("setcompactionthroughput") + : invokeNodetool("setcompactionthroughput", throughput); + assertThat(tool.getExitCode()).isEqualTo(1); + assertThat(tool.getStdout()).contains(expectedErrorMessage); + } + + private static void assertSetInvalidThroughputMib(String throughput) + { + ToolResult tool = invokeNodetool("setcompactionthroughput", throughput); + assertThat(tool.getExitCode()).isEqualTo(1); + assertThat(tool.getStdout()).contains("compaction_throughput: 2147483647 is too large; it should be less than" + + " 2147483647 in MiB/s"); + } + + private static void assertGetThroughput(int expected) + { + ToolResult tool = invokeNodetool("getcompactionthroughput"); + tool.assertOnCleanExit(); + + if (expected > 0) + assertThat(tool.getStdout()).contains("Current compaction throughput: " + expected + " MiB/s"); + else + assertThat(tool.getStdout()).contains("Current compaction throughput: 0 MiB/s"); + } +}