From 01e0aecba144dc733de40d3413b7d97fcce6624e Mon Sep 17 00:00:00 2001 From: Crypt Keeper <64215+codefromthecrypt@users.noreply.github.com> Date: Mon, 15 Jan 2024 07:59:37 +0800 Subject: [PATCH] migrates to zipkin-reporter 3.2 BytesMessageSender (#214) Signed-off-by: Adrian Cole --- benchmarks/pom.xml | 2 +- collector-pubsub/pom.xml | 2 +- encoder-stackdriver-brave/pom.xml | 2 +- encoder-stackdriver-zipkin/pom.xml | 2 +- module/pom.xml | 2 +- pom.xml | 4 +- propagation-stackdriver/pom.xml | 2 +- sender-pubsub/pom.xml | 2 +- .../zipkin2/reporter/pubsub/PubSubSender.java | 148 +++---------- .../reporter/pubsub/PubSubSenderTest.java | 67 +++--- sender-stackdriver/pom.xml | 2 +- .../AwaitableUnaryClientCallListener.java | 4 +- .../stackdriver/StackdriverSender.java | 114 +++------- .../CallbackToUnaryClientCallListener.java | 61 ------ .../stackdriver/internal/UnaryClientCall.java | 81 -------- .../AsyncReporterStackdriverSenderTest.java | 14 +- .../stackdriver/ITStackdriverSender.java | 6 +- .../stackdriver/StackdriverSenderTest.java | 33 ++- .../internal/UnaryClientCallTest.java | 194 ------------------ storage-stackdriver/pom.xml | 2 +- translation-stackdriver/pom.xml | 2 +- 21 files changed, 127 insertions(+), 619 deletions(-) rename sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/{internal => }/AwaitableUnaryClientCallListener.java (97%) delete mode 100644 sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/CallbackToUnaryClientCallListener.java delete mode 100644 sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/UnaryClientCall.java delete mode 100644 sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/internal/UnaryClientCallTest.java diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index f8f33e4..2fd73fd 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -20,7 +20,7 @@ zipkin-gcp-parent io.zipkin.gcp - 2.1.2-SNAPSHOT + 2.2.0-SNAPSHOT benchmarks diff --git a/collector-pubsub/pom.xml b/collector-pubsub/pom.xml index c337823..b8bfbd4 100644 --- a/collector-pubsub/pom.xml +++ b/collector-pubsub/pom.xml @@ -18,7 +18,7 @@ zipkin-gcp-parent io.zipkin.gcp - 2.1.2-SNAPSHOT + 2.2.0-SNAPSHOT 4.0.0 diff --git a/encoder-stackdriver-brave/pom.xml b/encoder-stackdriver-brave/pom.xml index 9b014de..11d9ed5 100644 --- a/encoder-stackdriver-brave/pom.xml +++ b/encoder-stackdriver-brave/pom.xml @@ -18,7 +18,7 @@ zipkin-gcp-parent io.zipkin.gcp - 2.1.2-SNAPSHOT + 2.2.0-SNAPSHOT 4.0.0 diff --git a/encoder-stackdriver-zipkin/pom.xml b/encoder-stackdriver-zipkin/pom.xml index 11889e4..0b74560 100644 --- a/encoder-stackdriver-zipkin/pom.xml +++ b/encoder-stackdriver-zipkin/pom.xml @@ -18,7 +18,7 @@ zipkin-gcp-parent io.zipkin.gcp - 2.1.2-SNAPSHOT + 2.2.0-SNAPSHOT 4.0.0 diff --git a/module/pom.xml b/module/pom.xml index 82dbadb..f45bf2c 100644 --- a/module/pom.xml +++ b/module/pom.xml @@ -19,7 +19,7 @@ io.zipkin.gcp zipkin-gcp-parent - 2.1.2-SNAPSHOT + 2.2.0-SNAPSHOT zipkin-module-gcp diff --git a/pom.xml b/pom.xml index db94524..6a10a7b 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ io.zipkin.gcp zipkin-gcp-parent - 2.1.2-SNAPSHOT + 2.2.0-SNAPSHOT pom @@ -75,7 +75,7 @@ io.zipkin.zipkin2 3.0.2 - 3.1.1 + 3.2.1 3.2.1 com.linecorp.armeria diff --git a/propagation-stackdriver/pom.xml b/propagation-stackdriver/pom.xml index b9c8570..0f768d9 100644 --- a/propagation-stackdriver/pom.xml +++ b/propagation-stackdriver/pom.xml @@ -18,7 +18,7 @@ zipkin-gcp-parent io.zipkin.gcp - 2.1.2-SNAPSHOT + 2.2.0-SNAPSHOT 4.0.0 diff --git a/sender-pubsub/pom.xml b/sender-pubsub/pom.xml index a9d4252..0daeba6 100644 --- a/sender-pubsub/pom.xml +++ b/sender-pubsub/pom.xml @@ -18,7 +18,7 @@ zipkin-gcp-parent io.zipkin.gcp - 2.1.2-SNAPSHOT + 2.2.0-SNAPSHOT 4.0.0 diff --git a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java index f194f44..0a49a4c 100644 --- a/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java +++ b/sender-pubsub/src/main/java/zipkin2/reporter/pubsub/PubSubSender.java @@ -13,30 +13,22 @@ */ package zipkin2.reporter.pubsub; -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; -import com.google.api.core.ApiFutures; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; -import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; -import com.google.pubsub.v1.Topic; -import com.google.pubsub.v1.TopicName; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import zipkin2.reporter.BytesMessageEncoder; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; -public class PubSubSender extends Sender { +public class PubSubSender extends BytesMessageSender.Base { public static PubSubSender create(String topic) { return newBuilder().topic(topic).build(); @@ -109,7 +101,6 @@ public PubSubSender build() { if (topic == null) throw new NullPointerException("topic == null"); if (executorProvider == null) executorProvider = defaultExecutorProvider(); - ; if (publisher == null) { try { @@ -146,7 +137,6 @@ public Builder toBuilder() { final String topic; final int messageMaxBytes; - final Encoding encoding; final Publisher publisher; final ExecutorProvider executorProvider; final TopicAdminClient topicAdminClient; @@ -154,61 +144,43 @@ public Builder toBuilder() { volatile boolean closeCalled; PubSubSender(Builder builder) { - this.topic = builder.topic; - this.messageMaxBytes = builder.messageMaxBytes; - this.encoding = builder.encoding; - this.publisher = builder.publisher; - this.executorProvider = builder.executorProvider; - this.topicAdminClient = builder.topicAdminClient; + super(builder.encoding); + topic = builder.topic; + messageMaxBytes = builder.messageMaxBytes; + publisher = builder.publisher; + executorProvider = builder.executorProvider; + topicAdminClient = builder.topicAdminClient; } - /** - * If no permissions given sent back ok, f permissions and topic exist ok, if topic does not exist error - * - * @return - */ - @Override - public CheckResult check() { - try { - Topic topic = topicAdminClient.getTopic(TopicName.parse(this.topic)); - return CheckResult.OK; - } catch (ApiException e) { - return CheckResult.failed(e); - } - } - - @Override public Encoding encoding() { - return encoding; - } - - @Override - public int messageMaxBytes() { + @Override public int messageMaxBytes() { return messageMaxBytes; } - @Override - public int messageSizeInBytes(List bytes) { - return encoding().listSizeInBytes(bytes); - } - - @Override - public Call sendSpans(List byteList) { - if (closeCalled) throw new IllegalStateException("closed"); + @Override public void send(List byteList) throws IOException { + if (closeCalled) throw new ClosedSenderException(); byte[] messageBytes = BytesMessageEncoder.forEncoding(encoding()).encode(byteList); - PubsubMessage pubsubMessage = + PubsubMessage message = PubsubMessage.newBuilder().setData(ByteString.copyFrom(messageBytes)).build(); - return new PubSubCall(pubsubMessage); + try { + publisher.publish(message).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) throw (RuntimeException) cause; + if (cause instanceof Error) throw (Error) cause; + throw new RuntimeException(cause); + } } /** - * Shutdown on Publisher is not async thus moving the synchronized block to another function in order not to block until the shutdown is over - * - * @throws IOException + * Shutdown on Publisher is not async thus moving the synchronized block to another function in + * order not to block until the shutdown is over. */ - @Override - public void close() throws IOException { + @Override public void close() { if (!setClosed()) { return; } @@ -227,68 +199,4 @@ private synchronized boolean setClosed() { @Override public final String toString() { return "PubSubSender{topic=" + topic + "}"; } - - class PubSubCall extends Call.Base { - private final PubsubMessage message; - volatile ApiFuture future; - - public PubSubCall(PubsubMessage message) { - this.message = message; - } - - @Override - protected Void doExecute() throws IOException { - try { - publisher.publish(message).get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - return null; - } - - @Override - protected void doEnqueue(Callback callback) { - future = publisher.publish(message); - ApiFutures.addCallback(future, new ApiFutureCallbackAdapter(callback), - executorProvider.getExecutor()); - if (future.isCancelled()) throw new IllegalStateException("cancelled sending spans"); - } - - @Override - protected void doCancel() { - Future maybeFuture = future; - if (maybeFuture != null) maybeFuture.cancel(true); - } - - @Override - protected boolean doIsCanceled() { - Future maybeFuture = future; - return maybeFuture != null && maybeFuture.isCancelled(); - } - - @Override - public Call clone() { - PubsubMessage clone = PubsubMessage.newBuilder(message).build(); - return new PubSubCall(clone); - } - } - - static final class ApiFutureCallbackAdapter implements ApiFutureCallback { - - final Callback callback; - - public ApiFutureCallbackAdapter(Callback callback) { - this.callback = callback; - } - - @Override - public void onFailure(Throwable t) { - callback.onError(t); - } - - @Override - public void onSuccess(String result) { - callback.onSuccess(null); - } - } } diff --git a/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java b/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java index 6c80592..d09c80b 100644 --- a/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java +++ b/sender-pubsub/src/test/java/zipkin2/reporter/pubsub/PubSubSenderTest.java @@ -54,13 +54,12 @@ import org.mockito.junit.jupiter.MockitoExtension; import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; -import zipkin2.reporter.Call; -import zipkin2.reporter.CheckResult; import zipkin2.reporter.Encoding; import zipkin2.reporter.SpanBytesEncoder; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static zipkin2.TestObjects.CLIENT_SPAN; @@ -116,9 +115,8 @@ private InstantiatingExecutorProvider testExecutorProvider() { .build(); } - @Test void sendsSpans() throws Exception { - ArgumentCaptor requestCaptor = - ArgumentCaptor.forClass(PublishRequest.class); + @Test void send() throws Exception { + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishRequest.class); doAnswer(invocationOnMock -> { StreamObserver responseObserver = invocationOnMock.getArgument(1); @@ -128,15 +126,14 @@ private InstantiatingExecutorProvider testExecutorProvider() { return null; }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(extractSpans(requestCaptor.getValue())) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_PROTO3() throws Exception { - ArgumentCaptor requestCaptor = - ArgumentCaptor.forClass(PublishRequest.class); + @Test void send_empty() throws Exception { + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishRequest.class); doAnswer(invocationOnMock -> { StreamObserver responseObserver = invocationOnMock.getArgument(1); @@ -146,17 +143,14 @@ private InstantiatingExecutorProvider testExecutorProvider() { return null; }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); - sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); - - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(); assertThat(extractSpans(requestCaptor.getValue())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + .isEmpty(); } - @Test void sendsSpans_json_unicode() throws Exception { - ArgumentCaptor requestCaptor = - ArgumentCaptor.forClass(PublishRequest.class); + @Test void send_PROTO3() throws Exception { + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishRequest.class); doAnswer(invocationOnMock -> { StreamObserver responseObserver = invocationOnMock.getArgument(1); @@ -166,39 +160,42 @@ private InstantiatingExecutorProvider testExecutorProvider() { return null; }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); - Span unicode = CLIENT_SPAN.toBuilder().putTag("error", "\uD83D\uDCA9").build(); - send(unicode).execute(); + sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); - assertThat(extractSpans(requestCaptor.getValue())).containsExactly(unicode); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); + + assertThat(extractSpans(requestCaptor.getValue())) + .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void checkPasses() throws Exception { - ArgumentCaptor captor = - ArgumentCaptor.forClass(GetTopicRequest.class); + @Test void send_json_unicode() throws Exception { + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishRequest.class); doAnswer(invocationOnMock -> { - StreamObserver responseObserver = invocationOnMock.getArgument(1); - responseObserver.onNext(Topic.newBuilder().setName("topic-name").build()); + StreamObserver responseObserver = invocationOnMock.getArgument(1); + responseObserver.onNext( + PublishResponse.newBuilder().addMessageIds(UUID.randomUUID().toString()).build()); responseObserver.onCompleted(); return null; - }).when(publisherImplBase).getTopic(captor.capture(), any(StreamObserver.class)); + }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); - CheckResult result = sender.check(); - assertThat(result.ok()).isTrue(); + Span unicode = CLIENT_SPAN.toBuilder().putTag("error", "\uD83D\uDCA9").build(); + sendSpans(unicode); + + assertThat(extractSpans(requestCaptor.getValue())).containsExactly(unicode); } - @Test void checkFailsWithStreamNotActive() throws Exception { - ArgumentCaptor captor = - ArgumentCaptor.forClass(GetTopicRequest.class); + @Test void sendFailsWithStreamNotActive() { + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishRequest.class); doAnswer(invocationOnMock -> { StreamObserver responseObserver = invocationOnMock.getArgument(1); responseObserver.onError(new io.grpc.StatusRuntimeException(Status.NOT_FOUND)); return null; - }).when(publisherImplBase).getTopic(captor.capture(), any(StreamObserver.class)); + }).when(publisherImplBase).publish(requestCaptor.capture(), any(StreamObserver.class)); - CheckResult result = sender.check(); - assertThat(result.error()).isInstanceOf(ApiException.class); + assertThatThrownBy(this::sendSpans) + .isInstanceOf(ApiException.class); } private List extractSpans(PublishRequest publishRequest) { @@ -217,9 +214,9 @@ Stream extractSpans(PubsubMessage pubsubMessage) { return SpanBytesDecoder.PROTO3.decodeList(messageBytes).stream(); } - Call send(zipkin2.Span... spans) { + void sendSpans(zipkin2.Span... spans) throws Exception { SpanBytesEncoder bytesEncoder = sender.encoding() == Encoding.JSON ? SpanBytesEncoder.JSON_V2 : SpanBytesEncoder.PROTO3; - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } } diff --git a/sender-stackdriver/pom.xml b/sender-stackdriver/pom.xml index ff6d2e4..bbf2be3 100644 --- a/sender-stackdriver/pom.xml +++ b/sender-stackdriver/pom.xml @@ -18,7 +18,7 @@ zipkin-gcp-parent io.zipkin.gcp - 2.1.2-SNAPSHOT + 2.2.0-SNAPSHOT 4.0.0 diff --git a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/AwaitableUnaryClientCallListener.java b/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/AwaitableUnaryClientCallListener.java similarity index 97% rename from sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/AwaitableUnaryClientCallListener.java rename to sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/AwaitableUnaryClientCallListener.java index 0ff93db..852d66f 100644 --- a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/AwaitableUnaryClientCallListener.java +++ b/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/AwaitableUnaryClientCallListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -11,7 +11,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin2.reporter.stackdriver.internal; +package zipkin2.reporter.stackdriver; import io.grpc.ClientCall; import io.grpc.Metadata; diff --git a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/StackdriverSender.java b/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/StackdriverSender.java index 16c3ba8..53fbb0d 100644 --- a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/StackdriverSender.java +++ b/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/StackdriverSender.java @@ -22,23 +22,22 @@ import com.google.protobuf.UnsafeByteOperations; import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ClientCall; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; +import io.grpc.Metadata; import java.io.IOException; import java.util.List; -import zipkin2.reporter.Call; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; +import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; -import zipkin2.reporter.stackdriver.internal.UnaryClientCall; import static com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag; import static io.grpc.CallOptions.DEFAULT; -import static zipkin2.reporter.stackdriver.internal.UnaryClientCall.DEFAULT_SERVER_TIMEOUT_MS; -public final class StackdriverSender extends Sender { +public final class StackdriverSender extends BytesMessageSender.Base { + static final int DEFAULT_SERVER_TIMEOUT_MS = 5000; + public static Builder newBuilder() { ManagedChannel channel = ManagedChannelBuilder.forTarget("cloudtrace.googleapis.com").build(); Builder result = newBuilder(channel); @@ -100,9 +99,8 @@ public StackdriverSender build() { final int spanNameFieldSize; final long serverResponseTimeoutMs; - final BatchWriteSpansCall healthcheckCall; - StackdriverSender(Builder builder) { + super(Encoding.PROTO3); channel = builder.channel; callOptions = builder.callOptions; projectName = ByteString.copyFromUtf8("projects/" + builder.projectId); @@ -117,26 +115,13 @@ public StackdriverSender build() { spanNameFieldSize = CodedOutputStream.computeTagSize(1) + CodedOutputStream.computeUInt32SizeNoTag(spanNameSize) + spanNameSize; - - BatchWriteSpansRequest healthcheckRequest = BatchWriteSpansRequest.newBuilder() - .setNameBytes(projectName) - .addSpans(Span.newBuilder().build()) - .build(); - healthcheckCall = new BatchWriteSpansCall(healthcheckRequest); } - @Override - public Encoding encoding() { - return Encoding.PROTO3; - } - - @Override - public int messageMaxBytes() { + @Override public int messageMaxBytes() { return 1024 * 1024; // 1 MiB for now } - @Override - public int messageSizeInBytes(List traceIdPrefixedSpans) { + @Override public int messageSizeInBytes(List traceIdPrefixedSpans) { int length = traceIdPrefixedSpans.size(); if (length == 0) return 0; if (length == 1) return messageSizeInBytes(traceIdPrefixedSpans.get(0).length); @@ -149,19 +134,15 @@ public int messageSizeInBytes(List traceIdPrefixedSpans) { return size; } - @Override - public int messageSizeInBytes(int traceIdPrefixedSpanSize) { + @Override public int messageSizeInBytes(int traceIdPrefixedSpanSize) { return projectNameFieldSize + spanFieldSize(traceIdPrefixedSpanSize); } /** close is typically called from a different thread */ volatile boolean closeCalled; - @Override - public Call sendSpans(List traceIdPrefixedSpans) { - if (closeCalled) throw new IllegalStateException("closed"); - int length = traceIdPrefixedSpans.size(); - if (length == 0) return Call.create(null); + @Override public void send(List traceIdPrefixedSpans) throws IOException { + if (closeCalled) throw new ClosedSenderException(); BatchWriteSpansRequest.Builder request = BatchWriteSpansRequest.newBuilder() .setNameBytes(projectName); @@ -169,42 +150,28 @@ public Call sendSpans(List traceIdPrefixedSpans) { request.addSpans(parseTraceIdPrefixedSpan(traceIdPrefixedSpan, spanNameSize, traceIdPrefix)); } - return new BatchWriteSpansCall(request.build()).map(EmptyToVoid.INSTANCE); - } + ClientCall call = + channel.newCall(TraceServiceGrpc.getBatchWriteSpansMethod(), callOptions); - /** - * Sends a malformed call to Stackdriver Trace to validate service health. - * - * @return successful status if Stackdriver Trace API responds with expected validation - * error (or happens to respond as success -- unexpected but okay); otherwise returns error status - * wrapping the underlying exception. - */ - @Override - public CheckResult check() { + AwaitableUnaryClientCallListener listener = + new AwaitableUnaryClientCallListener<>(serverResponseTimeoutMs); try { - healthcheckCall.clone().execute(); - } catch (StatusRuntimeException sre) { - if (sre.getStatus().getCode() == Status.Code.INVALID_ARGUMENT) { - return CheckResult.OK; - } - return CheckResult.failed(sre); - } catch (Exception e) { - return CheckResult.failed(e); + call.start(listener, new Metadata()); + call.request(1); + call.sendMessage(request.build()); + call.halfClose(); + } catch (RuntimeException | Error t) { + call.cancel(null, t); + throw t; } - - // Currently the rpc throws a validation exception on malformed input, which we handle above. - // If we get here despite the known malformed input, the implementation changed and we need to - // update this check. It's unlikely enough that we can wait and see. - return CheckResult.OK; + listener.await(); } - @Override - public final String toString() { + @Override public String toString() { return "StackdriverSender{" + projectName.toStringUtf8() + "}"; } - @Override - public void close() { + @Override public void close() { if (!shutdownChannelOnClose) return; if (closeCalled) return; closeCalled = true; @@ -244,31 +211,4 @@ int spanFieldSize(int traceIdPrefixedSpanSize) { return CodedOutputStream.computeTagSize(2) + computeUInt32SizeNoTag(sizeOfSpanMessage) + sizeOfSpanMessage; } - - final class BatchWriteSpansCall extends UnaryClientCall { - - BatchWriteSpansCall(BatchWriteSpansRequest request) { - super(channel, TraceServiceGrpc.getBatchWriteSpansMethod(), callOptions, request, - serverResponseTimeoutMs); - } - - @Override - public String toString() { - return "BatchWriteSpansCall{" + request() + "}"; - } - - @Override - public BatchWriteSpansCall clone() { - return new BatchWriteSpansCall(request()); - } - } - - enum EmptyToVoid implements Call.Mapper { - INSTANCE { - @Override - public Void map(Empty empty) { - return null; - } - } - } } diff --git a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/CallbackToUnaryClientCallListener.java b/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/CallbackToUnaryClientCallListener.java deleted file mode 100644 index ab8e9d0..0000000 --- a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/CallbackToUnaryClientCallListener.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2016-2024 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin2.reporter.stackdriver.internal; - -import io.grpc.ClientCall; -import io.grpc.Metadata; -import io.grpc.Status; -import zipkin2.reporter.Callback; - -final class CallbackToUnaryClientCallListener extends ClientCall.Listener { - private final Callback callback; - /** this differentiates between not yet set and null */ - boolean valueSet; // guarded by this - - RespT value; // guarded by this - - CallbackToUnaryClientCallListener(Callback callback) { - this.callback = callback; - } - - @Override - public void onHeaders(Metadata headers) { - } - - @Override - public synchronized void onMessage(RespT value) { - if (valueSet) { - throw Status.INTERNAL - .withDescription("More than one value received for unary call") - .asRuntimeException(); - } - valueSet = true; - this.value = value; - } - - @Override - public synchronized void onClose(Status status, Metadata trailers) { - if (status.isOk()) { - if (!valueSet) { - callback.onError( - Status.INTERNAL - .withDescription("No value received for unary call") - .asRuntimeException(trailers)); - } - callback.onSuccess(value); - } else { - callback.onError(status.asRuntimeException(trailers)); - } - } -} diff --git a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/UnaryClientCall.java b/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/UnaryClientCall.java deleted file mode 100644 index 91f5972..0000000 --- a/sender-stackdriver/src/main/java/zipkin2/reporter/stackdriver/internal/UnaryClientCall.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2016-2024 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin2.reporter.stackdriver.internal; - -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import java.io.IOException; -import zipkin2.reporter.Call; -import zipkin2.reporter.Callback; - -public abstract class UnaryClientCall extends Call.Base { - public static final int DEFAULT_SERVER_TIMEOUT_MS = 5000; - final ClientCall call; - final ReqT request; - final long serverTimeoutMs; - - protected UnaryClientCall( - Channel channel, - MethodDescriptor descriptor, - CallOptions callOptions, - ReqT request, - long serverTimeoutMs) { - this.call = channel.newCall(descriptor, callOptions); - this.request = request; - this.serverTimeoutMs = serverTimeoutMs; - } - - protected final ReqT request() { - return request; - } - - @Override - protected final RespT doExecute() throws IOException { - AwaitableUnaryClientCallListener listener = - new AwaitableUnaryClientCallListener<>(this.serverTimeoutMs); - beginUnaryCall(listener); - return listener.await(); - } - - @Override - protected final void doEnqueue(Callback callback) { - ClientCall.Listener listener = new CallbackToUnaryClientCallListener<>(callback); - try { - beginUnaryCall(listener); - } catch (RuntimeException | Error t) { - callback.onError(t); - throw t; - } - } - - void beginUnaryCall(ClientCall.Listener listener) { - try { - call.start(listener, new Metadata()); - call.request(1); - call.sendMessage(request); - call.halfClose(); - } catch (RuntimeException | Error t) { - call.cancel(null, t); - throw t; - } - } - - @Override - protected final void doCancel() { - call.cancel(null, null); - } -} diff --git a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/AsyncReporterStackdriverSenderTest.java b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/AsyncReporterStackdriverSenderTest.java index 2a24481..20fd481 100644 --- a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/AsyncReporterStackdriverSenderTest.java +++ b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/AsyncReporterStackdriverSenderTest.java @@ -71,13 +71,21 @@ class AsyncReporterStackdriverSenderTest { .build(StackdriverEncoder.V2); } - @Test void sendSpans_empty() { + @Test void send_empty() { reporter.flush(); - verify(traceService, never()).batchWriteSpans(any(), any()); + ArgumentCaptor requestCaptor = + ArgumentCaptor.forClass(BatchWriteSpansRequest.class); + + verify(traceService).batchWriteSpans(requestCaptor.capture(), any()); + + BatchWriteSpansRequest request = requestCaptor.getValue(); + assertThat(request.getName()).isEqualTo("projects/" + projectId); + + assertThat(request.getSpansList()).isEmpty(); } - @Test void sendSpans() { + @Test void send() { onClientCall( observer -> { observer.onNext(Empty.getDefaultInstance()); diff --git a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/ITStackdriverSender.java b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/ITStackdriverSender.java index 287c8f7..e3a8730 100644 --- a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/ITStackdriverSender.java +++ b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/ITStackdriverSender.java @@ -52,8 +52,7 @@ public class ITStackdriverSender { AsyncReporter reporterNoPermission; TraceServiceGrpc.TraceServiceBlockingStub traceServiceGrpcV1; - @BeforeEach - public void setUp() throws IOException { + @BeforeEach void setUp() throws IOException { // Application Default credential is configured using the GOOGLE_APPLICATION_CREDENTIALS env var // See: https://cloud.google.com/docs/authentication/production#providing_credentials_to_your_application @@ -90,8 +89,7 @@ public void setUp() throws IOException { .build(StackdriverEncoder.V2); } - @AfterEach - public void tearDown() { + @AfterEach void tearDown() { if (reporter != null) { reporter.close(); } diff --git a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/StackdriverSenderTest.java b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/StackdriverSenderTest.java index 8d9396d..721d41a 100644 --- a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/StackdriverSenderTest.java +++ b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/StackdriverSenderTest.java @@ -28,6 +28,7 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.time.Duration; +import java.util.Collections; import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -37,11 +38,11 @@ import org.mockito.ArgumentCaptor; import org.mockito.stubbing.Answer; import zipkin2.Span; -import zipkin2.reporter.CheckResult; import zipkin2.reporter.stackdriver.zipkin.StackdriverEncoder; import zipkin2.translation.stackdriver.SpanTranslator; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -82,7 +83,7 @@ class StackdriverSenderTest { observer.onCompleted(); }); - sender.sendSpans(encodedSpans).execute(); + sender.send(encodedSpans); // verify our estimate is correct int actualSize = takeRequest().getSerializedSize(); @@ -124,7 +125,7 @@ void verifyRequestSent(List spans) throws IOException { List encodedSpans = spans.stream().map(StackdriverEncoder.V2::encode).collect(Collectors.toList()); - sender.sendSpans(encodedSpans).execute(); + sender.send(encodedSpans); BatchWriteSpansRequest request = takeRequest(); @@ -139,41 +140,33 @@ void verifyRequestSent(List spans) throws IOException { assertThat(sender.messageSizeInBytes(encodedSpans)).isEqualTo(actualSize); } - @Test void verifyCheckReturnsFailureWhenServiceFailsWithKnownGrpcFailure() { + @Test void sendFailureWhenServiceFailsWithKnownGrpcFailure() { onClientCall(observer -> { observer.onError(new StatusRuntimeException(Status.RESOURCE_EXHAUSTED)); }); - CheckResult result = sender.check(); - assertThat(result.ok()).isFalse(); - assertThat(result.error()) + + assertThatThrownBy(() -> sender.send(Collections.emptyList())) .isInstanceOf(StatusRuntimeException.class) .hasMessageContaining("RESOURCE_EXHAUSTED"); } - @Test void verifyCheckReturnsFailureWhenServiceFailsForUnknownReason() { + @Test void sendFailureWhenServiceFailsForUnknownReason() { onClientCall(observer -> { observer.onError(new RuntimeException("oh no")); }); - CheckResult result = sender.check(); - assertThat(result.ok()).isFalse(); - assertThat(result.error()) + + assertThatThrownBy(() -> sender.send(Collections.emptyList())) .isInstanceOf(RuntimeException.class) .hasMessageContaining("UNKNOWN"); } - @Test void verifyCheckReturnsOkWhenExpectedValidationFailure() { - onClientCall(observer -> { - observer.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT)); - }); - assertThat(sender.check()).isSameAs(CheckResult.OK); - } - - @Test void verifyCheckReturnsOkWhenServiceSucceeds() { + @Test void send_empty() throws IOException { onClientCall(observer -> { observer.onNext(Empty.getDefaultInstance()); observer.onCompleted(); }); - assertThat(sender.check()).isSameAs(CheckResult.OK); + + sender.send(Collections.emptyList()); } void onClientCall(Consumer> onClientCall) { diff --git a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/internal/UnaryClientCallTest.java b/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/internal/UnaryClientCallTest.java deleted file mode 100644 index cdf416d..0000000 --- a/sender-stackdriver/src/test/java/zipkin2/reporter/stackdriver/internal/UnaryClientCallTest.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Copyright 2016-2024 The OpenZipkin Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package zipkin2.reporter.stackdriver.internal; - -import com.asarkar.grpc.test.GrpcCleanupExtension; -import com.asarkar.grpc.test.Resources; -import com.google.devtools.cloudtrace.v2.BatchWriteSpansRequest; -import com.google.devtools.cloudtrace.v2.TraceServiceGrpc; -import com.google.protobuf.Empty; -import io.grpc.Channel; -import io.grpc.ManagedChannel; -import io.grpc.Server; -import io.grpc.StatusRuntimeException; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; -import io.grpc.stub.StreamObserver; -import java.time.Duration; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; -import org.mockito.stubbing.Answer; -import zipkin2.reporter.Callback; - -import static io.grpc.CallOptions.DEFAULT; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static zipkin2.reporter.stackdriver.internal.UnaryClientCall.DEFAULT_SERVER_TIMEOUT_MS; - -@ExtendWith(GrpcCleanupExtension.class) -class UnaryClientCallTest { - final TestTraceService traceService = spy(new TestTraceService()); - - static class BatchWriteSpansCall extends UnaryClientCall { - final Channel channel; - - BatchWriteSpansCall(Channel channel, BatchWriteSpansRequest request, - long serverResponseTimeout) { - super(channel, TraceServiceGrpc.getBatchWriteSpansMethod(), DEFAULT, request, - serverResponseTimeout); - this.channel = channel; - } - - @Override - public BatchWriteSpansCall clone() { - return new BatchWriteSpansCall(channel, request(), DEFAULT_SERVER_TIMEOUT_MS); - } - } - - BatchWriteSpansCall call; - - @BeforeEach void setUp(Resources resources) throws Exception { - String serverName = InProcessServerBuilder.generateName(); - - Server server = InProcessServerBuilder - .forName(serverName) - .directExecutor() - .addService(traceService) - .build().start(); - resources.register(server, Duration.ofSeconds(10)); // shutdown deadline - - ManagedChannel channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); - resources.register(channel, Duration.ofSeconds(10));// close deadline - - call = new BatchWriteSpansCall(channel, BatchWriteSpansRequest.newBuilder().build(), - DEFAULT_SERVER_TIMEOUT_MS); - } - - @Test void execute_success() throws Throwable { - onClientCall( - observer -> { - observer.onNext(Empty.getDefaultInstance()); - observer.onCompleted(); - }); - - call.execute(); - - verifyPatchRequestSent(); - } - - @Test void enqueue_success() throws Throwable { - onClientCall( - observer -> { - observer.onNext(Empty.getDefaultInstance()); - observer.onCompleted(); - }); - - awaitCallbackResult(); - - verifyPatchRequestSent(); - } - - void verifyPatchRequestSent() { - ArgumentCaptor requestCaptor = - ArgumentCaptor.forClass(BatchWriteSpansRequest.class); - - verify(traceService).batchWriteSpans(requestCaptor.capture(), any()); - - BatchWriteSpansRequest request = requestCaptor.getValue(); - assertThat(request).isEqualTo(BatchWriteSpansRequest.getDefaultInstance()); - } - - @Test void accept_execute_serverError() throws Throwable { - assertThrows(StatusRuntimeException.class, () -> { - onClientCall(observer -> observer.onError(new IllegalStateException())); - - call.execute(); - }); - } - - @Test void accept_enqueue_serverError() throws Throwable { - assertThrows(StatusRuntimeException.class, () -> { - onClientCall(observer -> observer.onError(new IllegalStateException())); - - awaitCallbackResult(); - }); - } - - @Test void execute_timeout() throws Throwable { - assertThrows(IllegalStateException.class, () -> { - long overriddenTimeout = 50; - call = new BatchWriteSpansCall(call.channel, BatchWriteSpansRequest.newBuilder().build(), - overriddenTimeout); - onClientCall( - observer -> - Executors.newSingleThreadExecutor().submit(() -> - { - try { - Thread.sleep(overriddenTimeout + 10); - } catch (InterruptedException e) { - } - observer.onCompleted(); - })); - - call.execute(); - }); - } - - static class TestTraceService extends TraceServiceGrpc.TraceServiceImplBase { - } - - void awaitCallbackResult() throws Throwable { - AtomicReference ref = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - call.enqueue( - new Callback() { - @Override - public void onSuccess(Empty empty) { - latch.countDown(); - } - - @Override - public void onError(Throwable throwable) { - ref.set(throwable); - latch.countDown(); - } - }); - latch.await(10, TimeUnit.MILLISECONDS); - if (ref.get() != null) throw ref.get(); - } - - void onClientCall(Consumer> onClientCall) { - doAnswer( - (Answer) - invocationOnMock -> { - StreamObserver observer = - ((StreamObserver) invocationOnMock.getArguments()[1]); - onClientCall.accept(observer); - return null; - }) - .when(traceService) - .batchWriteSpans(any(BatchWriteSpansRequest.class), any(StreamObserver.class)); - } -} diff --git a/storage-stackdriver/pom.xml b/storage-stackdriver/pom.xml index 5cde510..3b257eb 100644 --- a/storage-stackdriver/pom.xml +++ b/storage-stackdriver/pom.xml @@ -18,7 +18,7 @@ zipkin-gcp-parent io.zipkin.gcp - 2.1.2-SNAPSHOT + 2.2.0-SNAPSHOT 4.0.0 diff --git a/translation-stackdriver/pom.xml b/translation-stackdriver/pom.xml index c1b98c5..76a74c9 100644 --- a/translation-stackdriver/pom.xml +++ b/translation-stackdriver/pom.xml @@ -18,7 +18,7 @@ zipkin-gcp-parent io.zipkin.gcp - 2.1.2-SNAPSHOT + 2.2.0-SNAPSHOT 4.0.0