From e59e783e0a1f33bdc10c6128a09ad5c01421296f Mon Sep 17 00:00:00 2001 From: jzw Date: Wed, 5 Jan 2022 21:41:06 +0800 Subject: [PATCH] Fix typo in Zipkin and Report (#178) * fix typo in zipkin * fix typo * fix typo * fix license --- .../easeagent/core/HttpServerTest.java | 2 +- .../metrics/impl/MetricRegistryImplTest.java | 11 +- .../mock/zipkin/TracingProviderMock.java | 6 +- .../plugin/api/trace/MessagingTracing.java | 47 ++++- .../easeagent/plugin/api/trace/Tracing.java | 4 +- .../easeagent/plugin/bridge/NoOpTracer.java | 19 +- pom.xml | 6 + .../report/metric/MetricReporterImpl.java | 2 - .../report/metric/log4j/AppenderManager.java | 2 +- .../report/metric/log4j/LoggerFactory.java | 3 +- .../log4j/MetricRefreshableAppender.java | 3 +- .../metric/log4j/RefreshableAppender.java | 2 +- .../report/trace/RefreshableReporter.java | 3 +- .../easeagent/report/trace/TraceReport.java | 8 +- .../easeagent/report/util/SpanUtils.java | 2 + .../easeagent/report/util/TextUtils.java | 2 + .../AbstractAgentV2SpanEndpointWriter.java | 32 +-- .../AgentV2SpanAnnotationsWriter.java | 30 +-- .../internal/AgentV2SpanBaseWriter.java | 54 ++--- .../internal/AgentV2SpanGlobalWriter.java | 23 +-- .../AgentV2SpanLocalEndpointWriter.java | 6 +- .../AgentV2SpanRemoteEndpointWriter.java | 6 +- .../internal/AgentV2SpanTagsWriter.java | 16 +- .../zipkin2/internal/AgentV2SpanWriter.java | 31 +-- .../zipkin2/reporter/SDKAsyncReporter.java | 97 +++++---- .../reporter/kafka11/SDKKafkaSender.java | 6 +- .../report/trace/TraceReportTest.java | 3 + .../zipkin/DebugReporterMetrics.java | 102 ---------- .../easeagent/zipkin/GatewaySender.java | 146 ------------- ...Provider.java => TracingProviderImpl.java} | 16 +- .../zipkin/impl/AsyncContextImpl.java | 8 +- .../zipkin/impl/MessagingTracingImpl.java | 191 ------------------ .../zipkin/impl/RequestContextImpl.java | 4 +- .../easeagent/zipkin/impl/SpanImpl.java | 57 +++++- .../easeagent/zipkin/impl/TracingImpl.java | 132 +++--------- .../impl/message/MessagingTracingImpl.java | 191 ++++++++++++++++++ .../impl/message/ZipkinConsumerRequest.java | 50 +++++ .../impl/message/ZipkinProducerRequest.java | 49 +++++ .../logging/AgentMDCScopeDecorator.java | 9 +- .../easeagent/zipkin/logging/LogUtils.java | 53 ++--- ...com.megaease.easeagent.plugin.BeanProvider | 2 +- .../easeagent/zipkin/BaseZipkinTest.java | 54 ----- .../easeagent/zipkin/GatewaySenderTest.java | 43 ---- 43 files changed, 641 insertions(+), 892 deletions(-) delete mode 100644 zipkin/src/main/java/com/megaease/easeagent/zipkin/DebugReporterMetrics.java delete mode 100644 zipkin/src/main/java/com/megaease/easeagent/zipkin/GatewaySender.java rename zipkin/src/main/java/com/megaease/easeagent/zipkin/{TracingProvider.java => TracingProviderImpl.java} (89%) delete mode 100644 zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/MessagingTracingImpl.java create mode 100644 zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/message/MessagingTracingImpl.java create mode 100644 zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/message/ZipkinConsumerRequest.java create mode 100644 zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/message/ZipkinProducerRequest.java delete mode 100644 zipkin/src/test/java/com/megaease/easeagent/zipkin/BaseZipkinTest.java delete mode 100644 zipkin/src/test/java/com/megaease/easeagent/zipkin/GatewaySenderTest.java diff --git a/core/src/test/java/com/megaease/easeagent/core/HttpServerTest.java b/core/src/test/java/com/megaease/easeagent/core/HttpServerTest.java index 63dd31200..fdddefc40 100644 --- a/core/src/test/java/com/megaease/easeagent/core/HttpServerTest.java +++ b/core/src/test/java/com/megaease/easeagent/core/HttpServerTest.java @@ -113,7 +113,7 @@ public void httpServer() throws Exception { DatagramSocket s = new DatagramSocket(0); int port = s.getLocalPort(); - + s.close(); String httpServer = "http://127.0.0.1:" + port; System.out.println("run up http server : " + httpServer); AgentHttpServer agentHttpServer = new AgentHttpServer(port); diff --git a/metrics/src/test/java/com/megaease/easeagent/metrics/impl/MetricRegistryImplTest.java b/metrics/src/test/java/com/megaease/easeagent/metrics/impl/MetricRegistryImplTest.java index ebd8f7817..214019a95 100644 --- a/metrics/src/test/java/com/megaease/easeagent/metrics/impl/MetricRegistryImplTest.java +++ b/metrics/src/test/java/com/megaease/easeagent/metrics/impl/MetricRegistryImplTest.java @@ -19,14 +19,11 @@ import com.megaease.easeagent.metrics.MetricRegistryService; import com.megaease.easeagent.plugin.api.metric.*; -import org.awaitility.Duration; import org.junit.Assert; import org.junit.Test; import java.util.concurrent.TimeUnit; -import static org.awaitility.Awaitility.await; - public class MetricRegistryImplTest { String countName = "countName"; String meterName = "meterName"; @@ -98,7 +95,7 @@ public void timer() throws InterruptedException { Timer timer = metricRegistry.timer(timerName); timer.update(10, TimeUnit.MILLISECONDS); Assert.assertEquals(1, timer.getCount()); - timer.update(100, TimeUnit.MILLISECONDS); + timer.update(200, TimeUnit.MILLISECONDS); Assert.assertEquals(2, timer.getCount()); Timer.Context context = timer.time(); Thread.sleep(50); @@ -107,9 +104,9 @@ public void timer() throws InterruptedException { Snapshot snapshot = timer.getSnapshot(); Assert.assertEquals(3, snapshot.size()); Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(10), snapshot.getMin()); - Assert.assertTrue(snapshot.getMedian() > TimeUnit.MILLISECONDS.toNanos(30)); - Assert.assertTrue(snapshot.getMedian() < TimeUnit.MILLISECONDS.toNanos(70)); - Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(100), snapshot.getMax()); + Assert.assertTrue(snapshot.getMedian() > TimeUnit.MILLISECONDS.toNanos(20)); + Assert.assertTrue(snapshot.getMedian() < TimeUnit.MILLISECONDS.toNanos(80)); + Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(200), snapshot.getMax()); } class TestGauge implements Gauge { diff --git a/mock/zipkin-mock/src/main/java/com/megaease/easeagent/mock/zipkin/TracingProviderMock.java b/mock/zipkin-mock/src/main/java/com/megaease/easeagent/mock/zipkin/TracingProviderMock.java index 0719697a5..7189f4477 100644 --- a/mock/zipkin-mock/src/main/java/com/megaease/easeagent/mock/zipkin/TracingProviderMock.java +++ b/mock/zipkin-mock/src/main/java/com/megaease/easeagent/mock/zipkin/TracingProviderMock.java @@ -21,10 +21,10 @@ import com.megaease.easeagent.mock.config.ConfigMock; import com.megaease.easeagent.mock.report.ReportMock; import com.megaease.easeagent.mock.utils.MockProvider; -import com.megaease.easeagent.zipkin.TracingProvider; +import com.megaease.easeagent.zipkin.TracingProviderImpl; public class TracingProviderMock implements MockProvider { - private static final TracingProvider TRACING_PROVIDER = new TracingProvider(); + private static final TracingProviderImpl TRACING_PROVIDER = new TracingProviderImpl(); private static final Tracing TRACING; static { @@ -34,7 +34,7 @@ public class TracingProviderMock implements MockProvider { TRACING = TRACING_PROVIDER.tracing(); } - public static TracingProvider getTracingProvider() { + public static TracingProviderImpl getTracingProvider() { return TRACING_PROVIDER; } diff --git a/plugin-api/src/main/java/com/megaease/easeagent/plugin/api/trace/MessagingTracing.java b/plugin-api/src/main/java/com/megaease/easeagent/plugin/api/trace/MessagingTracing.java index d1dff33fa..287fd2e27 100644 --- a/plugin-api/src/main/java/com/megaease/easeagent/plugin/api/trace/MessagingTracing.java +++ b/plugin-api/src/main/java/com/megaease/easeagent/plugin/api/trace/MessagingTracing.java @@ -17,7 +17,9 @@ package com.megaease.easeagent.plugin.api.trace; -import java.util.function.Function; +import com.megaease.easeagent.plugin.api.Context; + +import java.util.function.Predicate; /** * a MessagingTracing @@ -49,7 +51,7 @@ public interface MessagingTracing { * yet been made. For example, if a trace is already in progress, this function is not called. You * can implement this to skip channels that you never want to trace. */ - Function consumerSampler(); + Predicate consumerSampler(); /** * Returns an overriding sampling decision for a new trace. Defaults to ignore the request and use @@ -59,7 +61,7 @@ public interface MessagingTracing { * making an messaging request as a part of booting your application. You may want to opt-out of * tracing producer requests that did not originate from a consumer request. */ - Function producerSampler(); + Predicate producerSampler(); /** * Returns an overriding sampling decision for a new trace. @@ -75,4 +77,43 @@ public interface MessagingTracing { * @see #consumerSampler() */ boolean producerSampler(R request); + + /** + * Obtain key:value from the message request and create a Span, Examples: kafka consumer, rebbitmq consumer + *

+ * It will set the Span's kind, name and cached scope through {@link Request#kind()}, {@link Request#name()} + * and {@link Request#cacheScope()}. + * + *

+ * It will set the Span's tags "messaging.operation", "messaging.channel_kind", "messaging.channel_name" from request + * {@link MessagingRequest#operation()} {@link MessagingRequest#channelKind()} {@link MessagingRequest#channelName()} + * + *

+ * It just only obtain the key:value required by Trace from the {@link Request#header(String)}, + * If you need and get Span, generate result use {@link Context#consumerSpan(MessagingRequest)}. + * + * @param request {@link MessagingRequest} + * @return {@link Span} + * @see Context#consumerSpan(MessagingRequest) + */ + Span consumerSpan(R request); + + + /** + * Create a Span for message producer. Examples: kafka producer, rebbitmq producer + *

+ * It will set the Span's tags "messaging.operation", "messaging.channel_kind", "messaging.channel_name" from request + * {@link MessagingRequest#operation()} {@link MessagingRequest#channelKind()} {@link MessagingRequest#channelName()} + * + *

+ * It just only pass multiple key:value values required by Trace through + * {@link Request#setHeader(String, String)}, And set the Span's kind, name and + * cached scope through {@link Request#kind()}, {@link Request#name()} and {@link Request#cacheScope()}. + * If you need and get Span, generate result use {@link Context#producerSpan(MessagingRequest)}. + * + * @param request {@link MessagingRequest} + * @return {@link Span} + * @see Context#producerSpan(MessagingRequest) + */ + Span producerSpan(R request); } diff --git a/plugin-api/src/main/java/com/megaease/easeagent/plugin/api/trace/Tracing.java b/plugin-api/src/main/java/com/megaease/easeagent/plugin/api/trace/Tracing.java index a764fef75..d9c3657e0 100644 --- a/plugin-api/src/main/java/com/megaease/easeagent/plugin/api/trace/Tracing.java +++ b/plugin-api/src/main/java/com/megaease/easeagent/plugin/api/trace/Tracing.java @@ -75,7 +75,7 @@ public interface Tracing { * @return {@link Span} * @see Extractor#extract(MessagingRequest) */ - Span nextSpan(Message message); + Span nextSpan(Message message); /** * get MessagingTracing for message tracing @@ -85,5 +85,5 @@ public interface Tracing { * * @return {@link MessagingRequest} */ - MessagingTracing messagingTracing(); + MessagingTracing messagingTracing(); } diff --git a/plugin-api/src/main/java/com/megaease/easeagent/plugin/bridge/NoOpTracer.java b/plugin-api/src/main/java/com/megaease/easeagent/plugin/bridge/NoOpTracer.java index 206e2c9f1..98c5d6fab 100644 --- a/plugin-api/src/main/java/com/megaease/easeagent/plugin/bridge/NoOpTracer.java +++ b/plugin-api/src/main/java/com/megaease/easeagent/plugin/bridge/NoOpTracer.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.List; import java.util.function.Function; +import java.util.function.Predicate; public class NoOpTracer { public static final ITracing NO_OP_TRACING = NoopTracing.INSTANCE; @@ -233,7 +234,7 @@ public Span producerSpan(MessagingRequest request) { } @Override - public MessagingTracing messagingTracing() { + public MessagingTracing messagingTracing() { return EmptyMessagingTracing.INSTANCE; } @@ -255,7 +256,7 @@ public boolean hasCurrentSpan() { public static class EmptyMessagingTracing implements MessagingTracing { private static final EmptyMessagingTracing INSTANCE = new EmptyMessagingTracing(); - private static final Function NOOP_SAMPLER = r -> false; + private static final Predicate NOOP_SAMPLER = r -> false; @Override public Extractor extractor() { @@ -273,12 +274,12 @@ public Injector consumerInjector() { } @Override - public Function consumerSampler() { + public Predicate consumerSampler() { return NOOP_SAMPLER; } @Override - public Function producerSampler() { + public Predicate producerSampler() { return NOOP_SAMPLER; } @@ -291,6 +292,16 @@ public boolean consumerSampler(MessagingRequest request) { public boolean producerSampler(MessagingRequest request) { return false; } + + @Override + public Span consumerSpan(MessagingRequest request) { + return NoOpTracer.NO_OP_SPAN; + } + + @Override + public Span producerSpan(MessagingRequest request) { + return NoOpTracer.NO_OP_SPAN; + } } public static class EmptyMessage implements Message { diff --git a/pom.xml b/pom.xml index c8e7a59bd..4ce4b3dff 100644 --- a/pom.xml +++ b/pom.xml @@ -108,6 +108,12 @@ ${version.mockito} test + + org.awaitility + awaitility + 4.1.1 + test + diff --git a/report/src/main/java/com/megaease/easeagent/report/metric/MetricReporterImpl.java b/report/src/main/java/com/megaease/easeagent/report/metric/MetricReporterImpl.java index d5ed28b7d..e3820b602 100644 --- a/report/src/main/java/com/megaease/easeagent/report/metric/MetricReporterImpl.java +++ b/report/src/main/java/com/megaease/easeagent/report/metric/MetricReporterImpl.java @@ -30,14 +30,12 @@ public class MetricReporterImpl implements MetricReporter { private final ConcurrentHashMap reporters; - private final Configs configs; private final AppenderManager appenderManager; public MetricReporterImpl(Configs configs) { this.reporters = new ConcurrentHashMap<>(); OutputProperties outputProperties = Utils.extractOutputProperties(configs); this.appenderManager = AppenderManager.create(outputProperties); - this.configs = configs; configs.addChangeListener(this); } diff --git a/report/src/main/java/com/megaease/easeagent/report/metric/log4j/AppenderManager.java b/report/src/main/java/com/megaease/easeagent/report/metric/log4j/AppenderManager.java index 2582cee0b..29dc3adff 100644 --- a/report/src/main/java/com/megaease/easeagent/report/metric/log4j/AppenderManager.java +++ b/report/src/main/java/com/megaease/easeagent/report/metric/log4j/AppenderManager.java @@ -72,7 +72,7 @@ private DefaultKafkaAppenderManager(OutputProperties outputProperties) { this.outputProperties = outputProperties; ClassLoader initClassLoader = Thread.currentThread().getContextClassLoader(); LOGGER.info("bind classloader:{} to AppenderManager", initClassLoader); - this.provider = (topic) -> { + this.provider = topic -> { ClassLoader old = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(initClassLoader); try { diff --git a/report/src/main/java/com/megaease/easeagent/report/metric/log4j/LoggerFactory.java b/report/src/main/java/com/megaease/easeagent/report/metric/log4j/LoggerFactory.java index 9246bc5f2..39bea9f3f 100644 --- a/report/src/main/java/com/megaease/easeagent/report/metric/log4j/LoggerFactory.java +++ b/report/src/main/java/com/megaease/easeagent/report/metric/log4j/LoggerFactory.java @@ -20,9 +20,10 @@ import org.apache.logging.log4j.core.LoggerContext; public class LoggerFactory { + private LoggerFactory() {} // Independent logger context as an anchor for loggers in the metrics - private static LoggerContext loggerContext = new LoggerContext("ROOT"); + private static final LoggerContext loggerContext = new LoggerContext("ROOT"); public static LoggerContext getLoggerContext() { return loggerContext; diff --git a/report/src/main/java/com/megaease/easeagent/report/metric/log4j/MetricRefreshableAppender.java b/report/src/main/java/com/megaease/easeagent/report/metric/log4j/MetricRefreshableAppender.java index 21012511f..7b8970ecd 100644 --- a/report/src/main/java/com/megaease/easeagent/report/metric/log4j/MetricRefreshableAppender.java +++ b/report/src/main/java/com/megaease/easeagent/report/metric/log4j/MetricRefreshableAppender.java @@ -82,8 +82,9 @@ private Appender getAppender() { return getConsoleAppender(); case "mock": return getMockAppender(); + default: + return null; } - return null; } private Appender getKafkaAppender(String topic) { diff --git a/report/src/main/java/com/megaease/easeagent/report/metric/log4j/RefreshableAppender.java b/report/src/main/java/com/megaease/easeagent/report/metric/log4j/RefreshableAppender.java index 158841803..024714817 100644 --- a/report/src/main/java/com/megaease/easeagent/report/metric/log4j/RefreshableAppender.java +++ b/report/src/main/java/com/megaease/easeagent/report/metric/log4j/RefreshableAppender.java @@ -60,7 +60,7 @@ static Builder builder() { class DefaultRefreshableAppender implements RefreshableAppender { - private final static Logger LOGGER = com.megaease.easeagent.log4j2.LoggerFactory.getLogger(DefaultRefreshableAppender.class); + private static final Logger LOGGER = com.megaease.easeagent.log4j2.LoggerFactory.getLogger(DefaultRefreshableAppender.class); private final String loggerName; private final String appenderName; diff --git a/report/src/main/java/com/megaease/easeagent/report/trace/RefreshableReporter.java b/report/src/main/java/com/megaease/easeagent/report/trace/RefreshableReporter.java index a05f50fa3..cb8a10419 100644 --- a/report/src/main/java/com/megaease/easeagent/report/trace/RefreshableReporter.java +++ b/report/src/main/java/com/megaease/easeagent/report/trace/RefreshableReporter.java @@ -36,7 +36,7 @@ * @param always zipkin2.reporter */ public class RefreshableReporter implements Reporter { - private final static Logger LOGGER = LoggerFactory.getLogger(RefreshableReporter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RefreshableReporter.class); private final SDKAsyncReporter asyncReporter; private final TraceProps traceProperties; private final OutputProperties agentOutputProperties; @@ -67,6 +67,7 @@ public synchronized void refresh() { asyncReporter.getSender().close(); asyncReporter.closeFlushThread(); } catch (Exception ignored) { + // ignored } } diff --git a/report/src/main/java/com/megaease/easeagent/report/trace/TraceReport.java b/report/src/main/java/com/megaease/easeagent/report/trace/TraceReport.java index b07ecfaa2..9801f2833 100644 --- a/report/src/main/java/com/megaease/easeagent/report/trace/TraceReport.java +++ b/report/src/main/java/com/megaease/easeagent/report/trace/TraceReport.java @@ -54,7 +54,6 @@ public TraceReport(Configs configs) { } private RefreshableReporter initSpanRefreshableReporter(Configs configs) { - final RefreshableReporter spanRefreshableReporter; OutputProperties outputProperties = Utils.extractOutputProperties(configs); Map sslConfig = new HashMap<>(); if (SecurityProtocol.SSL.name.equals(outputProperties.getSecurityProtocol())) { @@ -67,7 +66,7 @@ private RefreshableReporter initSpanRefreshableReporter(Configs configs) { sslConfig.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, outputProperties.getEndpointAlgorithm()); } - Sender sender = new SimpleSender(); + Sender sender; TraceProps traceProperties = Utils.extractTraceProps(configs); if (traceProperties.getOutput().isEnabled() && traceProperties.isEnabled() && StringUtils.isNotEmpty(outputProperties.getServers())) { @@ -79,6 +78,8 @@ private RefreshableReporter initSpanRefreshableReporter(Configs configs) { .encoding(Encoding.JSON) .messageMaxBytes(traceProperties.getOutput().getMessageMaxBytes()) .build()); + } else { + sender = new SimpleSender(); } GlobalExtrasSupplier extrasSupplier = new GlobalExtrasSupplier() { @@ -103,8 +104,7 @@ public String system() { traceProperties, extrasSupplier); reporter.startFlushThread(); - spanRefreshableReporter = new RefreshableReporter<>(reporter, traceProperties, outputProperties); - return spanRefreshableReporter; + return new RefreshableReporter<>(reporter, traceProperties, outputProperties); } public void report(Span span) { diff --git a/report/src/main/java/com/megaease/easeagent/report/util/SpanUtils.java b/report/src/main/java/com/megaease/easeagent/report/util/SpanUtils.java index 3e9ab9e0b..cea64c467 100644 --- a/report/src/main/java/com/megaease/easeagent/report/util/SpanUtils.java +++ b/report/src/main/java/com/megaease/easeagent/report/util/SpanUtils.java @@ -20,6 +20,8 @@ import zipkin2.Span; public class SpanUtils { + private SpanUtils() {} + public static boolean isValidSpan(Object next) { if (!(next instanceof Span)) { return false; diff --git a/report/src/main/java/com/megaease/easeagent/report/util/TextUtils.java b/report/src/main/java/com/megaease/easeagent/report/util/TextUtils.java index 2cd621947..f9e4585bd 100644 --- a/report/src/main/java/com/megaease/easeagent/report/util/TextUtils.java +++ b/report/src/main/java/com/megaease/easeagent/report/util/TextUtils.java @@ -18,6 +18,8 @@ package com.megaease.easeagent.report.util; public class TextUtils { + private TextUtils() {} + public static boolean hasText(String content) { return content != null && content.trim().length() > 0; } diff --git a/report/src/main/java/zipkin2/internal/AbstractAgentV2SpanEndpointWriter.java b/report/src/main/java/zipkin2/internal/AbstractAgentV2SpanEndpointWriter.java index a971bb9ac..35e19f0d5 100644 --- a/report/src/main/java/zipkin2/internal/AbstractAgentV2SpanEndpointWriter.java +++ b/report/src/main/java/zipkin2/internal/AbstractAgentV2SpanEndpointWriter.java @@ -22,20 +22,20 @@ public abstract class AbstractAgentV2SpanEndpointWriter implements WriteBuffer.Writer { - final String serviceNameFieldName = "\"serviceName\":\""; - final String ipv4FieldName = "\"ipv4\":\""; - final String ipv6FieldName = "\"ipv6\":\""; - final String portFieldName = "\"port\":"; + static final String SERVICE_NAME_FIELD_NAME = "\"serviceName\":\""; + static final String IPV4_FIELD_NAME = "\"ipv4\":\""; + static final String IPV6_FIELD_NAME = "\"ipv6\":\""; + static final String PORT_FIELD_NAME = "\"port\":"; protected int endpointSizeInBytes(Endpoint value, boolean writeEmptyServiceName) { - int sizeInBytes = 1; // one byte for { + int sizeInBytes = 1; String serviceName = value.serviceName(); if (serviceName == null && writeEmptyServiceName) { serviceName = ""; } if (serviceName != null) { - sizeInBytes += serviceNameFieldName.length() + 1; + sizeInBytes += SERVICE_NAME_FIELD_NAME.length() + 1; sizeInBytes += JsonEscaper.jsonEscapedSizeInBytes(serviceName); } @@ -44,7 +44,7 @@ protected int endpointSizeInBytes(Endpoint value, boolean writeEmptyServiceName) ++sizeInBytes; } - sizeInBytes += ipv4FieldName.length() + 1; + sizeInBytes += IPV4_FIELD_NAME.length() + 1; sizeInBytes += value.ipv4().length(); } @@ -53,7 +53,7 @@ protected int endpointSizeInBytes(Endpoint value, boolean writeEmptyServiceName) ++sizeInBytes; } - sizeInBytes += ipv6FieldName.length() + 1; + sizeInBytes += IPV6_FIELD_NAME.length() + 1; sizeInBytes += value.ipv6().length(); } @@ -63,11 +63,11 @@ protected int endpointSizeInBytes(Endpoint value, boolean writeEmptyServiceName) ++sizeInBytes; } - sizeInBytes += portFieldName.length(); - sizeInBytes += WriteBuffer.asciiSizeInBytes((long) port); + sizeInBytes += PORT_FIELD_NAME.length(); + sizeInBytes += WriteBuffer.asciiSizeInBytes(port); } - sizeInBytes += 1; // one byte for } + sizeInBytes += 1; return sizeInBytes; } @@ -81,7 +81,7 @@ protected void writeEndpoint(Endpoint value, WriteBuffer b, boolean writeEmptySe } if (serviceName != null) { - b.writeAscii(serviceNameFieldName); + b.writeAscii(SERVICE_NAME_FIELD_NAME); b.writeUtf8(JsonEscaper.jsonEscape(serviceName)); b.writeByte('\"'); wroteField = true; @@ -91,7 +91,7 @@ protected void writeEndpoint(Endpoint value, WriteBuffer b, boolean writeEmptySe if (wroteField) { b.writeByte(','); } - b.writeAscii(ipv4FieldName); + b.writeAscii(IPV4_FIELD_NAME); b.writeAscii(value.ipv4()); b.writeByte('\"'); wroteField = true; @@ -101,7 +101,7 @@ protected void writeEndpoint(Endpoint value, WriteBuffer b, boolean writeEmptySe if (wroteField) { b.writeByte(','); } - b.writeAscii(ipv6FieldName); + b.writeAscii(IPV6_FIELD_NAME); b.writeAscii(value.ipv6()); b.writeByte('\"'); wroteField = true; @@ -112,8 +112,8 @@ protected void writeEndpoint(Endpoint value, WriteBuffer b, boolean writeEmptySe if (wroteField) { b.writeByte(','); } - b.writeAscii(portFieldName); - b.writeAscii((long) port); + b.writeAscii(PORT_FIELD_NAME); + b.writeAscii(port); } b.writeByte('}'); diff --git a/report/src/main/java/zipkin2/internal/AgentV2SpanAnnotationsWriter.java b/report/src/main/java/zipkin2/internal/AgentV2SpanAnnotationsWriter.java index 74252d4df..37aa30b19 100644 --- a/report/src/main/java/zipkin2/internal/AgentV2SpanAnnotationsWriter.java +++ b/report/src/main/java/zipkin2/internal/AgentV2SpanAnnotationsWriter.java @@ -21,34 +21,34 @@ import zipkin2.Span; public class AgentV2SpanAnnotationsWriter implements WriteBuffer.Writer { - final String annotationFieldName = ",\"annotations\":["; - final String timestampFieldName = "{\"timestamp\":"; - final String valueFieldName = ",\"value\":\""; - final String endpointFieldName = ",\"endpoint\":"; + static final String ANNOTATION_FIELD_NAME = ",\"annotations\":["; + static final String TIMESTAMP_FIELD_NAME = "{\"timestamp\":"; + static final String VALUE_FIELD_NAME = ",\"value\":\""; + static final String ENDPOINT_FIELD_NAME = ",\"endpoint\":"; int annotationSizeInBytes(long timestamp, String value, int endpointSizeInBytes) { int sizeInBytes = 0; - sizeInBytes += timestampFieldName.length(); + sizeInBytes += TIMESTAMP_FIELD_NAME.length(); sizeInBytes = sizeInBytes + WriteBuffer.asciiSizeInBytes(timestamp); - sizeInBytes += valueFieldName.length() + 1; + sizeInBytes += VALUE_FIELD_NAME.length() + 1; sizeInBytes += JsonEscaper.jsonEscapedSizeInBytes(value); if (endpointSizeInBytes != 0) { - sizeInBytes += endpointFieldName.length() + 1; + sizeInBytes += ENDPOINT_FIELD_NAME.length() + 1; sizeInBytes += endpointSizeInBytes; } - sizeInBytes++; // } + sizeInBytes++; return sizeInBytes; } void writeAnnotation(long timestamp, String value, @Nullable byte[] endpoint, WriteBuffer b) { - b.writeAscii(timestampFieldName); + b.writeAscii(TIMESTAMP_FIELD_NAME); b.writeAscii(timestamp); - b.writeAscii(valueFieldName); + b.writeAscii(VALUE_FIELD_NAME); b.writeUtf8(JsonEscaper.jsonEscape(value)); b.writeByte(34); // " for value field if (endpoint != null) { - b.writeAscii(endpointFieldName); + b.writeAscii(ENDPOINT_FIELD_NAME); b.write(endpoint); b.writeByte(34); // " for value field } @@ -61,14 +61,14 @@ public int sizeInBytes(Span value) { int tagCount; int sizeInBytes = 0; if (!value.annotations().isEmpty()) { - sizeInBytes += annotationFieldName.length() + 1; + sizeInBytes += ANNOTATION_FIELD_NAME.length() + 1; tagCount = value.annotations().size(); if (tagCount > 1) { sizeInBytes += tagCount - 1; // , for array item } for (int i = 0; i < tagCount; ++i) { - Annotation a = (Annotation) value.annotations().get(i); + Annotation a = value.annotations().get(i); sizeInBytes += annotationSizeInBytes(a.timestamp(), a.value(), 0); } } @@ -78,12 +78,12 @@ public int sizeInBytes(Span value) { @Override public void write(Span value, WriteBuffer b) { if (!value.annotations().isEmpty()) { - b.writeAscii(annotationFieldName); + b.writeAscii(ANNOTATION_FIELD_NAME); int i = 0; int length = value.annotations().size(); while (i < length) { - Annotation a = (Annotation) value.annotations().get(i++); + Annotation a = value.annotations().get(i++); writeAnnotation(a.timestamp(), a.value(), (byte[]) null, b); if (i < length) { b.writeByte(44); //, for array item diff --git a/report/src/main/java/zipkin2/internal/AgentV2SpanBaseWriter.java b/report/src/main/java/zipkin2/internal/AgentV2SpanBaseWriter.java index 3e031d1ac..ecdc2fff5 100644 --- a/report/src/main/java/zipkin2/internal/AgentV2SpanBaseWriter.java +++ b/report/src/main/java/zipkin2/internal/AgentV2SpanBaseWriter.java @@ -21,112 +21,112 @@ public class AgentV2SpanBaseWriter implements WriteBuffer.Writer { - final String traceIDFieldName = "\"traceId\":\""; - final String parentIDFieldName = ",\"parentId\":\""; - final String spanIDFieldName = ",\"id\":\""; - final String kindFieldName = ",\"kind\":\""; - final String nameFieldName = ",\"name\":\""; - final String timestampFieldName = ",\"timestamp\":"; - final String durationFieldName = ",\"duration\":"; - final String debugFieldValue = ",\"debug\":true"; - final String sharedFieldValue = ",\"shared\":true"; + static final String TRACE_ID_FIELD_NAME = "\"traceId\":\""; + static final String PARENT_ID_FIELD_NAME = ",\"parentId\":\""; + static final String SPAN_ID_FIELD_NAME = ",\"id\":\""; + static final String KIND_FIELD_NAME = ",\"kind\":\""; + static final String NAME_FIELD_NAME = ",\"name\":\""; + static final String TIMESTAMP_FIELD_NAME = ",\"timestamp\":"; + static final String DURATION_FIELD_NAME = ",\"duration\":"; + static final String DEBUG_FIELD_VALUE = ",\"debug\":true"; + static final String SHARED_FIELD_VALUE = ",\"shared\":true"; @Override public int sizeInBytes(Span value) { int sizeInBytes = 0; //traceId - sizeInBytes += traceIDFieldName.length() + 1; // 1 represent the last quote sign + sizeInBytes += TRACE_ID_FIELD_NAME.length() + 1; // 1 represent the last quote sign sizeInBytes += value.traceId().length(); //parentId if (value.parentId() != null) { - sizeInBytes += parentIDFieldName.length() + 1; + sizeInBytes += PARENT_ID_FIELD_NAME.length() + 1; sizeInBytes += value.parentId().length(); } // spanId - sizeInBytes += spanIDFieldName.length() + 1; + sizeInBytes += SPAN_ID_FIELD_NAME.length() + 1; sizeInBytes += value.id().length(); // kind if (value.kind() != null) { - sizeInBytes += kindFieldName.length() + 1; + sizeInBytes += KIND_FIELD_NAME.length() + 1; sizeInBytes += value.kind().name().length(); } // name if (value.name() != null) { - sizeInBytes += nameFieldName.length() + 1; + sizeInBytes += NAME_FIELD_NAME.length() + 1; sizeInBytes += JsonEscaper.jsonEscapedSizeInBytes(value.name()); } // timestamp if (value.timestampAsLong() != 0L) { - sizeInBytes += timestampFieldName.length(); + sizeInBytes += TIMESTAMP_FIELD_NAME.length(); sizeInBytes += WriteBuffer.asciiSizeInBytes(value.timestampAsLong()); } //duration if (value.durationAsLong() != 0L) { - sizeInBytes += durationFieldName.length(); + sizeInBytes += DURATION_FIELD_NAME.length(); sizeInBytes += WriteBuffer.asciiSizeInBytes(value.durationAsLong()); } if (Boolean.TRUE.equals(value.debug())) { - sizeInBytes += debugFieldValue.length(); + sizeInBytes += DEBUG_FIELD_VALUE.length(); } if (Boolean.TRUE.equals(value.shared())) { - sizeInBytes += sharedFieldValue.length(); + sizeInBytes += SHARED_FIELD_VALUE.length(); } return sizeInBytes; } @Override public void write(Span value, WriteBuffer b) { - b.writeAscii(traceIDFieldName); + b.writeAscii(TRACE_ID_FIELD_NAME); b.writeAscii(value.traceId()); b.writeByte('\"'); if (value.parentId() != null) { - b.writeAscii(parentIDFieldName); + b.writeAscii(PARENT_ID_FIELD_NAME); b.writeAscii(value.parentId()); b.writeByte('\"'); } - b.writeAscii(spanIDFieldName); + b.writeAscii(SPAN_ID_FIELD_NAME); b.writeAscii(value.id()); b.writeByte(34); if (value.kind() != null) { - b.writeAscii(kindFieldName); + b.writeAscii(KIND_FIELD_NAME); b.writeAscii(value.kind().toString()); b.writeByte('\"'); } if (value.name() != null) { - b.writeAscii(nameFieldName); + b.writeAscii(NAME_FIELD_NAME); b.writeUtf8(JsonEscaper.jsonEscape(value.name())); b.writeByte('\"'); } if (value.timestampAsLong() != 0L) { - b.writeAscii(timestampFieldName); + b.writeAscii(TIMESTAMP_FIELD_NAME); b.writeAscii(value.timestampAsLong()); } if (value.durationAsLong() != 0L) { - b.writeAscii(durationFieldName); + b.writeAscii(DURATION_FIELD_NAME); b.writeAscii(value.durationAsLong()); } if (Boolean.TRUE.equals(value.debug())) { - b.writeAscii(debugFieldValue); + b.writeAscii(DEBUG_FIELD_VALUE); } if (Boolean.TRUE.equals(value.shared())) { - b.writeAscii(sharedFieldValue); + b.writeAscii(SHARED_FIELD_VALUE); } } } diff --git a/report/src/main/java/zipkin2/internal/AgentV2SpanGlobalWriter.java b/report/src/main/java/zipkin2/internal/AgentV2SpanGlobalWriter.java index 5565644c2..61902556f 100644 --- a/report/src/main/java/zipkin2/internal/AgentV2SpanGlobalWriter.java +++ b/report/src/main/java/zipkin2/internal/AgentV2SpanGlobalWriter.java @@ -27,13 +27,12 @@ public class AgentV2SpanGlobalWriter implements WriteBuffer.Writer { final String type; - final GlobalExtrasSupplier extras;//= ApplicationUtils.getBean(Environment.class).getProperty(MetricNameBuilder - // .SPRING_APPLICATION_NAME, ""); - final TraceProps traceProperties;//= ApplicationUtils.getBean(TraceProperties.class); + final GlobalExtrasSupplier extras; + final TraceProps traceProperties; - final String typeFieldName = ",\"type\":\""; - final String serviceFieldName = ",\"service\":\""; - final String systemFieldName = ",\"system\":\""; + static final String TYPE_FIELD_NAME = ",\"type\":\""; + static final String SERVICE_FIELD_NAME = ",\"service\":\""; + static final String SYSTEM_FIELD_NAME = ",\"system\":\""; public AgentV2SpanGlobalWriter(String type, GlobalExtrasSupplier extras, TraceProps tp) { this.type = type; @@ -46,19 +45,19 @@ public int sizeInBytes(Span value) { final MutableInt mutableInt = new MutableInt(0); Optional.ofNullable(traceProperties).ifPresent(t -> { if (TextUtils.hasText(type)) { - mutableInt.add(typeFieldName.length() + 1); + mutableInt.add(TYPE_FIELD_NAME.length() + 1); mutableInt.add(JsonEscaper.jsonEscapedSizeInBytes(type)); } String tmpService = this.extras.service(); if (TextUtils.hasText(tmpService)) { - mutableInt.add(serviceFieldName.length() + 1); + mutableInt.add(SERVICE_FIELD_NAME.length() + 1); mutableInt.add(JsonEscaper.jsonEscapedSizeInBytes(tmpService)); } String tmpSystem = this.extras.system(); if (TextUtils.hasText(tmpSystem)) { - mutableInt.add(systemFieldName.length() + 1); + mutableInt.add(SYSTEM_FIELD_NAME.length() + 1); mutableInt.add(JsonEscaper.jsonEscapedSizeInBytes(tmpSystem)); } }); @@ -69,19 +68,19 @@ public int sizeInBytes(Span value) { public void write(Span value, WriteBuffer buffer) { Optional.ofNullable(traceProperties).ifPresent(t -> { if (TextUtils.hasText(type)) { - buffer.writeAscii(typeFieldName); + buffer.writeAscii(TYPE_FIELD_NAME); buffer.writeUtf8(JsonEscaper.jsonEscape(type)); buffer.writeByte(34); } String tmpService = this.extras.service(); if (TextUtils.hasText(tmpService)) { - buffer.writeAscii(serviceFieldName); + buffer.writeAscii(SERVICE_FIELD_NAME); buffer.writeUtf8(JsonEscaper.jsonEscape(tmpService)); buffer.writeByte(34); } String tmpSystem = this.extras.system(); if (TextUtils.hasText(tmpSystem)) { - buffer.writeAscii(systemFieldName); + buffer.writeAscii(SYSTEM_FIELD_NAME); buffer.writeUtf8(JsonEscaper.jsonEscape(tmpSystem)); buffer.writeByte(34); } diff --git a/report/src/main/java/zipkin2/internal/AgentV2SpanLocalEndpointWriter.java b/report/src/main/java/zipkin2/internal/AgentV2SpanLocalEndpointWriter.java index f06946d35..4cb9ea880 100644 --- a/report/src/main/java/zipkin2/internal/AgentV2SpanLocalEndpointWriter.java +++ b/report/src/main/java/zipkin2/internal/AgentV2SpanLocalEndpointWriter.java @@ -20,14 +20,14 @@ import zipkin2.Span; public class AgentV2SpanLocalEndpointWriter extends AbstractAgentV2SpanEndpointWriter implements WriteBuffer.Writer { - final String localEndpointFieldName = ",\"localEndpoint\":"; + static final String LOCAL_ENDPOINT_FIELD_NAME = ",\"localEndpoint\":"; @Override public int sizeInBytes(Span value) { if (value.localEndpoint() == null) { return 0; } - int size = localEndpointFieldName.length(); + int size = LOCAL_ENDPOINT_FIELD_NAME.length(); size += this.endpointSizeInBytes(value.localEndpoint(), true); return size; } @@ -37,7 +37,7 @@ public void write(Span value, WriteBuffer buffer) { if (value.localEndpoint() == null) { return; } - buffer.writeAscii(localEndpointFieldName); + buffer.writeAscii(LOCAL_ENDPOINT_FIELD_NAME); this.writeEndpoint(value.localEndpoint(), buffer, true); } } diff --git a/report/src/main/java/zipkin2/internal/AgentV2SpanRemoteEndpointWriter.java b/report/src/main/java/zipkin2/internal/AgentV2SpanRemoteEndpointWriter.java index 7b4c863e6..9064fa6e8 100644 --- a/report/src/main/java/zipkin2/internal/AgentV2SpanRemoteEndpointWriter.java +++ b/report/src/main/java/zipkin2/internal/AgentV2SpanRemoteEndpointWriter.java @@ -20,14 +20,14 @@ import zipkin2.Span; public class AgentV2SpanRemoteEndpointWriter extends AbstractAgentV2SpanEndpointWriter implements WriteBuffer.Writer { - final String remoteEndpointFieldName = ",\"remoteEndpoint\":"; + static final String REMOTE_ENDPOINT_FIELD_NAME = ",\"remoteEndpoint\":"; @Override public int sizeInBytes(Span value) { if (value.remoteEndpoint() == null) { return 0; } - int size = remoteEndpointFieldName.length(); + int size = REMOTE_ENDPOINT_FIELD_NAME.length(); size += this.endpointSizeInBytes(value.remoteEndpoint(), false); return size; } @@ -37,7 +37,7 @@ public void write(Span value, WriteBuffer buffer) { if (value.remoteEndpoint() == null) { return; } - buffer.writeAscii(remoteEndpointFieldName); + buffer.writeAscii(REMOTE_ENDPOINT_FIELD_NAME); this.writeEndpoint(value.remoteEndpoint(), buffer, false); } } diff --git a/report/src/main/java/zipkin2/internal/AgentV2SpanTagsWriter.java b/report/src/main/java/zipkin2/internal/AgentV2SpanTagsWriter.java index a99f339b8..9da99b091 100644 --- a/report/src/main/java/zipkin2/internal/AgentV2SpanTagsWriter.java +++ b/report/src/main/java/zipkin2/internal/AgentV2SpanTagsWriter.java @@ -28,12 +28,12 @@ public int sizeInBytes(Span value) { int sizeInBytes = 0; if (!value.tags().isEmpty()) { sizeInBytes += 10; - Iterator i = value.tags().entrySet().iterator(); + Iterator> i = value.tags().entrySet().iterator(); while (i.hasNext()) { - Map.Entry entry = (Map.Entry) i.next(); + Map.Entry entry = i.next(); sizeInBytes += 5; - sizeInBytes += JsonEscaper.jsonEscapedSizeInBytes((CharSequence) entry.getKey()); - sizeInBytes += JsonEscaper.jsonEscapedSizeInBytes((CharSequence) entry.getValue()); + sizeInBytes += JsonEscaper.jsonEscapedSizeInBytes(entry.getKey()); + sizeInBytes += JsonEscaper.jsonEscapedSizeInBytes(entry.getValue()); if (i.hasNext()) { sizeInBytes += 1; } @@ -46,14 +46,14 @@ public int sizeInBytes(Span value) { public void write(Span value, WriteBuffer b) { if (!value.tags().isEmpty()) { b.writeAscii(",\"tags\":{"); - Iterator i = value.tags().entrySet().iterator(); + Iterator> i = value.tags().entrySet().iterator(); while (i.hasNext()) { - Map.Entry entry = (Map.Entry) i.next(); + Map.Entry entry = i.next(); b.writeByte('\"'); - b.writeUtf8(JsonEscaper.jsonEscape((CharSequence) entry.getKey())); + b.writeUtf8(JsonEscaper.jsonEscape(entry.getKey())); b.writeAscii("\":\""); - b.writeUtf8(JsonEscaper.jsonEscape((CharSequence) entry.getValue())); + b.writeUtf8(JsonEscaper.jsonEscape(entry.getValue())); b.writeByte('\"'); if (i.hasNext()) { b.writeByte(','); diff --git a/report/src/main/java/zipkin2/internal/AgentV2SpanWriter.java b/report/src/main/java/zipkin2/internal/AgentV2SpanWriter.java index 75e9f93be..d95b73817 100644 --- a/report/src/main/java/zipkin2/internal/AgentV2SpanWriter.java +++ b/report/src/main/java/zipkin2/internal/AgentV2SpanWriter.java @@ -28,21 +28,6 @@ public class AgentV2SpanWriter implements WriteBuffer.Writer { public final Collection> writerList; - @Deprecated - public AgentV2SpanWriter() { - this(new GlobalExtrasSupplier() { - @Override - public String service() { - return ""; - } - - @Override - public String system() { - return ""; - } - }, null); - } - public AgentV2SpanWriter(GlobalExtrasSupplier extrasSupplier, TraceProps properties) { writerList = ImmutableList.>builder() .add(new AgentV2SpanBaseWriter()) @@ -56,21 +41,17 @@ public AgentV2SpanWriter(GlobalExtrasSupplier extrasSupplier, TraceProps propert public int sizeInBytes(Span value) { - final MutableInt size = new MutableInt(1); // 1 byte for first { - writerList.forEach(w -> { - size.add(w.sizeInBytes(value)); - }); - size.add(1); // 1 byte for last } + final MutableInt size = new MutableInt(1); + writerList.forEach(w -> size.add(w.sizeInBytes(value))); + size.add(1); return size.intValue(); } @Override public void write(Span value, WriteBuffer buffer) { - buffer.writeByte(123); //write '{' - writerList.forEach(w -> { - w.write(value, buffer); - }); - buffer.writeByte(125); // write last '}' + buffer.writeByte(123); + writerList.forEach(w -> w.write(value, buffer)); + buffer.writeByte(125); } public String toString() { diff --git a/report/src/main/java/zipkin2/reporter/SDKAsyncReporter.java b/report/src/main/java/zipkin2/reporter/SDKAsyncReporter.java index f5bc36713..6795840c0 100644 --- a/report/src/main/java/zipkin2/reporter/SDKAsyncReporter.java +++ b/report/src/main/java/zipkin2/reporter/SDKAsyncReporter.java @@ -47,7 +47,10 @@ import static java.util.logging.Level.WARNING; public class SDKAsyncReporter extends AsyncReporter { - static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName()); + static final Logger logger = Logger.getLogger(SDKAsyncReporter.class.getName()); + + private static final String NAME_PREFIX = "AsyncReporter"; + final AtomicBoolean closed = new AtomicBoolean(false); final BytesEncoder encoder; ByteBoundedQueue pending; @@ -189,24 +192,20 @@ void flush(BufferNextMessage bundler, ByteBoundedQueue pending) { // Create the next message. Since we are outside the lock shared with writers, we can encode ArrayList nextMessage = new ArrayList<>(bundler.count()); - bundler.drain(new SpanWithSizeConsumer() { - @Override - public boolean offer(S next, int nextSizeInBytes) { - nextMessage.add(encoder.encode(next)); // speculatively add to the pending message - if (sender.messageSizeInBytes(nextMessage) > messageMaxBytes) { - // if we overran the message size, remove the encoded message. - nextMessage.remove(nextMessage.size() - 1); + bundler.drain((next, nextSizeInBytes) -> { + nextMessage.add(encoder.encode(next)); // speculatively add to the pending message + if (sender.messageSizeInBytes(nextMessage) > messageMaxBytes) { + // if we overran the message size, remove the encoded message. + nextMessage.remove(nextMessage.size() - 1); - return false; - } - return true; + return false; } + return true; }); try { - sender.sendSpans(nextMessage).execute(); - } catch (IOException | RuntimeException | Error t) { + } catch (IOException | RuntimeException t) { // In failure case, we increment messages and spans dropped. int count = nextMessage.size(); Call.propagateIfFatal(t); @@ -253,13 +252,13 @@ public void close() { int count = pending.clear(); if (count > 0) { metrics.incrementSpansDropped(count); - logger.warning("Dropped " + count + " spans due to AsyncReporter.close()"); + logger.log(WARNING, "Dropped {0} spans due to AsyncReporter.close()", count); } } @Override public String toString() { - return "AsyncReporter{" + sender + "}"; + return NAME_PREFIX + "{" + sender + "}"; } public void setThreadFactory(ThreadFactory threadFactory) { @@ -268,16 +267,16 @@ public void setThreadFactory(ThreadFactory threadFactory) { public void startFlushThread() { if (this.messageTimeoutNanos > 0) { - List flushThreads = new CopyOnWriteArrayList<>(); + List threads = new CopyOnWriteArrayList<>(); for (int i = 0; i < traceProperties.getOutput().getReportThread(); i++) { // Multiple consumer consumption final BufferNextMessage consumer = BufferNextMessage.create(encoder.encoding(), this.messageMaxBytes, this.messageTimeoutNanos); - Thread flushThread = this.threadFactory.newThread(new Flusher(this, consumer, this.sender, traceProperties)); - flushThread.setName("AsyncReporter{" + this.sender + "}"); + Thread flushThread = this.threadFactory.newThread(new Flusher<>(this, consumer, this.sender, traceProperties)); + flushThread.setName(NAME_PREFIX + "{" + this.sender + "}"); flushThread.setDaemon(true); flushThread.start(); } - this.setFlushThreads(flushThreads); + this.setFlushThreads(threads); } } @@ -289,34 +288,35 @@ public void closeFlushThread() { } + @SuppressWarnings("unused") public static final class Builder { - private final AsyncReporter.Builder builder; + private final AsyncReporter.Builder asyncBuilder; TracerConverter tracerConverter; private TraceProps traceProperties; - public Builder(AsyncReporter.Builder builder) { - this.builder = builder; + public Builder(AsyncReporter.Builder asyncBuilder) { + this.asyncBuilder = asyncBuilder; } public AsyncReporter.Builder getBuilder() { - return builder; + return asyncBuilder; } public AsyncReporter.Builder threadFactory(ThreadFactory threadFactory) { - return builder.threadFactory(threadFactory); + return asyncBuilder.threadFactory(threadFactory); } public AsyncReporter.Builder traceProperties(TraceProps traceProperties) { this.traceProperties = traceProperties; - return builder; + return asyncBuilder; } public AsyncReporter.Builder metrics(ReporterMetrics metrics) { - return builder.metrics(metrics); + return asyncBuilder.metrics(metrics); } /** @@ -325,7 +325,7 @@ public AsyncReporter.Builder metrics(ReporterMetrics metrics) { */ public AsyncReporter.Builder messageMaxBytes(int messageMaxBytes) { - return builder.messageMaxBytes(messageMaxBytes); + return asyncBuilder.messageMaxBytes(messageMaxBytes); } /** @@ -339,7 +339,7 @@ public AsyncReporter.Builder messageMaxBytes(int messageMaxBytes) { */ public AsyncReporter.Builder messageTimeout(long timeout, TimeUnit unit) { - return builder.messageTimeout(timeout, unit); + return asyncBuilder.messageTimeout(timeout, unit); } /** @@ -347,7 +347,7 @@ public AsyncReporter.Builder messageTimeout(long timeout, TimeUnit unit) { */ public AsyncReporter.Builder closeTimeout(long timeout, TimeUnit unit) { - return builder.closeTimeout(timeout, unit); + return asyncBuilder.closeTimeout(timeout, unit); } /** @@ -365,7 +365,7 @@ public Builder tracerConverter(TracerConverter tracerConverter) { */ public AsyncReporter.Builder queuedMaxBytes(int queuedMaxBytes) { - return builder.queuedMaxBytes(queuedMaxBytes); + return asyncBuilder.queuedMaxBytes(queuedMaxBytes); } /** @@ -373,15 +373,13 @@ public AsyncReporter.Builder queuedMaxBytes(int queuedMaxBytes) { */ public SDKAsyncReporter build(TraceProps traceProperties, GlobalExtrasSupplier extrasSupplier) { this.traceProperties = traceProperties; - switch (builder.sender.encoding()) { + switch (asyncBuilder.sender.encoding()) { case JSON: return build(getAgentEncoder(traceProperties, extrasSupplier)); case PROTO3: return build(SpanBytesEncoder.PROTO3); - case THRIFT: - return build(SpanBytesEncoder.THRIFT); default: - throw new UnsupportedOperationException(builder.sender.encoding().name()); + throw new UnsupportedOperationException(asyncBuilder.sender.encoding().name()); } } @@ -394,7 +392,7 @@ private BytesEncoder getAgentEncoder(TraceProps tp, GlobalExtrasSupplier e */ public AsyncReporter.Builder queuedMaxSpans(int queuedMaxSpans) { - return builder.queuedMaxSpans(queuedMaxSpans); + return asyncBuilder.queuedMaxSpans(queuedMaxSpans); } /** @@ -403,27 +401,29 @@ public AsyncReporter.Builder queuedMaxSpans(int queuedMaxSpans) { private SDKAsyncReporter build(BytesEncoder encoder) { if (encoder == null) throw new NullPointerException("encoder == null"); - if (encoder.encoding() != builder.sender.encoding()) { + if (encoder.encoding() != asyncBuilder.sender.encoding()) { throw new IllegalArgumentException(String.format( - "Encoder doesn't match Sender: %s %s", encoder.encoding(), builder.sender.encoding())); + "Encoder doesn't match Sender: %s %s", encoder.encoding(), asyncBuilder.sender.encoding())); } - final SDKAsyncReporter result = new SDKAsyncReporter(this, encoder, traceProperties); + final SDKAsyncReporter result = new SDKAsyncReporter<>(this, encoder, traceProperties); - if (builder.messageTimeoutNanos > 0) { // Start a thread that flushes the queue in a loop. + if (asyncBuilder.messageTimeoutNanos > 0) { // Start a thread that flushes the queue in a loop. List flushThreads = new CopyOnWriteArrayList<>(); for (int i = 0; i < traceProperties.getOutput().getReportThread(); i++) { // Multiple consumer consumption final BufferNextMessage consumer = - BufferNextMessage.create(encoder.encoding(), builder.messageMaxBytes, builder.messageTimeoutNanos); - Thread flushThread = builder.threadFactory.newThread(new Flusher(result, consumer, builder.sender, traceProperties)); - flushThread.setName("AsyncReporter{" + builder.sender + "}"); + BufferNextMessage.create(encoder.encoding(), asyncBuilder.messageMaxBytes, asyncBuilder.messageTimeoutNanos); + + Thread flushThread = asyncBuilder.threadFactory + .newThread(new Flusher<>(result, consumer, asyncBuilder.sender, traceProperties)); + flushThread.setName(NAME_PREFIX + "{" + asyncBuilder.sender + "}"); flushThread.setDaemon(true); flushThread.start(); flushThreads.add(flushThread); } result.setFlushThreads(flushThreads); - result.setThreadFactory(builder.threadFactory); - result.setSender(builder.sender); + result.setThreadFactory(asyncBuilder.threadFactory); + result.setSender(asyncBuilder.sender); } return result; @@ -490,14 +490,11 @@ public void run() { // otherwise the cpu will spin. result.flush(consumer, result.pending); } - } catch (RuntimeException | Error e) { - logger.log(Level.WARNING, "Unexpected error flushing spans", e); - throw e; } finally { int count = consumer.count(); if (count > 0) { result.metrics.incrementSpansDropped(count); - logger.warning("Dropped " + count + " spans due to AsyncReporter.close()"); + logger.log(WARNING,"Dropped {0} spans due to AsyncReporter.close()", count); } result.close.countDown(); } @@ -505,9 +502,7 @@ public void run() { @Override public String toString() { - return "AsyncReporter{" + result.sender + "}"; + return NAME_PREFIX + "{" + result.sender + "}"; } } - - } diff --git a/report/src/main/java/zipkin2/reporter/kafka11/SDKKafkaSender.java b/report/src/main/java/zipkin2/reporter/kafka11/SDKKafkaSender.java index d7dc20984..203da7edc 100644 --- a/report/src/main/java/zipkin2/reporter/kafka11/SDKKafkaSender.java +++ b/report/src/main/java/zipkin2/reporter/kafka11/SDKKafkaSender.java @@ -40,10 +40,12 @@ public static SDKKafkaSender wrap(TraceProps properties, KafkaSender sender) { return new SDKKafkaSender(sender, properties); } + @Override public boolean isClose() { return kafkaSender.closeCalled; } + @Override public void close() throws IOException { kafkaSender.close(); } @@ -75,15 +77,13 @@ public int messageSizeInBytes(List encodedSpans) { return kafkaSender.messageSizeInBytes(encodedSpans); } + @Override public CheckResult check() { return kafkaSender.check(); } - @Override public int messageSizeInBytes(int encodedSizeInBytes) { return kafkaSender.messageSizeInBytes(encodedSizeInBytes); } - - } diff --git a/report/src/test/java/com/megaease/easeagent/report/trace/TraceReportTest.java b/report/src/test/java/com/megaease/easeagent/report/trace/TraceReportTest.java index 7f644c016..4854b3dea 100644 --- a/report/src/test/java/com/megaease/easeagent/report/trace/TraceReportTest.java +++ b/report/src/test/java/com/megaease/easeagent/report/trace/TraceReportTest.java @@ -19,6 +19,7 @@ import com.megaease.easeagent.config.Configs; import com.megaease.easeagent.plugin.api.config.ConfigConst; +import org.junit.Assert; import org.junit.Test; import zipkin2.Span; @@ -46,6 +47,7 @@ public void test1() throws InterruptedException { .build(); report.report(build); TimeUnit.SECONDS.sleep(3); + Assert.assertTrue(true); } @Test @@ -70,5 +72,6 @@ public void test2() throws InterruptedException { .build(); report.report(build); TimeUnit.SECONDS.sleep(3); + Assert.assertTrue(true); } } diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/DebugReporterMetrics.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/DebugReporterMetrics.java deleted file mode 100644 index f38459cd0..000000000 --- a/zipkin/src/main/java/com/megaease/easeagent/zipkin/DebugReporterMetrics.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright (c) 2017, MegaEase - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.megaease.easeagent.zipkin; - -import com.megaease.easeagent.log4j2.Logger; -import com.megaease.easeagent.log4j2.LoggerFactory; -import zipkin2.reporter.ReporterMetrics; - -public class DebugReporterMetrics implements ReporterMetrics { - - static final Logger LOGGER = LoggerFactory.getLogger(DebugReporterMetrics.class); - - private volatile long send = 0; - private volatile long sendBytes = 0; - private volatile long span = 0; - private volatile long spanBytes = 0; - private volatile int pending = 0; - private volatile int pendingBytes = 0; - private volatile long dropped = 0; - private volatile long droppedSpan = 0; - - @Override - public void incrementMessages() { - if (LOGGER.isDebugEnabled()) { - send += 1; - LOGGER.debug("Try to send messages {}", send); - } - } - - @Override - public void incrementMessagesDropped(Throwable cause) { - if (LOGGER.isDebugEnabled()) { - dropped += 1; - LOGGER.debug("Drop messages {}, {}", dropped, cause); - } - } - - @Override - public void incrementSpans(int quantity) { - if (LOGGER.isDebugEnabled()) { - span += quantity; - LOGGER.debug("Collect spans {}", span); - } - } - - @Override - public void incrementSpanBytes(int quantity) { - if (LOGGER.isDebugEnabled()) { - spanBytes += quantity; - LOGGER.debug("Collect span bytes {}", spanBytes); - } - } - - @Override - public void incrementMessageBytes(int quantity) { - if (LOGGER.isDebugEnabled()) { - sendBytes += quantity; - LOGGER.debug("Try to send message bytes {}", sendBytes); - } - - } - - @Override - public void incrementSpansDropped(int quantity) { - if (LOGGER.isDebugEnabled()) { - droppedSpan += quantity; - LOGGER.debug("Drop spans {}", droppedSpan); - } - } - - @Override - public void updateQueuedSpans(int update) { - if (LOGGER.isDebugEnabled()) { - pending = update; - LOGGER.debug("Current pending {}", pending); - } - - } - - @Override - public void updateQueuedBytes(int update) { - if (LOGGER.isDebugEnabled()) { - pendingBytes = update; - LOGGER.debug("Current pending bytes {}", pendingBytes); - } - } -} diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/GatewaySender.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/GatewaySender.java deleted file mode 100644 index 3d94f85a7..000000000 --- a/zipkin/src/main/java/com/megaease/easeagent/zipkin/GatewaySender.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright (c) 2017, MegaEase - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - package com.megaease.easeagent.zipkin; - -import zipkin2.Call; -import zipkin2.CheckResult; -import zipkin2.codec.Encoding; -import zipkin2.reporter.BytesMessageEncoder; -import zipkin2.reporter.Sender; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.List; -import java.util.zip.GZIPOutputStream; - -class GatewaySender extends Sender { - - private final Encoding encoding; - private final int messageMaxBytes; - private final String sendEndpoint; - private final int connectTimeout; - private final int readTimeout; - private final boolean sendCompression; - private final String userAgent; - - /** - * close is typically called from a different thread - */ - private volatile boolean closeCalled; - - - GatewaySender(int messageMaxBytes, String sendEndpoint, int connectTimeout, int readTimeout, - boolean sendCompression, String userAgent) { - this.messageMaxBytes = messageMaxBytes; - this.sendEndpoint = sendEndpoint; - this.connectTimeout = connectTimeout; - this.readTimeout = readTimeout; - this.sendCompression = sendCompression; - this.userAgent = userAgent; - encoding = Encoding.JSON; - } - - @Override - public Encoding encoding() { - return encoding; - } - - @Override - public int messageMaxBytes() { - return messageMaxBytes; - } - - @Override - public int messageSizeInBytes(List encodedSpans) { - return encoding().listSizeInBytes(encodedSpans); - } - - @Override - public Call sendSpans(List encodedSpans) { - if (closeCalled) throw new IllegalStateException("close"); - try { - byte[] message = BytesMessageEncoder.JSON.encode(encodedSpans); - send(message, "application/json"); - } catch (Throwable e) { - if (e instanceof Error) throw (Error) e; - } - return Call.create(null); - } - - @Override - public CheckResult check() { - try { - send(new byte[]{'[', ']'}, "application/json"); - return CheckResult.OK; - } catch (Exception e) { - return CheckResult.failed(e); - } - - } - - public void close() throws IOException { - closeCalled = true; - } - - private void send(byte[] body, String mediaType) throws IOException { - // intentionally not closing the connection, so as to use keep-alives - HttpURLConnection connection = (HttpURLConnection) new URL(sendEndpoint).openConnection(); - connection.setConnectTimeout(connectTimeout); - connection.setReadTimeout(readTimeout); - connection.setRequestMethod("POST"); - connection.addRequestProperty("Content-Type", mediaType); - connection.addRequestProperty("User-Agent", userAgent); - if (sendCompression) { - connection.addRequestProperty("Content-Encoding", "gzip"); - ByteArrayOutputStream gzipped = new ByteArrayOutputStream(); - GZIPOutputStream compressor = new GZIPOutputStream(gzipped); - try { - compressor.write(body); - } finally { - compressor.close(); - } - body = gzipped.toByteArray(); - } - connection.setDoOutput(true); - connection.setFixedLengthStreamingMode(body.length); - connection.getOutputStream().write(body); - - try { - discard(connection.getInputStream()); - final int code = connection.getResponseCode(); - if (code >= 400) throw new IOException(connection.getResponseMessage()); - } finally { - InputStream err = connection.getErrorStream(); - if (err != null) { // possible, if the connection was dropped - discard(err); - } - } - } - - private void discard(InputStream in) throws IOException { - try { - while (in.read() != -1) ; // skip - } finally { - in.close(); - } - } - -} diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/TracingProvider.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/TracingProviderImpl.java similarity index 89% rename from zipkin/src/main/java/com/megaease/easeagent/zipkin/TracingProvider.java rename to zipkin/src/main/java/com/megaease/easeagent/zipkin/TracingProviderImpl.java index 759631a37..17938c0ae 100644 --- a/zipkin/src/main/java/com/megaease/easeagent/zipkin/TracingProvider.java +++ b/zipkin/src/main/java/com/megaease/easeagent/zipkin/TracingProviderImpl.java @@ -28,6 +28,7 @@ import com.megaease.easeagent.plugin.annotation.Injection; import com.megaease.easeagent.plugin.api.config.ConfigConst; import com.megaease.easeagent.plugin.api.trace.ITracing; +import com.megaease.easeagent.plugin.api.trace.TracingProvider; import com.megaease.easeagent.plugin.api.trace.TracingSupplier; import com.megaease.easeagent.plugin.utils.AdditionalAttributes; import com.megaease.easeagent.report.AgentReport; @@ -41,7 +42,7 @@ import zipkin2.reporter.brave.AsyncZipkinSpanHandler; import zipkin2.reporter.urlconnection.URLConnectionSender; -public class TracingProvider implements BeanProvider, AgentReportAware, ConfigAware, IProvider, com.megaease.easeagent.plugin.api.trace.TracingProvider { +public class TracingProviderImpl implements BeanProvider, AgentReportAware, ConfigAware, IProvider, TracingProvider { private static final String ENV_ZIPKIN_SERVER_URL = "ZIPKIN_SERVER_URL"; private Tracing tracing; private ITracing iTracing; @@ -60,7 +61,6 @@ public void setAgentReport(AgentReport report) { this.agentReport = report; } - @Override public void afterPropertiesSet() { ThreadLocalCurrentTraceContext traceContext = ThreadLocalCurrentTraceContext.newBuilder() @@ -91,10 +91,10 @@ public void afterPropertiesSet() { reporter = span -> agentReport.report(span); } this.tracing = Tracing.newBuilder() - .localServiceName(serviceName.getValue()) + .localServiceName(getServiceName()) .traceId128Bit(false) .sampler(CountingSampler.create(1)) - .addSpanHandler(new CustomTagsSpanHandler(serviceName::getValue, AdditionalAttributes.getHostName())) + .addSpanHandler(new CustomTagsSpanHandler(this::getServiceName, AdditionalAttributes.getHostName())) .addSpanHandler(AsyncZipkinSpanHandler .newBuilder(reporter) .alwaysReportSpans(true) @@ -111,11 +111,11 @@ public Tracing tracing() { @Override public TracingSupplier tracingSupplier() { - return (supplier) -> { + return supplier -> { if (iTracing != null) { return iTracing; } - synchronized (TracingProvider.class) { + synchronized (TracingProviderImpl.class) { if (iTracing != null) { return iTracing; } @@ -124,4 +124,8 @@ public TracingSupplier tracingSupplier() { return iTracing; }; } + + private String getServiceName() { + return this.serviceName.getValue(); + } } diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/AsyncContextImpl.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/AsyncContextImpl.java index 9eedfa635..ec33af082 100644 --- a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/AsyncContextImpl.java +++ b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/AsyncContextImpl.java @@ -34,7 +34,9 @@ public class AsyncContextImpl implements AsyncContext { private final Map context; private final Supplier supplier; - private AsyncContextImpl(Tracing tracing, TraceContext traceContext, Supplier supplier, Map context) { + private AsyncContextImpl(Tracing tracing, TraceContext traceContext, + Supplier supplier, + Map context) { this.tracing = tracing; this.traceContext = traceContext; this.supplier = supplier; @@ -45,7 +47,9 @@ public static AsyncContextImpl build(Tracing tracing, TraceContext traceContext, return build(tracing, traceContext, supplier, null); } - public static AsyncContextImpl build(Tracing tracing, TraceContext traceContext, Supplier supplier, Map context) { + public static AsyncContextImpl build(Tracing tracing, TraceContext traceContext, + Supplier supplier, + Map context) { Map contextMap = context == null ? new HashMap<>() : new HashMap<>(context); return new AsyncContextImpl(tracing, traceContext, supplier, contextMap); } diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/MessagingTracingImpl.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/MessagingTracingImpl.java deleted file mode 100644 index dfe2846ef..000000000 --- a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/MessagingTracingImpl.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Copyright (c) 2017, MegaEase - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.megaease.easeagent.zipkin.impl; - -import brave.messaging.ConsumerRequest; -import brave.messaging.MessagingRequest; -import brave.messaging.ProducerRequest; -import brave.propagation.TraceContext; -import brave.propagation.TraceContextOrSamplingFlags; -import com.megaease.easeagent.plugin.api.trace.*; -import com.megaease.easeagent.plugin.bridge.NoOpTracer; - -import javax.annotation.Nonnull; -import java.util.function.Function; - -public class MessagingTracingImpl implements MessagingTracing { - private final brave.messaging.MessagingTracing messagingTracing; - private final Extractor extractor; - private final Injector producerInjector; - private final Injector consumerInjector; - private final Function consumerSampler; - private final Function producerSampler; - - private MessagingTracingImpl(brave.messaging.MessagingTracing messagingTracing) { - this.messagingTracing = messagingTracing; - this.extractor = new ExtractorImpl(messagingTracing.propagation().extractor(com.megaease.easeagent.plugin.api.trace.MessagingRequest::header)); - this.producerInjector = new InjectorImpl(messagingTracing.propagation().injector(new RemoteSetterImpl<>(brave.Span.Kind.PRODUCER))); - this.consumerInjector = new InjectorImpl(messagingTracing.propagation().injector(new RemoteSetterImpl<>(brave.Span.Kind.CONSUMER))); - this.consumerSampler = new SamplerFunction(ZipkinConsumerRequest::new, messagingTracing.consumerSampler()); - this.producerSampler = new SamplerFunction(ZipkinProducerRequest::new, messagingTracing.producerSampler()); - } - - public static MessagingTracing build(brave.Tracing tracing) { - if (tracing == null) { - return NoOpTracer.NO_OP_MESSAGING_TRACING; - } - brave.messaging.MessagingTracing messagingTracing = brave.messaging.MessagingTracing.newBuilder(tracing).build(); - return new MessagingTracingImpl(messagingTracing); - } - - @Override - public Extractor extractor() { - return extractor; - } - - @Override - public Injector producerInjector() { - return producerInjector; - } - - @Override - public Injector consumerInjector() { - return consumerInjector; - } - - @Override - public Function consumerSampler() { - return consumerSampler; - } - - @Override - public Function producerSampler() { - return producerSampler; - } - - @Override - public boolean consumerSampler(R request) { - return messagingTracing.consumerSampler().trySample(new ZipkinConsumerRequest(request)); - } - - @Override - public boolean producerSampler(R request) { - return messagingTracing.producerSampler().trySample(new ZipkinProducerRequest(request)); - } - - - public class ExtractorImpl implements Extractor { - private final TraceContext.Extractor extractor; - - public ExtractorImpl(TraceContext.Extractor extractor) { - this.extractor = extractor; - } - - @Override - public Message extract(com.megaease.easeagent.plugin.api.trace.MessagingRequest request) { - return new MessageImpl(extractor.extract(request)); - } - } - - public class InjectorImpl implements Injector { - private final TraceContext.Injector injector; - - public InjectorImpl(TraceContext.Injector injector) { - this.injector = injector; - } - - @Override - public void inject(Span span, com.megaease.easeagent.plugin.api.trace.MessagingRequest request) { - if (span instanceof SpanImpl) { - this.injector.inject(((SpanImpl) span).getSpan().context(), request); - } - } - } - - public class SamplerFunction implements Function { - private final Function builder; - private final brave.sampler.SamplerFunction function; - - public SamplerFunction(@Nonnull Function builder, @Nonnull brave.sampler.SamplerFunction function) { - this.builder = builder; - this.function = function; - } - - @Override - public Boolean apply(R request) { - return function.trySample(builder.apply(request)); - } - } - - public class ZipkinProducerRequest extends ProducerRequest { - private final R request; - - public ZipkinProducerRequest(R request) { - this.request = request; - } - - @Override - public String operation() { - return request.operation(); - } - - @Override - public String channelKind() { - return request.channelKind(); - } - - @Override - public String channelName() { - return request.channelName(); - } - - @Override - public Object unwrap() { - return request.unwrap(); - } - } - - public class ZipkinConsumerRequest extends ConsumerRequest { - - private final R request; - - public ZipkinConsumerRequest(R request) { - this.request = request; - } - - @Override - public String operation() { - return request.operation(); - } - - @Override - public String channelKind() { - return request.channelKind(); - } - - @Override - public String channelName() { - return request.channelName(); - } - - @Override - public Object unwrap() { - return request.unwrap(); - } - } -} diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/RequestContextImpl.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/RequestContextImpl.java index fe05de8d5..a49a612fc 100644 --- a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/RequestContextImpl.java +++ b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/RequestContextImpl.java @@ -27,6 +27,7 @@ import com.megaease.easeagent.plugin.api.trace.Span; import com.megaease.easeagent.plugin.api.trace.Tracing; +import java.util.HashMap; import java.util.Map; import java.util.function.Supplier; @@ -74,8 +75,7 @@ public Map getHeaders() { @Override public AsyncContext async() { - @SuppressWarnings("unchecked") - Map headers = (Map) asyncRequest.getHeaders(); + Map headers = new HashMap<>(asyncRequest.getHeaders()); return AsyncContextImpl.build(tracing, braveSpan.context(), supplier, headers); } diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/SpanImpl.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/SpanImpl.java index 2e9fc30f2..3a6b1f8aa 100644 --- a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/SpanImpl.java +++ b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/SpanImpl.java @@ -20,6 +20,7 @@ import brave.Tracing; import brave.propagation.CurrentTraceContext; import brave.propagation.TraceContext; +import brave.propagation.TraceContextOrSamplingFlags; import com.megaease.easeagent.plugin.api.trace.Request; import com.megaease.easeagent.plugin.api.trace.Scope; import com.megaease.easeagent.plugin.api.trace.Span; @@ -28,14 +29,14 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Collections; -import java.util.HashMap; +import java.util.EnumMap; import java.util.Map; public class SpanImpl implements Span { private static final Map KINDS; static { - Map kinds = new HashMap<>(); + Map kinds = new EnumMap<>(Kind.class); kinds.put(Kind.CLIENT, brave.Span.Kind.CLIENT); kinds.put(Kind.SERVER, brave.Span.Kind.SERVER); kinds.put(Kind.PRODUCER, brave.Span.Kind.PRODUCER); @@ -48,24 +49,68 @@ public class SpanImpl implements Span { private CurrentTraceContext.Scope scope; private final TraceContext.Injector injector; - private SpanImpl(@Nonnull Tracing tracing, @Nonnull brave.Span span, @Nonnull TraceContext.Injector injector) { + private SpanImpl(@Nonnull Tracing tracing, @Nonnull brave.Span span, + @Nonnull TraceContext.Injector injector) { this.tracing = tracing; this.span = span; this.injector = injector; } - public static Span build(Tracing tracing, brave.Span span, TraceContext.Injector injector) { + public static Span build(Tracing tracing, + brave.Span span, + boolean cachedScope, + TraceContext.Injector injector) { if (span == null) { return NoOpTracer.NO_OP_SPAN; } - return new SpanImpl(tracing, span, injector); + + TraceContext.Injector ci = (TraceContext.Injector) injector; + SpanImpl eSpan = new SpanImpl(tracing, span, ci); + + if (cachedScope) { + eSpan.cacheScope(); + } + return eSpan; + } + + public static Span build(Tracing tracing, brave.Span span, TraceContext.Injector injector) { + return build(tracing, span, false, injector); } public static brave.Span.Kind braveKind(Kind kind) { return KINDS.get(kind); } - protected brave.Span getSpan() { + public static brave.Span nextBraveSpan(Tracing tracing, + TraceContext.Extractor extractor, Request request) { + TraceContext maybeParent = tracing.currentTraceContext().get(); + // Unlike message consumers, we try current span before trying extraction. This is the proper + // order because the span in scope should take precedence over a potentially stale header entry. + // + brave.Span span; + if (maybeParent == null) { + TraceContext.Extractor rExtractor = (TraceContext.Extractor)extractor; + TraceContextOrSamplingFlags extracted = rExtractor.extract(request); + span = tracing.tracer().nextSpan(extracted); + } else { // If we have a span in scope assume headers were cleared before + span = tracing.tracer().newChild(maybeParent); + } + if (span.isNoop()) { + return span; + } + setInfo(span, request); + return span; + } + + private static void setInfo(brave.Span span, Request request) { + Span.Kind kind = request.kind(); + if (kind != null) { + span.kind(SpanImpl.braveKind(kind)); + } + span.name(request.name()); + } + + public brave.Span getSpan() { return span; } diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/TracingImpl.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/TracingImpl.java index eb7fb84e3..691931aeb 100644 --- a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/TracingImpl.java +++ b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/TracingImpl.java @@ -17,8 +17,8 @@ package com.megaease.easeagent.zipkin.impl; -import brave.Tracer; import brave.propagation.CurrentTraceContext; +import brave.propagation.Propagation; import brave.propagation.TraceContext; import brave.propagation.TraceContextOrSamplingFlags; import com.megaease.easeagent.log4j2.Logger; @@ -29,6 +29,7 @@ import com.megaease.easeagent.plugin.api.trace.*; import com.megaease.easeagent.plugin.bridge.NoOpContext; import com.megaease.easeagent.plugin.bridge.NoOpTracer; +import com.megaease.easeagent.zipkin.impl.message.MessagingTracingImpl; import javax.annotation.Nonnull; import java.util.List; @@ -39,54 +40,34 @@ public class TracingImpl implements ITracing { private final Supplier supplier; private final brave.Tracing tracing; private final brave.Tracer tracer; - private final TraceContext.Injector defaultInjector; - private final TraceContext.Injector clientInjector; - private final TraceContext.Injector consumerInjector; - private final TraceContext.Injector producerInjector; - private final TraceContext.Extractor defaultExtractor; - private final TraceContext.Extractor producerExtractor; - private final TraceContext.Extractor consumerExtractor; - private final MessagingTracing messagingTracing; + + private final TraceContext.Injector defaultZipkinInjector; + private final TraceContext.Injector clientZipkinInjector; + private final TraceContext.Extractor defaultZipkinExtractor; + + private final MessagingTracing messagingTracing; private final List propagationKeys; private TracingImpl(@Nonnull Supplier supplier, - @Nonnull brave.Tracing tracing, - @Nonnull Tracer tracer, - @Nonnull TraceContext.Injector defaultInjector, - @Nonnull TraceContext.Injector clientInjector, - @Nonnull TraceContext.Injector producerInjector, - @Nonnull TraceContext.Injector consumerInjector, - @Nonnull TraceContext.Extractor defaultExtractor, - TraceContext.Extractor producerExtractor, - TraceContext.Extractor consumerExtractor, - @Nonnull MessagingTracing messagingTracing, List propagationKeys) { + @Nonnull brave.Tracing tracing) { this.supplier = supplier; this.tracing = tracing; - this.tracer = tracer; - this.defaultInjector = defaultInjector; - this.clientInjector = clientInjector; - this.consumerInjector = consumerInjector; - this.producerInjector = producerInjector; - this.defaultExtractor = defaultExtractor; - this.consumerExtractor = consumerExtractor; - this.producerExtractor = producerExtractor; - this.messagingTracing = messagingTracing; - this.propagationKeys = propagationKeys; + this.tracer = tracing.tracer(); + this.propagationKeys = tracing.propagation().keys(); + Propagation propagation = tracing.propagation(); + + this.defaultZipkinInjector = propagation.injector(Request::setHeader); + this.clientZipkinInjector = propagation.injector(new RemoteSetterImpl<>(brave.Span.Kind.CLIENT)); + this.defaultZipkinExtractor = propagation.extractor(Request::header); + this.messagingTracing = MessagingTracingImpl.build(tracing); } public static ITracing build(Supplier supplier, brave.Tracing tracing) { - return tracing == null ? NoOpTracer.NO_OP_TRACING : - new TracingImpl(supplier, tracing, - tracing.tracer(), - tracing.propagation().injector(Request::setHeader), - tracing.propagation().injector(new RemoteSetterImpl<>(brave.Span.Kind.CLIENT)), - tracing.propagation().injector(new RemoteSetterImpl<>(brave.Span.Kind.PRODUCER)), - tracing.propagation().injector(new RemoteSetterImpl<>(brave.Span.Kind.CONSUMER)), - tracing.propagation().extractor(Request::header), - tracing.propagation().extractor(new RemoteGetterImpl<>(brave.Span.Kind.PRODUCER)), - tracing.propagation().extractor(new RemoteGetterImpl<>(brave.Span.Kind.CONSUMER)), - MessagingTracingImpl.build(tracing), - tracing.propagation().keys()); + if (tracing == null) { + return NoOpTracer.NO_OP_TRACING; + } + + return new TracingImpl(supplier, tracing); } @Override @@ -110,7 +91,6 @@ private brave.Tracing tracing() { @Override public Span currentSpan() { - brave.Tracer tracer = tracer(); Span span = NoOpTracer.NO_OP_SPAN; if (tracer != null) { span = build(tracer.currentSpan()); @@ -123,11 +103,7 @@ private Span build(brave.Span bSpan) { } private Span build(brave.Span bSpan, boolean cacheScope) { - Span span = SpanImpl.build(tracing(), bSpan, defaultInjector); - if (cacheScope) { - span.cacheScope(); - } - return span; + return SpanImpl.build(tracing(), bSpan, cacheScope, defaultZipkinInjector); } private void setInfo(brave.Span span, Request request) { @@ -139,7 +115,6 @@ private void setInfo(brave.Span span, Request request) { } private TraceContext currentTraceContext() { - Tracer tracer = tracer(); if (tracer == null) { LOGGER.debug("tracer was null."); return null; @@ -172,35 +147,16 @@ public Scope importAsync(AsyncContext snapshot) { @Override public RequestContext nextServer(Request request) { - brave.Span span = nextBraveSpan(defaultExtractor, request); + brave.Span span = SpanImpl.nextBraveSpan(tracing, defaultZipkinExtractor, request); AsyncRequest asyncRequest = new AsyncRequest(request); - clientInjector.inject(span.context(), asyncRequest); + clientZipkinInjector.inject(span.context(), asyncRequest); Span newSpan = build(span, request.cacheScope()); return new RequestContextImpl(this, span, newSpan, newSpan.maybeScope(), asyncRequest, supplier); } - private brave.Span nextBraveSpan(TraceContext.Extractor extractor, Request request) { - TraceContext maybeParent = tracing.currentTraceContext().get(); - // Unlike message consumers, we try current span before trying extraction. This is the proper - // order because the span in scope should take precedence over a potentially stale header entry. - // - brave.Span span; - if (maybeParent == null) { - TraceContextOrSamplingFlags extracted = extractor.extract(request); - span = tracer().nextSpan(extracted); - } else { // If we have a span in scope assume headers were cleared before - span = tracer.newChild(maybeParent); - } - if (span.isNoop()) { - return span; - } - setInfo(span, request); - return span; - } - @Override public RequestContext serverImport(Request request) { - TraceContextOrSamplingFlags extracted = defaultExtractor.extract(request); + TraceContextOrSamplingFlags extracted = defaultZipkinExtractor.extract(request); brave.Span span = extracted.context() != null ? tracer().joinSpan(extracted.context()) : tracer().nextSpan(extracted); @@ -209,7 +165,7 @@ public RequestContext serverImport(Request request) { } setInfo(span, request); AsyncRequest asyncRequest = new AsyncRequest(request); - defaultInjector.inject(span.context(), asyncRequest); + defaultZipkinInjector.inject(span.context(), asyncRequest); Span newSpan = build(span, request.cacheScope()); return new RequestContextImpl(this, span, newSpan, newSpan.maybeScope(), asyncRequest, supplier); } @@ -224,9 +180,8 @@ public Span nextSpan() { return nextSpan(null); } - @Override - public Span nextSpan(Message message) { + public Span nextSpan(Message message) { Object msg = message == null ? null : message.get(); Span span = null; if (msg == null) { @@ -238,42 +193,17 @@ public Span nextSpan(Message message) { } @Override - public MessagingTracing messagingTracing() { + public MessagingTracing messagingTracing() { return messagingTracing; } - private void setMessageInfo(brave.Span span, MessagingRequest request) { - if (request.operation() != null) { - span.tag("messaging.operation", request.operation()); - } - if (request.channelKind() != null) { - span.tag("messaging.channel_kind", request.channelKind()); - } - if (request.channelName() != null) { - span.tag("messaging.channel_name", request.channelName()); - } - } - @Override public Span consumerSpan(MessagingRequest request) { - brave.Span span = nextBraveSpan(consumerExtractor, request); - if (span.isNoop()) { - return NoOpTracer.NO_OP_SPAN; - } - setMessageInfo(span, request); - return NoOpTracer.noNullSpan(build(span, request.cacheScope())); + return this.messagingTracing.consumerSpan(request); } @Override public Span producerSpan(MessagingRequest request) { - brave.Span span = nextBraveSpan(producerExtractor, request); - if (span.isNoop()) { - return NoOpTracer.NO_OP_SPAN; - } - setMessageInfo(span, request); - producerInjector.inject(span.context(), request); - return NoOpTracer.noNullSpan(build(span, request.cacheScope())); + return this.messagingTracing.producerSpan(request); } - - } diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/message/MessagingTracingImpl.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/message/MessagingTracingImpl.java new file mode 100644 index 000000000..3f36be9b0 --- /dev/null +++ b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/message/MessagingTracingImpl.java @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2017, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.megaease.easeagent.zipkin.impl.message; + +import brave.propagation.TraceContext; +import brave.propagation.TraceContextOrSamplingFlags; +import com.megaease.easeagent.plugin.api.trace.*; +import com.megaease.easeagent.plugin.bridge.NoOpTracer; +import com.megaease.easeagent.zipkin.impl.MessageImpl; +import com.megaease.easeagent.zipkin.impl.RemoteSetterImpl; +import com.megaease.easeagent.zipkin.impl.SpanImpl; + +import javax.annotation.Nonnull; +import java.util.function.Function; +import java.util.function.Predicate; + +public class MessagingTracingImpl implements MessagingTracing { + private final brave.messaging.MessagingTracing messagingTracing; + private final Extractor extractor; + private final Injector producerInjector; + private final Injector consumerInjector; + + private final Predicate consumerSampler; + private final Predicate producerSampler; + + private final TraceContext.Injector zipkinProducerInjector; + private final TraceContext.Injector zipkinConsumerInjector; + private final TraceContext.Extractor zipkinMessageExtractor; + + private MessagingTracingImpl(brave.messaging.MessagingTracing messagingTracing) { + this.messagingTracing = messagingTracing; + this.zipkinMessageExtractor = messagingTracing.propagation().extractor(MessagingRequest::header); + this.zipkinProducerInjector = messagingTracing.propagation().injector(new RemoteSetterImpl<>(brave.Span.Kind.PRODUCER)); + this.zipkinConsumerInjector = messagingTracing.propagation().injector(new RemoteSetterImpl<>(brave.Span.Kind.CONSUMER)); + + this.extractor = new ExtractorImpl(messagingTracing.propagation().extractor(MessagingRequest::header)); + this.producerInjector = new InjectorImpl(this.zipkinProducerInjector); + this.consumerInjector = new InjectorImpl(this.zipkinConsumerInjector); + + this.consumerSampler = new SamplerFunction(ZipkinConsumerRequest::new, messagingTracing.consumerSampler()); + this.producerSampler = new SamplerFunction(ZipkinProducerRequest::new, messagingTracing.producerSampler()); + } + + public static MessagingTracing build(brave.Tracing tracing) { + if (tracing == null) { + return NoOpTracer.NO_OP_MESSAGING_TRACING; + } + brave.messaging.MessagingTracing messagingTracing = brave.messaging.MessagingTracing + .newBuilder(tracing).build(); + + return new MessagingTracingImpl<>(messagingTracing); + } + + @Override + public Span consumerSpan(MessagingRequest request) { + brave.Tracing tracing = messagingTracing.tracing(); + brave.Span span = SpanImpl.nextBraveSpan(tracing, this.zipkinMessageExtractor, request); + if (span.isNoop()) { + return NoOpTracer.NO_OP_SPAN; + } + setMessageInfo(span, request); + Span eSpan = SpanImpl.build(messagingTracing.tracing(),span, + request.cacheScope(), this.zipkinConsumerInjector); + + return NoOpTracer.noNullSpan(eSpan); + } + + @Override + public Span producerSpan(MessagingRequest request) { + brave.Tracing tracing = messagingTracing.tracing(); + brave.Span span = SpanImpl.nextBraveSpan(tracing, this.zipkinMessageExtractor, request); + if (span.isNoop()) { + return NoOpTracer.NO_OP_SPAN; + } + setMessageInfo(span, request); + Span eSpan = SpanImpl.build(messagingTracing.tracing(), span, true, zipkinProducerInjector); + producerInjector.inject(eSpan, (R)request); + return NoOpTracer.noNullSpan(eSpan); + } + + @Override + public Extractor extractor() { + return extractor; + } + + @Override + public Injector producerInjector() { + return producerInjector; + } + + @Override + public Injector consumerInjector() { + return consumerInjector; + } + + @Override + public Predicate consumerSampler() { + return consumerSampler; + } + + @Override + public Predicate producerSampler() { + return producerSampler; + } + + @Override + public boolean consumerSampler(R request) { + return messagingTracing.consumerSampler().trySample(new ZipkinConsumerRequest<>(request)); + } + + @Override + public boolean producerSampler(R request) { + return messagingTracing.producerSampler().trySample(new ZipkinProducerRequest<>(request)); + } + + private void setMessageInfo(brave.Span span, MessagingRequest request) { + if (request.operation() != null) { + span.tag("messaging.operation", request.operation()); + } + if (request.channelKind() != null) { + span.tag("messaging.channel_kind", request.channelKind()); + } + if (request.channelName() != null) { + span.tag("messaging.channel_name", request.channelName()); + } + } + + public class ExtractorImpl implements Extractor { + private final TraceContext.Extractor extractor; + + public ExtractorImpl(TraceContext.Extractor extractor) { + this.extractor = extractor; + } + + @Override + public Message extract(MessagingRequest request) { + return new MessageImpl(extractor.extract(request)); + } + } + + public class InjectorImpl implements Injector { + private final TraceContext.Injector injector; + + public InjectorImpl(TraceContext.Injector injector) { + this.injector = injector; + } + + @Override + public void inject(Span span, R request) { + if (span instanceof SpanImpl) { + this.injector.inject(((SpanImpl) span).getSpan().context(), request); + } + } + + public TraceContext.Injector getInjector() { + return this.injector; + } + } + + public class SamplerFunction implements Predicate { + private final Function builder; + private final brave.sampler.SamplerFunction function; + + public SamplerFunction(@Nonnull Function builder, + @Nonnull brave.sampler.SamplerFunction function) { + this.builder = builder; + this.function = function; + } + + @Override + public boolean test(R request) { + return function.trySample(builder.apply(request)); + } + } + +} diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/message/ZipkinConsumerRequest.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/message/ZipkinConsumerRequest.java new file mode 100644 index 000000000..da809f27b --- /dev/null +++ b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/message/ZipkinConsumerRequest.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.megaease.easeagent.zipkin.impl.message; + +import brave.messaging.ConsumerRequest; +import com.megaease.easeagent.plugin.api.trace.MessagingRequest; + +public class ZipkinConsumerRequest extends ConsumerRequest { + + private final R request; + + public ZipkinConsumerRequest(R request) { + this.request = request; + } + + @Override + public String operation() { + return request.operation(); + } + + @Override + public String channelKind() { + return request.channelKind(); + } + + @Override + public String channelName() { + return request.channelName(); + } + + @Override + public Object unwrap() { + return request.unwrap(); + } +} diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/message/ZipkinProducerRequest.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/message/ZipkinProducerRequest.java new file mode 100644 index 000000000..fbc6245f4 --- /dev/null +++ b/zipkin/src/main/java/com/megaease/easeagent/zipkin/impl/message/ZipkinProducerRequest.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.megaease.easeagent.zipkin.impl.message; + +import brave.messaging.ProducerRequest; +import com.megaease.easeagent.plugin.api.trace.MessagingRequest; + +public class ZipkinProducerRequest extends ProducerRequest { + private final R request; + + public ZipkinProducerRequest(R request) { + this.request = request; + } + + @Override + public String operation() { + return request.operation(); + } + + @Override + public String channelKind() { + return request.channelKind(); + } + + @Override + public String channelName() { + return request.channelName(); + } + + @Override + public Object unwrap() { + return request.unwrap(); + } +} diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/logging/AgentMDCScopeDecorator.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/logging/AgentMDCScopeDecorator.java index 339c5d932..467a97044 100644 --- a/zipkin/src/main/java/com/megaease/easeagent/zipkin/logging/AgentMDCScopeDecorator.java +++ b/zipkin/src/main/java/com/megaease/easeagent/zipkin/logging/AgentMDCScopeDecorator.java @@ -35,10 +35,6 @@ public static CurrentTraceContext.ScopeDecorator getV2() { return INSTANCE_V2; } - public static CorrelationScopeDecorator.Builder newBuilder() { - return new AgentMDCScopeDecorator.Builder(); - } - static final class Builder extends CorrelationScopeDecorator.Builder { Builder() { super(AgentMDCScopeDecorator.MDCContext.INSTANCE); @@ -80,10 +76,7 @@ public boolean update(String name, @Nullable String value) { } private ClassLoader getUserClassLoader() { - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - return classLoader; -// if (classLoader==null){ -// } + return Thread.currentThread().getContextClassLoader(); } } diff --git a/zipkin/src/main/java/com/megaease/easeagent/zipkin/logging/LogUtils.java b/zipkin/src/main/java/com/megaease/easeagent/zipkin/logging/LogUtils.java index 510060040..c61559934 100644 --- a/zipkin/src/main/java/com/megaease/easeagent/zipkin/logging/LogUtils.java +++ b/zipkin/src/main/java/com/megaease/easeagent/zipkin/logging/LogUtils.java @@ -45,32 +45,34 @@ public class LogUtils { private static final String LOG4J_CHECK_CLASS_NAME = "org.apache.logging.log4j.core.Appender"; private static final String LOGBACK_CHECK_CLASS_NAME = "ch.qos.logback.core.Appender"; - private static Boolean LOG4J_LOADED; - private static Boolean LOGBACK_LOADED; + private static Boolean log4jLoaded; + private static Boolean logbackLoaded; - public static Class LOG4J_MDC_CLASS; - public static Class LOGBACK_MDC_CLASS; + private static Class log4jMdcClass; + private static Class logbackMdcClass; + + private LogUtils() {} public static Class checkLog4JMDC(ClassLoader classLoader) { - if (LOG4J_LOADED != null) { - return LOG4J_MDC_CLASS; + if (log4jLoaded != null) { + return log4jMdcClass; } if (loadClass(classLoader, LOG4J_CHECK_CLASS_NAME) != null) { - LOG4J_MDC_CLASS = loadClass(classLoader, LOG4J_MDC_CLASS_NAME); + log4jMdcClass = loadClass(classLoader, LOG4J_MDC_CLASS_NAME); } - LOG4J_LOADED = true; - return LOG4J_MDC_CLASS; + log4jLoaded = true; + return log4jMdcClass; } public static Class checkLogBackMDC(ClassLoader classLoader) { - if (LOGBACK_LOADED != null) { - return LOGBACK_MDC_CLASS; + if (logbackLoaded != null) { + return logbackMdcClass; } if (loadClass(classLoader, LOGBACK_CHECK_CLASS_NAME) != null) { - LOGBACK_MDC_CLASS = loadClass(classLoader, LOGBACK_MDC_CLASS_NAME); + logbackMdcClass = loadClass(classLoader, LOGBACK_MDC_CLASS_NAME); } - LOGBACK_LOADED = true; - return LOGBACK_MDC_CLASS; + logbackLoaded = true; + return logbackMdcClass; } public static Class loadClass(ClassLoader classLoader, String className) { @@ -81,8 +83,6 @@ public static Class loadClass(ClassLoader classLoader, String className) { } } - // Method handling - /** * Attempt to find a {@link Method} on the supplied class with the supplied name * and no parameters. Searches all superclasses up to {@code Object}. @@ -142,7 +142,7 @@ private static Method[] getDeclaredMethods(Class clazz, boolean defensive) { result = declaredMethods; } declaredMethodsCache.put(clazz, (result.length == 0 ? EMPTY_METHOD_ARRAY : result)); - } catch (Throwable ex) { + } catch (Exception ex) { throw new IllegalStateException("Failed to introspect Class [" + clazz.getName() + "] from ClassLoader [" + clazz.getClassLoader() + "]", ex); } @@ -269,23 +269,4 @@ public static void rethrowRuntimeException(Throwable ex) { } throw new UndeclaredThrowableException(ex); } - -// -// public static Class loadLog4JMDC(ClassLoader classLoader) { -// if (LOG4J_LOADED != null) { -// return LOG4J_MDC_CLASS; -// } -// LOG4J_MDC_CLASS = loadClass(classLoader, LOG4J_MDC_CLASS_NAME); -// LOG4J_LOADED = true; -// return LOG4J_MDC_CLASS; -// } -// -// public static Class loadLogBackMDC(ClassLoader classLoader) { -// if (LOGBACK_LOADED != null) { -// return LOGBACK_MDC_CLASS; -// } -// LOGBACK_MDC_CLASS = loadClass(classLoader, LOGBACK_MDC_CLASS_NAME); -// LOGBACK_LOADED = true; -// return LOGBACK_MDC_CLASS; -// } } diff --git a/zipkin/src/main/resources/META-INF/services/com.megaease.easeagent.plugin.BeanProvider b/zipkin/src/main/resources/META-INF/services/com.megaease.easeagent.plugin.BeanProvider index d2ec268df..1bc59f66d 100644 --- a/zipkin/src/main/resources/META-INF/services/com.megaease.easeagent.plugin.BeanProvider +++ b/zipkin/src/main/resources/META-INF/services/com.megaease.easeagent.plugin.BeanProvider @@ -1 +1 @@ -com.megaease.easeagent.zipkin.TracingProvider +com.megaease.easeagent.zipkin.TracingProviderImpl diff --git a/zipkin/src/test/java/com/megaease/easeagent/zipkin/BaseZipkinTest.java b/zipkin/src/test/java/com/megaease/easeagent/zipkin/BaseZipkinTest.java deleted file mode 100644 index fe92861ca..000000000 --- a/zipkin/src/test/java/com/megaease/easeagent/zipkin/BaseZipkinTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright (c) 2017, MegaEase - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.megaease.easeagent.zipkin; - -import brave.Tracer; -import brave.Tracing; -import brave.propagation.StrictCurrentTraceContext; -import org.junit.After; -import zipkin2.Span; -import zipkin2.reporter.Reporter; - -public class BaseZipkinTest { - StrictCurrentTraceContext currentTraceContext = StrictCurrentTraceContext.create(); - - @After - public void close() { - Tracing current = Tracing.current(); - if (current != null) current.close(); - currentTraceContext.close(); - } - - protected Tracer tracer(Reporter reporter) { - return Tracing.newBuilder() - .currentTraceContext(currentTraceContext) - .spanReporter(reporter).build().tracer(); - } - -// protected AgentInterceptorChain mockChain() { -// return mock(AgentInterceptorChain.class); -// } -// -// protected Config createConfig(String key, String value) { -// Map map = new HashMap<>(); -// map.put(key, value); -// map.put(SwitchUtil.GLOBAL_METRICS_ENABLE_KEY, "true"); -// map.put(SwitchUtil.GLOBAL_TRACING_ENABLE_KEY, "true"); -// return new Configs(map); -// } -} diff --git a/zipkin/src/test/java/com/megaease/easeagent/zipkin/GatewaySenderTest.java b/zipkin/src/test/java/com/megaease/easeagent/zipkin/GatewaySenderTest.java deleted file mode 100644 index 278f4b75a..000000000 --- a/zipkin/src/test/java/com/megaease/easeagent/zipkin/GatewaySenderTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2017, MegaEase - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.megaease.easeagent.zipkin; - -import com.github.dreamhead.moco.HttpServer; -import com.github.dreamhead.moco.Runnable; -import org.junit.Test; - -import java.util.Collections; - -import static com.github.dreamhead.moco.Moco.*; -import static com.github.dreamhead.moco.Runner.running; - -public class GatewaySenderTest { - @Test - public void should_work() throws Exception { - final HttpServer server = httpServer(log()); - server.request(eq(header("User-Agent"), "easeagent/0.1.0")).response(status(200)); - - running(server, new Runnable() { - @Override - public void run() throws Exception { - new GatewaySender(1024, "http://localhost:" + server.port(), 1000, 1000, false, "easeagent/0.1.0") - .sendSpans(Collections.singletonList(new byte[0])); - } - }); - } -} \ No newline at end of file