Skip to content

Commit

Permalink
Port CASSANDRA-13890 from apache/cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
maoling authored and pkolaczk committed Dec 2, 2024
1 parent 262785b commit f7982df
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 38 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
Future version (tbd)
* Require only MODIFY permission on base when updating table with MV (STAR-564)
Merged from 5.1:
* Expose current compaction throughput in nodetool (CASSANDRA-13890)
Merged from 5.0:
* Disable chronicle analytics (CASSANDRA-19656)
* Remove mocking in InternalNodeProbe spying on StorageServiceMBean (CASSANDRA-18152)
Expand Down
14 changes: 10 additions & 4 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
Expand Down Expand Up @@ -290,6 +290,11 @@ public void setRate(final double throughPutMbPerSec)
compactionRateLimiter.setRate(throughput);
}

public Meter getCompactionThroughput()
{
return metrics.bytesCompactedThroughput;
}

/**
* Call this whenever a compaction might be needed on the given column family store.
* It's okay to over-call (within reason) if a call is unnecessary, it will
Expand All @@ -314,7 +319,7 @@ public CompletableFuture<?>[] startCompactionTasks(ColumnFamilyStore cfs, Collec
{
return backgroundCompactionRunner.startCompactionTasks(cfs, tasks);
}

public int getOngoingBackgroundUpgradesCount()
{
return backgroundCompactionRunner.getOngoingUpgradesCount();
Expand Down Expand Up @@ -1340,7 +1345,7 @@ private void doCleanupOne(final ColumnFamilyStore cfs,

}

static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio)
protected boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio)
{
if (DatabaseDescriptor.getCompactionThroughputMbPerSec() == 0)
return false;
Expand All @@ -1353,8 +1358,9 @@ static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScann
return actuallyAcquire(limiter, lengthRead);
}

private static boolean actuallyAcquire(RateLimiter limiter, long lengthRead)
private boolean actuallyAcquire(RateLimiter limiter, long lengthRead)
{
metrics.bytesCompactedThroughput.mark(lengthRead);
while (lengthRead >= Integer.MAX_VALUE)
{
limiter.acquire(Integer.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@

import static org.apache.cassandra.config.CassandraRelevantProperties.COMPACTION_HISTORY_ENABLED;
import static org.apache.cassandra.config.CassandraRelevantProperties.CURSORS_ENABLED;
import static org.apache.cassandra.db.compaction.CompactionManager.compactionRateLimiterAcquire;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemoryPerSecond;

Expand Down Expand Up @@ -602,7 +601,7 @@ void execute0()
long bytesScanned = compactionIterator.getTotalBytesScanned();

// Rate limit the scanners, and account for compression
if (compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio))
if (CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio))
lastBytesScanned = bytesScanned;

maybeStopOrUpdateState();
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/metrics/CompactionMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class CompactionMetrics
public final Counter totalCompactionsFailed;
/** Total number of bytes processed by operations since server [re]start */
public final Counter bytesCompacted;
/** Recent/current throughput of compactions take */
public final Meter bytesCompactedThroughput;

/**
* The compaction strategy information for each table. Cached, because its computation might be fairly expensive.
Expand Down Expand Up @@ -190,6 +192,7 @@ public Long getValue()
totalCompactionsCompleted = Metrics.meter(factory.createMetricName("TotalCompactionsCompleted"));
totalCompactionsFailed = Metrics.counter(factory.createMetricName("FailedCompactions"));
bytesCompacted = Metrics.counter(factory.createMetricName("BytesCompacted"));
bytesCompactedThroughput = Metrics.meter(factory.createMetricName("BytesCompactedThroughput"));

// compaction failure metrics
compactionsReduced = Metrics.counter(factory.createMetricName("CompactionsReduced"));
Expand Down
17 changes: 17 additions & 0 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import org.apache.cassandra.audit.AuditLogManager;
import org.apache.cassandra.audit.AuditLogOptions;
import org.apache.cassandra.auth.AuthKeyspace;
Expand Down Expand Up @@ -233,6 +234,7 @@
import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACEMENT_ALLOW_EMPTY;
import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
import static org.apache.cassandra.io.util.FileUtils.ONE_MB;
import static org.apache.cassandra.net.NoPayload.noPayload;
import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ;
import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
Expand Down Expand Up @@ -1650,6 +1652,21 @@ public void setCompactionThroughputMbPerSec(int value)
CompactionManager.instance.setRate(value);
}

/**
* Get the Current Compaction Throughput
* key is 1/5/15minute time dimension for statistics
* value is the metric double string (unit is:mib/s)
*/
public Map<String, String> getCurrentCompactionThroughputMebibytesPerSec()
{
HashMap<String, String> result = new LinkedHashMap<>();
Meter rate = CompactionManager.instance.getCompactionThroughput();
result.put("1minute", String.format("%.3f", rate.getOneMinuteRate() / ONE_MB));
result.put("5minute", String.format("%.3f", rate.getFiveMinuteRate() / ONE_MB));
result.put("15minute", String.format("%.3f", rate.getFifteenMinuteRate() / ONE_MB));
return result;
}

public int getBatchlogReplayThrottleInKB()
{
return DatabaseDescriptor.getBatchlogReplayThrottleInKB();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ public interface StorageServiceMBean extends NotificationEmitter

public int getCompactionThroughputMbPerSec();
public void setCompactionThroughputMbPerSec(int value);
Map<String, String> getCurrentCompactionThroughputMebibytesPerSec();

public int getBatchlogReplayThrottleInKB();
public void setBatchlogReplayThrottleInKB(int value);
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/tools/NodeProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,11 @@ public int getCompactionThroughput()
return ssProxy.getCompactionThroughputMbPerSec();
}

public Map<String, String> getCurrentCompactionThroughputMiBPerSec()
{
return ssProxy.getCurrentCompactionThroughputMebibytesPerSec();
}

public void setBatchlogReplayThrottle(int value)
{
ssProxy.setBatchlogReplayThrottleInKB(value);
Expand Down
96 changes: 66 additions & 30 deletions src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.PrintStream;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -30,6 +31,7 @@
import org.apache.cassandra.db.compaction.CompactionStrategyStatistics;
import org.apache.cassandra.db.compaction.TableOperation;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
Expand Down Expand Up @@ -57,7 +59,7 @@ public void execute(NodeProbe probe)
Map<String, Map<String, Integer>> pendingTaskNumberByTable =
(Map<String, Map<String, Integer>>) probe.getCompactionMetric("PendingTasksByTableName");
Map<String, Map<String, Double>> writeAmplificationByTableName =
(Map<String, Map<String, Double>>) probe.getCompactionMetric("WriteAmplificationByTableName");
(Map<String, Map<String, Double>>) probe.getCompactionMetric("WriteAmplificationByTableName");
int numTotalPendingTask = 0;
double totWriteAmplification = 0;
for (Entry<String, Map<String, Integer>> ksEntry : pendingTaskNumberByTable.entrySet())
Expand Down Expand Up @@ -88,48 +90,82 @@ public void execute(NodeProbe probe)
}
}
out.println();
reportCompactionTable(cm.getCompactions(), probe.getCompactionThroughput(), humanReadable, out);

TableBuilder tableBuilder = new TableBuilder();
compactionsStats(probe, tableBuilder);
reportCompactionTable(cm.getCompactions(), probe.getCompactionThroughput(), humanReadable, out, tableBuilder);
if (aggregate)
{
reportAggregateCompactions(probe);
}
}

private void compactionsStats(NodeProbe probe, TableBuilder tableBuilder)
{
CassandraMetricsRegistry.JmxMeterMBean totalCompactionsCompletedMetrics =
(CassandraMetricsRegistry.JmxMeterMBean) probe.getCompactionMetric("TotalCompactionsCompleted");
tableBuilder.add("compactions completed", String.valueOf(totalCompactionsCompletedMetrics.getCount()));

CassandraMetricsRegistry.JmxCounterMBean bytesCompacted = (CassandraMetricsRegistry.JmxCounterMBean) probe.getCompactionMetric("BytesCompacted");
if (humanReadable)
tableBuilder.add("data compacted", FileUtils.stringifyFileSize(Double.parseDouble(Long.toString(bytesCompacted.getCount()))));
else
tableBuilder.add("data compacted", Long.toString(bytesCompacted.getCount()));

NumberFormat formatter = new DecimalFormat("0.00");

tableBuilder.add("15 minute rate", String.format("%s/minute", formatter.format(totalCompactionsCompletedMetrics.getFifteenMinuteRate() * 60)));
tableBuilder.add("mean rate", String.format("%s/hour", formatter.format(totalCompactionsCompletedMetrics.getMeanRate() * 60 * 60)));

double configured = probe.getStorageService().getCompactionThroughputMbPerSec();
tableBuilder.add("compaction throughput (MiB/s)", configured == 0 ? "throttling disabled (0)" : Double.toString(configured));
Map<String, String> currentCompactionThroughputMetricsMap = probe.getCurrentCompactionThroughputMiBPerSec();
tableBuilder.add("current compaction throughput (1 minute)", currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s");
tableBuilder.add("current compaction throughput (5 minute)", currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s");
tableBuilder.add("current compaction throughput (15 minute)", currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s");
}

public static void reportCompactionTable(List<Map<String,String>> compactions, int compactionThroughput, boolean humanReadable, PrintStream out)
{
if (!compactions.isEmpty())
{
long remainingBytes = 0;
TableBuilder table = new TableBuilder();
reportCompactionTable(compactions, compactionThroughput, humanReadable, out, new TableBuilder());
}

table.add("id", "compaction type", "keyspace", "table", "completed", "total", "unit", "progress");
for (Map<String, String> c : compactions)
{
long total = Long.parseLong(c.get(TableOperation.Progress.TOTAL));
long completed = Long.parseLong(c.get(TableOperation.Progress.COMPLETED));
String taskType = c.get(TableOperation.Progress.OPERATION_TYPE);
String keyspace = c.get(TableOperation.Progress.KEYSPACE);
String columnFamily = c.get(TableOperation.Progress.COLUMNFAMILY);
String unit = c.get(TableOperation.Progress.UNIT);
boolean toFileSize = humanReadable && TableOperation.Unit.isFileSize(unit);
String completedStr = toFileSize ? FileUtils.stringifyFileSize(completed) : Long.toString(completed);
String totalStr = toFileSize ? FileUtils.stringifyFileSize(total) : Long.toString(total);
String percentComplete = total == 0 ? "n/a" : new DecimalFormat("0.00").format((double) completed / total * 100) + "%";
String id = c.get(TableOperation.Progress.OPERATION_ID);
table.add(id, taskType, keyspace, columnFamily, completedStr, totalStr, unit, percentComplete);
remainingBytes += total - completed;
}
public static void reportCompactionTable(List<Map<String,String>> compactions, int compactionThroughput, boolean humanReadable, PrintStream out, TableBuilder table)
{
if (compactions.isEmpty())
{
table.printTo(out);
return;
}

String remainingTime = "n/a";
if (compactionThroughput != 0)
{
long remainingTimeInSecs = remainingBytes / (1024L * 1024L * compactionThroughput);
remainingTime = format("%dh%02dm%02ds", remainingTimeInSecs / 3600, (remainingTimeInSecs % 3600) / 60, (remainingTimeInSecs % 60));
}
out.printf("%25s%10s%n", "Active compaction remaining time : ", remainingTime);
long remainingBytes = 0;

table.add("id", "compaction type", "keyspace", "table", "completed", "total", "unit", "progress");
for (Map<String, String> c : compactions)
{
long total = Long.parseLong(c.get(TableOperation.Progress.TOTAL));
long completed = Long.parseLong(c.get(TableOperation.Progress.COMPLETED));
String taskType = c.get(TableOperation.Progress.OPERATION_TYPE);
String keyspace = c.get(TableOperation.Progress.KEYSPACE);
String columnFamily = c.get(TableOperation.Progress.COLUMNFAMILY);
String unit = c.get(TableOperation.Progress.UNIT);
boolean toFileSize = humanReadable && TableOperation.Unit.isFileSize(unit);
String completedStr = toFileSize ? FileUtils.stringifyFileSize(completed) : Long.toString(completed);
String totalStr = toFileSize ? FileUtils.stringifyFileSize(total) : Long.toString(total);
String percentComplete = total == 0 ? "n/a" : new DecimalFormat("0.00").format((double) completed / total * 100) + "%";
String id = c.get(TableOperation.Progress.OPERATION_ID);
table.add(id, taskType, keyspace, columnFamily, completedStr, totalStr, unit, percentComplete);
remainingBytes += total - completed;
}
table.printTo(out);

String remainingTime = "n/a";
if (compactionThroughput != 0)
{
long remainingTimeInSecs = remainingBytes / (1024L * 1024L * compactionThroughput);
remainingTime = format("%dh%02dm%02ds", remainingTimeInSecs / 3600, (remainingTimeInSecs % 3600) / 60, (remainingTimeInSecs % 60));
}
out.printf("%25s%10s%n", "Active compaction remaining time : ", remainingTime);
}

private static void reportAggregateCompactions(NodeProbe probe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,24 @@
*/
package org.apache.cassandra.tools.nodetool;

import java.util.Map;

import io.airlift.airline.Command;

import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool.NodeToolCmd;

@Command(name = "getcompactionthroughput", description = "Print the MB/s throughput cap for compaction in the system")
@Command(name = "getcompactionthroughput", description = "Print the MiB/s throughput cap for compaction in the system")
public class GetCompactionThroughput extends NodeToolCmd
{
@Override
public void execute(NodeProbe probe)
{
probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MB/s");
probe.output().out.println("Current compaction throughput: " + probe.getCompactionThroughput() + " MiB/s");

Map<String, String> currentCompactionThroughputMetricsMap = probe.getCurrentCompactionThroughputMiBPerSec();
probe.output().out.println("Current compaction throughput (1 minute): " + currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s");
probe.output().out.println("Current compaction throughput (5 minute): " + currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s");
probe.output().out.println("Current compaction throughput (15 minute): " + currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s");
}
}
Loading

0 comments on commit f7982df

Please sign in to comment.