Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Instrument HTTP Reads and Rewrites #2808

Merged
merged 4 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -848,9 +848,10 @@ private Get createReadRequest(StorageObject from, Map<Option, ?> options) throws
@Override
public long read(
StorageObject from, Map<Option, ?> options, long position, OutputStream outputStream) {
OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("read");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misunderstood the span name; Does startSpan provide context on class for this method?

Pattern: $package.$service/$method
Example using C++: storage::Client::WriteObject/CreateResumableUpload

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was thinking it would be

storage.spi.v1/read in this case so it's clear where this span exists.

@cojenco could you take a look at this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to providing context on the class/module for this method, especially considering there are many call paths in java

The OTel semantic conventions recommends the pattern of $package.$service/$method where $method MUST NOT contain slashes. I'm not very familiar with how the classes are structured here, but I agree that it would be a good idea to include some context of spi/v1/HttpStorageRpc in the span name for better debuggability

Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_READ);
Scope scope = tracer.withSpan(span);
try {
try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) {
Get req = createReadRequest(from, options);
Boolean shouldReturnRawInputStream = Option.RETURN_RAW_INPUT_STREAM.getBoolean(options);
if (shouldReturnRawInputStream != null) {
Expand All @@ -867,13 +868,16 @@ public long read(
req.executeMedia().download(outputStream);
return mediaHttpDownloader.getNumBytesDownloaded();
} catch (IOException ex) {
otelSpan.recordException(ex);
otelSpan.setStatus(StatusCode.ERROR, ex.getClass().getSimpleName());
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
StorageException serviceException = translate(ex);
if (serviceException.getCode() == SC_REQUESTED_RANGE_NOT_SATISFIABLE) {
return 0;
}
throw serviceException;
} finally {
otelSpan.end();
scope.close();
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
}
Expand All @@ -882,9 +886,10 @@ public long read(
@Override
public Tuple<String, byte[]> read(
StorageObject from, Map<Option, ?> options, long position, int bytes) {
OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("read");
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_READ);
Scope scope = tracer.withSpan(span);
try {
try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) {
checkArgument(position >= 0, "Position should be non-negative, is " + position);
Get req = createReadRequest(from, options);
Boolean shouldReturnRawInputStream = Option.RETURN_RAW_INPUT_STREAM.getBoolean(options);
Expand All @@ -902,13 +907,16 @@ public Tuple<String, byte[]> read(
String etag = req.getLastResponseHeaders().getETag();
return Tuple.of(etag, output.toByteArray());
} catch (IOException ex) {
otelSpan.recordException(ex);
otelSpan.setStatus(StatusCode.ERROR, ex.getClass().getSimpleName());
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
StorageException serviceException = StorageException.translate(ex);
if (serviceException.getCode() == SC_REQUESTED_RANGE_NOT_SATISFIABLE) {
return Tuple.of(null, new byte[0]);
}
throw serviceException;
} finally {
otelSpan.end();
scope.close();
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
}
Expand Down Expand Up @@ -1158,30 +1166,39 @@ public String open(String signedURL) {

@Override
public RewriteResponse openRewrite(RewriteRequest rewriteRequest) {
OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("openRewrite");
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_OPEN_REWRITE);
Scope scope = tracer.withSpan(span);
try {
return rewrite(rewriteRequest, null);
try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) {
return rewrite(rewriteRequest, null, openTelemetryTraceUtil.currentContext());
} finally {
otelSpan.end();
scope.close();
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
}
}

@Override
public RewriteResponse continueRewrite(RewriteResponse previousResponse) {
OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("continueRewrite");
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_CONTINUE_REWRITE);
Scope scope = tracer.withSpan(span);
try {
return rewrite(previousResponse.rewriteRequest, previousResponse.rewriteToken);
try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) {
return rewrite(
previousResponse.rewriteRequest,
previousResponse.rewriteToken,
openTelemetryTraceUtil.currentContext());
} finally {
otelSpan.end();
scope.close();
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
}
}

private RewriteResponse rewrite(RewriteRequest req, String token) {
try {
private RewriteResponse rewrite(
RewriteRequest req, String token, OpenTelemetryTraceUtil.Context ctx) {
OpenTelemetryTraceUtil.Span otelSpan = openTelemetryTraceUtil.startSpan("rewrite", ctx);
try (OpenTelemetryTraceUtil.Scope unused = otelSpan.makeCurrent()) {
String userProject = Option.USER_PROJECT.getString(req.sourceOptions);
if (userProject == null) {
userProject = Option.USER_PROJECT.getString(req.targetOptions);
Expand Down Expand Up @@ -1232,8 +1249,12 @@ private RewriteResponse rewrite(RewriteRequest req, String token) {
rewriteResponse.getRewriteToken(),
rewriteResponse.getTotalBytesRewritten().longValue());
} catch (IOException ex) {
otelSpan.recordException(ex);
otelSpan.setStatus(StatusCode.ERROR, ex.getClass().getSimpleName());
tracer.getCurrentSpan().setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
throw translate(ex);
} finally {
otelSpan.end();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.cloud.NoCredentials;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.it.runner.StorageITRunner;
import com.google.cloud.storage.it.runner.annotations.Backend;
import com.google.cloud.storage.it.runner.annotations.Inject;
Expand All @@ -31,6 +34,10 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -43,7 +50,10 @@ public class ITHttpOpenTelemetryTest {
@Inject public TestBench testBench;
private StorageOptions options;
private SpanExporter exporter;
private BlobId blobId;
private Storage storage;
private static final byte[] helloWorldTextBytes = "hello world".getBytes();
private static final byte[] helloWorldGzipBytes = TestUtils.gzipBytes(helloWorldTextBytes);
@Inject public Generator generator;
@Inject public BucketInfo testBucket;

Expand All @@ -65,20 +75,17 @@ public void setUp() {
.setOpenTelemetrySdk(openTelemetrySdk)
.build();
storage = options.getService();
String objectString = generator.randomObjectName();
blobId = BlobId.of(testBucket.getName(), objectString);
}

@Test
public void runCreateBucket() {
String bucket = "random-bucket";
storage.create(BucketInfo.of(bucket));
TestExporter testExported = (TestExporter) exporter;
SpanData spanData = testExported.getExportedSpans().get(0);
Assert.assertEquals("Storage", getAttributeValue(spanData, "gcp.client.service"));
Assert.assertEquals("googleapis/java-storage", getAttributeValue(spanData, "gcp.client.repo"));
Assert.assertEquals(
"com.google.cloud.google-cloud-storage",
getAttributeValue(spanData, "gcp.client.artifact"));
Assert.assertEquals("http", getAttributeValue(spanData, "rpc.system"));
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
}

@Test
Expand All @@ -88,6 +95,53 @@ public void runCreateBlob() {
storage.create(BlobInfo.newBuilder(toCreate).build(), content);
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
}

@Test
public void runRead() throws IOException {
BlobInfo blobInfo =
BlobInfo.newBuilder(blobId).setContentEncoding("gzip").setContentType("text/plain").build();
storage.create(blobInfo, helloWorldGzipBytes);
Path helloWorldTxtGz = File.createTempFile(blobId.getName(), ".txt.gz").toPath();
storage.downloadTo(
blobId, helloWorldTxtGz, Storage.BlobSourceOption.shouldReturnRawInputStream(true));
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("read")));
}

@Test
public void runCopy() {

byte[] expected = "Hello, World!".getBytes(StandardCharsets.UTF_8);

BlobInfo info =
BlobInfo.newBuilder(testBucket.getName(), generator.randomObjectName() + "copy/src")
.build();
Blob cpySrc = storage.create(info, expected, BlobTargetOption.doesNotExist());

BlobInfo dst =
BlobInfo.newBuilder(testBucket.getName(), generator.randomObjectName() + "copy/dst")
.build();

CopyRequest copyRequest =
CopyRequest.newBuilder()
.setSource(cpySrc.getBlobId())
.setSourceOptions(BlobSourceOption.generationMatch(cpySrc.getGeneration()))
.setTarget(dst, BlobTargetOption.doesNotExist())
.build();
CopyWriter copyWriter = storage.copy(copyRequest);
copyWriter.getResult();
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("openRewrite")));
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("rewrite")));
}

private void checkCommonAttributes(List<SpanData> spanData) {
for (SpanData span : spanData) {
Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service"));
Assert.assertEquals("googleapis/java-storage", getAttributeValue(span, "gcp.client.repo"));
Expand Down
Loading