Skip to content

Commit

Permalink
Instrument Reader and Writer methods
Browse files Browse the repository at this point in the history
  • Loading branch information
sydney-munro committed Nov 27, 2024
1 parent bd2fd5e commit 3e41d6f
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -800,17 +800,26 @@ public GrpcBlobReadChannel reader(String bucket, String blob, BlobSourceOption..

@Override
public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
ReadObjectRequest request = getReadObjectRequest(blob, opts);
GrpcCallContext grpcCallContext = Retrying.newCallContext();
Span otelSpan = openTelemetryTraceUtil.startSpan("reader", this.getClass().getName());
try (Scope unused = otelSpan.makeCurrent()) {
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
ReadObjectRequest request = getReadObjectRequest(blob, opts);
GrpcCallContext grpcCallContext = Retrying.newCallContext();

return new GrpcBlobReadChannel(
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
getOptions(),
retryAlgorithmManager.getFor(request),
responseContentLifecycleManager,
request,
!opts.autoGzipDecompression());
return new GrpcBlobReadChannel(
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
getOptions(),
retryAlgorithmManager.getFor(request),
responseContentLifecycleManager,
request,
!opts.autoGzipDecompression());
} catch (Exception e) {
otelSpan.recordException(e);
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw StorageException.coalesce(e);
} finally {
otelSpan.end();
}
}

@Override
Expand Down Expand Up @@ -853,25 +862,35 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption.

@Override
public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Hasher hasher = Hasher.noop();
// in JSON, the starting of the resumable session happens before the invocation of write can
// happen. Emulate the same thing here.
// 1. create the future
ApiFuture<ResumableWrite> startResumableWrite = startResumableWrite(grpcCallContext, req, opts);
// 2. await the result of the future
ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite);
// 3. wrap the result in another future container before constructing the BlobWriteChannel
ApiFuture<ResumableWrite> wrapped = ApiFutures.immediateFuture(resumableWrite);
return new GrpcBlobWriteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext),
getOptions(),
retryAlgorithmManager.idempotent(),
() -> wrapped,
hasher);
Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName());
try (Scope unused = otelSpan.makeCurrent()) {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Hasher hasher = Hasher.noop();
// in JSON, the starting of the resumable session happens before the invocation of write can
// happen. Emulate the same thing here.
// 1. create the future
ApiFuture<ResumableWrite> startResumableWrite =
startResumableWrite(grpcCallContext, req, opts);
// 2. await the result of the future
ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite);
// 3. wrap the result in another future container before constructing the BlobWriteChannel
ApiFuture<ResumableWrite> wrapped = ApiFutures.immediateFuture(resumableWrite);
return new GrpcBlobWriteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext),
getOptions(),
retryAlgorithmManager.idempotent(),
() -> wrapped,
hasher);
} catch (Exception e) {
otelSpan.recordException(e);
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw StorageException.coalesce(e);
} finally {
otelSpan.end();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil;
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Scope;
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Span;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.cloud.storage.spi.v1.StorageRpc.RewriteRequest;
import com.google.common.base.CharMatcher;
Expand Down Expand Up @@ -743,10 +745,19 @@ public StorageReadChannel reader(String bucket, String blob, BlobSourceOption...

@Override
public StorageReadChannel reader(BlobId blob, BlobSourceOption... options) {
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob);
StorageObject storageObject = Conversions.json().blobId().encode(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this));
Span otelSpan = openTelemetryTraceUtil.startSpan("reader", this.getClass().getName());
try (Scope unused = otelSpan.makeCurrent()) {
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob);
StorageObject storageObject = Conversions.json().blobId().encode(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this));
} catch (Exception e) {
otelSpan.recordException(e);
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw e;
} finally {
otelSpan.end();
}
}

@Override
Expand Down Expand Up @@ -777,40 +788,58 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption.

@Override
public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null);
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName());
try (Scope unused = otelSpan.makeCurrent()) {
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null);
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();

StorageObject encode = codecs.blobInfo().encode(updated);
// open the resumable session outside the write channel
// the exception behavior of open is different from #write(ByteBuffer)
Supplier<String> uploadIdSupplier =
ResumableMedia.startUploadForBlobInfo(
getOptions(),
updated,
optionsMap,
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
StorageObject encode = codecs.blobInfo().encode(updated);
// open the resumable session outside the write channel
// the exception behavior of open is different from #write(ByteBuffer)
Supplier<String> uploadIdSupplier =
ResumableMedia.startUploadForBlobInfo(
getOptions(),
updated,
optionsMap,
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
} catch (Exception e) {
otelSpan.recordException(e);
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw e;
} finally {
otelSpan.end();
}
}

@Override
public StorageWriteChannel writer(URL signedURL) {
// TODO: is it possible to know if a signed url is configured to have a constraint which makes
// it idempotent?
ResultRetryAlgorithm<?> forResumableUploadSessionCreate =
retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap());
// open the resumable session outside the write channel
// the exception behavior of open is different from #write(ByteBuffer)
String signedUrlString = signedURL.toString();
Supplier<String> uploadIdSupplier =
ResumableMedia.startUploadForSignedUrl(
getOptions(), signedURL, forResumableUploadSessionCreate);
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0);
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName());
try (Scope unused = otelSpan.makeCurrent()) {
// TODO: is it possible to know if a signed url is configured to have a constraint which makes
// it idempotent?
ResultRetryAlgorithm<?> forResumableUploadSessionCreate =
retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap());
// open the resumable session outside the write channel
// the exception behavior of open is different from #write(ByteBuffer)
String signedUrlString = signedURL.toString();
Supplier<String> uploadIdSupplier =
ResumableMedia.startUploadForSignedUrl(
getOptions(), signedURL, forResumableUploadSessionCreate);
JsonResumableWrite jsonResumableWrite =
JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0);
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
} catch (Exception e) {
otelSpan.recordException(e);
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
throw e;
} finally {
otelSpan.end();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.cloud.NoCredentials;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.CopyRequest;
Expand Down Expand Up @@ -170,6 +172,31 @@ public void runCopy() {
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("copy")));
}

@Test
public void runWriter() throws IOException {
BlobInfo info = BlobInfo.newBuilder(testBucket, generator.randomObjectName()).build();
try (WriteChannel writer = storage.writer(info)) {
// Do nothing
}
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("writer")));
}

@Test
public void runReader() throws IOException {
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
storage.create(blobInfo, helloWorldTextBytes);
try (ReadChannel reader = storage.reader(blobId)) {
// Do nothing
}
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("reader")));
}

private void checkCommonAttributes(List<SpanData> spanData) {
for (SpanData span : spanData) {
Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.cloud.NoCredentials;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.CopyRequest;
Expand Down Expand Up @@ -174,6 +176,31 @@ public void runCreateFromInputStream() throws IOException {
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("createFrom")));
}

@Test
public void runWriter() throws IOException {
BlobInfo info = BlobInfo.newBuilder(testBucket, generator.randomObjectName()).build();
try (WriteChannel writer = storage.writer(info)) {
// Do nothing
}
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("writer")));
}

@Test
public void runReader() throws IOException {
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
storage.create(blobInfo, helloWorldTextBytes);
try (ReadChannel reader = storage.reader(blobId)) {
// Do nothing
}
TestExporter testExported = (TestExporter) exporter;
List<SpanData> spanData = testExported.getExportedSpans();
checkCommonAttributes(spanData);
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("reader")));
}

private void checkCommonAttributes(List<SpanData> spanData) {
for (SpanData span : spanData) {
Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service"));
Expand Down
Loading

0 comments on commit 3e41d6f

Please sign in to comment.