diff --git a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java index 26b48449c1c2ff..4b6e75fe390b12 100644 --- a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java +++ b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java @@ -1,10 +1,15 @@ package com.linkedin.metadata.dao.producer; +import static com.linkedin.metadata.utils.metrics.MetricUtils.PRODUCE_MCL_METRIC_NAME; +import static com.linkedin.metadata.utils.metrics.MetricUtils.PRODUCE_MCP_METRIC_NAME; +import static com.linkedin.metadata.utils.metrics.MetricUtils.PRODUCE_PE_METRIC_NAME; + import com.datahub.util.exception.ModelConversionException; import com.linkedin.common.urn.Urn; import com.linkedin.metadata.EventUtils; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.utils.metrics.MetricUtils; import com.linkedin.mxe.DataHubUpgradeHistoryEvent; import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.MetadataChangeProposal; @@ -73,6 +78,7 @@ record = EventUtils.pegasusToAvroMCL(metadataChangeLog); if (aspectSpec.isTimeseries()) { topic = _topicConvention.getMetadataChangeLogTimeseriesTopicName(); } + MetricUtils.counter(this.getClass(), PRODUCE_MCL_METRIC_NAME).inc(); return _producer.send( new ProducerRecord(topic, urn.toString(), record), _kafkaHealthChecker.getKafkaCallBack("MCL", urn.toString())); @@ -97,6 +103,7 @@ record = EventUtils.pegasusToAvroMCP(metadataChangeProposal); } String topic = _topicConvention.getMetadataChangeProposalTopicName(); + MetricUtils.counter(this.getClass(), PRODUCE_MCP_METRIC_NAME).inc(); return _producer.send( new ProducerRecord(topic, urn.toString(), record), _kafkaHealthChecker.getKafkaCallBack("MCP", urn.toString())); @@ -116,6 +123,7 @@ record = EventUtils.pegasusToAvroPE(event); } final String topic = _topicConvention.getPlatformEventTopicName(); + MetricUtils.counter(this.getClass(), PRODUCE_PE_METRIC_NAME).inc(); return _producer.send( new ProducerRecord(topic, key == null ? name : key, record), _kafkaHealthChecker.getKafkaCallBack("Platform Event", name)); diff --git a/metadata-io/build.gradle b/metadata-io/build.gradle index 516a77d59d50bd..4441da18b651f6 100644 --- a/metadata-io/build.gradle +++ b/metadata-io/build.gradle @@ -31,7 +31,7 @@ dependencies { implementation externalDependency.guava implementation externalDependency.reflections // https://mvnrepository.com/artifact/nl.basjes.parse.useragent/yauaa - implementation 'nl.basjes.parse.useragent:yauaa:7.27.0' + api 'nl.basjes.parse.useragent:yauaa:7.27.0' api(externalDependency.dgraph4j) { exclude group: 'com.google.guava', module: 'guava' diff --git a/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/APIThrottle.java b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/APIThrottle.java index 542eb5f3869c01..30bdf6aae88df4 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/APIThrottle.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/dao/throttle/APIThrottle.java @@ -3,6 +3,7 @@ import static com.linkedin.metadata.dao.throttle.ThrottleType.MANUAL; import static com.linkedin.metadata.dao.throttle.ThrottleType.MCL_TIMESERIES_LAG; import static com.linkedin.metadata.dao.throttle.ThrottleType.MCL_VERSIONED_LAG; +import static com.linkedin.metadata.userAgent.UserAgentUtils.UAA; import io.datahubproject.metadata.context.OperationContext; import io.datahubproject.metadata.context.RequestContext; @@ -13,16 +14,9 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import nl.basjes.parse.useragent.UserAgent; -import nl.basjes.parse.useragent.UserAgentAnalyzer; public class APIThrottle { private static final Set AGENT_EXEMPTIONS = Set.of("Browser"); - private static final UserAgentAnalyzer UAA = - UserAgentAnalyzer.newBuilder() - .hideMatcherLoadStats() - .withField(UserAgent.AGENT_CLASS) - .withCache(1000) - .build(); private APIThrottle() {} diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index 8ae09111204cab..542376848b4815 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -12,6 +12,8 @@ import static com.linkedin.metadata.utils.PegasusUtils.urnToEntityName; import static com.linkedin.metadata.utils.SystemMetadataUtils.createDefaultSystemMetadata; import static com.linkedin.metadata.utils.metrics.ExceptionUtils.collectMetrics; +import static com.linkedin.metadata.utils.metrics.MetricUtils.INGEST_PROPOSAL_API_SOURCE_METRIC_NAME; +import static com.linkedin.metadata.utils.metrics.MetricUtils.PRE_PROCESS_MCL_METRIC_NAME; import com.codahale.metrics.Timer; import com.datahub.util.RecordUtils; @@ -1195,8 +1197,20 @@ public List ingestProposal( Stream nonTimeseriesIngestResults = async ? ingestProposalAsync(aspectsBatch) : ingestProposalSync(opContext, aspectsBatch); - return Stream.concat(nonTimeseriesIngestResults, timeseriesIngestResults) - .collect(Collectors.toList()); + List fullResults = + Stream.concat(nonTimeseriesIngestResults, timeseriesIngestResults) + .collect(Collectors.toList()); + String apiSource = + opContext.getRequestContext() != null + ? opContext.getRequestContext().getRequestAPI().name() + : "Unknown"; + String syncType = async ? "async" : "sync"; + MetricUtils.counter( + this.getClass(), + String.format(INGEST_PROPOSAL_API_SOURCE_METRIC_NAME, syncType, apiSource)) + .inc(fullResults.size()); + + return fullResults; } /** @@ -1427,6 +1441,7 @@ private boolean preprocessEvent( // Pre-process the update indices hook for UI updates to avoid perceived lag from Kafka if (updateIndicesService != null) { updateIndicesService.handleChangeEvent(opContext, metadataChangeLog); + MetricUtils.counter(this.getClass(), PRE_PROCESS_MCL_METRIC_NAME).inc(); } return true; } diff --git a/metadata-io/src/main/java/com/linkedin/metadata/userAgent/UserAgentUtils.java b/metadata-io/src/main/java/com/linkedin/metadata/userAgent/UserAgentUtils.java new file mode 100644 index 00000000000000..dcc5f2b1932581 --- /dev/null +++ b/metadata-io/src/main/java/com/linkedin/metadata/userAgent/UserAgentUtils.java @@ -0,0 +1,16 @@ +package com.linkedin.metadata.userAgent; + +import nl.basjes.parse.useragent.UserAgent; +import nl.basjes.parse.useragent.UserAgentAnalyzer; + +public class UserAgentUtils { + + private UserAgentUtils() {} + + public static final UserAgentAnalyzer UAA = + UserAgentAnalyzer.newBuilder() + .hideMatcherLoadStats() + .withField(UserAgent.AGENT_CLASS) + .withCache(1000) + .build(); +} diff --git a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java index 779c418a56142f..544bcd20b2029b 100644 --- a/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java +++ b/metadata-operation-context/src/main/java/io/datahubproject/metadata/context/RequestContext.java @@ -145,7 +145,7 @@ private static String buildRequestId( "%s(%s)", action, entityNames.stream().distinct().collect(Collectors.toList())); } - private static String extractUserAgent(@Nonnull HttpServletRequest request) { + public static String extractUserAgent(@Nonnull HttpServletRequest request) { return Optional.ofNullable(request.getHeader(HttpHeaders.USER_AGENT)).orElse(""); } diff --git a/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphQLController.java b/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphQLController.java index 2f66b30f55844c..1b6ea0c970978f 100644 --- a/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphQLController.java +++ b/metadata-service/graphql-servlet-impl/src/main/java/com/datahub/graphql/GraphQLController.java @@ -1,6 +1,7 @@ package com.datahub.graphql; import static com.linkedin.metadata.Constants.*; +import static com.linkedin.metadata.userAgent.UserAgentUtils.UAA; import com.codahale.metrics.MetricRegistry; import com.datahub.authentication.Authentication; @@ -11,6 +12,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; import com.google.inject.name.Named; import com.linkedin.datahub.graphql.GraphQLEngine; import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils; @@ -19,6 +21,7 @@ import com.linkedin.metadata.utils.metrics.MetricUtils; import graphql.ExecutionResult; import io.datahubproject.metadata.context.OperationContext; +import io.datahubproject.metadata.context.RequestContext; import io.opentelemetry.api.trace.Span; import jakarta.inject.Inject; import jakarta.servlet.http.HttpServletRequest; @@ -29,7 +32,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; +import nl.basjes.parse.useragent.UserAgent; import org.apache.commons.lang3.StringUtils; import org.springframework.http.HttpEntity; import org.springframework.http.HttpStatus; @@ -159,7 +164,8 @@ CompletableFuture> postGraphQL( * Format & Return Response */ try { - long totalDuration = submitMetrics(executionResult); + long totalDuration = + submitMetrics(executionResult, context.getOperationContext().getRequestContext()); String executionTook = totalDuration > 0 ? " in " + totalDuration + " ms" : ""; log.info("Executed operation {}" + executionTook, queryName); // Remove tracing from response to reduce bulk, not used by the frontend @@ -187,7 +193,8 @@ void getGraphQL(HttpServletRequest request, HttpServletResponse response) throw new HttpRequestMethodNotSupportedException("GET"); } - private void observeErrors(ExecutionResult executionResult) { + private void observeErrors( + ExecutionResult executionResult, @Nullable RequestContext requestContext) { executionResult .getErrors() .forEach( @@ -209,15 +216,28 @@ private void observeErrors(ExecutionResult executionResult) { } }); if (executionResult.getErrors().size() != 0) { + // Coarse grained MetricUtils.get().counter(MetricRegistry.name(this.getClass(), "error")).inc(); + if (requestContext != null) { + // User agent driven + String userAgent = UAA.parse(requestContext.getUserAgent()).getValue(UserAgent.AGENT_CLASS); + MetricUtils.get().counter(MetricRegistry.name(this.getClass(), "error", userAgent)).inc(); + } } } @SuppressWarnings("unchecked") - private long submitMetrics(ExecutionResult executionResult) { + private long submitMetrics( + ExecutionResult executionResult, @Nullable RequestContext requestContext) { try { - observeErrors(executionResult); + observeErrors(executionResult, requestContext); + // Coarse grained MetricUtils.get().counter(MetricRegistry.name(this.getClass(), "call")).inc(); + if (requestContext != null) { + // User agent driven + String userAgent = UAA.parse(requestContext.getUserAgent()).getValue(UserAgent.AGENT_CLASS); + MetricUtils.get().counter(MetricRegistry.name(this.getClass(), "call", userAgent)).inc(); + } Object tracingInstrumentation = executionResult.getExtensions().get("tracing"); if (tracingInstrumentation instanceof Map) { Map tracingMap = (Map) tracingInstrumentation; @@ -229,7 +249,8 @@ private long submitMetrics(ExecutionResult executionResult) { String fieldName = resolvers.stream() .filter( - resolver -> List.of("Query", "Mutation").contains(resolver.get("parentType"))) + resolver -> + ImmutableSet.of("Query", "Mutation").contains(resolver.get("parentType"))) .findFirst() .map(parentResolver -> parentResolver.get("fieldName")) .map(Object::toString) diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java index 3a47c11f8d7489..e72e60f1b2f4d8 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/metrics/MetricUtils.java @@ -15,6 +15,12 @@ private MetricUtils() {} public static final String NAME = "default"; private static final MetricRegistry REGISTRY = SharedMetricRegistries.getOrCreate(NAME); + public static String PRODUCE_MCL_METRIC_NAME = "produceMCL"; + public static String PRODUCE_MCP_METRIC_NAME = "produceMCP"; + public static String PRODUCE_PE_METRIC_NAME = "producePE"; + public static String INGEST_PROPOSAL_API_SOURCE_METRIC_NAME = "ingestProposal_%s_%s"; + public static String PRE_PROCESS_MCL_METRIC_NAME = "preProcessChangeLog"; + static { final JmxReporter reporter = JmxReporter.forRegistry(REGISTRY).build(); reporter.start();