From 0f56d59643187ddc4d1c86edfecc1eed6863051c Mon Sep 17 00:00:00 2001 From: maoling Date: Wed, 12 Jun 2024 23:14:00 +0800 Subject: [PATCH] Partially port CASSANDRA-13890 from apache/cassandra What was ported: - current compaction throughput measurement by CompactionManager - exposing current compaction throughput in StorageService and CompactionMetrics - nodetool getcompactionthroughput, including tests Not ported: - changes to `nodetool compactionstats`, because that would require porting also the tests which are currently missing in CC and porting those tests turned out to be a complex task without porting the other changes in the CompactionManager API - Code for getting / setting compaction throughput as double --- CHANGES.txt | 2 + .../db/compaction/CompactionManager.java | 14 +- .../db/compaction/CompactionTask.java | 3 +- .../cassandra/metrics/CompactionMetrics.java | 3 + .../cassandra/service/StorageService.java | 17 +++ .../service/StorageServiceMBean.java | 1 + .../org/apache/cassandra/tools/NodeProbe.java | 5 + .../nodetool/GetCompactionThroughput.java | 11 +- .../SetGetCompactionThroughputTest.java | 122 ++++++++++++++++++ 9 files changed, 170 insertions(+), 8 deletions(-) create mode 100644 test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java diff --git a/CHANGES.txt b/CHANGES.txt index abd15011268f..59036cd5c32d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,7 @@ Future version (tbd) * Require only MODIFY permission on base when updating table with MV (STAR-564) +Merged from 5.1: + * Expose current compaction throughput in nodetool (CASSANDRA-13890) Merged from 5.0: * Disable chronicle analytics (CASSANDRA-19656) * Remove mocking in InternalNodeProbe spying on StorageServiceMBean (CASSANDRA-18152) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 2adaea3eb168..0b2da3297d79 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -65,7 +65,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import com.codahale.metrics.Meter; import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; @@ -290,6 +290,11 @@ public void setRate(final double throughPutMbPerSec) compactionRateLimiter.setRate(throughput); } + public Meter getCompactionThroughput() + { + return metrics.bytesCompactedThroughput; + } + /** * Call this whenever a compaction might be needed on the given column family store. * It's okay to over-call (within reason) if a call is unnecessary, it will @@ -314,7 +319,7 @@ public CompletableFuture[] startCompactionTasks(ColumnFamilyStore cfs, Collec { return backgroundCompactionRunner.startCompactionTasks(cfs, tasks); } - + public int getOngoingBackgroundUpgradesCount() { return backgroundCompactionRunner.getOngoingUpgradesCount(); @@ -1373,7 +1378,7 @@ private void doCleanupOne(final ColumnFamilyStore cfs, } - static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio) + protected boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio) { if (DatabaseDescriptor.getCompactionThroughputMbPerSec() == 0) return false; @@ -1386,8 +1391,9 @@ static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScann return actuallyAcquire(limiter, lengthRead); } - private static boolean actuallyAcquire(RateLimiter limiter, long lengthRead) + private boolean actuallyAcquire(RateLimiter limiter, long lengthRead) { + metrics.bytesCompactedThroughput.mark(lengthRead); while (lengthRead >= Integer.MAX_VALUE) { limiter.acquire(Integer.MAX_VALUE); diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 9e07eeec5d19..ef7a51603cb8 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -63,7 +63,6 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.COMPACTION_HISTORY_ENABLED; import static org.apache.cassandra.config.CassandraRelevantProperties.CURSORS_ENABLED; -import static org.apache.cassandra.db.compaction.CompactionManager.compactionRateLimiterAcquire; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemoryPerSecond; @@ -658,7 +657,7 @@ void execute0() long bytesScanned = compactionIterator.getTotalBytesScanned(); // Rate limit the scanners, and account for compression - if (compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio)) + if (CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio)) lastBytesScanned = bytesScanned; maybeStopOrUpdateState(); diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java index c11476122076..45552e594fe2 100644 --- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java +++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java @@ -66,6 +66,8 @@ public class CompactionMetrics public final Counter totalCompactionsFailed; /** Total number of bytes processed by operations since server [re]start */ public final Counter bytesCompacted; + /** Recent/current throughput of compactions take */ + public final Meter bytesCompactedThroughput; /** * The compaction strategy information for each table. Cached, because its computation might be fairly expensive. @@ -190,6 +192,7 @@ public Long getValue() totalCompactionsCompleted = Metrics.meter(factory.createMetricName("TotalCompactionsCompleted")); totalCompactionsFailed = Metrics.counter(factory.createMetricName("FailedCompactions")); bytesCompacted = Metrics.counter(factory.createMetricName("BytesCompacted")); + bytesCompactedThroughput = Metrics.meter(factory.createMetricName("BytesCompactedThroughput")); // compaction failure metrics compactionsReduced = Metrics.counter(factory.createMetricName("CompactionsReduced")); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index ba57b3ad0db8..b92670198523 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -96,6 +96,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.codahale.metrics.Meter; import org.apache.cassandra.audit.AuditLogManager; import org.apache.cassandra.audit.AuditLogOptions; import org.apache.cassandra.auth.AuthKeyspace; @@ -233,6 +234,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACEMENT_ALLOW_EMPTY; import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName; import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily; +import static org.apache.cassandra.io.util.FileUtils.ONE_MB; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ; import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; @@ -1650,6 +1652,21 @@ public void setCompactionThroughputMbPerSec(int value) CompactionManager.instance.setRate(value); } + /** + * Get the Current Compaction Throughput + * key is 1/5/15minute time dimension for statistics + * value is the metric double string (unit is:mib/s) + */ + public Map getCurrentCompactionThroughputMebibytesPerSec() + { + HashMap result = new LinkedHashMap<>(); + Meter rate = CompactionManager.instance.getCompactionThroughput(); + result.put("1minute", String.format("%.3f", rate.getOneMinuteRate() / ONE_MB)); + result.put("5minute", String.format("%.3f", rate.getFiveMinuteRate() / ONE_MB)); + result.put("15minute", String.format("%.3f", rate.getFifteenMinuteRate() / ONE_MB)); + return result; + } + public int getBatchlogReplayThrottleInKB() { return DatabaseDescriptor.getBatchlogReplayThrottleInKB(); diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 96d512e6284e..8dded652c07a 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -631,6 +631,7 @@ public interface StorageServiceMBean extends NotificationEmitter public int getCompactionThroughputMbPerSec(); public void setCompactionThroughputMbPerSec(int value); + Map getCurrentCompactionThroughputMebibytesPerSec(); public int getBatchlogReplayThrottleInKB(); public void setBatchlogReplayThrottleInKB(int value); diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 75b6a2775205..098124c6e4aa 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -1138,6 +1138,11 @@ public int getCompactionThroughput() return ssProxy.getCompactionThroughputMbPerSec(); } + public Map getCurrentCompactionThroughputMiBPerSec() + { + return ssProxy.getCurrentCompactionThroughputMebibytesPerSec(); + } + public void setBatchlogReplayThrottle(int value) { ssProxy.setBatchlogReplayThrottleInKB(value); diff --git a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java index 839c78d3195b..8c17ace1882c 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java +++ b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java @@ -17,17 +17,24 @@ */ package org.apache.cassandra.tools.nodetool; +import java.util.Map; + import io.airlift.airline.Command; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; -@Command(name = "getcompactionthroughput", description = "Print the MB/s throughput cap for compaction in the system") +@Command(name = "getcompactionthroughput", description = "Print the MiB/s throughput cap for compaction in the system") public class GetCompactionThroughput extends NodeToolCmd { @Override public void execute(NodeProbe probe) { - probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MB/s"); + probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MiB/s"); + + Map currentCompactionThroughputMetricsMap = probe.getCurrentCompactionThroughputMiBPerSec(); + probe.output().out.println("Current compaction throughput (1 minute): " + currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s"); + probe.output().out.println("Current compaction throughput (5 minute): " + currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s"); + probe.output().out.println("Current compaction throughput (15 minute): " + currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s"); } } diff --git a/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java new file mode 100644 index 000000000000..76c75350121f --- /dev/null +++ b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.CQLTester; + +import static org.apache.cassandra.tools.ToolRunner.ToolResult; +import static org.apache.cassandra.tools.ToolRunner.invokeNodetool; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@code nodetool setcompactionthroughput} and {@code nodetool getcompactionthroughput}. + */ +public class SetGetCompactionThroughputTest extends CQLTester +{ + private static final int MAX_INT_CONFIG_VALUE_IN_MBIT = Integer.MAX_VALUE - 1; + + @BeforeClass + public static void setup() throws Exception + { + requireNetwork(); + startJMXServer(); + } + + @Test + public void testNull() + { + assertSetInvalidThroughput(null, "Required parameters are missing: compaction_throughput"); + } + + @Test + public void testPositive() + { + assertSetGetValidThroughput(7); + } + + @Test + public void testMaxValue() + { + assertSetGetValidThroughput(MAX_INT_CONFIG_VALUE_IN_MBIT); + } + + @Test + public void testZero() + { + assertSetGetValidThroughput(0); + } + + @Test + public void testUnparseable() + { + assertSetInvalidThroughput("1.2", "compaction_throughput: can not convert \"1.2\" to a Integer"); + assertSetInvalidThroughput("value", "compaction_throughput: can not convert \"value\" to a Integer"); + } + + @Test + public void testCurrentCompactionThroughput() + { + ToolResult tool = invokeNodetool("getcompactionthroughput"); + tool.assertOnCleanExit(); + + assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(1 minute\\): \\d+\\.\\d+ MiB/s"); + assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(5 minute\\): \\d+\\.\\d+ MiB/s"); + assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(15 minute\\): \\d+\\.\\d+ MiB/s"); + } + + private static void assertSetGetValidThroughput(int throughput) + { + ToolResult tool = invokeNodetool("setcompactionthroughput", String.valueOf(throughput)); + tool.assertOnCleanExit(); + assertThat(tool.getStdout()).isEmpty(); + + assertGetThroughput(throughput); + } + + private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage) + { + ToolResult tool = throughput == null ? invokeNodetool("setcompactionthroughput") + : invokeNodetool("setcompactionthroughput", throughput); + assertThat(tool.getExitCode()).isEqualTo(1); + assertThat(tool.getStdout()).contains(expectedErrorMessage); + } + + private static void assertSetInvalidThroughputMib(String throughput) + { + ToolResult tool = invokeNodetool("setcompactionthroughput", throughput); + assertThat(tool.getExitCode()).isEqualTo(1); + assertThat(tool.getStdout()).contains("compaction_throughput: 2147483647 is too large; it should be less than" + + " 2147483647 in MiB/s"); + } + + private static void assertGetThroughput(int expected) + { + ToolResult tool = invokeNodetool("getcompactionthroughput"); + tool.assertOnCleanExit(); + + if (expected > 0) + assertThat(tool.getStdout()).contains("Current compaction throughput: " + expected + " MiB/s"); + else + assertThat(tool.getStdout()).contains("Current compaction throughput: 0 MiB/s"); + } +}