Skip to content

Commit

Permalink
Propagate READ_BYTES only
Browse files Browse the repository at this point in the history
  • Loading branch information
aymkhalil committed Oct 2, 2024
1 parent 54b3b28 commit 7613307
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,12 @@ public enum CassandraRelevantProperties
* Allows custom implementation of {@link org.apache.cassandra.sensors.RequestSensorsFactory} to optionally create
* and configure {@link org.apache.cassandra.sensors.RequestSensors} instances.
*/
REQUEST_SENSORS_FACTORY("cassandra.request_sensors_factory_class");
REQUEST_SENSORS_FACTORY("cassandra.request_sensors_factory_class"),

/**
* If true, the coordinator will propagate request sensors via the native protocol custom payload flag.
*/
PROPAGATE_REQUEST_SENSORS_VIA_NATIVE_PROTOCAL("cassandra.propagate_request_sensors_via_native_protocal", "false");;

CassandraRelevantProperties(String key, String defaultVal)
{
Expand Down
49 changes: 0 additions & 49 deletions src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@
*/
package org.apache.cassandra.cql3.statements;

import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import com.google.common.collect.ImmutableMap;

import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.audit.AuditLogEntryType;
Expand All @@ -35,17 +30,8 @@
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
import org.apache.commons.lang3.builder.ToStringBuilder;
Expand Down Expand Up @@ -124,17 +110,6 @@ public void addUpdateForKey(PartitionUpdate.Builder update, Slice slice, UpdateP
throw new UnsupportedOperationException();
}

@Override
public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime)
{
ResultMessage result = super.execute(state, options, queryStartNanoTime);

if (result == null) result = new ResultMessage.Void();
addWriteSensorData(result, options);

return result;
}

public static class ParsedInsert extends ModificationStatement.Parsed
{
private final List<ColumnIdentifier> columnNames;
Expand Down Expand Up @@ -368,28 +343,4 @@ public AuditLogContext getAuditLogContext()
{
return new AuditLogContext(AuditLogEntryType.UPDATE, keyspace(), columnFamily());
}

private void addWriteSensorData(ResultMessage msg, QueryOptions options)
{
// Custom payload is not supported for protocol versions < 4
if (options.getProtocolVersion().isSmallerThan(ProtocolVersion.V4))
{
return;
}

RequestSensors requestSensors = RequestTracker.instance.get();
if (requestSensors == null)
{
return;
}

Context contex = Context.from(this.metadata());
Optional<Sensor> writeRequestSensor = RequestTracker.instance.get().getSensor(contex, Type.WRITE_BYTES);
writeRequestSensor.ifPresent(sensor -> {
ByteBuffer bytes = SensorsCustomParams.sensorValueAsByteBuffer(sensor.getValue());
String headerName = SensorsCustomParams.encodeTableInWriteBytesRequestParam(this.metadata().name);
Map<String, ByteBuffer> sensorHeader = ImmutableMap.of(headerName, bytes);
msg.setCustomPayload(sensorHeader);
});
}
}
13 changes: 6 additions & 7 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.cassandra.batchlog.BatchlogManager;
import org.apache.cassandra.concurrent.ExecutorLocals;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
Expand Down Expand Up @@ -121,7 +122,9 @@
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.ActiveRequestSensors;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.NoOpRequestSensors;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestSensorsFactory;
import org.apache.cassandra.sensors.Type;
Expand Down Expand Up @@ -1066,11 +1069,6 @@ public static void mutate(List<? extends IMutation> mutations,

QueryInfoTracker.WriteTracker writeTracker = queryTracker().onWrite(state, false, mutations, consistencyLevel);

// Request sensors are utilized to track usages from all replicas serving a write request
RequestSensors requestSensors = RequestSensorsFactory.instance.create(state.getKeyspace());
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
ExecutorLocals.set(locals);

long startTime = System.nanoTime();

List<AbstractWriteResponseHandler<IMutation>> responseHandlers = new ArrayList<>(mutations.size());
Expand Down Expand Up @@ -1945,8 +1943,9 @@ public static PartitionIterator read(SinglePartitionReadCommand.Group group,
group.metadata(),
group.queries,
consistencyLevel);
// Request sensors are utilized to track usages from all replicas serving a read request
RequestSensors requestSensors = RequestSensorsFactory.instance.create(group.metadata().keyspace);
// Request sensors are utilized to track usages from replicas serving a read request
RequestSensors requestSensors = CassandraRelevantProperties.PROPAGATE_REQUEST_SENSORS_VIA_NATIVE_PROTOCAL.getBoolean() ?
new ActiveRequestSensors() : NoOpRequestSensors.instance;
Context context = Context.from(group.metadata());
requestSensors.registerSensor(context, Type.READ_BYTES);
ExecutorLocals locals = ExecutorLocals.create(requestSensors);
Expand Down
41 changes: 0 additions & 41 deletions src/java/org/apache/cassandra/service/WriteResponseHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,14 @@
*/
package org.apache.cassandra.service;

import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.locator.ReplicaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.net.Message;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Type;

/**
* Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels.
Expand Down Expand Up @@ -62,10 +54,7 @@ public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, WriteType wri
public void onResponse(Message<T> m)
{
if (responsesUpdater.decrementAndGet(this) == 0)
{
estimateAndIncrementWriteSensor(m);
signal();
}
//Must be last after all subclass processing
//The two current subclasses both assume logResponseToIdealCLDelegate is called
//here.
Expand All @@ -76,34 +65,4 @@ public int ackCount()
{
return blockFor() - responses;
}

private void estimateAndIncrementWriteSensor(Message<T> msg)
{
RequestSensors requestSensors = RequestTracker.instance.get();
if (requestSensors == null)
return;

if (msg == null)
return;

Map<String, byte[]> customParams = msg.header.customParams();
if (customParams == null)
return;

if (!(msg instanceof IMutation))
return;

IMutation mutation = (IMutation) msg;
for (PartitionUpdate pu: mutation.getPartitionUpdates())
{
String headerName = SensorsCustomParams.encodeTableInWriteBytesRequestParam(pu.metadata().name);
byte[] writeBytes = msg.header.customParams().get(headerName);
if (writeBytes == null)
continue;
double adjustedSensorValue = SensorsCustomParams.sensorValueFromBytes(writeBytes) * candidateReplicaCount();
Context context = Context.from(pu.metadata());
requestSensors.registerSensor(context, Type.WRITE_BYTES);
requestSensors.incrementSensor(context, Type.WRITE_BYTES, adjustedSensorValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.cql3.PageSize;
import org.apache.cassandra.cql3.QueryHandler;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.statements.SelectStatement;
Expand Down Expand Up @@ -59,51 +58,66 @@ public static void setup()
CassandraRelevantProperties.REQUEST_SENSORS_FACTORY.setString(ActiveRequestSensorsFactory.class.getName());
}
@Test
public void testSensorsInResultMessage() throws Throwable
public void testSensorsInResultMessageEnabled() throws Throwable
{
CassandraRelevantProperties.PROPAGATE_REQUEST_SENSORS_VIA_NATIVE_PROTOCAL.setBoolean(true);
try (Cluster cluster = builder().withNodes(1).start())
{
// resister a noop sensor listener before init(cluster) which creates the test keyspace to ensure that the registry singleton instance is subscribed to schema notifications
cluster.get(1).runsOnInstance(initSensorsRegistry()).run();
init(cluster);
cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY, v1 text)"));
String write = withKeyspace("INSERT INTO %s.tbl(pk, v1) VALUES (1, 'read me')");
cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl(pk, v1) VALUES (1, 'read me')"));

String query = withKeyspace("SELECT * FROM %s.tbl WHERE pk=1");

// Any methods used inside the runOnInstance() block should be static, otherwise java.io.NotSerializableException will be thrown
cluster.get(1).runOnInstance(() -> {
ResultMessage writeResult = executeWithResult(write);
Map<String, ByteBuffer> customPayload = writeResult.getCustomPayload();

double writeBytesRequest = getWriteBytesRequest(customPayload, "tbl");
Assertions.assertThat(writeBytesRequest).isGreaterThan(0D);

ResultMessage.Rows readResult = executeWithPagingWithResultMessage(query);
customPayload = readResult.getCustomPayload();
assertReadBytesHeadersExist(customPayload);
Map<String, ByteBuffer> customPayload = readResult.getCustomPayload();
assertReadBytesHeadersExist(customPayload, true);

double readBytesRequest = getReadBytesRequest(customPayload);
Assertions.assertThat(readBytesRequest).isGreaterThan(0D);
});
}
}

private static void assertReadBytesHeadersExist(Map<String, ByteBuffer> customPayload)
@Test
public void testSensorsInResultMessageDisabled() throws Throwable
{
Assertions.assertThat(customPayload).containsKey(SensorsCustomParams.READ_BYTES_REQUEST);
CassandraRelevantProperties.PROPAGATE_REQUEST_SENSORS_VIA_NATIVE_PROTOCAL.setBoolean(false);
try (Cluster cluster = builder().withNodes(1).start())
{
// resister a noop sensor listener before init(cluster) which creates the test keyspace to ensure that the registry singleton instance is subscribed to schema notifications
cluster.get(1).runsOnInstance(initSensorsRegistry()).run();
init(cluster);
cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY, v1 text)"));
cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.tbl(pk, v1) VALUES (1, 'read me')"));

String query = withKeyspace("SELECT * FROM %s.tbl WHERE pk=1");

// Any methods used inside the runOnInstance() block should be static, otherwise java.io.NotSerializableException will be thrown
cluster.get(1).runOnInstance(() -> {
ResultMessage.Rows readResult = executeWithPagingWithResultMessage(query);
Map<String, ByteBuffer> customPayload = readResult.getCustomPayload();
assertReadBytesHeadersExist(customPayload, false);
});
}
}

private static double getReadBytesRequest(Map<String, ByteBuffer> customPayload)
private static void assertReadBytesHeadersExist(Map<String, ByteBuffer> customPayload, boolean exists)
{
Assertions.assertThat(customPayload).containsKey(SensorsCustomParams.READ_BYTES_REQUEST);
return ByteBufferUtil.toDouble(customPayload.get(SensorsCustomParams.READ_BYTES_REQUEST));
if (exists)
Assertions.assertThat(customPayload).containsKey(SensorsCustomParams.READ_BYTES_REQUEST);
else if (customPayload != null)
Assertions.assertThat(customPayload).doesNotContainKey(SensorsCustomParams.READ_BYTES_REQUEST);
}

private static double getWriteBytesRequest(Map<String, ByteBuffer> customPayload, String table)
private static double getReadBytesRequest(Map<String, ByteBuffer> customPayload)
{
String headerName = String.format(SensorsCustomParams.WRITE_BYTES_REQUEST_TEMPLATE, table);
Assertions.assertThat(customPayload).containsKey(headerName);
return ByteBufferUtil.toDouble(customPayload.get(headerName));
Assertions.assertThat(customPayload).containsKey(SensorsCustomParams.READ_BYTES_REQUEST);
return ByteBufferUtil.toDouble(customPayload.get(SensorsCustomParams.READ_BYTES_REQUEST));
}

/**
Expand Down Expand Up @@ -154,24 +168,4 @@ private static ResultMessage.Rows executeWithPagingWithResultMessage(String quer

return selectStatement.execute(state, initialOptions, nanoTime);
}

/**
* TODO: update SimpleQueryResult in the dtest-api project to expose custom payload and use Coordinator##executeWithResult instead
*/
private static ResultMessage executeWithResult(String query, Object... args)
{
long nanoTime = System.nanoTime();
QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query);
ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(ConsistencyLevel.ALL.name());
org.apache.cassandra.db.ConsistencyLevel cl = org.apache.cassandra.db.ConsistencyLevel.fromCode(consistencyLevel.ordinal());
QueryOptions initialOptions = QueryOptions.create(cl,
null,
false,
PageSize.inRows(512),
null,
null,
ProtocolVersion.CURRENT,
prepared.keyspace);
return prepared.statement.execute(QueryProcessor.internalQueryState(), initialOptions, nanoTime);
}
}

1 comment on commit 7613307

@cassci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build rejected: 7 NEW test failure(s) in 2 builds., Build 1: ran 17696 tests with 15 failures and 128 skipped.
Butler analysis done on ds-cassandra-pr-gate/cndb-8501 vs last 16 runs of ds-cassandra-build-nightly/main.
org.apache.cassandra.index.sai.cql.TinySegmentQueryWriteLifecycleTest.testWriteLifecycle[aa_CompoundKeyDataModel{primaryKey=p, c}]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.cql.QueryWriteLifecycleTest.testWriteLifecycle[aa_CompoundKeyDataModel{primaryKey=p, c}]: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.disk.vector.VectorCompressionTest.testNV_QA_4: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.disk.vector.VectorCompressionTest.testOpenAiV3Small: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.disk.vector.VectorCompressionTest.testOpenAiV3Large: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.disk.vector.VectorCompressionTest.testBert: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.cql.VectorSiftSmallTest.testMultiSegmentBuild: test is constantly failing. No failures on upstream;
branch story: [F] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
butler comparison

Please sign in to comment.