From 54b3b287f3ade6f54a24b2bc2b290e1ae4a50cdb Mon Sep 17 00:00:00 2001 From: Ayman Khalil <13960949+aymkhalil@users.noreply.github.com> Date: Fri, 20 Sep 2024 17:01:35 -0700 Subject: [PATCH] CNDB-8501 Propagate request sensors via native CQL custom payload --- .../cql3/statements/SelectStatement.java | 33 ++++ .../cql3/statements/UpdateStatement.java | 49 +++++ .../cassandra/net/SensorsCustomParams.java | 8 + .../cassandra/service/StorageProxy.java | 16 ++ .../service/WriteResponseHandler.java | 41 ++++ .../service/reads/AbstractReadExecutor.java | 41 +++- .../distributed/test/sensors/SensorsTest.java | 177 ++++++++++++++++++ .../net/SensorsCustomParamsTest.java | 11 ++ .../service/reads/ReadExecutorTest.java | 95 ++++++++++ 9 files changed, 470 insertions(+), 1 deletion(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/sensors/SensorsTest.java diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 959d489cdef7..68949fe37387 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.math.IntMath; import org.apache.cassandra.cql3.Ordering; @@ -44,6 +45,7 @@ import org.apache.cassandra.db.marshal.MultiCellCapableType; import org.apache.cassandra.guardrails.Guardrails; import org.apache.cassandra.index.Index; +import org.apache.cassandra.net.SensorsCustomParams; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; @@ -69,6 +71,11 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.index.IndexRegistry; +import org.apache.cassandra.sensors.Context; +import org.apache.cassandra.sensors.RequestSensors; +import org.apache.cassandra.sensors.RequestTracker; +import org.apache.cassandra.sensors.Sensor; +import org.apache.cassandra.sensors.Type; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; @@ -568,6 +575,9 @@ else if (restrictions.keyIsInRelation()) msg = processResults(partitions, options, selectors, nowInSec, userLimit, userOffset); } + // Propagate read sensor data + addReadSensorData(msg, options); + // Please note that the isExhausted state of the pager only gets updated when we've closed the page, so this // shouldn't be moved inside the 'try' above. if (!pager.isExhausted() && !pager.pager.isTopK()) @@ -576,6 +586,29 @@ else if (restrictions.keyIsInRelation()) return msg; } + private void addReadSensorData(ResultMessage.Rows msg, QueryOptions options) + { + // Custom payload is not supported for protocol versions < 4 + if (options.getProtocolVersion().isSmallerThan(ProtocolVersion.V4)) + { + return; + } + + RequestSensors requestSensors = RequestTracker.instance.get(); + if (requestSensors == null) + { + return; + } + + Context contex = Context.from(this.table); + Optional readRequestSensor = RequestTracker.instance.get().getSensor(contex, Type.READ_BYTES); + readRequestSensor.ifPresent(sensor -> { + ByteBuffer bytes = SensorsCustomParams.sensorValueAsByteBuffer(sensor.getValue()); + Map sensorHeader = ImmutableMap.of(SensorsCustomParams.READ_BYTES_REQUEST, bytes); + msg.setCustomPayload(sensorHeader); + }); + } + private void warn(String msg) { logger.warn(msg); diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index bccd812d2cb5..d1ac4d950d7b 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -17,9 +17,14 @@ */ package org.apache.cassandra.cql3.statements; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Optional; + +import com.google.common.collect.ImmutableMap; import org.apache.cassandra.audit.AuditLogContext; import org.apache.cassandra.audit.AuditLogEntryType; @@ -30,8 +35,17 @@ import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.Slice; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.net.SensorsCustomParams; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.sensors.Context; +import org.apache.cassandra.sensors.RequestSensors; +import org.apache.cassandra.sensors.RequestTracker; +import org.apache.cassandra.sensors.Sensor; +import org.apache.cassandra.sensors.Type; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -110,6 +124,17 @@ public void addUpdateForKey(PartitionUpdate.Builder update, Slice slice, UpdateP throw new UnsupportedOperationException(); } + @Override + public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) + { + ResultMessage result = super.execute(state, options, queryStartNanoTime); + + if (result == null) result = new ResultMessage.Void(); + addWriteSensorData(result, options); + + return result; + } + public static class ParsedInsert extends ModificationStatement.Parsed { private final List columnNames; @@ -343,4 +368,28 @@ public AuditLogContext getAuditLogContext() { return new AuditLogContext(AuditLogEntryType.UPDATE, keyspace(), columnFamily()); } + + private void addWriteSensorData(ResultMessage msg, QueryOptions options) + { + // Custom payload is not supported for protocol versions < 4 + if (options.getProtocolVersion().isSmallerThan(ProtocolVersion.V4)) + { + return; + } + + RequestSensors requestSensors = RequestTracker.instance.get(); + if (requestSensors == null) + { + return; + } + + Context contex = Context.from(this.metadata()); + Optional writeRequestSensor = RequestTracker.instance.get().getSensor(contex, Type.WRITE_BYTES); + writeRequestSensor.ifPresent(sensor -> { + ByteBuffer bytes = SensorsCustomParams.sensorValueAsByteBuffer(sensor.getValue()); + String headerName = SensorsCustomParams.encodeTableInWriteBytesRequestParam(this.metadata().name); + Map sensorHeader = ImmutableMap.of(headerName, bytes); + msg.setCustomPayload(sensorHeader); + }); + } } diff --git a/src/java/org/apache/cassandra/net/SensorsCustomParams.java b/src/java/org/apache/cassandra/net/SensorsCustomParams.java index 86d089841913..6695a60e7359 100644 --- a/src/java/org/apache/cassandra/net/SensorsCustomParams.java +++ b/src/java/org/apache/cassandra/net/SensorsCustomParams.java @@ -95,6 +95,14 @@ public static byte[] sensorValueAsBytes(double value) return buffer.array(); } + public static ByteBuffer sensorValueAsByteBuffer(double value) + { + ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES); + buffer.putDouble(value); + buffer.flip(); + return buffer; + } + public static double sensorValueFromBytes(byte[] bytes) { ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES); diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 05c3e6dea455..938350ba99c0 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -51,6 +51,7 @@ import org.apache.cassandra.batchlog.Batch; import org.apache.cassandra.batchlog.BatchlogManager; +import org.apache.cassandra.concurrent.ExecutorLocals; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; @@ -120,6 +121,10 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.sensors.Context; +import org.apache.cassandra.sensors.RequestSensors; +import org.apache.cassandra.sensors.RequestSensorsFactory; +import org.apache.cassandra.sensors.Type; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.service.paxos.PrepareCallback; @@ -1061,6 +1066,11 @@ public static void mutate(List mutations, QueryInfoTracker.WriteTracker writeTracker = queryTracker().onWrite(state, false, mutations, consistencyLevel); + // Request sensors are utilized to track usages from all replicas serving a write request + RequestSensors requestSensors = RequestSensorsFactory.instance.create(state.getKeyspace()); + ExecutorLocals locals = ExecutorLocals.create(requestSensors); + ExecutorLocals.set(locals); + long startTime = System.nanoTime(); List> responseHandlers = new ArrayList<>(mutations.size()); @@ -1935,6 +1945,12 @@ public static PartitionIterator read(SinglePartitionReadCommand.Group group, group.metadata(), group.queries, consistencyLevel); + // Request sensors are utilized to track usages from all replicas serving a read request + RequestSensors requestSensors = RequestSensorsFactory.instance.create(group.metadata().keyspace); + Context context = Context.from(group.metadata()); + requestSensors.registerSensor(context, Type.READ_BYTES); + ExecutorLocals locals = ExecutorLocals.create(requestSensors); + ExecutorLocals.set(locals); PartitionIterator partitions = read(group, consistencyLevel, queryState, queryStartNanoTime, readTracker); partitions = PartitionIterators.filteredRowTrackingIterator(partitions, readTracker::onFilteredPartition, readTracker::onFilteredRow, readTracker::onFilteredRow); return PartitionIterators.doOnClose(partitions, readTracker::onDone); diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index 041fe839a43b..d97ada7ba366 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -17,14 +17,22 @@ */ package org.apache.cassandra.service; +import java.util.Map; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.locator.ReplicaPlan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.net.Message; import org.apache.cassandra.db.WriteType; +import org.apache.cassandra.net.SensorsCustomParams; +import org.apache.cassandra.sensors.Context; +import org.apache.cassandra.sensors.RequestSensors; +import org.apache.cassandra.sensors.RequestTracker; +import org.apache.cassandra.sensors.Type; /** * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels. @@ -54,7 +62,10 @@ public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, WriteType wri public void onResponse(Message m) { if (responsesUpdater.decrementAndGet(this) == 0) + { + estimateAndIncrementWriteSensor(m); signal(); + } //Must be last after all subclass processing //The two current subclasses both assume logResponseToIdealCLDelegate is called //here. @@ -65,4 +76,34 @@ public int ackCount() { return blockFor() - responses; } + + private void estimateAndIncrementWriteSensor(Message msg) + { + RequestSensors requestSensors = RequestTracker.instance.get(); + if (requestSensors == null) + return; + + if (msg == null) + return; + + Map customParams = msg.header.customParams(); + if (customParams == null) + return; + + if (!(msg instanceof IMutation)) + return; + + IMutation mutation = (IMutation) msg; + for (PartitionUpdate pu: mutation.getPartitionUpdates()) + { + String headerName = SensorsCustomParams.encodeTableInWriteBytesRequestParam(pu.metadata().name); + byte[] writeBytes = msg.header.customParams().get(headerName); + if (writeBytes == null) + continue; + double adjustedSensorValue = SensorsCustomParams.sensorValueFromBytes(writeBytes) * candidateReplicaCount(); + Context context = Context.from(pu.metadata()); + requestSensors.registerSensor(context, Type.WRITE_BYTES); + requestSensors.incrementSensor(context, Type.WRITE_BYTES, adjustedSensorValue); + } + } } diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index 1d938f5fe772..eaac7745deca 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.service.reads; +import java.util.Map; + import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +29,7 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadResponse; import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.transform.DuplicateRowChecker; @@ -42,6 +45,11 @@ import org.apache.cassandra.metrics.ReadCoordinationMetrics; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.SensorsCustomParams; +import org.apache.cassandra.sensors.Context; +import org.apache.cassandra.sensors.RequestSensors; +import org.apache.cassandra.sensors.RequestTracker; +import org.apache.cassandra.sensors.Type; import org.apache.cassandra.service.QueryInfoTracker; import org.apache.cassandra.service.StorageProxy.LocalReadRunnable; import org.apache.cassandra.service.reads.repair.ReadRepair; @@ -75,6 +83,7 @@ public abstract class AbstractReadExecutor private final int initialDataRequestCount; protected volatile PartitionIterator result = null; protected final QueryInfoTracker.ReadTracker readTracker; + private final RequestSensors requestSensors; static { @@ -99,7 +108,7 @@ public abstract class AbstractReadExecutor this.traceState = Tracing.instance.get(); this.queryStartNanoTime = queryStartNanoTime; this.readTracker = readTracker; - + this.requestSensors = RequestTracker.instance.get(); // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes // knows how to produce older digest but the reverse is not true. @@ -401,6 +410,7 @@ public void awaitResponses() throws ReadTimeoutException { handler.awaitResults(); assert digestResolver.isDataPresent() : "awaitResults returned with no data present."; + estimateAndIncrementReadSensor(); } catch (ReadTimeoutException e) { @@ -462,4 +472,33 @@ public PartitionIterator getResult() throws ReadFailureException, ReadTimeoutExc Preconditions.checkState(result != null, "Result must be set first"); return result; } + + /** + * Estiamtes the read byte returned from all replicas and increments the {@link RequestSensors} with the estiamted + * value. The estimation is performed by multiplying the actual read bytes as retunred by one replica by the number + * of candiadte replicas. This avoids the cost of waiting for all replicas to response, and at the same time + * provides a consistent estimation of the same query between different requests. + */ + private void estimateAndIncrementReadSensor() + { + if (this.requestSensors == null) + return; + + if (!handler.resolver.isDataPresent()) + return; + + Message msg = handler.resolver.getMessages().get(0); + Map customParams = msg.header.customParams(); + if (customParams == null) + return; + + byte[] readBytes = msg.header.customParams().get(SensorsCustomParams.READ_BYTES_REQUEST); + if (readBytes == null) + return; + + int numCandidates = replicaPlan().candidates().size(); + double adjustedSensorValue = SensorsCustomParams.sensorValueFromBytes(readBytes) * numCandidates; + Context context = Context.from(this.command); + this.requestSensors.incrementSensor(context, Type.READ_BYTES, adjustedSensorValue); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/sensors/SensorsTest.java b/test/distributed/org/apache/cassandra/distributed/test/sensors/SensorsTest.java new file mode 100644 index 000000000000..4d5aad967fdf --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/sensors/SensorsTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.sensors; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.PageSize; +import org.apache.cassandra.cql3.QueryHandler; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.statements.SelectStatement; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.net.SensorsCustomParams; +import org.apache.cassandra.sensors.ActiveRequestSensorsFactory; +import org.apache.cassandra.sensors.Sensor; +import org.apache.cassandra.sensors.SensorsRegistry; +import org.apache.cassandra.sensors.SensorsRegistryListener; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.assertj.core.api.Assertions; + +public class SensorsTest extends TestBaseImpl +{ + @BeforeClass + public static void setup() + { + CassandraRelevantProperties.REQUEST_SENSORS_FACTORY.setString(ActiveRequestSensorsFactory.class.getName()); + } + @Test + public void testSensorsInResultMessage() throws Throwable + { + try (Cluster cluster = builder().withNodes(1).start()) + { + // resister a noop sensor listener before init(cluster) which creates the test keyspace to ensure that the registry singleton instance is subscribed to schema notifications + cluster.get(1).runsOnInstance(initSensorsRegistry()).run(); + init(cluster); + cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY KEY, v1 text)")); + String write = withKeyspace("INSERT INTO %s.tbl(pk, v1) VALUES (1, 'read me')"); + String query = withKeyspace("SELECT * FROM %s.tbl WHERE pk=1"); + + // Any methods used inside the runOnInstance() block should be static, otherwise java.io.NotSerializableException will be thrown + cluster.get(1).runOnInstance(() -> { + ResultMessage writeResult = executeWithResult(write); + Map customPayload = writeResult.getCustomPayload(); + + double writeBytesRequest = getWriteBytesRequest(customPayload, "tbl"); + Assertions.assertThat(writeBytesRequest).isGreaterThan(0D); + + ResultMessage.Rows readResult = executeWithPagingWithResultMessage(query); + customPayload = readResult.getCustomPayload(); + assertReadBytesHeadersExist(customPayload); + + double readBytesRequest = getReadBytesRequest(customPayload); + Assertions.assertThat(readBytesRequest).isGreaterThan(0D); + }); + } + } + + private static void assertReadBytesHeadersExist(Map customPayload) + { + Assertions.assertThat(customPayload).containsKey(SensorsCustomParams.READ_BYTES_REQUEST); + } + + private static double getReadBytesRequest(Map customPayload) + { + Assertions.assertThat(customPayload).containsKey(SensorsCustomParams.READ_BYTES_REQUEST); + return ByteBufferUtil.toDouble(customPayload.get(SensorsCustomParams.READ_BYTES_REQUEST)); + } + + private static double getWriteBytesRequest(Map customPayload, String table) + { + String headerName = String.format(SensorsCustomParams.WRITE_BYTES_REQUEST_TEMPLATE, table); + Assertions.assertThat(customPayload).containsKey(headerName); + return ByteBufferUtil.toDouble(customPayload.get(headerName)); + } + + /** + * Registers a noop listener to ensure that the registry singleton instance is subscribed to schema notifications + */ + private static IIsolatedExecutor.SerializableRunnable initSensorsRegistry() + { + return () -> + SensorsRegistry.instance.registerListener(new SensorsRegistryListener() + { + @Override + public void onSensorCreated(Sensor sensor) + { + } + + @Override + public void onSensorRemoved(Sensor sensor) + { + } + }); + } + + /** + * Adapted from org.apache.cassandra.distributed.impl.Coordinator#executeWithPagingWithResult(java.lang.String, org.apache.cassandra.distributed.api.ConsistencyLevel, int, java.lang.Object...) + * TODO: update the dtest-api project to expose this method and hide the implementation in Coordinator + */ + private static ResultMessage.Rows executeWithPagingWithResultMessage(String query) + { + QueryState state = new QueryState(ClientState.forExternalCalls(new InetSocketAddress(FBUtilities.getJustLocalAddress(), 9042))); + ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(ConsistencyLevel.ALL.name()); + CQLStatement prepared = QueryProcessor.getStatement(query, state.getClientState()); + final List boundBBValues = new ArrayList<>(); + + prepared.validate(state); + assert prepared instanceof SelectStatement : "Only SELECT statements can be executed with paging"; + + long nanoTime = System.nanoTime(); + SelectStatement selectStatement = (SelectStatement) prepared; + org.apache.cassandra.db.ConsistencyLevel cl = org.apache.cassandra.db.ConsistencyLevel.fromCode(consistencyLevel.ordinal()); + QueryOptions initialOptions = QueryOptions.create(cl, + boundBBValues, + false, + PageSize.inRows(512), + null, + null, + ProtocolVersion.CURRENT, + selectStatement.keyspace()); + + return selectStatement.execute(state, initialOptions, nanoTime); + } + + /** + * TODO: update SimpleQueryResult in the dtest-api project to expose custom payload and use Coordinator##executeWithResult instead + */ + private static ResultMessage executeWithResult(String query, Object... args) + { + long nanoTime = System.nanoTime(); + QueryHandler.Prepared prepared = QueryProcessor.prepareInternal(query); + ConsistencyLevel consistencyLevel = ConsistencyLevel.valueOf(ConsistencyLevel.ALL.name()); + org.apache.cassandra.db.ConsistencyLevel cl = org.apache.cassandra.db.ConsistencyLevel.fromCode(consistencyLevel.ordinal()); + QueryOptions initialOptions = QueryOptions.create(cl, + null, + false, + PageSize.inRows(512), + null, + null, + ProtocolVersion.CURRENT, + prepared.keyspace); + return prepared.statement.execute(QueryProcessor.internalQueryState(), initialOptions, nanoTime); + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/net/SensorsCustomParamsTest.java b/test/unit/org/apache/cassandra/net/SensorsCustomParamsTest.java index f251b5e873c5..68a35b764950 100644 --- a/test/unit/org/apache/cassandra/net/SensorsCustomParamsTest.java +++ b/test/unit/org/apache/cassandra/net/SensorsCustomParamsTest.java @@ -35,6 +35,7 @@ import org.apache.cassandra.sensors.RequestSensors; import org.apache.cassandra.sensors.SensorsRegistry; import org.apache.cassandra.sensors.Type; +import org.apache.cassandra.utils.ByteBufferUtil; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.junit.Assert.assertEquals; @@ -157,6 +158,16 @@ public void testAddReadSensorToResponse() ignored -> SensorsCustomParams.READ_BYTES_TABLE); } + @Test + public void testSensorValueAsByteBuffer() + { + double d = Double.MAX_VALUE; + ByteBuffer bb = SensorsCustomParams.sensorValueAsByteBuffer(d); + // bb should already be flipped + assertEquals(bb.position(), 0); + assertEquals(d, ByteBufferUtil.toDouble(bb), 0.0); + } + private void testAddSensorsToResponse(Type sensorType, Function requestParamSupplier, Function tableParamSupplier) { RequestSensors sensors = requestSensorsFactory.create("ks1"); diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java index c2957007819d..22a8615481b0 100644 --- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java +++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java @@ -18,10 +18,16 @@ package org.apache.cassandra.service.reads; +import java.nio.ByteBuffer; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.cassandra.concurrent.ExecutorLocals; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.ReplicaPlan; @@ -42,9 +48,16 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.NoPayload; +import org.apache.cassandra.net.SensorsCustomParams; import org.apache.cassandra.net.Verb; import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.sensors.ActiveRequestSensors; +import org.apache.cassandra.sensors.Context; +import org.apache.cassandra.sensors.RequestSensors; +import org.apache.cassandra.sensors.Sensor; +import org.apache.cassandra.sensors.Type; import org.apache.cassandra.service.QueryInfoTracker; +import org.mockito.Mockito; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.cassandra.locator.ReplicaUtils.full; @@ -236,6 +249,39 @@ public void testRaceWithNonSpeculativeFailure() } } + @Test + public void testReadSensorsEstimatedAndIncremented() + { + MockSinglePartitionReadCommand command = new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)); + ReplicaPlan.ForTokenRead plan = plan(ConsistencyLevel.QUORUM, targets, targets.subList(0, 1)); + // register read sensor before initilizaing the read executor + RequestSensors requestSensors = new ActiveRequestSensors(); + Context context = Context.from(command); + requestSensors.registerSensor(context, Type.READ_BYTES); + ExecutorLocals locals = ExecutorLocals.create(requestSensors); + ExecutorLocals.set(locals); + AbstractReadExecutor executor = new AbstractReadExecutor(cfs, command, plan, 1, System.nanoTime(), noopReadTracker()) + { + @Override + public void maybeTryAdditionalReplicas() + { + } + }; + + double readSesnorValue = 19.0; + for (int target = 0; target < targets.size(); target++) + { + Message response = createMessageWithReadSensor(targets.get(target).endpoint(), readSesnorValue); + executor.handler.onResponse(response); // will triger signalAll after the second response + } + + executor.awaitResponses(); + + Optional readSensor = requestSensors.getSensor(context, Type.READ_BYTES); + assertTrue(readSensor.isPresent()); + assertEquals(readSesnorValue * targets.size(), readSensor.get().getValue(), 0.01); + } + public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand { private final long timeout; @@ -278,4 +324,53 @@ private QueryInfoTracker.ReadTracker noopReadTracker() { return QueryInfoTracker.ReadTracker.NOOP; } + + private Message createMessageWithReadSensor(InetAddressAndPort from, double readSensorValue) + { + ReadResponse response = new ReadResponse() + { + @Override + public UnfilteredPartitionIterator makeIterator(ReadCommand command) + { + UnfilteredPartitionIterator iterator = Mockito.mock(UnfilteredPartitionIterator.class); + Mockito.when(iterator.metadata()).thenReturn(command.metadata()); + return iterator; + } + + @Override + public ByteBuffer digest(ReadCommand command) + { + return null; + } + + @Override + public ByteBuffer repairedDataDigest() + { + return null; + } + + @Override + public boolean isRepairedDigestConclusive() + { + return false; + } + + @Override + public boolean mayIncludeRepairedDigest() + { + return false; + } + + @Override + public boolean isDigestResponse() + { + return false; + } + }; + + return Message.builder(Verb.READ_RSP, response) + .from(from) + .withCustomParam(SensorsCustomParams.READ_BYTES_REQUEST, SensorsCustomParams.sensorValueAsBytes(readSensorValue)) + .build(); + } }