Skip to content

Commit

Permalink
Fix typo in Zipkin and Report (#178)
Browse files Browse the repository at this point in the history
* fix typo in zipkin

* fix typo

* fix typo

* fix license
  • Loading branch information
Oseenix authored Jan 5, 2022
1 parent f2e68f7 commit e59e783
Show file tree
Hide file tree
Showing 43 changed files with 641 additions and 892 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void httpServer() throws Exception {

DatagramSocket s = new DatagramSocket(0);
int port = s.getLocalPort();

s.close();
String httpServer = "http://127.0.0.1:" + port;
System.out.println("run up http server : " + httpServer);
AgentHttpServer agentHttpServer = new AgentHttpServer(port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@

import com.megaease.easeagent.metrics.MetricRegistryService;
import com.megaease.easeagent.plugin.api.metric.*;
import org.awaitility.Duration;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;

public class MetricRegistryImplTest {
String countName = "countName";
String meterName = "meterName";
Expand Down Expand Up @@ -98,7 +95,7 @@ public void timer() throws InterruptedException {
Timer timer = metricRegistry.timer(timerName);
timer.update(10, TimeUnit.MILLISECONDS);
Assert.assertEquals(1, timer.getCount());
timer.update(100, TimeUnit.MILLISECONDS);
timer.update(200, TimeUnit.MILLISECONDS);
Assert.assertEquals(2, timer.getCount());
Timer.Context context = timer.time();
Thread.sleep(50);
Expand All @@ -107,9 +104,9 @@ public void timer() throws InterruptedException {
Snapshot snapshot = timer.getSnapshot();
Assert.assertEquals(3, snapshot.size());
Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(10), snapshot.getMin());
Assert.assertTrue(snapshot.getMedian() > TimeUnit.MILLISECONDS.toNanos(30));
Assert.assertTrue(snapshot.getMedian() < TimeUnit.MILLISECONDS.toNanos(70));
Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(100), snapshot.getMax());
Assert.assertTrue(snapshot.getMedian() > TimeUnit.MILLISECONDS.toNanos(20));
Assert.assertTrue(snapshot.getMedian() < TimeUnit.MILLISECONDS.toNanos(80));
Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(200), snapshot.getMax());
}

class TestGauge implements Gauge<String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import com.megaease.easeagent.mock.config.ConfigMock;
import com.megaease.easeagent.mock.report.ReportMock;
import com.megaease.easeagent.mock.utils.MockProvider;
import com.megaease.easeagent.zipkin.TracingProvider;
import com.megaease.easeagent.zipkin.TracingProviderImpl;

public class TracingProviderMock implements MockProvider {
private static final TracingProvider TRACING_PROVIDER = new TracingProvider();
private static final TracingProviderImpl TRACING_PROVIDER = new TracingProviderImpl();
private static final Tracing TRACING;

static {
Expand All @@ -34,7 +34,7 @@ public class TracingProviderMock implements MockProvider {
TRACING = TRACING_PROVIDER.tracing();
}

public static TracingProvider getTracingProvider() {
public static TracingProviderImpl getTracingProvider() {
return TRACING_PROVIDER;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package com.megaease.easeagent.plugin.api.trace;

import java.util.function.Function;
import com.megaease.easeagent.plugin.api.Context;

import java.util.function.Predicate;

/**
* a MessagingTracing
Expand Down Expand Up @@ -49,7 +51,7 @@ public interface MessagingTracing<R extends MessagingRequest> {
* yet been made. For example, if a trace is already in progress, this function is not called. You
* can implement this to skip channels that you never want to trace.
*/
Function<R, Boolean> consumerSampler();
Predicate<R> consumerSampler();

/**
* Returns an overriding sampling decision for a new trace. Defaults to ignore the request and use
Expand All @@ -59,7 +61,7 @@ public interface MessagingTracing<R extends MessagingRequest> {
* making an messaging request as a part of booting your application. You may want to opt-out of
* tracing producer requests that did not originate from a consumer request.
*/
Function<R, Boolean> producerSampler();
Predicate<R> producerSampler();

/**
* Returns an overriding sampling decision for a new trace.
Expand All @@ -75,4 +77,43 @@ public interface MessagingTracing<R extends MessagingRequest> {
* @see #consumerSampler()
*/
boolean producerSampler(R request);

/**
* Obtain key:value from the message request and create a Span, Examples: kafka consumer, rebbitmq consumer
* <p>
* It will set the Span's kind, name and cached scope through {@link Request#kind()}, {@link Request#name()}
* and {@link Request#cacheScope()}.
*
* <p>
* It will set the Span's tags "messaging.operation", "messaging.channel_kind", "messaging.channel_name" from request
* {@link MessagingRequest#operation()} {@link MessagingRequest#channelKind()} {@link MessagingRequest#channelName()}
*
* <p>
* It just only obtain the key:value required by Trace from the {@link Request#header(String)},
* If you need and get Span, generate result use {@link Context#consumerSpan(MessagingRequest)}.
*
* @param request {@link MessagingRequest}
* @return {@link Span}
* @see Context#consumerSpan(MessagingRequest)
*/
Span consumerSpan(R request);


/**
* Create a Span for message producer. Examples: kafka producer, rebbitmq producer
* <p>
* It will set the Span's tags "messaging.operation", "messaging.channel_kind", "messaging.channel_name" from request
* {@link MessagingRequest#operation()} {@link MessagingRequest#channelKind()} {@link MessagingRequest#channelName()}
*
* <p>
* It just only pass multiple key:value values required by Trace through
* {@link Request#setHeader(String, String)}, And set the Span's kind, name and
* cached scope through {@link Request#kind()}, {@link Request#name()} and {@link Request#cacheScope()}.
* If you need and get Span, generate result use {@link Context#producerSpan(MessagingRequest)}.
*
* @param request {@link MessagingRequest}
* @return {@link Span}
* @see Context#producerSpan(MessagingRequest)
*/
Span producerSpan(R request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public interface Tracing {
* @return {@link Span}
* @see Extractor#extract(MessagingRequest)
*/
Span nextSpan(Message message);
Span nextSpan(Message<?> message);

/**
* get MessagingTracing for message tracing
Expand All @@ -85,5 +85,5 @@ public interface Tracing {
*
* @return {@link MessagingRequest}
*/
MessagingTracing messagingTracing();
MessagingTracing<MessagingRequest> messagingTracing();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;

public class NoOpTracer {
public static final ITracing NO_OP_TRACING = NoopTracing.INSTANCE;
Expand Down Expand Up @@ -233,7 +234,7 @@ public Span producerSpan(MessagingRequest request) {
}

@Override
public MessagingTracing messagingTracing() {
public MessagingTracing<MessagingRequest> messagingTracing() {
return EmptyMessagingTracing.INSTANCE;
}

Expand All @@ -255,7 +256,7 @@ public boolean hasCurrentSpan() {

public static class EmptyMessagingTracing implements MessagingTracing<MessagingRequest> {
private static final EmptyMessagingTracing INSTANCE = new EmptyMessagingTracing();
private static final Function<MessagingRequest, Boolean> NOOP_SAMPLER = r -> false;
private static final Predicate<MessagingRequest> NOOP_SAMPLER = r -> false;

@Override
public Extractor<MessagingRequest> extractor() {
Expand All @@ -273,12 +274,12 @@ public Injector<MessagingRequest> consumerInjector() {
}

@Override
public Function<MessagingRequest, Boolean> consumerSampler() {
public Predicate<MessagingRequest> consumerSampler() {
return NOOP_SAMPLER;
}

@Override
public Function<MessagingRequest, Boolean> producerSampler() {
public Predicate<MessagingRequest> producerSampler() {
return NOOP_SAMPLER;
}

Expand All @@ -291,6 +292,16 @@ public boolean consumerSampler(MessagingRequest request) {
public boolean producerSampler(MessagingRequest request) {
return false;
}

@Override
public Span consumerSpan(MessagingRequest request) {
return NoOpTracer.NO_OP_SPAN;
}

@Override
public Span producerSpan(MessagingRequest request) {
return NoOpTracer.NO_OP_SPAN;
}
}

public static class EmptyMessage implements Message {
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@
<version>${version.mockito}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.1.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@

public class MetricReporterImpl implements MetricReporter {
private final ConcurrentHashMap<String, Reporter> reporters;
private final Configs configs;
private final AppenderManager appenderManager;

public MetricReporterImpl(Configs configs) {
this.reporters = new ConcurrentHashMap<>();
OutputProperties outputProperties = Utils.extractOutputProperties(configs);
this.appenderManager = AppenderManager.create(outputProperties);
this.configs = configs;
configs.addChangeListener(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private DefaultKafkaAppenderManager(OutputProperties outputProperties) {
this.outputProperties = outputProperties;
ClassLoader initClassLoader = Thread.currentThread().getContextClassLoader();
LOGGER.info("bind classloader:{} to AppenderManager", initClassLoader);
this.provider = (topic) -> {
this.provider = topic -> {
ClassLoader old = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(initClassLoader);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import org.apache.logging.log4j.core.LoggerContext;

public class LoggerFactory {
private LoggerFactory() {}

// Independent logger context as an anchor for loggers in the metrics
private static LoggerContext loggerContext = new LoggerContext("ROOT");
private static final LoggerContext loggerContext = new LoggerContext("ROOT");

public static LoggerContext getLoggerContext() {
return loggerContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ private Appender getAppender() {
return getConsoleAppender();
case "mock":
return getMockAppender();
default:
return null;
}
return null;
}

private Appender getKafkaAppender(String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ static Builder builder() {

class DefaultRefreshableAppender implements RefreshableAppender {

private final static Logger LOGGER = com.megaease.easeagent.log4j2.LoggerFactory.getLogger(DefaultRefreshableAppender.class);
private static final Logger LOGGER = com.megaease.easeagent.log4j2.LoggerFactory.getLogger(DefaultRefreshableAppender.class);

private final String loggerName;
private final String appenderName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* @param <S> always zipkin2.reporter
*/
public class RefreshableReporter<S> implements Reporter<S> {
private final static Logger LOGGER = LoggerFactory.getLogger(RefreshableReporter.class);
private static final Logger LOGGER = LoggerFactory.getLogger(RefreshableReporter.class);
private final SDKAsyncReporter<S> asyncReporter;
private final TraceProps traceProperties;
private final OutputProperties agentOutputProperties;
Expand Down Expand Up @@ -67,6 +67,7 @@ public synchronized void refresh() {
asyncReporter.getSender().close();
asyncReporter.closeFlushThread();
} catch (Exception ignored) {
// ignored
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public TraceReport(Configs configs) {
}

private RefreshableReporter<Span> initSpanRefreshableReporter(Configs configs) {
final RefreshableReporter<Span> spanRefreshableReporter;
OutputProperties outputProperties = Utils.extractOutputProperties(configs);
Map<String, String> sslConfig = new HashMap<>();
if (SecurityProtocol.SSL.name.equals(outputProperties.getSecurityProtocol())) {
Expand All @@ -67,7 +66,7 @@ private RefreshableReporter<Span> initSpanRefreshableReporter(Configs configs) {
sslConfig.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, outputProperties.getEndpointAlgorithm());
}

Sender sender = new SimpleSender();
Sender sender;
TraceProps traceProperties = Utils.extractTraceProps(configs);
if (traceProperties.getOutput().isEnabled() && traceProperties.isEnabled()
&& StringUtils.isNotEmpty(outputProperties.getServers())) {
Expand All @@ -79,6 +78,8 @@ private RefreshableReporter<Span> initSpanRefreshableReporter(Configs configs) {
.encoding(Encoding.JSON)
.messageMaxBytes(traceProperties.getOutput().getMessageMaxBytes())
.build());
} else {
sender = new SimpleSender();
}

GlobalExtrasSupplier extrasSupplier = new GlobalExtrasSupplier() {
Expand All @@ -103,8 +104,7 @@ public String system() {
traceProperties,
extrasSupplier);
reporter.startFlushThread();
spanRefreshableReporter = new RefreshableReporter<>(reporter, traceProperties, outputProperties);
return spanRefreshableReporter;
return new RefreshableReporter<>(reporter, traceProperties, outputProperties);
}

public void report(Span span) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import zipkin2.Span;

public class SpanUtils {
private SpanUtils() {}

public static boolean isValidSpan(Object next) {
if (!(next instanceof Span)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.megaease.easeagent.report.util;

public class TextUtils {
private TextUtils() {}

public static boolean hasText(String content) {
return content != null && content.trim().length() > 0;
}
Expand Down
Loading

0 comments on commit e59e783

Please sign in to comment.