Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): add more fine grained metrics for distinguishing between request sources #12188

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package com.linkedin.metadata.dao.producer;

import static com.linkedin.metadata.utils.metrics.MetricUtils.PRODUCE_MCL_METRIC_NAME;
import static com.linkedin.metadata.utils.metrics.MetricUtils.PRODUCE_MCP_METRIC_NAME;
import static com.linkedin.metadata.utils.metrics.MetricUtils.PRODUCE_PE_METRIC_NAME;

import com.datahub.util.exception.ModelConversionException;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.DataHubUpgradeHistoryEvent;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.MetadataChangeProposal;
Expand Down Expand Up @@ -73,6 +78,7 @@ record = EventUtils.pegasusToAvroMCL(metadataChangeLog);
if (aspectSpec.isTimeseries()) {
topic = _topicConvention.getMetadataChangeLogTimeseriesTopicName();
}
MetricUtils.counter(this.getClass(), PRODUCE_MCL_METRIC_NAME).inc();
return _producer.send(
new ProducerRecord(topic, urn.toString(), record),
_kafkaHealthChecker.getKafkaCallBack("MCL", urn.toString()));
Expand All @@ -97,6 +103,7 @@ record = EventUtils.pegasusToAvroMCP(metadataChangeProposal);
}

String topic = _topicConvention.getMetadataChangeProposalTopicName();
MetricUtils.counter(this.getClass(), PRODUCE_MCP_METRIC_NAME).inc();
return _producer.send(
new ProducerRecord(topic, urn.toString(), record),
_kafkaHealthChecker.getKafkaCallBack("MCP", urn.toString()));
Expand All @@ -116,6 +123,7 @@ record = EventUtils.pegasusToAvroPE(event);
}

final String topic = _topicConvention.getPlatformEventTopicName();
MetricUtils.counter(this.getClass(), PRODUCE_PE_METRIC_NAME).inc();
return _producer.send(
new ProducerRecord(topic, key == null ? name : key, record),
_kafkaHealthChecker.getKafkaCallBack("Platform Event", name));
Expand Down
2 changes: 1 addition & 1 deletion metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ dependencies {
implementation externalDependency.guava
implementation externalDependency.reflections
// https://mvnrepository.com/artifact/nl.basjes.parse.useragent/yauaa
implementation 'nl.basjes.parse.useragent:yauaa:7.27.0'
api 'nl.basjes.parse.useragent:yauaa:7.27.0'

api(externalDependency.dgraph4j) {
exclude group: 'com.google.guava', module: 'guava'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.linkedin.metadata.dao.throttle.ThrottleType.MANUAL;
import static com.linkedin.metadata.dao.throttle.ThrottleType.MCL_TIMESERIES_LAG;
import static com.linkedin.metadata.dao.throttle.ThrottleType.MCL_VERSIONED_LAG;
import static com.linkedin.metadata.userAgent.UserAgentUtils.UAA;

import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.RequestContext;
Expand All @@ -13,16 +14,9 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import nl.basjes.parse.useragent.UserAgent;
import nl.basjes.parse.useragent.UserAgentAnalyzer;

public class APIThrottle {
private static final Set<String> AGENT_EXEMPTIONS = Set.of("Browser");
private static final UserAgentAnalyzer UAA =
UserAgentAnalyzer.newBuilder()
.hideMatcherLoadStats()
.withField(UserAgent.AGENT_CLASS)
.withCache(1000)
.build();

private APIThrottle() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import static com.linkedin.metadata.utils.PegasusUtils.urnToEntityName;
import static com.linkedin.metadata.utils.SystemMetadataUtils.createDefaultSystemMetadata;
import static com.linkedin.metadata.utils.metrics.ExceptionUtils.collectMetrics;
import static com.linkedin.metadata.utils.metrics.MetricUtils.INGEST_PROPOSAL_API_SOURCE_METRIC_NAME;
import static com.linkedin.metadata.utils.metrics.MetricUtils.PRE_PROCESS_MCL_METRIC_NAME;

import com.codahale.metrics.Timer;
import com.datahub.util.RecordUtils;
Expand Down Expand Up @@ -1195,8 +1197,20 @@ public List<IngestResult> ingestProposal(
Stream<IngestResult> nonTimeseriesIngestResults =
async ? ingestProposalAsync(aspectsBatch) : ingestProposalSync(opContext, aspectsBatch);

return Stream.concat(nonTimeseriesIngestResults, timeseriesIngestResults)
.collect(Collectors.toList());
List<IngestResult> fullResults =
Stream.concat(nonTimeseriesIngestResults, timeseriesIngestResults)
.collect(Collectors.toList());
String apiSource =
opContext.getRequestContext() != null
? opContext.getRequestContext().getRequestAPI().name()
: "Unknown";
String syncType = async ? "async" : "sync";
MetricUtils.counter(
this.getClass(),
String.format(INGEST_PROPOSAL_API_SOURCE_METRIC_NAME, syncType, apiSource))
.inc(fullResults.size());

return fullResults;
}

/**
Expand Down Expand Up @@ -1427,6 +1441,7 @@ private boolean preprocessEvent(
// Pre-process the update indices hook for UI updates to avoid perceived lag from Kafka
if (updateIndicesService != null) {
updateIndicesService.handleChangeEvent(opContext, metadataChangeLog);
MetricUtils.counter(this.getClass(), PRE_PROCESS_MCL_METRIC_NAME).inc();
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.linkedin.metadata.userAgent;

import nl.basjes.parse.useragent.UserAgent;
import nl.basjes.parse.useragent.UserAgentAnalyzer;

public class UserAgentUtils {

private UserAgentUtils() {}

public static final UserAgentAnalyzer UAA =
UserAgentAnalyzer.newBuilder()
.hideMatcherLoadStats()
.withField(UserAgent.AGENT_CLASS)
.withCache(1000)
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private static String buildRequestId(
"%s(%s)", action, entityNames.stream().distinct().collect(Collectors.toList()));
}

private static String extractUserAgent(@Nonnull HttpServletRequest request) {
public static String extractUserAgent(@Nonnull HttpServletRequest request) {
return Optional.ofNullable(request.getHeader(HttpHeaders.USER_AGENT)).orElse("");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.datahub.graphql;

import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.userAgent.UserAgentUtils.UAA;

import com.codahale.metrics.MetricRegistry;
import com.datahub.authentication.Authentication;
Expand All @@ -11,6 +12,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.google.inject.name.Named;
import com.linkedin.datahub.graphql.GraphQLEngine;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
Expand All @@ -19,6 +21,7 @@
import com.linkedin.metadata.utils.metrics.MetricUtils;
import graphql.ExecutionResult;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.RequestContext;
import io.opentelemetry.api.trace.Span;
import jakarta.inject.Inject;
import jakarta.servlet.http.HttpServletRequest;
Expand All @@ -29,7 +32,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import nl.basjes.parse.useragent.UserAgent;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpStatus;
Expand Down Expand Up @@ -159,7 +164,8 @@ CompletableFuture<ResponseEntity<String>> postGraphQL(
* Format & Return Response
*/
try {
long totalDuration = submitMetrics(executionResult);
long totalDuration =
submitMetrics(executionResult, context.getOperationContext().getRequestContext());
String executionTook = totalDuration > 0 ? " in " + totalDuration + " ms" : "";
log.info("Executed operation {}" + executionTook, queryName);
// Remove tracing from response to reduce bulk, not used by the frontend
Expand Down Expand Up @@ -187,7 +193,8 @@ void getGraphQL(HttpServletRequest request, HttpServletResponse response)
throw new HttpRequestMethodNotSupportedException("GET");
}

private void observeErrors(ExecutionResult executionResult) {
private void observeErrors(
ExecutionResult executionResult, @Nullable RequestContext requestContext) {
executionResult
.getErrors()
.forEach(
Expand All @@ -209,15 +216,28 @@ private void observeErrors(ExecutionResult executionResult) {
}
});
if (executionResult.getErrors().size() != 0) {
// Coarse grained
MetricUtils.get().counter(MetricRegistry.name(this.getClass(), "error")).inc();
if (requestContext != null) {
// User agent driven
String userAgent = UAA.parse(requestContext.getUserAgent()).getValue(UserAgent.AGENT_CLASS);
MetricUtils.get().counter(MetricRegistry.name(this.getClass(), "error", userAgent)).inc();
}
}
}

@SuppressWarnings("unchecked")
private long submitMetrics(ExecutionResult executionResult) {
private long submitMetrics(
ExecutionResult executionResult, @Nullable RequestContext requestContext) {
try {
observeErrors(executionResult);
observeErrors(executionResult, requestContext);
// Coarse grained
MetricUtils.get().counter(MetricRegistry.name(this.getClass(), "call")).inc();
if (requestContext != null) {
// User agent driven
String userAgent = UAA.parse(requestContext.getUserAgent()).getValue(UserAgent.AGENT_CLASS);
MetricUtils.get().counter(MetricRegistry.name(this.getClass(), "call", userAgent)).inc();
}
Object tracingInstrumentation = executionResult.getExtensions().get("tracing");
if (tracingInstrumentation instanceof Map) {
Map<String, Object> tracingMap = (Map<String, Object>) tracingInstrumentation;
Expand All @@ -229,7 +249,8 @@ private long submitMetrics(ExecutionResult executionResult) {
String fieldName =
resolvers.stream()
.filter(
resolver -> List.of("Query", "Mutation").contains(resolver.get("parentType")))
resolver ->
ImmutableSet.of("Query", "Mutation").contains(resolver.get("parentType")))
.findFirst()
.map(parentResolver -> parentResolver.get("fieldName"))
.map(Object::toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ private MetricUtils() {}
public static final String NAME = "default";
private static final MetricRegistry REGISTRY = SharedMetricRegistries.getOrCreate(NAME);

public static String PRODUCE_MCL_METRIC_NAME = "produceMCL";
public static String PRODUCE_MCP_METRIC_NAME = "produceMCP";
public static String PRODUCE_PE_METRIC_NAME = "producePE";
public static String INGEST_PROPOSAL_API_SOURCE_METRIC_NAME = "ingestProposal_%s_%s";
public static String PRE_PROCESS_MCL_METRIC_NAME = "preProcessChangeLog";

static {
final JmxReporter reporter = JmxReporter.forRegistry(REGISTRY).build();
reporter.start();
Expand Down
Loading