Skip to content

Commit

Permalink
Partially port CASSANDRA-13890 from apache/cassandra
Browse files Browse the repository at this point in the history
What was ported:
- current compaction throughput measurement by CompactionManager
- exposing current compaction throughput in StorageService
  and CompactionMetrics
- nodetool getcompactionthroughput, including tests

Not ported:
- changes to `nodetool compactionstats`, because that would
  require porting also the tests which are currently missing in CC
  and porting those tests turned out to be a complex task without
  porting the other changes in the CompactionManager API
- Code for getting / setting compaction throughput as double
  • Loading branch information
maoling authored and pkolaczk committed Dec 16, 2024
1 parent 5c830dc commit 0f56d59
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
Future version (tbd)
* Require only MODIFY permission on base when updating table with MV (STAR-564)
Merged from 5.1:
* Expose current compaction throughput in nodetool (CASSANDRA-13890)
Merged from 5.0:
* Disable chronicle analytics (CASSANDRA-19656)
* Remove mocking in InternalNodeProbe spying on StorageServiceMBean (CASSANDRA-18152)
Expand Down
14 changes: 10 additions & 4 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
Expand Down Expand Up @@ -290,6 +290,11 @@ public void setRate(final double throughPutMbPerSec)
compactionRateLimiter.setRate(throughput);
}

public Meter getCompactionThroughput()
{
return metrics.bytesCompactedThroughput;
}

/**
* Call this whenever a compaction might be needed on the given column family store.
* It's okay to over-call (within reason) if a call is unnecessary, it will
Expand All @@ -314,7 +319,7 @@ public CompletableFuture<?>[] startCompactionTasks(ColumnFamilyStore cfs, Collec
{
return backgroundCompactionRunner.startCompactionTasks(cfs, tasks);
}

public int getOngoingBackgroundUpgradesCount()
{
return backgroundCompactionRunner.getOngoingUpgradesCount();
Expand Down Expand Up @@ -1373,7 +1378,7 @@ private void doCleanupOne(final ColumnFamilyStore cfs,

}

static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio)
protected boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio)
{
if (DatabaseDescriptor.getCompactionThroughputMbPerSec() == 0)
return false;
Expand All @@ -1386,8 +1391,9 @@ static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScann
return actuallyAcquire(limiter, lengthRead);
}

private static boolean actuallyAcquire(RateLimiter limiter, long lengthRead)
private boolean actuallyAcquire(RateLimiter limiter, long lengthRead)
{
metrics.bytesCompactedThroughput.mark(lengthRead);
while (lengthRead >= Integer.MAX_VALUE)
{
limiter.acquire(Integer.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@

import static org.apache.cassandra.config.CassandraRelevantProperties.COMPACTION_HISTORY_ENABLED;
import static org.apache.cassandra.config.CassandraRelevantProperties.CURSORS_ENABLED;
import static org.apache.cassandra.db.compaction.CompactionManager.compactionRateLimiterAcquire;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemoryPerSecond;

Expand Down Expand Up @@ -658,7 +657,7 @@ void execute0()
long bytesScanned = compactionIterator.getTotalBytesScanned();

// Rate limit the scanners, and account for compression
if (compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio))
if (CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio))
lastBytesScanned = bytesScanned;

maybeStopOrUpdateState();
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/metrics/CompactionMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class CompactionMetrics
public final Counter totalCompactionsFailed;
/** Total number of bytes processed by operations since server [re]start */
public final Counter bytesCompacted;
/** Recent/current throughput of compactions take */
public final Meter bytesCompactedThroughput;

/**
* The compaction strategy information for each table. Cached, because its computation might be fairly expensive.
Expand Down Expand Up @@ -190,6 +192,7 @@ public Long getValue()
totalCompactionsCompleted = Metrics.meter(factory.createMetricName("TotalCompactionsCompleted"));
totalCompactionsFailed = Metrics.counter(factory.createMetricName("FailedCompactions"));
bytesCompacted = Metrics.counter(factory.createMetricName("BytesCompacted"));
bytesCompactedThroughput = Metrics.meter(factory.createMetricName("BytesCompactedThroughput"));

// compaction failure metrics
compactionsReduced = Metrics.counter(factory.createMetricName("CompactionsReduced"));
Expand Down
17 changes: 17 additions & 0 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import org.apache.cassandra.audit.AuditLogManager;
import org.apache.cassandra.audit.AuditLogOptions;
import org.apache.cassandra.auth.AuthKeyspace;
Expand Down Expand Up @@ -233,6 +234,7 @@
import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACEMENT_ALLOW_EMPTY;
import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
import static org.apache.cassandra.io.util.FileUtils.ONE_MB;
import static org.apache.cassandra.net.NoPayload.noPayload;
import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ;
import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
Expand Down Expand Up @@ -1650,6 +1652,21 @@ public void setCompactionThroughputMbPerSec(int value)
CompactionManager.instance.setRate(value);
}

/**
* Get the Current Compaction Throughput
* key is 1/5/15minute time dimension for statistics
* value is the metric double string (unit is:mib/s)
*/
public Map<String, String> getCurrentCompactionThroughputMebibytesPerSec()
{
HashMap<String, String> result = new LinkedHashMap<>();
Meter rate = CompactionManager.instance.getCompactionThroughput();
result.put("1minute", String.format("%.3f", rate.getOneMinuteRate() / ONE_MB));
result.put("5minute", String.format("%.3f", rate.getFiveMinuteRate() / ONE_MB));
result.put("15minute", String.format("%.3f", rate.getFifteenMinuteRate() / ONE_MB));
return result;
}

public int getBatchlogReplayThrottleInKB()
{
return DatabaseDescriptor.getBatchlogReplayThrottleInKB();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ public interface StorageServiceMBean extends NotificationEmitter

public int getCompactionThroughputMbPerSec();
public void setCompactionThroughputMbPerSec(int value);
Map<String, String> getCurrentCompactionThroughputMebibytesPerSec();

public int getBatchlogReplayThrottleInKB();
public void setBatchlogReplayThrottleInKB(int value);
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/tools/NodeProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,11 @@ public int getCompactionThroughput()
return ssProxy.getCompactionThroughputMbPerSec();
}

public Map<String, String> getCurrentCompactionThroughputMiBPerSec()
{
return ssProxy.getCurrentCompactionThroughputMebibytesPerSec();
}

public void setBatchlogReplayThrottle(int value)
{
ssProxy.setBatchlogReplayThrottleInKB(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@
*/
package org.apache.cassandra.tools.nodetool;

import java.util.Map;

import io.airlift.airline.Command;

import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool.NodeToolCmd;

@Command(name = "getcompactionthroughput", description = "Print the MB/s throughput cap for compaction in the system")
@Command(name = "getcompactionthroughput", description = "Print the MiB/s throughput cap for compaction in the system")
public class GetCompactionThroughput extends NodeToolCmd
{
@Override
public void execute(NodeProbe probe)
{
probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MB/s");
probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MiB/s");

Map<String, String> currentCompactionThroughputMetricsMap = probe.getCurrentCompactionThroughputMiBPerSec();
probe.output().out.println("Current compaction throughput (1 minute): " + currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s");
probe.output().out.println("Current compaction throughput (5 minute): " + currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s");
probe.output().out.println("Current compaction throughput (15 minute): " + currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.tools.nodetool;

import org.junit.BeforeClass;
import org.junit.Test;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;

import static org.apache.cassandra.tools.ToolRunner.ToolResult;
import static org.apache.cassandra.tools.ToolRunner.invokeNodetool;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for {@code nodetool setcompactionthroughput} and {@code nodetool getcompactionthroughput}.
*/
public class SetGetCompactionThroughputTest extends CQLTester
{
private static final int MAX_INT_CONFIG_VALUE_IN_MBIT = Integer.MAX_VALUE - 1;

@BeforeClass
public static void setup() throws Exception
{
requireNetwork();
startJMXServer();
}

@Test
public void testNull()
{
assertSetInvalidThroughput(null, "Required parameters are missing: compaction_throughput");
}

@Test
public void testPositive()
{
assertSetGetValidThroughput(7);
}

@Test
public void testMaxValue()
{
assertSetGetValidThroughput(MAX_INT_CONFIG_VALUE_IN_MBIT);
}

@Test
public void testZero()
{
assertSetGetValidThroughput(0);
}

@Test
public void testUnparseable()
{
assertSetInvalidThroughput("1.2", "compaction_throughput: can not convert \"1.2\" to a Integer");
assertSetInvalidThroughput("value", "compaction_throughput: can not convert \"value\" to a Integer");
}

@Test
public void testCurrentCompactionThroughput()
{
ToolResult tool = invokeNodetool("getcompactionthroughput");
tool.assertOnCleanExit();

assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(1 minute\\): \\d+\\.\\d+ MiB/s");
assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(5 minute\\): \\d+\\.\\d+ MiB/s");
assertThat(tool.getStdout()).containsPattern("Current compaction throughput \\(15 minute\\): \\d+\\.\\d+ MiB/s");
}

private static void assertSetGetValidThroughput(int throughput)
{
ToolResult tool = invokeNodetool("setcompactionthroughput", String.valueOf(throughput));
tool.assertOnCleanExit();
assertThat(tool.getStdout()).isEmpty();

assertGetThroughput(throughput);
}

private static void assertSetInvalidThroughput(String throughput, String expectedErrorMessage)
{
ToolResult tool = throughput == null ? invokeNodetool("setcompactionthroughput")
: invokeNodetool("setcompactionthroughput", throughput);
assertThat(tool.getExitCode()).isEqualTo(1);
assertThat(tool.getStdout()).contains(expectedErrorMessage);
}

private static void assertSetInvalidThroughputMib(String throughput)
{
ToolResult tool = invokeNodetool("setcompactionthroughput", throughput);
assertThat(tool.getExitCode()).isEqualTo(1);
assertThat(tool.getStdout()).contains("compaction_throughput: 2147483647 is too large; it should be less than" +
" 2147483647 in MiB/s");
}

private static void assertGetThroughput(int expected)
{
ToolResult tool = invokeNodetool("getcompactionthroughput");
tool.assertOnCleanExit();

if (expected > 0)
assertThat(tool.getStdout()).contains("Current compaction throughput: " + expected + " MiB/s");
else
assertThat(tool.getStdout()).contains("Current compaction throughput: 0 MiB/s");
}
}

0 comments on commit 0f56d59

Please sign in to comment.