Skip to content

Commit

Permalink
[server] Add metric for poll count only if it returns non-zero record…
Browse files Browse the repository at this point in the history
… count (#665)

* [server] Records poll count only if it returns non-zero record count

---------

Co-authored-by: Sourav Maji <[email protected]>
  • Loading branch information
majisourav99 and Sourav Maji authored Oct 10, 2023
1 parent 7f82927 commit 5a3b8ea
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public void run() {
}
stats.recordConsumerRecordsProducingToWriterBufferLatency(
LatencyUtils.getElapsedTimeInMs(beforeProducingToWriteBufferTimestamp));
stats.recordNonZeroPollResultNum(polledPubSubMessagesCount);
bandwidthThrottler.accept(payloadBytesConsumedInOnePoll);
recordsThrottler.accept(polledPubSubMessagesCount);
cleaner.unsubscribe(topicPartitionsToUnsub);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public class KafkaConsumerServiceStats extends AbstractVeniceStats {
private final Sensor pollRequestSensor;
private final Sensor pollRequestLatencySensor;
private final Sensor pollResultNumSensor;
private final Sensor pollNonZeroResultNumSensor;

private final Sensor pollRequestError;
private final Sensor consumerRecordsProducingToWriterBufferLatencySensor;
private final Sensor detectedDeletedTopicNumSensor;
Expand Down Expand Up @@ -50,6 +52,7 @@ public KafkaConsumerServiceStats(
new Gauge(getMaxElapsedTimeSinceLastPollInConsumerPool.getAsLong()));
// consumer record number per second returned by Kafka consumer poll.
pollResultNumSensor = registerSensor("consumer_poll_result_num", new Avg(), new Total());
pollNonZeroResultNumSensor = registerSensor("consumer_poll_non_zero_result_num", new Avg(), new Total());
pollRequestError = registerSensor("consumer_poll_error", new OccurrenceRate());
// To measure 'put' latency of consumer records blocking queue
consumerRecordsProducingToWriterBufferLatencySensor =
Expand Down Expand Up @@ -92,6 +95,10 @@ public void recordPollResultNum(int count) {
pollResultNumSensor.record(count);
}

public void recordNonZeroPollResultNum(int count) {
pollNonZeroResultNumSensor.record(count);
}

public void recordConsumerRecordsProducingToWriterBufferLatency(double latency) {
consumerRecordsProducingToWriterBufferLatencySensor.record(latency);
}
Expand Down

0 comments on commit 5a3b8ea

Please sign in to comment.