Skip to content

Commit

Permalink
Address generic feedback (javadocs, code style, leftovers, class pack…
Browse files Browse the repository at this point in the history
…ages)
  • Loading branch information
aymkhalil committed Oct 23, 2024
1 parent 78ef0b5 commit 42a41f2
Show file tree
Hide file tree
Showing 20 changed files with 63 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ public enum CassandraRelevantProperties
/**
* If true, the coordinator will propagate request sensors via the native protocol custom payload bytes map.
*/
PROPAGATE_REQUEST_SENSORS_VIA_NATIVE_PROTOCOL("cassandra.propagate_request_sensors_via_native_protocol", "false");;
REQUEST_SENSORS_VIA_NATIVE_PROTOCOL("cassandra.request_sensors_via_native_protocol", "false");

CassandraRelevantProperties(String key, String defaultVal)
{
Expand Down
29 changes: 2 additions & 27 deletions src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.math.IntMath;

import org.apache.cassandra.cql3.Ordering;
Expand All @@ -44,7 +43,7 @@
import org.apache.cassandra.cql3.selection.SortedRowsBuilder;
import org.apache.cassandra.guardrails.Guardrails;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
Expand Down Expand Up @@ -72,7 +71,6 @@
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestTracker;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.service.ClientState;
Expand Down Expand Up @@ -576,7 +574,7 @@ else if (restrictions.keyIsInRelation())
RequestSensors sensors = RequestTracker.instance.get();
Context context = Context.from(this.table);
Type sensorType = Type.READ_BYTES;
SensorsCustomParams.addSensorToMessageResponse(msg, options.getProtocolVersion(), sensors, context, sensorType);
SensorsCustomParams.addSensorToCQLResponse(msg, options.getProtocolVersion(), sensors, context, sensorType);

// Please note that the isExhausted state of the pager only gets updated when we've closed the page, so this
// shouldn't be moved inside the 'try' above.
Expand All @@ -586,29 +584,6 @@ else if (restrictions.keyIsInRelation())
return msg;
}

private void addReadSensorData(ResultMessage.Rows msg, QueryOptions options)
{
// Custom payload is not supported for protocol versions < 4
if (options.getProtocolVersion().isSmallerThan(ProtocolVersion.V4))
{
return;
}

RequestSensors requestSensors = RequestTracker.instance.get();
if (requestSensors == null)
{
return;
}

Context contex = Context.from(this.table);
Optional<Sensor> readRequestSensor = RequestTracker.instance.get().getSensor(contex, Type.READ_BYTES);
readRequestSensor.ifPresent(sensor -> {
ByteBuffer bytes = SensorsCustomParams.sensorValueAsByteBuffer(sensor.getValue());
Map<String, ByteBuffer> sensorHeader = ImmutableMap.of(SensorsCustomParams.requestParamForSensor(sensor), bytes);
msg.setCustomPayload(sensorHeader);
});
}

private void warn(String msg)
{
logger.warn(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.Slice;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
Expand Down Expand Up @@ -127,7 +127,7 @@ public ResultMessage execute(QueryState state, QueryOptions options, long queryS
RequestSensors sensors = RequestTracker.instance.get();
Context context = Context.from(this.metadata());
Type sensorType = Type.WRITE_BYTES;
SensorsCustomParams.addSensorToMessageResponse(result, options.getProtocolVersion(), sensors, context, sensorType);
SensorsCustomParams.addSensorToCQLResponse(result, options.getProtocolVersion(), sensors, context, sensorType);

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@

import java.util.Collection;
import java.util.Optional;
import java.util.function.Function;

import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.SensorsRegistry;
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/MutationVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.ParamType;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
Expand All @@ -48,7 +48,7 @@ private void respond(RequestSensors requestSensors, Message<Mutation> respondToM

Message.Builder<NoPayload> response = respondToMessage.emptyResponseBuilder();
// no need to calculate outbound internode bytes because the response is NoPayload
SensorsCustomParams.addSensorsToResponse(requestSensors, response);
SensorsCustomParams.addSensorsToInternodeResponse(requestSensors, response);
MessagingService.instance().send(response.build(), respondToAddress);
}

Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestSensorsFactory;
Expand Down Expand Up @@ -87,7 +87,7 @@ public void doVerb(Message<ReadCommand> message)
int size = reply.currentPayloadSize(MessagingService.current_version);
requestSensors.incrementSensor(context, Type.INTERNODE_BYTES, size);
requestSensors.syncAllSensors();
SensorsCustomParams.addSensorsToResponse(requestSensors, reply);
SensorsCustomParams.addSensorsToInternodeResponse(requestSensors, reply);

Tracing.trace("Enqueuing response to {}", message.from());
MessagingService.instance().send(reply.build(), message.from());
Expand Down
8 changes: 6 additions & 2 deletions src/java/org/apache/cassandra/net/RequestCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.cassandra.net;

import javax.annotation.Nullable;

import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.sensors.NoOpRequestSensors;
Expand Down Expand Up @@ -60,10 +62,12 @@ default boolean trackLatencyForSnitch()
}

/**
* @return the {@link RequestSensors} associated with the request. Used to track sensors as reported by response replicas.
* @return the {@link RequestSensors} associated with the request to track sensors as reported by response replicas.
* If null, sensor tracking will be disabled for this request.
*/
@Nullable
default RequestSensors getRequestSensors()
{
return NoOpRequestSensors.instance;
return null;
}
}
5 changes: 2 additions & 3 deletions src/java/org/apache/cassandra/net/ResponseVerbHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.reads.ReadCallback;
import org.apache.cassandra.tracing.Tracing;
Expand Down Expand Up @@ -72,8 +73,6 @@ public void doVerb(Message message)
private void trackReplicaSensors(RequestCallbacks.CallbackInfo callbackInfo, Message message)
{
RequestSensors sensors = callbackInfo.callback.getRequestSensors();
// Eventhough RequestCallback.getRequestSensors() returns a NoOpRequestSensors instance by default, callbacks
// that override this method may have been initialized from a code path where sensors are not set.
if (sensors == null)
return;

Expand Down Expand Up @@ -107,7 +106,7 @@ private void incrementSensor(RequestSensors sensors, Context context, Type type,
Optional<Sensor> sensor = sensors.getSensor(context, type);
sensor.ifPresent(s -> {
String customParam = SensorsCustomParams.requestParamForSensor(s);
double sensorValue = SensorsCustomParams.sensorValueFromCustomParam(message, customParam);
double sensorValue = SensorsCustomParams.sensorValueFromInternodeResponse(message, customParam);
sensors.incrementSensor(context, type, sensorValue);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,26 @@
* limitations under the License.
*/

package org.apache.cassandra.net;
package org.apache.cassandra.sensors;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;

import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestSensorsFactory;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.SensorsRegistry;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.transport.ProtocolVersion;

/**
* A utility class that contains the definition of custom params added to the {@link Message} header to propagate {@link Sensor} values from
* writer to coordinator and necessary methods to encode sensor values as appropriate for the internode message format.
* A utility class that contains the groups methods to facilitate encoding sensors in native or internode protocol message
* responses as follows:
* <p>Sensors in internode messages: Use to communicate sensors values from replicas to coordinators in the internode
* message response {@link Message.Header#customParams()} bytes map. See {@link SensorsCustomParams#addSensorsToInternodeResponse(RequestSensors, Message.Builder)} and
* {@link SensorsCustomParams#sensorValueFromInternodeResponse(Message, String)}.
* <p>Sensors in native protocol messages: Use to communicate sensors values from coordinator to upstream callers native
* protocol response {@link org.apache.cassandra.transport.Message#getCustomPayload()} bytes map. See {@link SensorsCustomParams#addSensorsToInternodeResponse(RequestSensors, Message.Builder)}.
*/
public final class SensorsCustomParams
{
Expand Down Expand Up @@ -80,42 +78,41 @@ public static double sensorValueFromBytes(byte[] bytes)
* @param response the response message builder to add the sensors to
* @param <T> the response message builder type
*/
public static <T> void addSensorsToResponse(RequestSensors sensors, Message.Builder<T> response)
public static <T> void addSensorsToInternodeResponse(RequestSensors sensors, Message.Builder<T> response)
{
Preconditions.checkNotNull(sensors);
Preconditions.checkNotNull(response);

for (Sensor sensor : sensors.getSensors(ignored -> true))
{
addSensorToResponse(response, sensor);
addSensorToInternodeResponse(response, sensor);
}
}

/**
* Reads the sensor value encoded in the response message header as {@link Message.Header#customParams()} bytes map.
*
* @param message the message to read the sensor value from
* @param param the name of the header in custom params to read the sensor value from
* @param customParam the name of the header in custom params to read the sensor value from
* @param <T> the message type
* @return the sensor value
*/
public static <T> double sensorValueFromCustomParam(Message<T> message, String param)
public static <T> double sensorValueFromInternodeResponse(Message<T> message, String customParam)
{
if (param == null)
if (customParam == null)
return 0.0;

Map<String, byte[]> customParams = message.header.customParams();
if (customParams == null)
return 0.0;

byte[] readBytes = message.header.customParams().get(param);
byte[] readBytes = message.header.customParams().get(customParam);
if (readBytes == null)
return 0.0;

return sensorValueFromBytes(readBytes);
}


/**
* Adds a sensor of a given type and context to the native protocol response message encoded in the custom payload bytes map
*
Expand All @@ -125,11 +122,11 @@ public static <T> double sensorValueFromCustomParam(Message<T> message, String p
* @param context the context of the sensor to add to the response
* @param type the type of the sensor to add to the response
*/
public static void addSensorToMessageResponse(org.apache.cassandra.transport.Message.Response response,
ProtocolVersion protocolVersion,
RequestSensors sensors,
Context context,
Type type)
public static void addSensorToCQLResponse(org.apache.cassandra.transport.Message.Response response,
ProtocolVersion protocolVersion,
RequestSensors sensors,
Context context,
Type type)
{
// Custom payload is not supported for protocol versions < 4
if (protocolVersion.isSmallerThan(ProtocolVersion.V4))
Expand All @@ -142,16 +139,16 @@ public static void addSensorToMessageResponse(org.apache.cassandra.transport.Mes
return;
}

Optional<Sensor> writeRequestSensor = sensors.getSensor(context, type);
writeRequestSensor.ifPresent(sensor -> {
Optional<Sensor> requestSensor = sensors.getSensor(context, type);
requestSensor.ifPresent(sensor -> {
ByteBuffer bytes = SensorsCustomParams.sensorValueAsByteBuffer(sensor.getValue());
String headerName = RequestSensorsFactory.instance.requestSensorEncoder().apply(sensor);
Map<String, ByteBuffer> sensorHeader = ImmutableMap.of(headerName, bytes);
response.setCustomPayload(sensorHeader);
});
}

private static <T> void addSensorToResponse(Message.Builder<T> response, Sensor sensor)
private static <T> void addSensorToInternodeResponse(Message.Builder<T> response, Sensor sensor)
{
byte[] requestBytes = SensorsCustomParams.sensorValueAsBytes(sensor.getValue());
String requestParam = RequestSensorsFactory.instance.requestSensorEncoder().apply(sensor);
Expand Down
7 changes: 2 additions & 5 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@
import org.apache.cassandra.net.MessageFlag;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
Expand All @@ -127,7 +126,6 @@
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.NoOpRequestSensors;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.Sensor;
import org.apache.cassandra.sensors.Type;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.PaxosState;
Expand All @@ -145,7 +143,6 @@
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;

import static com.google.common.collect.Iterables.transform;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.net.NoPayload.noPayload;
Expand Down Expand Up @@ -1072,7 +1069,7 @@ public static void mutate(List<? extends IMutation> mutations,
QueryInfoTracker.WriteTracker writeTracker = queryTracker().onWrite(state, false, mutations, consistencyLevel);

// Request sensors are utilized to track usages from replicas serving a write request
RequestSensors sensors = CassandraRelevantProperties.PROPAGATE_REQUEST_SENSORS_VIA_NATIVE_PROTOCOL.getBoolean() ?
RequestSensors sensors = CassandraRelevantProperties.REQUEST_SENSORS_VIA_NATIVE_PROTOCOL.getBoolean() ?
new ActiveRequestSensors() : NoOpRequestSensors.instance;
ExecutorLocals locals = ExecutorLocals.create(sensors);
ExecutorLocals.set(locals);
Expand Down Expand Up @@ -1958,7 +1955,7 @@ public static PartitionIterator read(SinglePartitionReadCommand.Group group,
group.queries,
consistencyLevel);
// Request sensors are utilized to track usages from replicas serving a read request
RequestSensors requestSensors = CassandraRelevantProperties.PROPAGATE_REQUEST_SENSORS_VIA_NATIVE_PROTOCOL.getBoolean() ?
RequestSensors requestSensors = CassandraRelevantProperties.REQUEST_SENSORS_VIA_NATIVE_PROTOCOL.getBoolean() ?
new ActiveRequestSensors() : NoOpRequestSensors.instance;
Context context = Context.from(group.metadata());
requestSensors.registerSensor(context, Type.READ_BYTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestSensorsFactory;
Expand Down Expand Up @@ -54,7 +54,7 @@ public void doVerb(Message<Commit> message)
Message.Builder<NoPayload> reply = message.emptyResponseBuilder();

// no need to calculate outbound internode bytes because the response is NoPayload
SensorsCustomParams.addSensorsToResponse(sensors, reply);
SensorsCustomParams.addSensorsToInternodeResponse(sensors, reply);
MessagingService.instance().send(reply.build(), message.from());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.SensorsCustomParams;
import org.apache.cassandra.sensors.SensorsCustomParams;
import org.apache.cassandra.sensors.Context;
import org.apache.cassandra.sensors.RequestSensors;
import org.apache.cassandra.sensors.RequestSensorsFactory;
Expand Down Expand Up @@ -58,7 +58,7 @@ public void doVerb(Message<Commit> message)
int size = reply.currentPayloadSize(MessagingService.current_version);
sensors.incrementSensor(context, Type.INTERNODE_BYTES, size);
sensors.syncAllSensors();
SensorsCustomParams.addSensorsToResponse(sensors, reply);
SensorsCustomParams.addSensorsToInternodeResponse(sensors, reply);
MessagingService.instance().send(reply.build(), message.from());
}
}
Loading

1 comment on commit 42a41f2

@cassci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build rejected: 4 NEW test failure(s) in 9 builds., Build ds-cassandra-pr-gate > cndb-8501 > #13: ran 18073 tests with 24 failures and 133 skipped.
Butler analysis done on ds-cassandra-pr-gate/cndb-8501 vs last 16 runs of ds-cassandra-build-nightly/main.
org.apache.cassandra.index.sai.cql.TinySegmentQueryWriteLifecycleTest.testWriteLifecycle[aa_BaseDataModel{primaryKey=p}]: test is constantly failing. No failures on upstream;
branch story: [FFFFFF] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
counter_test.TestCounters.test_counter_node_down: test is constantly failing. No failures on upstream;
branch story: [FFFF] vs upstream: [++++++++++]; [NEW]
org.apache.cassandra.index.sai.cql.QueryWriteLifecycleTest.testWriteLifecycle[aa_BaseDataModel{primaryKey=p}]: test is constantly failing. No failures on upstream;
branch story: [FFFFF] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
org.apache.cassandra.index.sai.cql.VectorSiftSmallTest.testMultiSegmentBuild: test looks flaky. No failures on upstream;
branch story: [F+FFF+FFF] vs upstream: [++++++++++++++++++++++++++++++++++++++++++++++++++++++++]; [NEW]
butler comparison

Please sign in to comment.