Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wip fix for trace 1.0 #70

Merged
merged 10 commits into from
Jan 25, 2021
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,13 @@ Provides OpenTelemetry Exporters for Google Cloud Operations.
[circleci-image]: https://circleci.com/gh/GoogleCloudPlatform/opentelemetry-operations-java.svg?style=shield
[circleci-url]: https://circleci.com/gh/GoogleCloudPlatform/opentelemetry-operations-java



## Building

This project requires a mock server for Google Cloud APIs. To build and test, do the following:

```
$ source get_mock_server.sh
$ ./gradlew test -Dmock.server.path=$MOCKSERVER
```
20 changes: 16 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@ subprojects {
maven { url 'https://dl.bintray.com/open-telemetry/maven' }
}

test {
testLogging {
exceptionFormat = 'full'
}
}

ext {
autoServiceVersion = '1.0-rc7'
autoValueVersion = '1.7.4'
slf4jVersion = '1.7.30'
googleCloudVersion = '1.0.2'
cloudMonitoringVersion = '2.0.1'
openTelemetryVersion = '0.13.1'
openTelemetryInstrumentationVersion = '0.13.1'
openTelemetryVersion = '0.14.1'
openTelemetryInstrumentationVersion = '0.14.0'
junitVersion = '4.13'
mockitoVersion = '3.5.10'

Expand All @@ -41,6 +47,8 @@ subprojects {
slf4j : "org.slf4j:slf4j-api:${slf4jVersion}",
opentelemetry_api : "io.opentelemetry:opentelemetry-api:${openTelemetryVersion}",
opentelemetry_sdk : "io.opentelemetry:opentelemetry-sdk:${openTelemetryVersion}",
opentelemetry_api_metrics : "io.opentelemetry:opentelemetry-api-metrics:${openTelemetryVersion}-alpha",
opentelemetry_sdk_metrics : "io.opentelemetry:opentelemetry-sdk-metrics:${openTelemetryVersion}-alpha",
opentelemetry_auto : "io.opentelemetry.javaagent:opentelemetry-javaagent-spi:${openTelemetryInstrumentationVersion}",
]
testLibraries = [
Expand Down Expand Up @@ -75,6 +83,10 @@ subprojects {

signing {
required false
// Allow using the GPG agent on linux instead of passwords in a .properties file.
if (rootProject.hasProperty('signingUseGpgCmd')) {
useGpgCmd()
}
sign configurations.archives
}

Expand All @@ -100,8 +112,8 @@ subprojects {
url 'https://github.com/GoogleCloudPlatform/opentelemetry-operations-java'

scm {
connection 'scm:svn:https://github.com/GoogleCloudPlatform/opentelemetry-operations-java'
developerConnection 'scm:svn:https://github.com/GoogleCloudPlatform/opentelemetry-operations-java'
connection 'scm:git:https://github.com/GoogleCloudPlatform/opentelemetry-operations-java'
developerConnection 'scm:git:https://github.com/GoogleCloudPlatform/opentelemetry-operations-java'
url 'https://github.com/GoogleCloudPlatform/opentelemetry-operations-java'
}

Expand Down
4 changes: 2 additions & 2 deletions examples/metrics/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ task fatJar(type: Jar) {
description = 'Examples for Cloud Monitoring Exporter'

dependencies {
compile(libraries.opentelemetry_api)
compile(libraries.opentelemetry_sdk)
compile(libraries.opentelemetry_api_metrics)
compile(libraries.opentelemetry_sdk_metrics)
compile(libraries.google_cloud_monitoring)
compile project(':exporter-metrics')
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.LongUpDownCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.api.metrics.GlobalMetricsProvider;
import io.opentelemetry.sdk.metrics.export.IntervalMetricReader;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import java.io.IOException;
import java.util.Random;

public class MetricsExporterExample {
private static final Meter METER =
OpenTelemetry.getGlobalMeter("instrumentation-library-name", "semver:1.0.0");
GlobalMetricsProvider.getMeter("instrumentation-library-name", "semver:1.0.0");
private static final Random RANDOM = new Random();
private static io.opentelemetry.sdk.metrics.export.MetricExporter metricExporter;
private static IntervalMetricReader intervalMetricReader;
Expand All @@ -29,7 +30,7 @@ private static void setupMetricExporter() {
.setExportIntervalMillis(20000)
.setMetricExporter(metricExporter)
.setMetricProducers(
singleton(OpenTelemetrySdk.getGlobalMeterProvider().getMetricProducer()))
singleton(SdkMeterProvider.builder().buildAndRegisterGlobal()))
.build();
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.google.cloud.opentelemetry.trace.TraceConfiguration;
import com.google.cloud.opentelemetry.trace.TraceExporter;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
Expand All @@ -13,7 +13,8 @@
import java.util.Random;

public class TraceExporterExample {
private static final Tracer tracer = OpenTelemetry.getGlobalTracer("io.opentelemetry.example.TraceExporterExample");
// TODO: Update this to NON-global OpenTelemetry. Only APIs/Frameworks should be using gloabl.
jsuereth marked this conversation as resolved.
Show resolved Hide resolved
private static final Tracer tracer = GlobalOpenTelemetry.getTracer("io.opentelemetry.example.TraceExporterExample");
private static final Random random = new Random();

private static void setupTraceExporter() {
Expand Down
4 changes: 2 additions & 2 deletions exporters/metrics/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ dependencies {
api(libraries.auto_value_annotations)
api(libraries.slf4j)
annotationProcessor(libraries.auto_value)
api(libraries.opentelemetry_api)
api(libraries.opentelemetry_sdk)
api(libraries.opentelemetry_api_metrics)
api(libraries.opentelemetry_sdk_metrics)
api(libraries.google_cloud_core)
api(libraries.google_cloud_monitoring)
testImplementation(testLibraries.junit)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.google.cloud.opentelemetry.metric;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.stream.Collectors;

import com.google.api.MetricDescriptor;
import com.google.monitoring.v3.TimeSeries;
import com.google.monitoring.v3.TypedValue;

import io.opentelemetry.api.common.Labels;
import io.opentelemetry.sdk.metrics.data.DoublePoint;
import io.opentelemetry.sdk.metrics.data.LongPoint;
import io.opentelemetry.sdk.metrics.data.MetricData;

import com.google.cloud.opentelemetry.metric.MetricExporter.MetricWithLabels;

import static com.google.cloud.opentelemetry.metric.MetricTranslator.mapMetricDescriptor;
import static com.google.cloud.opentelemetry.metric.MetricTranslator.mapInterval;
import static com.google.cloud.opentelemetry.metric.MetricTranslator.mapMetric;
import static com.google.cloud.opentelemetry.metric.MetricTranslator.mapResource;

public class AggregateByLabelMetricTimeSeriesBuilder implements MetricTimeSeriesBuilder {

private final Map<String, MetricDescriptor> descriptors = new HashMap<>();
private final Map<MetricWithLabels, TimeSeries.Builder> pendingTimeSeries = new HashMap<>();
private final String projectId;


public AggregateByLabelMetricTimeSeriesBuilder(String projectId) {
this.projectId = projectId;
}

@Override
public void recordPoint(MetricData metric, LongPoint point) {
MetricDescriptor descriptor = mapMetricDescriptor(metric, point);
if (descriptor == null) {
return;
}
// TODO: Use actual unique key for descriptors, and deal with conflicts (or log)
descriptors.putIfAbsent(descriptor.getName(), descriptor);
MetricWithLabels key = new MetricWithLabels(descriptor.getType(), point.getLabels());
// TODO: Check lastExportTime and ensure we don't send too often...
pendingTimeSeries
.computeIfAbsent(key, k -> makeTimeSeriesHeader(metric, point.getLabels(), descriptor))
.addPoints(
com.google.monitoring.v3.Point.newBuilder()
.setValue(
TypedValue.newBuilder().setInt64Value(point.getValue())
).setInterval(mapInterval(point, metric.getType())).build());
}

@Override
public void recordPoint(MetricData metric, DoublePoint point) {
MetricDescriptor descriptor = mapMetricDescriptor(metric, point);
if (descriptor == null) {
return;
}
// TODO: Use actual unique key for descriptors, and deal with conflicts (or log)
descriptors.putIfAbsent(descriptor.getName(), descriptor);
MetricWithLabels key = new MetricWithLabels(descriptor.getType(), point.getLabels());
// TODO: Check lastExportTime and ensure we don't send too often...
pendingTimeSeries
.computeIfAbsent(key, k -> makeTimeSeriesHeader(metric, point.getLabels(), descriptor))
.addPoints(
com.google.monitoring.v3.Point.newBuilder()
.setValue(
TypedValue.newBuilder().setDoubleValue(point.getValue())
).setInterval(mapInterval(point, metric.getType())));
}

private TimeSeries.Builder makeTimeSeriesHeader(MetricData metric, Labels labels, MetricDescriptor descriptor) {
return TimeSeries.newBuilder()
.setMetric(mapMetric(labels, descriptor.getType()))
.setMetricKind(descriptor.getMetricKind())
.setResource(mapResource(projectId));
}

@Override
public Collection<MetricDescriptor> getDescriptors() {
return descriptors.values();
}

@Override
public List<TimeSeries> getTimeSeries() {
return pendingTimeSeries.values().stream().map(b -> b.build()).collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package com.google.cloud.opentelemetry.metric;

import static com.google.api.client.util.Preconditions.checkNotNull;
import static com.google.cloud.opentelemetry.metric.MetricTranslator.mapMetric;
import static com.google.cloud.opentelemetry.metric.MetricTranslator.mapMetricDescriptor;
import static com.google.cloud.opentelemetry.metric.MetricTranslator.mapPoint;
import static com.google.cloud.opentelemetry.metric.MetricTranslator.mapResource;

import com.google.api.Metric;
import com.google.api.MetricDescriptor;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.Credentials;
Expand All @@ -17,12 +12,14 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.monitoring.v3.CreateMetricDescriptorRequest;
import com.google.monitoring.v3.Point;
import com.google.monitoring.v3.ProjectName;
import com.google.monitoring.v3.TimeSeries;
import io.opentelemetry.api.common.Labels;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.LongPoint;
import io.opentelemetry.sdk.metrics.data.DoublePoint;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -96,62 +93,73 @@ private static MetricExporter createWithCredentials(

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
List<TimeSeries> allTimesSeries = new ArrayList<>();

for (MetricData metricData : metrics) {

// We are expecting one point per MetricData
if (metricData.getPoints().size() != 1) {
logger.error(
"There should be exactly one point in each metricData, found {}",
metricData.getPoints().size());
continue;
// General Algorithm for export:
// 1. Iterate over all points in the set of metrics to export
// 2. Attempt to register MetricDescriptors if not already registered.
// 3. Fire the set of time series off.
MetricTimeSeriesBuilder builder = new AggregateByLabelMetricTimeSeriesBuilder(projectId);
for (final MetricData metricData : metrics) {
// Extract all the underlying points.
switch(metricData.getType()) {
case LONG_GAUGE:
for(LongPoint point : metricData.getLongGaugeData().getPoints()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too familiar with the Java SDK – in practice, is there always just one point?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's one of the issues we're fixing here.

builder.recordPoint(metricData, point);
}
break;
case LONG_SUM:
for(LongPoint point : metricData.getLongSumData().getPoints()) {
builder.recordPoint(metricData, point);
}
break;
case DOUBLE_GAUGE:
for(DoublePoint point : metricData.getDoubleGaugeData().getPoints()) {
builder.recordPoint(metricData, point);
}
break;
case DOUBLE_SUM:
for(DoublePoint point : metricData.getDoubleSumData().getPoints()) {
builder.recordPoint(metricData, point);
}
break;
default:
logger.error(
"Metric type {} not supported. Only gauge and cumulative types are supported.",
metricData.getType());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit inconsistent indent

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

continue;
}
MetricData.Point metricPoint = metricData.getPoints().iterator().next();

MetricDescriptor descriptor = mapMetricDescriptor(metricData, metricPoint);
if (descriptor == null) {
continue;
}
metricServiceClient.createMetricDescriptor(
CreateMetricDescriptorRequest.newBuilder()
.setName(PROJECT_NAME_PREFIX + projectId)
.setMetricDescriptor(descriptor)
.build());

MetricWithLabels updateKey =
new MetricWithLabels(descriptor.getType(), metricPoint.getLabels());

// Cloud Monitoring API allows, for any combination of labels and
// metric name, one update per WRITE_INTERVAL seconds
long pointCollectionTime = metricPoint.getEpochNanos();
if (lastUpdatedTime.containsKey(updateKey)
&& pointCollectionTime
<= lastUpdatedTime.get(updateKey) / NANO_PER_SECOND + WRITE_INTERVAL_SECOND) {
continue;
for (final MetricDescriptor descriptor: builder.getDescriptors()) {
// TODO: limit this ONCE per JVM.
jsuereth marked this conversation as resolved.
Show resolved Hide resolved
metricServiceClient.createMetricDescriptor(
CreateMetricDescriptorRequest.newBuilder()
.setName(PROJECT_NAME_PREFIX + projectId)
.setMetricDescriptor(descriptor)
.build());
}

Metric metric = mapMetric(metricPoint, descriptor.getType());
Point point = mapPoint(metricData, metricPoint, updateKey, lastUpdatedTime);
if (point == null) {
continue;
}

allTimesSeries.add(
TimeSeries.newBuilder()
.setMetric(metric)
.addPoints(point)
.setResource(mapResource(projectId))
.setMetricKind(descriptor.getMetricKind())
.build());
}
createTimeSeriesBatch(metricServiceClient, ProjectName.of(projectId), allTimesSeries);
if (allTimesSeries.size() < metrics.size()) {
// TODO: Filter metrics by last updated time....
// MetricWithLabels updateKey =
// new MetricWithLabels(descriptor.getType(), metricPoint.getLabels());

// // Cloud Monitoring API allows, for any combination of labels and
// // metric name, one update per WRITE_INTERVAL seconds
// long pointCollectionTime = metricPoint.getEpochNanos();
// if (lastUpdatedTime.containsKey(updateKey)
// && pointCollectionTime
// <= lastUpdatedTime.get(updateKey) / NANO_PER_SECOND + WRITE_INTERVAL_SECOND) {
// continue;
// }
}
List<TimeSeries> series = builder.getTimeSeries();
createTimeSeriesBatch(metricServiceClient, ProjectName.of(projectId), series);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do GCM API errors get surfaced? It doesn't look like createTimeSeriesBatch() throws. API errors seem like the most reasonable case to return CompletableResultCode.ofFailure()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to refactor all of that code. I opened up an issue to track it, but yeah... it's not clear what happens right now.

Even more, we may want to use an asynchronous client to avoid blocking, as we return a completable result (i.e an async observable). We'll fix this up more post-1.0 as it won't break existing usage, just make it better.

// TODO: better error reporting.
if (series.size() < metrics.size()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re above comment about each MetricData having a list of points; if that's not the case, this would report failure. Should we just remove the check for now if its not adding anything?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to not change the behavior right before 1.0. I agree this needs to get fixed, but I'd rather spend some time on it.

Given metrics is alpha, I think the TODO and a bug to fix is the right way to go for now.

return CompletableResultCode.ofFailure();
}
return CompletableResultCode.ofSuccess();
}

// Fragment metrics into batches and send to GCM.
private static void createTimeSeriesBatch(
CloudMetricClient metricServiceClient,
ProjectName projectName,
Expand Down Expand Up @@ -179,6 +187,7 @@ public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}

// TODO: Move this to its own class.
static class MetricWithLabels {

private final String metricType;
Expand Down
Loading