Skip to content

Commit

Permalink
Partially port CASSANDRA-13890
Browse files Browse the repository at this point in the history
Ported the compaction throughput metering, but not
the changes in nodetool.
  • Loading branch information
maoling authored and pkolaczk committed Nov 27, 2024
1 parent 9bdc785 commit e8cc355
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 6 deletions.
14 changes: 10 additions & 4 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
Expand Down Expand Up @@ -290,6 +290,11 @@ public void setRate(final double throughPutMbPerSec)
compactionRateLimiter.setRate(throughput);
}

public Meter getCompactionThroughput()
{
return metrics.bytesCompactedThroughput;
}

/**
* Call this whenever a compaction might be needed on the given column family store.
* It's okay to over-call (within reason) if a call is unnecessary, it will
Expand All @@ -314,7 +319,7 @@ public CompletableFuture<?>[] startCompactionTasks(ColumnFamilyStore cfs, Collec
{
return backgroundCompactionRunner.startCompactionTasks(cfs, tasks);
}

public int getOngoingBackgroundUpgradesCount()
{
return backgroundCompactionRunner.getOngoingUpgradesCount();
Expand Down Expand Up @@ -1340,7 +1345,7 @@ private void doCleanupOne(final ColumnFamilyStore cfs,

}

static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio)
protected boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio)
{
if (DatabaseDescriptor.getCompactionThroughputMbPerSec() == 0)
return false;
Expand All @@ -1353,8 +1358,9 @@ static boolean compactionRateLimiterAcquire(RateLimiter limiter, long bytesScann
return actuallyAcquire(limiter, lengthRead);
}

private static boolean actuallyAcquire(RateLimiter limiter, long lengthRead)
private boolean actuallyAcquire(RateLimiter limiter, long lengthRead)
{
metrics.bytesCompactedThroughput.mark(lengthRead);
while (lengthRead >= Integer.MAX_VALUE)
{
limiter.acquire(Integer.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@

import static org.apache.cassandra.config.CassandraRelevantProperties.COMPACTION_HISTORY_ENABLED;
import static org.apache.cassandra.config.CassandraRelevantProperties.CURSORS_ENABLED;
import static org.apache.cassandra.db.compaction.CompactionManager.compactionRateLimiterAcquire;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemoryPerSecond;

Expand Down Expand Up @@ -602,7 +601,7 @@ void execute0()
long bytesScanned = compactionIterator.getTotalBytesScanned();

// Rate limit the scanners, and account for compression
if (compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio))
if (CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio))
lastBytesScanned = bytesScanned;

maybeStopOrUpdateState();
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/metrics/CompactionMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class CompactionMetrics
public final Counter totalCompactionsFailed;
/** Total number of bytes processed by operations since server [re]start */
public final Counter bytesCompacted;
/** Recent/current throughput of compactions take */
public final Meter bytesCompactedThroughput;

/**
* The compaction strategy information for each table. Cached, because its computation might be fairly expensive.
Expand Down Expand Up @@ -190,6 +192,7 @@ public Long getValue()
totalCompactionsCompleted = Metrics.meter(factory.createMetricName("TotalCompactionsCompleted"));
totalCompactionsFailed = Metrics.counter(factory.createMetricName("FailedCompactions"));
bytesCompacted = Metrics.counter(factory.createMetricName("BytesCompacted"));
bytesCompactedThroughput = Metrics.meter(factory.createMetricName("BytesCompactedThroughput"));

// compaction failure metrics
compactionsReduced = Metrics.counter(factory.createMetricName("CompactionsReduced"));
Expand Down
17 changes: 17 additions & 0 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import org.apache.cassandra.audit.AuditLogManager;
import org.apache.cassandra.audit.AuditLogOptions;
import org.apache.cassandra.auth.AuthKeyspace;
Expand Down Expand Up @@ -233,6 +234,7 @@
import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACEMENT_ALLOW_EMPTY;
import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
import static org.apache.cassandra.io.util.FileUtils.ONE_MB;
import static org.apache.cassandra.net.NoPayload.noPayload;
import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ;
import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
Expand Down Expand Up @@ -1650,6 +1652,21 @@ public void setCompactionThroughputMbPerSec(int value)
CompactionManager.instance.setRate(value);
}

/**
* Get the Current Compaction Throughput
* key is 1/5/15minute time dimension for statistics
* value is the metric double string (unit is:mib/s)
*/
public Map<String, String> getCurrentCompactionThroughputMebibytesPerSec()
{
HashMap<String, String> result = new LinkedHashMap<>();
Meter rate = CompactionManager.instance.getCompactionThroughput();
result.put("1minute", String.format("%.3f", rate.getOneMinuteRate() / ONE_MB));
result.put("5minute", String.format("%.3f", rate.getFiveMinuteRate() / ONE_MB));
result.put("15minute", String.format("%.3f", rate.getFifteenMinuteRate() / ONE_MB));
return result;
}

public int getBatchlogReplayThrottleInKB()
{
return DatabaseDescriptor.getBatchlogReplayThrottleInKB();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ public interface StorageServiceMBean extends NotificationEmitter

public int getCompactionThroughputMbPerSec();
public void setCompactionThroughputMbPerSec(int value);
Map<String, String> getCurrentCompactionThroughputMebibytesPerSec();

public int getBatchlogReplayThrottleInKB();
public void setBatchlogReplayThrottleInKB(int value);
Expand Down

0 comments on commit e8cc355

Please sign in to comment.