Skip to content

Commit

Permalink
CNDB-8501 Propagate request sensors via native CQL custom payload
Browse files Browse the repository at this point in the history
  • Loading branch information
aymkhalil committed Sep 21, 2024
1 parent 5ea584d commit 54b3b28
Show file tree
Hide file tree
Showing 9 changed files with 470 additions and 1 deletion.
33 changes: 33 additions & 0 deletions src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.math.IntMath;

import org.apache.cassandra.cql3.Ordering;
Expand All @@ -44,6 +45,7 @@
import org.apache.cassandra.db.marshal.MultiCellCapableType;
import org.apache.cassandra.guardrails.Guardrails;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
Expand All @@ -69,6 +71,11 @@
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.ClientWarn;
Expand Down Expand Up @@ -568,6 +575,9 @@ else if (restrictions.keyIsInRelation())
msg = processResults(partitions, options, selectors, nowInSec, userLimit, userOffset);
}

// Propagate read sensor data
addReadSensorData(msg, options);

// Please note that the isExhausted state of the pager only gets updated when we've closed the page, so this
// shouldn't be moved inside the 'try' above.
if (!pager.isExhausted() && !pager.pager.isTopK())
Expand All @@ -576,6 +586,29 @@ else if (restrictions.keyIsInRelation())
return msg;
}

private void addReadSensorData(ResultMessage.Rows msg, QueryOptions options)
{
// Custom payload is not supported for protocol versions < 4
if (options.getProtocolVersion().isSmallerThan(ProtocolVersion.V4))
{
return;
}

RequestSensors requestSensors = RequestTracker.instance.get();
if (requestSensors == null)
{
return;
}

Context contex = Context.from(this.table);
Optional<Sensor> readRequestSensor = RequestTracker.instance.get().getSensor(contex, Type.READ_BYTES);
readRequestSensor.ifPresent(sensor -> {
ByteBuffer bytes = SensorsCustomParams.sensorValueAsByteBuffer(sensor.getValue());
Map<String, ByteBuffer> sensorHeader = ImmutableMap.of(SensorsCustomParams.READ_BYTES_REQUEST, bytes);
msg.setCustomPayload(sensorHeader);
});
}

private void warn(String msg)
{
logger.warn(msg);
Expand Down
49 changes: 49 additions & 0 deletions src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
*/
package org.apache.cassandra.cql3.statements;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import com.google.common.collect.ImmutableMap;

import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.audit.AuditLogEntryType;
Expand All @@ -30,8 +35,17 @@
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.commons.lang3.builder.ToStringBuilder;
Expand Down Expand Up @@ -110,6 +124,17 @@ public void addUpdateForKey(PartitionUpdate.Builder update, Slice slice, UpdateP
throw new UnsupportedOperationException();
}

@Override
public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime)
{
ResultMessage result = super.execute(state, options, queryStartNanoTime);

if (result == null) result = new ResultMessage.Void();
addWriteSensorData(result, options);

return result;
}

public static class ParsedInsert extends ModificationStatement.Parsed
{
private final List<ColumnIdentifier> columnNames;
Expand Down Expand Up @@ -343,4 +368,28 @@ public AuditLogContext getAuditLogContext()
{
return new AuditLogContext(AuditLogEntryType.UPDATE, keyspace(), columnFamily());
}

private void addWriteSensorData(ResultMessage msg, QueryOptions options)
{
// Custom payload is not supported for protocol versions < 4
if (options.getProtocolVersion().isSmallerThan(ProtocolVersion.V4))
{
return;
}

RequestSensors requestSensors = RequestTracker.instance.get();
if (requestSensors == null)
{
return;
}

Context contex = Context.from(this.metadata());
Optional<Sensor> writeRequestSensor = RequestTracker.instance.get().getSensor(contex, Type.WRITE_BYTES);
writeRequestSensor.ifPresent(sensor -> {
ByteBuffer bytes = SensorsCustomParams.sensorValueAsByteBuffer(sensor.getValue());
String headerName = SensorsCustomParams.encodeTableInWriteBytesRequestParam(this.metadata().name);
Map<String, ByteBuffer> sensorHeader = ImmutableMap.of(headerName, bytes);
msg.setCustomPayload(sensorHeader);
});
}
}
8 changes: 8 additions & 0 deletions src/java/org/apache/cassandra/net/SensorsCustomParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ public static byte[] sensorValueAsBytes(double value)
return buffer.array();
}

public static ByteBuffer sensorValueAsByteBuffer(double value)
{
ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
buffer.putDouble(value);
buffer.flip();
return buffer;
}

public static double sensorValueFromBytes(byte[] bytes)
{
ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
Expand Down
16 changes: 16 additions & 0 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
Expand Down Expand Up @@ -120,6 +121,10 @@
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestSensorsFactory;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.service.paxos.PrepareCallback;
Expand Down Expand Up @@ -1061,6 +1066,11 @@ public static void mutate(List<? extends IMutation> mutations,

QueryInfoTracker.WriteTracker writeTracker = queryTracker().onWrite(state, false, mutations, consistencyLevel);

// Request sensors are utilized to track usages from all replicas serving a write request
RequestSensors requestSensors = RequestSensorsFactory.instance.create(state.getKeyspace());
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

long startTime = System.nanoTime();

List<AbstractWriteResponseHandler<IMutation>> responseHandlers = new ArrayList<>(mutations.size());
Expand Down Expand Up @@ -1935,6 +1945,12 @@ public static PartitionIterator read(SinglePartitionReadCommand.Group group,
group.metadata(),
group.queries,
consistencyLevel);
// Request sensors are utilized to track usages from all replicas serving a read request
RequestSensors requestSensors = RequestSensorsFactory.instance.create(group.metadata().keyspace);
Context context = Context.from(group.metadata());
requestSensors.registerSensor(context, Type.READ_BYTES);
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);
PartitionIterator partitions = read(group, consistencyLevel, queryState, queryStartNanoTime, readTracker);
partitions = PartitionIterators.filteredRowTrackingIterator(partitions, readTracker::onFilteredPartition, readTracker::onFilteredRow, readTracker::onFilteredRow);
return PartitionIterators.doOnClose(partitions, readTracker::onDone);
Expand Down
41 changes: 41 additions & 0 deletions src/java/org/apache/cassandra/service/WriteResponseHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,22 @@
*/
package org.apache.cassandra.service;

import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.ReplicaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.net.Message;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Type;

/**
* Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels.
Expand Down Expand Up @@ -54,7 +62,10 @@ public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, WriteType wri
public void onResponse(Message<T> m)
{
if (responsesUpdater.decrementAndGet(this) == 0)
{
estimateAndIncrementWriteSensor(m);
signal();
}
//Must be last after all subclass processing
//The two current subclasses both assume logResponseToIdealCLDelegate is called
//here.
Expand All @@ -65,4 +76,34 @@ public int ackCount()
{
return blockFor() - responses;
}

private void estimateAndIncrementWriteSensor(Message<T> msg)
{
RequestSensors requestSensors = RequestTracker.instance.get();
if (requestSensors == null)
return;

if (msg == null)
return;

Map<String, byte[]> customParams = msg.header.customParams();
if (customParams == null)
return;

if (!(msg instanceof IMutation))
return;

IMutation mutation = (IMutation) msg;
for (PartitionUpdate pu: mutation.getPartitionUpdates())
{
String headerName = SensorsCustomParams.encodeTableInWriteBytesRequestParam(pu.metadata().name);
byte[] writeBytes = msg.header.customParams().get(headerName);
if (writeBytes == null)
continue;
double adjustedSensorValue = SensorsCustomParams.sensorValueFromBytes(writeBytes) * candidateReplicaCount();
Context context = Context.from(pu.metadata());
requestSensors.registerSensor(context, Type.WRITE_BYTES);
requestSensors.incrementSensor(context, Type.WRITE_BYTES, adjustedSensorValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.cassandra.service.reads;

import java.util.Map;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -27,6 +29,7 @@
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.SinglePartitionReadCommand;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.transform.DuplicateRowChecker;
Expand All @@ -42,6 +45,11 @@
import org.apache.cassandra.metrics.ReadCoordinationMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.QueryInfoTracker;
import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
import org.apache.cassandra.service.reads.repair.ReadRepair;
Expand Down Expand Up @@ -75,6 +83,7 @@ public abstract class AbstractReadExecutor
private final int initialDataRequestCount;
protected volatile PartitionIterator result = null;
protected final QueryInfoTracker.ReadTracker readTracker;
private final RequestSensors requestSensors;

static
{
Expand All @@ -99,7 +108,7 @@ public abstract class AbstractReadExecutor
this.traceState = Tracing.instance.get();
this.queryStartNanoTime = queryStartNanoTime;
this.readTracker = readTracker;

this.requestSensors = RequestTracker.instance.get();

// Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes
// knows how to produce older digest but the reverse is not true.
Expand Down Expand Up @@ -401,6 +410,7 @@ public void awaitResponses() throws ReadTimeoutException
{
handler.awaitResults();
assert digestResolver.isDataPresent() : "awaitResults returned with no data present.";
estimateAndIncrementReadSensor();
}
catch (ReadTimeoutException e)
{
Expand Down Expand Up @@ -462,4 +472,33 @@ public PartitionIterator getResult() throws ReadFailureException, ReadTimeoutExc
Preconditions.checkState(result != null, "Result must be set first");
return result;
}

/**
* Estiamtes the read byte returned from all replicas and increments the {@link RequestSensors} with the estiamted
* value. The estimation is performed by multiplying the actual read bytes as retunred by one replica by the number
* of candiadte replicas. This avoids the cost of waiting for all replicas to response, and at the same time
* provides a consistent estimation of the same query between different requests.
*/
private void estimateAndIncrementReadSensor()
{
if (this.requestSensors == null)
return;

if (!handler.resolver.isDataPresent())
return;

Message<ReadResponse> msg = handler.resolver.getMessages().get(0);
Map<String, byte[]> customParams = msg.header.customParams();
if (customParams == null)
return;

byte[] readBytes = msg.header.customParams().get(SensorsCustomParams.READ_BYTES_REQUEST);
if (readBytes == null)
return;

int numCandidates = replicaPlan().candidates().size();
double adjustedSensorValue = SensorsCustomParams.sensorValueFromBytes(readBytes) * numCandidates;
Context context = Context.from(this.command);
this.requestSensors.incrementSensor(context, Type.READ_BYTES, adjustedSensorValue);
}
}
Loading

0 comments on commit 54b3b28

Please sign in to comment.