From 831841e35615912782b8d516a704bdfdf31d2d9f Mon Sep 17 00:00:00 2001 From: jzw Date: Thu, 27 Jan 2022 22:44:37 +0800 Subject: [PATCH] Add HttpSender report plugin supporting http basic authorization (#203) * add HttpSender report plugin supporting HTTP basic authorization --- build/src/main/resources/agent.properties | 186 ++++++----- .../easeagent/config/ConfigUtils.java | 30 +- .../megaease/easeagent/config/Configs.java | 10 +- .../easeagent/config/GlobalConfigs.java | 9 +- .../config/report/ReportConfigConst.java | 24 +- .../config/report/ReporterConfigAdapter.java | 30 +- .../easeagent/plugin/api/config/Config.java | 2 + .../plugin/async/AgentThreadFactory.java | 5 +- .../plugin/bridge/NoOpConfigFactory.java | 5 + .../easeagent/plugin/report/Call.java | 12 +- .../easeagent/plugin/report/Callback.java | 51 +-- .../easeagent/plugin/report/Sender.java | 5 +- .../plugin/tools/trace/HttpUtils.java | 2 + .../plugin/utils/common/StringUtils.java | 20 ++ .../plugin/httpservlet/HttpServletPlugin.java | 2 +- .../httpservlet/advice/DoFilterAdvice.java | 8 - .../interceptor/DoFilterTraceInterceptor.java | 20 +- report/pom.xml | 9 + .../easeagent/report/plugin/NoOpCall.java | 44 +++ .../report/sender/AgentKafkaSender.java | 14 +- .../report/sender/AgentLoggerSender.java | 12 +- .../easeagent/report/sender/NoOpSender.java | 8 +- .../report/sender/SenderConfigDecorator.java | 31 +- .../report/sender/SenderWithEncoder.java | 4 +- .../report/sender/ZipkinCallWrapper.java | 60 ++++ .../easeagent/report/sender/ZipkinSender.java | 4 +- .../sender/metric/MetricKafkaSender.java | 10 +- .../report/sender/okhttp/ByteRequestBody.java | 54 +++ .../report/sender/okhttp/HttpCall.java | 102 ++++++ .../report/sender/okhttp/HttpSender.java | 315 ++++++++++++++++++ 30 files changed, 853 insertions(+), 235 deletions(-) rename report/src/main/java/com/megaease/easeagent/report/plugin/NoOpCallback.java => plugin-api/src/main/java/com/megaease/easeagent/plugin/report/Call.java (73%) create mode 100644 report/src/main/java/com/megaease/easeagent/report/plugin/NoOpCall.java create mode 100644 report/src/main/java/com/megaease/easeagent/report/sender/ZipkinCallWrapper.java create mode 100644 report/src/main/java/com/megaease/easeagent/report/sender/okhttp/ByteRequestBody.java create mode 100644 report/src/main/java/com/megaease/easeagent/report/sender/okhttp/HttpCall.java create mode 100644 report/src/main/java/com/megaease/easeagent/report/sender/okhttp/HttpSender.java diff --git a/build/src/main/resources/agent.properties b/build/src/main/resources/agent.properties index 535fc6450..42050acd0 100644 --- a/build/src/main/resources/agent.properties +++ b/build/src/main/resources/agent.properties @@ -143,137 +143,143 @@ plugin.integrability.global.redirect.enabled=true plugin.integrability.global.forwarded.enabled=true plugin.hook.global.foundation.enabled=true + +# ---------------------------------------------- +# if the plugin configuration is consistent with the global namespace, +# do not add configuration items not commented out in this default configuration file. +# otherwise, they can not be overridden by Global configuration in user's configuration file. + # # -------------------- jvm --------------------- -plugin.observability.jvmGc.metric.enabled=true -plugin.observability.jvmGc.metric.interval=30 -plugin.observability.jvmGc.metric.topic=platform-meter -plugin.observability.jvmGc.metric.appendType=kafka -plugin.observability.jvmMemory.metric.enabled=true -plugin.observability.jvmMemory.metric.interval=30 -plugin.observability.jvmMemory.metric.topic=platform-meter -plugin.observability.jvmMemory.metric.appendType=kafka +# plugin.observability.jvmGc.metric.enabled=true +# plugin.observability.jvmGc.metric.interval=30 +# plugin.observability.jvmGc.metric.topic=platform-meter +# plugin.observability.jvmGc.metric.appendType=kafka +# plugin.observability.jvmMemory.metric.enabled=true +# plugin.observability.jvmMemory.metric.interval=30 +# plugin.observability.jvmMemory.metric.topic=platform-meter +# plugin.observability.jvmMemory.metric.appendType=kafka # # -------------------- async --------------------- -plugin.observability.async.tracing.enabled=true +# plugin.observability.async.tracing.enabled=true # # -------------------- elasticsearch redirect --------------------- -plugin.integrability.elasticsearch.redirect.enabled=true -plugin.observability.elasticsearch.tracing.enabled=true +# plugin.integrability.elasticsearch.redirect.enabled=true +# plugin.observability.elasticsearch.tracing.enabled=true # elasticsearch metric -plugin.observability.elasticsearch.metric.enabled=true -plugin.observability.elasticsearch.metric.interval=30 -plugin.observability.elasticsearch.metric.topic=application-meter -plugin.observability.elasticsearch.metric.appendType=kafka +# plugin.observability.elasticsearch.metric.enabled=true +# plugin.observability.elasticsearch.metric.interval=30 +# plugin.observability.elasticsearch.metric.topic=application-meter +# plugin.observability.elasticsearch.metric.appendType=kafka # # -------------------- httpServlet --------------------- -plugin.observability.httpServlet.tracing.enabled=true -plugin.observability.httpServlet.metric.enabled=true -plugin.observability.httpServlet.metric.interval=30 -plugin.observability.httpServlet.metric.topic=application-meter -plugin.observability.httpServlet.metric.appendType=kafka +# plugin.observability.httpServlet.tracing.enabled=true +# plugin.observability.httpServlet.metric.enabled=true +# plugin.observability.httpServlet.metric.interval=30 +# plugin.observability.httpServlet.metric.topic=application-meter +# plugin.observability.httpServlet.metric.appendType=kafka # # -------------------- jdbc --------------------- -# jdbc tracing -plugin.observability.jdbc.tracing.enabled=true +## jdbc tracing +# plugin.observability.jdbc.tracing.enabled=true # jdbcStatement metric -plugin.observability.jdbcStatement.metric.enabled=true -plugin.observability.jdbcStatement.metric.interval=30 -plugin.observability.jdbcStatement.metric.topic=application-meter -plugin.observability.jdbcStatement.metric.appendType=kafka -# jdbcConnection metric -plugin.observability.jdbcConnection.metric.enabled=true -plugin.observability.jdbcConnection.metric.interval=30 -plugin.observability.jdbcConnection.metric.topic=application-meter -plugin.observability.jdbcConnection.metric.appendType=kafka -# md5Dictionary metric -plugin.observability.md5Dictionary.metric.enabled=true -plugin.observability.md5Dictionary.metric.interval=30 -plugin.observability.md5Dictionary.metric.topic=application-meter -plugin.observability.md5Dictionary.metric.appendType=kafka -# jdbc redirect -plugin.integrability.jdbc.redirect.enabled=true +# plugin.observability.jdbcStatement.metric.enabled=true +# plugin.observability.jdbcStatement.metric.interval=30 +# plugin.observability.jdbcStatement.metric.topic=application-meter +# plugin.observability.jdbcStatement.metric.appendType=kafka +## jdbcConnection metric +# plugin.observability.jdbcConnection.metric.enabled=true +# plugin.observability.jdbcConnection.metric.interval=30 +# plugin.observability.jdbcConnection.metric.topic=application-meter +# plugin.observability.jdbcConnection.metric.appendType=kafka +## md5Dictionary metric +# plugin.observability.md5Dictionary.metric.enabled=true +# plugin.observability.md5Dictionary.metric.interval=30 +# plugin.observability.md5Dictionary.metric.topic=application-meter +# plugin.observability.md5Dictionary.metric.appendType=kafka +## jdbc redirect +# plugin.integrability.jdbc.redirect.enabled=true # # -------------------- kafka --------------------- # kafka tracing -plugin.observability.kafka.tracing.enabled=true +# plugin.observability.kafka.tracing.enabled=true # kafka metric -plugin.observability.kafka.metric.enabled=true -plugin.observability.kafka.metric.interval=30 -plugin.observability.kafka.metric.topic=application-meter -plugin.observability.kafka.metric.appendType=kafka +# plugin.observability.kafka.metric.enabled=true +# plugin.observability.kafka.metric.interval=30 +# plugin.observability.kafka.metric.topic=application-meter +# plugin.observability.kafka.metric.appendType=kafka # kafka redirect -plugin.integrability.kafka.redirect.enabled=true +# plugin.integrability.kafka.redirect.enabled=true # # -------------------- rabbitmq --------------------- # rabbitmq tracing -plugin.observability.rabbitmq.tracing.enabled=true +# plugin.observability.rabbitmq.tracing.enabled=true # rabbitmq metric -plugin.observability.rabbitmq.metric.enabled=true -plugin.observability.rabbitmq.metric.interval=30 -plugin.observability.rabbitmq.metric.topic=application-meter -plugin.observability.rabbitmq.metric.appendType=kafka +# plugin.observability.rabbitmq.metric.enabled=true +# plugin.observability.rabbitmq.metric.interval=30 +# plugin.observability.rabbitmq.metric.topic=application-meter +# plugin.observability.rabbitmq.metric.appendType=kafka # rabbitmq redirect -plugin.integrability.rabbitmq.redirect.enabled=true +# plugin.integrability.rabbitmq.redirect.enabled=true # # -------------------- redis --------------------- # redis tracing -plugin.observability.redis.tracing.enabled=true +# plugin.observability.redis.tracing.enabled=true # redis metric -plugin.observability.redis.metric.enabled=true -plugin.observability.redis.metric.interval=30 -plugin.observability.redis.metric.topic=application-meter -plugin.observability.redis.metric.appendType=kafka +# plugin.observability.redis.metric.enabled=true +# plugin.observability.redis.metric.interval=30 +# plugin.observability.redis.metric.topic=application-meter +# plugin.observability.redis.metric.appendType=kafka # redis redirect -plugin.integrability.redis.redirect.enabled=true +# plugin.integrability.redis.redirect.enabled=true # # -------------------- springGateway --------------------- # springGateway tracing -plugin.observability.springGateway.tracing.enabled=true +# plugin.observability.springGateway.tracing.enabled=true # springGateway metric -plugin.observability.springGateway.metric.enabled=true -plugin.observability.springGateway.metric.interval=30 -plugin.observability.springGateway.metric.topic=application-meter -plugin.observability.springGateway.metric.appendType=kafka +# plugin.observability.springGateway.metric.enabled=true +# plugin.observability.springGateway.metric.interval=30 +# plugin.observability.springGateway.metric.topic=application-meter +# plugin.observability.springGateway.metric.appendType=kafka # # -------------------- request --------------------- -# httpclient tracing:httpclient and httpclient5 -plugin.observability.httpclient.tracing.enabled=true -# okHttp tracing -plugin.observability.okHttp.tracing.enabled=true -# webclient tracing -plugin.observability.webclient.tracing.enabled=true -# feignClient tracing -plugin.observability.feignClient.tracing.enabled=true -# restTemplate tracing -plugin.observability.restTemplate.tracing.enabled=true +## httpclient tracing:httpclient and httpclient5 +# plugin.observability.httpclient.tracing.enabled=true +## okHttp tracing +# plugin.observability.okHttp.tracing.enabled=true +## webclient tracing +# plugin.observability.webclient.tracing.enabled=true +## feignClient tracing +# plugin.observability.feignClient.tracing.enabled=true +## restTemplate tracing +# plugin.observability.restTemplate.tracing.enabled=true # # -------------------- access --------------------- -# access: servlet and spring gateway -plugin.observability.access.metric.enabled=true -plugin.observability.access.metric.interval=30 -plugin.observability.access.metric.topic=application-log -plugin.observability.access.metric.appendType=kafka +## access: servlet and spring gateway +# plugin.observability.access.metric.enabled=true +# plugin.observability.access.metric.interval=30 +# plugin.observability.access.metric.topic=application-log +# plugin.observability.access.metric.appendType=kafka # # -------------------- service name --------------------- -# add service name to header by name for easemesh. default name: X-Mesh-RPC-Service -plugin.integrability.serviceName.addServiceNameHead.propagate.head=X-Mesh-RPC-Service +## add service name to header by name for easemesh. default name: X-Mesh-RPC-Service +# plugin.integrability.serviceName.addServiceNameHead.propagate.head=X-Mesh-RPC-Service # # -------------------- mongodb --------------------- -# mongodb tracing -plugin.observability.mongodb.tracing.enabled=true -# mongodb metric -plugin.observability.mongodb.metric.enabled=true -plugin.observability.mongodb.metric.interval=30 -plugin.observability.mongodb.metric.topic=application-meter -plugin.observability.mongodb.metric.appendType=kafka -# mongodb redirect -plugin.integrability.mongodb.redirect.enabled=true -# mongodb foundation -plugin.hook.mongodb.foundation.enabled=true +## mongodb tracing +# plugin.observability.mongodb.tracing.enabled=true +## mongodb metric +# plugin.observability.mongodb.metric.enabled=true +# plugin.observability.mongodb.metric.interval=30 +# plugin.observability.mongodb.metric.topic=application-meter +# plugin.observability.mongodb.metric.appendType=kafka +## mongodb redirect +# plugin.integrability.mongodb.redirect.enabled=true +## mongodb foundation +# plugin.hook.mongodb.foundation.enabled=true # -------------------------------- -#reporter.outputServer.bootstrapServer=127.0.0.1:9092 +# reporter.outputServer.bootstrapServer=127.0.0.1:9092 reporter.outputServer.timeout=1000 reporter.outputServer.enabled=true diff --git a/config/src/main/java/com/megaease/easeagent/config/ConfigUtils.java b/config/src/main/java/com/megaease/easeagent/config/ConfigUtils.java index fe661fa81..9f42405b5 100644 --- a/config/src/main/java/com/megaease/easeagent/config/ConfigUtils.java +++ b/config/src/main/java/com/megaease/easeagent/config/ConfigUtils.java @@ -27,15 +27,12 @@ import java.util.*; import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Predicate; import java.util.stream.Collectors; import static com.megaease.easeagent.plugin.api.config.ConfigConst.*; public class ConfigUtils { - public static String extractServiceName(Configs configs) { - return configs.getString(ConfigConst.SERVICE_NAME); - } + private ConfigUtils() {} public static void bindProp(String name, Config configs, BiFunction func, Consumer consumer, R def) { Runnable process = () -> { @@ -99,23 +96,15 @@ private static String join(String prefix, String current) { } public static boolean isGlobal(String namespace) { - return namespace != null && PLUGIN_GLOBAL.equals(namespace); + return PLUGIN_GLOBAL.equals(namespace); } public static boolean isPluginConfig(String key) { - boolean r = false; - if (key != null && key.startsWith(PLUGIN_PREFIX)) { - r = true; - } - return r; + return key != null && key.startsWith(PLUGIN_PREFIX); } public static boolean isPluginConfig(String key, String domain, String namespace, String id) { - boolean r = false; - if (key != null && key.startsWith(ConfigConst.join(PLUGIN, domain, namespace, id))) { - r = true; - } - return r; + return key != null && key.startsWith(ConfigConst.join(PLUGIN, domain, namespace, id)); } public static PluginProperty pluginProperty(String path) { @@ -148,28 +137,23 @@ public static String buildPluginProperty(String domain, String namespace, String /** * Convert config item with a fromPrefix to toPrefix for configuration Compatibility * - * @param config config + * @param cfg config source map * @param fromPrefix from * @param toPrefix to * @return Extracted and converted KV map */ - public static Map extractAndConvertPrefix( - Config config, String fromPrefix, String toPrefix) { - return extractAndConvertPrefix(config.getConfigs(), fromPrefix, toPrefix); - } - public static Map extractAndConvertPrefix(Map cfg, String fromPrefix, String toPrefix) { Map convert = new HashMap<>(); Set keys = new HashSet<>(); cfg.forEach((key, value) -> { if (key.startsWith(fromPrefix)) { + keys.add(key); key = toPrefix + key.substring(fromPrefix.length()); convert.put(key, value); - keys.add(key); } }); - keys.forEach(k -> cfg.remove(k)); + keys.forEach(cfg::remove); // override, new configuration KV override previous KV convert.putAll(extractByPrefix(cfg, toPrefix)); diff --git a/config/src/main/java/com/megaease/easeagent/config/Configs.java b/config/src/main/java/com/megaease/easeagent/config/Configs.java index 911a62c1b..b256bc414 100644 --- a/config/src/main/java/com/megaease/easeagent/config/Configs.java +++ b/config/src/main/java/com/megaease/easeagent/config/Configs.java @@ -35,7 +35,7 @@ public class Configs implements Config { protected Configs() {} public Configs(Map source) { - this.source = new HashMap<>(source); + this.source = new TreeMap<>(source); notifier = new ConfigNotifier(""); } @@ -94,6 +94,14 @@ public Integer getInt(String name) { } } + public Boolean getBooleanNullForUnset(String name) { + String value = this.source.get(name); + if (value == null) { + return null; + } + return value.equalsIgnoreCase("yes") || value.equalsIgnoreCase("true"); + } + public Boolean getBoolean(String name) { String value = this.source.get(name); if (value == null) { diff --git a/config/src/main/java/com/megaease/easeagent/config/GlobalConfigs.java b/config/src/main/java/com/megaease/easeagent/config/GlobalConfigs.java index fcf4aa031..10e153500 100644 --- a/config/src/main/java/com/megaease/easeagent/config/GlobalConfigs.java +++ b/config/src/main/java/com/megaease/easeagent/config/GlobalConfigs.java @@ -23,10 +23,7 @@ import com.megaease.easeagent.plugin.api.config.ConfigConst; import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; public class GlobalConfigs extends Configs implements ConfigManagerMXBean { private static final Logger LOGGER = LoggerFactory.getLogger(GlobalConfigs.class); @@ -37,9 +34,9 @@ public class GlobalConfigs extends Configs implements ConfigManagerMXBean { public GlobalConfigs(Map source) { super(); // reporter adapter - Map map = new HashMap<>(source); + Map map = new TreeMap<>(source); ReporterConfigAdapter.convertConfig(map); - this.source = new HashMap<>(source); + this.source = new TreeMap<>(map); this.notifier = new ConfigNotifier(""); } diff --git a/config/src/main/java/com/megaease/easeagent/config/report/ReportConfigConst.java b/config/src/main/java/com/megaease/easeagent/config/report/ReportConfigConst.java index 8db2de448..d2f1f9d0d 100644 --- a/config/src/main/java/com/megaease/easeagent/config/report/ReportConfigConst.java +++ b/config/src/main/java/com/megaease/easeagent/config/report/ReportConfigConst.java @@ -32,6 +32,9 @@ private ReportConfigConst() {} static final String DELIMITER = "."; public static final String TOPIC_KEY = "topic"; public static final String ENABLED_KEY = "enabled"; + public static final String SENDER_KEY = "sender"; + public static final String ENCODER_KEY = "encoder"; + public static final String NAME_KEY = "name"; /** * Reporter v2 configuration @@ -42,6 +45,7 @@ private ReportConfigConst() {} public static final String OUTPUT_SERVER_V2 = join(REPORT, "outputServer"); public static final String TRACE_V2 = join(REPORT, "tracing"); public static final String METRIC_V2 = join(REPORT, "metric"); + public static final String GENERAL = join(REPORT, "general"); // ------ lv3 ------ public static final String BOOTSTRAP_SERVERS = join(OUTPUT_SERVER_V2, "bootstrapServer"); public static final String OUTPUT_SERVERS_ENABLE = join(OUTPUT_SERVER_V2, ENABLED_KEY); @@ -50,16 +54,22 @@ private ReportConfigConst() {} public static final String OUTPUT_SECURITY_PROTOCOL_V2 = join(OUTPUT_SERVER_V2, "security.protocol"); public static final String OUTPUT_SERVERS_SSL = join(OUTPUT_SERVER_V2, "ssl"); - public static final String TRACE_SENDER = join(TRACE_V2, "sender"); - public static final String TRACE_ENCODER = join(TRACE_V2, "encoder"); + public static final String GENERAL_SENDER = join(GENERAL, SENDER_KEY); + public static final String GENERAL_ENCODER = join(GENERAL, ENCODER_KEY); + + public static final String TRACE_SENDER = join(TRACE_V2, SENDER_KEY); + public static final String TRACE_ENCODER = join(TRACE_V2, ENCODER_KEY); public static final String TRACE_ASYNC = join(TRACE_V2, "async"); - public static final String METRIC_SENDER = join(METRIC_V2, "sender"); - public static final String METRIC_ENCODER = join(METRIC_V2, "encoder"); + public static final String METRIC_SENDER = join(METRIC_V2, SENDER_KEY); + public static final String METRIC_ENCODER = join(METRIC_V2, ENCODER_KEY); public static final String METRIC_ASYNC = join(METRIC_V2, "async"); // -------- lv4 -------- - public static final String TRACE_SENDER_NAME = join(TRACE_SENDER, "name"); + public static final String GENERAL_SENDER_NAME = join(GENERAL_SENDER, NAME_KEY); + public static final String GENERAL_SENDER_ENABLED = join(GENERAL_SENDER, ENABLED_KEY); + + public static final String TRACE_SENDER_NAME = join(TRACE_SENDER, NAME_KEY); public static final String TRACE_SENDER_ENABLED_V2 = join(TRACE_SENDER, ENABLED_KEY); public static final String TRACE_SENDER_TOPIC_V2 = join(TRACE_SENDER, TOPIC_KEY); @@ -69,7 +79,7 @@ private ReportConfigConst() {} public static final String TRACE_ASYNC_QUEUED_MAX_SPANS_V2 = join(TRACE_ASYNC, "queuedMaxSpans"); public static final String TRACE_ASYNC_QUEUED_MAX_SIZE_V2 = join(TRACE_ASYNC, "queuedMaxSize"); - public static final String METRIC_SENDER_NAME = join(METRIC_SENDER, "name"); + public static final String METRIC_SENDER_NAME = join(METRIC_SENDER, NAME_KEY); public static final String METRIC_SENDER_ENABLED = join(METRIC_SENDER, ENABLED_KEY); public static final String METRIC_SENDER_TOPIC = join(METRIC_SENDER, TOPIC_KEY); public static final String METRIC_SENDER_APPENDER = join(METRIC_SENDER, "appenderName"); @@ -107,6 +117,8 @@ private ReportConfigConst() {} public static final String GLOBAL_METRIC = "plugin.observability.global.metric"; public static final String GLOBAL_METRIC_ENABLED = join(GLOBAL_METRIC, ENABLED_KEY); + public static final String GLOBAL_METRIC_TOPIC = join(GLOBAL_METRIC, TOPIC_KEY); + public static final String GLOBAL_METRIC_APPENDER = join(GLOBAL_METRIC, "appendType"); public static String join(String... texts) { return String.join(DELIMITER, texts); diff --git a/config/src/main/java/com/megaease/easeagent/config/report/ReporterConfigAdapter.java b/config/src/main/java/com/megaease/easeagent/config/report/ReporterConfigAdapter.java index 70af46864..b602ce5b1 100644 --- a/config/src/main/java/com/megaease/easeagent/config/report/ReporterConfigAdapter.java +++ b/config/src/main/java/com/megaease/easeagent/config/report/ReporterConfigAdapter.java @@ -20,14 +20,14 @@ import com.megaease.easeagent.plugin.api.config.Config; import com.megaease.easeagent.plugin.utils.NoNull; import com.megaease.easeagent.plugin.utils.common.StringUtils; - import lombok.extern.slf4j.Slf4j; -import java.util.HashMap; import java.util.Map; +import java.util.TreeMap; +import static com.megaease.easeagent.config.ConfigUtils.extractAndConvertPrefix; +import static com.megaease.easeagent.config.ConfigUtils.extractByPrefix; import static com.megaease.easeagent.config.report.ReportConfigConst.*; -import static com.megaease.easeagent.config.ConfigUtils.*; @Slf4j public class ReporterConfigAdapter { @@ -53,7 +53,7 @@ public static Map extractReporterConfig(Config configs) { public static Map extractAndConvertReporterConfig(Map config) { // outputServer config Map extract = extractAndConvertPrefix(config, OUTPUT_SERVER_V1, OUTPUT_SERVER_V2); - Map cfg = new HashMap<>(extract); + Map cfg = new TreeMap<>(extract); // async config extract = extractAndConvertPrefix(config, TRACE_OUTPUT_V1, TRACE_ASYNC); @@ -87,35 +87,41 @@ public static Map extractAndConvertReporterConfig(Map config, Map cfg) { Map globalMetric = extractByPrefix(config, GLOBAL_METRIC); Map extract = extractAndConvertPrefix(globalMetric, GLOBAL_METRIC, METRIC_ASYNC); - - remove(join(METRIC_ASYNC, ENABLED_KEY), extract, config); + extract.putAll(extractByPrefix(config, METRIC_SENDER)); String appendType = remove(join(METRIC_ASYNC, "appendType"), extract, config); if ("kafka".equals(appendType)) { appendType = METRIC_KAFKA_SENDER_NAME; } - if (!StringUtils.isEmpty(appendType)) { - cfg.put(METRIC_SENDER_NAME, NoNull.of(appendType, CONSOLE_SENDER_NAME)); + // overridden by v2 configuration + cfg.put(METRIC_SENDER_NAME, NoNull.of(cfg.get(METRIC_SENDER_NAME), appendType)); + globalMetric.put(join(METRIC_ASYNC, "appendType"), NoNull.of(cfg.get(METRIC_SENDER_NAME), appendType)); } + String topic = remove(join(METRIC_ASYNC, TOPIC_KEY), extract, config); if (!StringUtils.isEmpty(topic)) { - cfg.put(METRIC_SENDER_TOPIC, topic); + // overridden by v2 configuration + cfg.put(METRIC_SENDER_TOPIC, NoNull.of(cfg.get(METRIC_SENDER_TOPIC), topic)); + globalMetric.put(join(METRIC_ASYNC, "topic"), NoNull.of(cfg.get(METRIC_SENDER_TOPIC), topic)); } cfg.putAll(extract); + // make plugin-global-metric has the same configuration with v2 global metric report configuration + config.putAll(extractAndConvertPrefix(globalMetric, METRIC_ASYNC, GLOBAL_METRIC)); } private static String remove(String key, Map extract, Map config) { diff --git a/plugin-api/src/main/java/com/megaease/easeagent/plugin/api/config/Config.java b/plugin-api/src/main/java/com/megaease/easeagent/plugin/api/config/Config.java index 4fca11151..83219c9d4 100644 --- a/plugin-api/src/main/java/com/megaease/easeagent/plugin/api/config/Config.java +++ b/plugin-api/src/main/java/com/megaease/easeagent/plugin/api/config/Config.java @@ -31,6 +31,8 @@ public interface Config { Boolean getBoolean(String name); + Boolean getBooleanNullForUnset(String name); + Double getDouble(String name); Long getLong(String name); diff --git a/plugin-api/src/main/java/com/megaease/easeagent/plugin/async/AgentThreadFactory.java b/plugin-api/src/main/java/com/megaease/easeagent/plugin/async/AgentThreadFactory.java index 0cf2bd698..26718f5c3 100644 --- a/plugin-api/src/main/java/com/megaease/easeagent/plugin/async/AgentThreadFactory.java +++ b/plugin-api/src/main/java/com/megaease/easeagent/plugin/async/AgentThreadFactory.java @@ -19,11 +19,14 @@ import javax.annotation.Nullable; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; public class AgentThreadFactory implements ThreadFactory { + protected static AtomicInteger createCount = new AtomicInteger(1); // Used internally to compute Thread names that comply with the Java specification + @Override public Thread newThread(@Nullable Runnable r) { - Thread thread = new Thread(r); + Thread thread = new Thread(r, "EaseAgent-" + createCount.getAndIncrement()); thread.setDaemon(true); return thread; } diff --git a/plugin-api/src/main/java/com/megaease/easeagent/plugin/bridge/NoOpConfigFactory.java b/plugin-api/src/main/java/com/megaease/easeagent/plugin/bridge/NoOpConfigFactory.java index 76f1eb6fe..33bc5461c 100644 --- a/plugin-api/src/main/java/com/megaease/easeagent/plugin/bridge/NoOpConfigFactory.java +++ b/plugin-api/src/main/java/com/megaease/easeagent/plugin/bridge/NoOpConfigFactory.java @@ -64,6 +64,11 @@ public Boolean getBoolean(String name) { return false; } + @Override + public Boolean getBooleanNullForUnset(String name) { + return null; + } + @Override public Double getDouble(String name) { return null; diff --git a/report/src/main/java/com/megaease/easeagent/report/plugin/NoOpCallback.java b/plugin-api/src/main/java/com/megaease/easeagent/plugin/report/Call.java similarity index 73% rename from report/src/main/java/com/megaease/easeagent/report/plugin/NoOpCallback.java rename to plugin-api/src/main/java/com/megaease/easeagent/plugin/report/Call.java index daa5c26e6..c12d10329 100644 --- a/report/src/main/java/com/megaease/easeagent/report/plugin/NoOpCallback.java +++ b/plugin-api/src/main/java/com/megaease/easeagent/plugin/report/Call.java @@ -15,15 +15,15 @@ * limitations under the License. * */ -package com.megaease.easeagent.report.plugin; -import com.megaease.easeagent.plugin.report.Callback; +package com.megaease.easeagent.plugin.report; import java.io.IOException; -public class NoOpCallback implements Callback { - @Override - public V execute() throws IOException { - return null; +public interface Call { + V execute() throws IOException; + + default void enqueue(Callback cb) { + return; } } diff --git a/plugin-api/src/main/java/com/megaease/easeagent/plugin/report/Callback.java b/plugin-api/src/main/java/com/megaease/easeagent/plugin/report/Callback.java index a9eef8927..a7bb1b628 100644 --- a/plugin-api/src/main/java/com/megaease/easeagent/plugin/report/Callback.java +++ b/plugin-api/src/main/java/com/megaease/easeagent/plugin/report/Callback.java @@ -1,29 +1,40 @@ +package com.megaease.easeagent.plugin.report; + /* - * Copyright (c) 2021, MegaEase - * All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Copyright 2015-2019 The OpenZipkin Authors * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. */ -package com.megaease.easeagent.plugin.report; - -import java.io.IOException; -@SuppressWarnings("unused") +/** + * A callback of a single result or error. + * + *

This is a bridge to async libraries such as CompletableFuture complete, completeExceptionally. + * + *

Implementations will call either {@link #onSuccess} or {@link #onError}, but not both. + */ public interface Callback { - V execute() throws IOException; - default V enqueue() { - return null; - } + /** + * Invoked when computation produces its potentially null value successfully. + * + *

When this is called, {@link #onError} won't be. + */ + void onSuccess(V value); + + /** + * Invoked when computation produces a possibly null value successfully. + * + *

When this is called, {@link #onSuccess} won't be. + */ + void onError(Throwable t); } + diff --git a/plugin-api/src/main/java/com/megaease/easeagent/plugin/report/Sender.java b/plugin-api/src/main/java/com/megaease/easeagent/plugin/report/Sender.java index 3961bbb43..7295a56ac 100644 --- a/plugin-api/src/main/java/com/megaease/easeagent/plugin/report/Sender.java +++ b/plugin-api/src/main/java/com/megaease/easeagent/plugin/report/Sender.java @@ -18,11 +18,8 @@ package com.megaease.easeagent.plugin.report; import com.megaease.easeagent.plugin.api.config.Config; -import com.megaease.easeagent.plugin.api.config.ConfigChangeListener; import java.io.Closeable; -import java.util.Collections; -import java.util.List; import java.util.Map; /** @@ -46,7 +43,7 @@ public interface Sender extends Closeable { * @param encodedData encoded data, such as encoded spans. * @throws IllegalStateException if {@link #close() close} was called. */ - Callback send(byte[] encodedData); + Call send(byte[] encodedData); /** * If sender is available( not closed), return true, otherwise false. diff --git a/plugin-api/src/main/java/com/megaease/easeagent/plugin/tools/trace/HttpUtils.java b/plugin-api/src/main/java/com/megaease/easeagent/plugin/tools/trace/HttpUtils.java index 88b824c04..75e090b0d 100644 --- a/plugin-api/src/main/java/com/megaease/easeagent/plugin/tools/trace/HttpUtils.java +++ b/plugin-api/src/main/java/com/megaease/easeagent/plugin/tools/trace/HttpUtils.java @@ -22,6 +22,8 @@ import static com.megaease.easeagent.plugin.tools.trace.TraceConst.HTTP_HEADER_X_FORWARDED_FOR; public class HttpUtils { + private HttpUtils() {} + public static void handleReceive(Span span, HttpRequest httpRequest) { span.name(httpRequest.name()); span.tag(TraceConst.HTTP_TAG_ROUTE, httpRequest.route()); diff --git a/plugin-api/src/main/java/com/megaease/easeagent/plugin/utils/common/StringUtils.java b/plugin-api/src/main/java/com/megaease/easeagent/plugin/utils/common/StringUtils.java index 40f09fac1..c768b49ac 100644 --- a/plugin-api/src/main/java/com/megaease/easeagent/plugin/utils/common/StringUtils.java +++ b/plugin-api/src/main/java/com/megaease/easeagent/plugin/utils/common/StringUtils.java @@ -25,6 +25,26 @@ public class StringUtils { private StringUtils() { } + /** + *

If the first one is empty, return the alternative value

+ * @param first first + * @param alternate alternative value + * @return String + */ + public static String noEmptyOf(String first, String alternate) { + if (isEmpty(first)) { + return alternate; + } + return first; + } + + public static String noEmptyOf(String first, String alternate, String defaultValue) { + if (isEmpty(first)) { + return noEmptyOf(alternate, defaultValue); + } + return first; + } + /** *

Checks if a CharSequence is empty ("") or null.

* diff --git a/plugins/httpservlet/src/main/java/com/megaease/easeagent/plugin/httpservlet/HttpServletPlugin.java b/plugins/httpservlet/src/main/java/com/megaease/easeagent/plugin/httpservlet/HttpServletPlugin.java index f6ef99346..6d31f987f 100644 --- a/plugins/httpservlet/src/main/java/com/megaease/easeagent/plugin/httpservlet/HttpServletPlugin.java +++ b/plugins/httpservlet/src/main/java/com/megaease/easeagent/plugin/httpservlet/HttpServletPlugin.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, MegaEase + * Copyright (c) 2021, MegaEase * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/plugins/httpservlet/src/main/java/com/megaease/easeagent/plugin/httpservlet/advice/DoFilterAdvice.java b/plugins/httpservlet/src/main/java/com/megaease/easeagent/plugin/httpservlet/advice/DoFilterAdvice.java index 313519b1c..ae7287342 100644 --- a/plugins/httpservlet/src/main/java/com/megaease/easeagent/plugin/httpservlet/advice/DoFilterAdvice.java +++ b/plugins/httpservlet/src/main/java/com/megaease/easeagent/plugin/httpservlet/advice/DoFilterAdvice.java @@ -31,14 +31,6 @@ public class DoFilterAdvice implements Points { static final String SERVLET_REQUEST = "javax.servlet.ServletRequest"; static final String SERVLET_RESPONSE = "javax.servlet.ServletResponse"; - // return def.type( - // hasSuperType(namedOneOf(FILTER_NAME, HTTP_SERVLET_NAME))) - // .transform(doFilterOrService( - // namedOneOf("doFilter", "service") - // .and(takesArgument(0, named(SERVLET_REQUEST))) - // .and(takesArgument(1, named(SERVLET_RESPONSE))) - // ) - // ).end(); @Override public IClassMatcher getClassMatcher() { return ClassMatcher.builder() diff --git a/plugins/httpservlet/src/main/java/com/megaease/easeagent/plugin/httpservlet/interceptor/DoFilterTraceInterceptor.java b/plugins/httpservlet/src/main/java/com/megaease/easeagent/plugin/httpservlet/interceptor/DoFilterTraceInterceptor.java index b2cccdf7f..a4afbcbcf 100644 --- a/plugins/httpservlet/src/main/java/com/megaease/easeagent/plugin/httpservlet/interceptor/DoFilterTraceInterceptor.java +++ b/plugins/httpservlet/src/main/java/com/megaease/easeagent/plugin/httpservlet/interceptor/DoFilterTraceInterceptor.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2017, MegaEase * All rights reserved. @@ -40,9 +39,10 @@ import javax.servlet.http.HttpServletResponse; import java.util.concurrent.atomic.AtomicBoolean; -@AdviceTo(value = DoFilterAdvice.class, qualifier = "default", plugin = HttpServletPlugin.class) +@AdviceTo(value = DoFilterAdvice.class, plugin = HttpServletPlugin.class) public class DoFilterTraceInterceptor implements NonReentrantInterceptor { private static final String AFTER_MARK = DoFilterTraceInterceptor.class.getName() + "$AfterMark"; + private static final String ERROR_KEY = "error"; @Override public void doBefore(MethodInfo methodInfo, Context context) { @@ -73,11 +73,9 @@ public void doAfter(MethodInfo methodInfo, Context context) { } else if (methodInfo.getThrowable() != null) { span.error(methodInfo.getThrowable()); span.finish(); - return; } else { httpServletRequest.getAsyncContext().addListener(new TracingAsyncListener(requestContext), httpServletRequest, httpServletResponse); } - return; } finally { requestContext.scope().close(); } @@ -128,7 +126,7 @@ public Throwable maybeError() { if (caught != null) { return caught; } - Object maybeError = httpServletRequest.getAttribute("error"); + Object maybeError = httpServletRequest.getAttribute(ERROR_KEY); if (maybeError instanceof Throwable) { return (Throwable) maybeError; } else { @@ -163,19 +161,14 @@ public void onComplete(AsyncEvent e) { } public void onTimeout(AsyncEvent e) { - ServletRequest request = e.getSuppliedRequest(); - if (request.getAttribute("error") == null) { - request.setAttribute("error", e.getThrowable()); - } - + onError(e); } public void onError(AsyncEvent e) { ServletRequest request = e.getSuppliedRequest(); - if (request.getAttribute("error") == null) { - request.setAttribute("error", e.getThrowable()); + if (request.getAttribute(ERROR_KEY) == null) { + request.setAttribute(ERROR_KEY, e.getThrowable()); } - } public void onStartAsync(AsyncEvent e) { @@ -183,7 +176,6 @@ public void onStartAsync(AsyncEvent e) { if (eventAsyncContext != null) { eventAsyncContext.addListener(this, e.getSuppliedRequest(), e.getSuppliedResponse()); } - } public String toString() { diff --git a/report/pom.xml b/report/pom.xml index 7cf12328a..185d77fb2 100644 --- a/report/pom.xml +++ b/report/pom.xml @@ -104,6 +104,11 @@ auto-service provided + + + com.squareup.okhttp3 + okhttp + @@ -119,6 +124,10 @@ org.apache.kafka com.megaease.easeagent.org.apache.kafka + + okhttp3 + easeagent.okhttp3 + diff --git a/report/src/main/java/com/megaease/easeagent/report/plugin/NoOpCall.java b/report/src/main/java/com/megaease/easeagent/report/plugin/NoOpCall.java new file mode 100644 index 000000000..1d3ae364e --- /dev/null +++ b/report/src/main/java/com/megaease/easeagent/report/plugin/NoOpCall.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2021, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.megaease.easeagent.report.plugin; + +import com.megaease.easeagent.plugin.report.Call; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class NoOpCall implements Call { + private static final Map, NoOpCall> INSTANCE_MAP = new ConcurrentHashMap<>(); + + @SuppressWarnings("unchecked") + public static NoOpCall getInstance(Class clazz) { + NoOpCall b = INSTANCE_MAP.get(clazz); + if (b != null) { + return (NoOpCall)b; + } + b = new NoOpCall<>(); + INSTANCE_MAP.put(clazz, b); + return b; + } + + @Override + public V execute() throws IOException { + return null; + } +} diff --git a/report/src/main/java/com/megaease/easeagent/report/sender/AgentKafkaSender.java b/report/src/main/java/com/megaease/easeagent/report/sender/AgentKafkaSender.java index b8548112d..98a4b7164 100644 --- a/report/src/main/java/com/megaease/easeagent/report/sender/AgentKafkaSender.java +++ b/report/src/main/java/com/megaease/easeagent/report/sender/AgentKafkaSender.java @@ -19,13 +19,11 @@ import com.google.auto.service.AutoService; import com.megaease.easeagent.config.ConfigUtils; -import com.megaease.easeagent.config.report.ReportConfigConst; import com.megaease.easeagent.plugin.api.config.Config; -import com.megaease.easeagent.plugin.report.Callback; +import com.megaease.easeagent.plugin.report.Call; import com.megaease.easeagent.plugin.report.Sender; import com.megaease.easeagent.plugin.utils.common.StringUtils; -import com.megaease.easeagent.report.plugin.NoOpCallback; -import zipkin2.Call; +import com.megaease.easeagent.report.plugin.NoOpCall; import zipkin2.codec.Encoding; import zipkin2.reporter.kafka11.KafkaSender; import zipkin2.reporter.kafka11.SDKKafkaSender; @@ -74,12 +72,12 @@ public void init(Config config) { } @Override - public Callback send(byte[] encodedData) { + public Call send(byte[] encodedData) { if (!enabled) { - return new NoOpCallback<>(); + return new NoOpCall<>(); } - Call call = this.sender.sendSpans(encodedData); - return call::execute; + zipkin2.Call call = this.sender.sendSpans(encodedData); + return new ZipkinCallWrapper<>(call); } @Override diff --git a/report/src/main/java/com/megaease/easeagent/report/sender/AgentLoggerSender.java b/report/src/main/java/com/megaease/easeagent/report/sender/AgentLoggerSender.java index 26d04c5bd..e035cfce0 100644 --- a/report/src/main/java/com/megaease/easeagent/report/sender/AgentLoggerSender.java +++ b/report/src/main/java/com/megaease/easeagent/report/sender/AgentLoggerSender.java @@ -22,6 +22,7 @@ import com.megaease.easeagent.log4j2.Logger; import com.megaease.easeagent.log4j2.LoggerFactory; import com.megaease.easeagent.plugin.api.config.Config; +import com.megaease.easeagent.plugin.report.Call; import com.megaease.easeagent.plugin.report.Callback; import com.megaease.easeagent.plugin.report.Sender; @@ -48,8 +49,8 @@ public void init(Config config) { } @Override - public Callback send(byte[] encodedData) { - return new ConsoleCallback(encodedData); + public Call send(byte[] encodedData) { + return new ConsoleCall(encodedData); } @Override @@ -67,10 +68,10 @@ public void close() throws IOException { // ignored } - static class ConsoleCallback implements Callback { + static class ConsoleCall implements Call { private final byte[] msg; - ConsoleCallback(byte[] msg) { + ConsoleCall(byte[] msg) { this.msg = msg; } @@ -81,9 +82,8 @@ public Void execute() throws IOException { } @Override - public Void enqueue() { + public void enqueue(Callback cb) { LOGGER.debug("{}", new String(msg)); - return null; } } } diff --git a/report/src/main/java/com/megaease/easeagent/report/sender/NoOpSender.java b/report/src/main/java/com/megaease/easeagent/report/sender/NoOpSender.java index 5e22e21bf..64e62a111 100644 --- a/report/src/main/java/com/megaease/easeagent/report/sender/NoOpSender.java +++ b/report/src/main/java/com/megaease/easeagent/report/sender/NoOpSender.java @@ -18,9 +18,9 @@ package com.megaease.easeagent.report.sender; import com.megaease.easeagent.plugin.api.config.Config; -import com.megaease.easeagent.plugin.report.Callback; +import com.megaease.easeagent.plugin.report.Call; import com.megaease.easeagent.plugin.report.Sender; -import com.megaease.easeagent.report.plugin.NoOpCallback; +import com.megaease.easeagent.report.plugin.NoOpCall; import java.io.IOException; import java.util.Map; @@ -39,8 +39,8 @@ public void init(Config config) { } @Override - public Callback send(byte[] encodedData) { - return new NoOpCallback<>(); + public Call send(byte[] encodedData) { + return new NoOpCall<>(); } @Override diff --git a/report/src/main/java/com/megaease/easeagent/report/sender/SenderConfigDecorator.java b/report/src/main/java/com/megaease/easeagent/report/sender/SenderConfigDecorator.java index 274958db6..c1ebb8ba7 100644 --- a/report/src/main/java/com/megaease/easeagent/report/sender/SenderConfigDecorator.java +++ b/report/src/main/java/com/megaease/easeagent/report/sender/SenderConfigDecorator.java @@ -21,7 +21,7 @@ import com.megaease.easeagent.plugin.api.config.ChangeItem; import com.megaease.easeagent.plugin.api.config.Config; import com.megaease.easeagent.plugin.api.config.ConfigChangeListener; -import com.megaease.easeagent.plugin.report.Callback; +import com.megaease.easeagent.plugin.report.Call; import com.megaease.easeagent.plugin.report.Encoder; import com.megaease.easeagent.plugin.report.Sender; import com.megaease.easeagent.report.plugin.ReporterRegistry; @@ -32,11 +32,13 @@ import java.util.List; import java.util.Map; +import static com.megaease.easeagent.config.ConfigUtils.extractAndConvertPrefix; import static com.megaease.easeagent.config.ConfigUtils.extractByPrefix; import static com.megaease.easeagent.config.report.ReportConfigConst.*; @Slf4j public class SenderConfigDecorator implements SenderWithEncoder, ConfigChangeListener { + protected Sender sender; String prefix; Config config; @@ -59,17 +61,17 @@ public String name() { @Override public void init(Config config) { this.packer = ReporterRegistry.getEncoder(config.getString(this.encoderKey)); - this.packer.init(config); - this.sender.init(config); + this.packer.init(this.config); + this.sender.init(this.config); } @Override - public Callback send(byte[] encodedData) { + public Call send(byte[] encodedData) { return this.sender.send(encodedData); } @Override - public Callback send(List encodedData) { + public Call send(List encodedData) { byte[] data = this.packer.encodeList(encodedData); return sender.send(data); } @@ -81,7 +83,7 @@ public boolean isAvailable() { @Override public void updateConfigs(Map changes) { - String name = changes.get(join(prefix, "name")); + String name = changes.get(join(GENERAL_SENDER, "name")); if (name == null || name.equals(name())) { this.sender.updateConfigs(changes); } else { @@ -127,18 +129,15 @@ private static String getEncoderKey(String cfgPrefix) { } } - public static Map extractSenderConfig(String cfgPrefix, Config config) { - // outputServer config - Map extract = extractByPrefix(config, OUTPUT_SERVER_V2); + private static Map extractSenderConfig(String cfgPrefix, Config config) { + Map extract = extractByPrefix(config, cfgPrefix); Map cfg = new HashMap<>(extract); - // encoder config - String encoderKey = getEncoderKey(cfgPrefix); - cfg.put(encoderKey, config.getString(encoderKey)); + // convert to general cfg, then sender don't need to care about any special business. + cfg = extractAndConvertPrefix(cfg, cfgPrefix, GENERAL_SENDER); - // sender config - extract = extractByPrefix(config, cfgPrefix); - cfg.putAll(extract); + // outputServer config + cfg.putAll(extractByPrefix(config, OUTPUT_SERVER_V2)); return cfg; } @@ -153,7 +152,7 @@ private Map filterChanges(List list) { || name.startsWith(OUTPUT_SERVER_V2); }).forEach(one -> cfg.put(one.getFullName(), one.getNewValue())); - return cfg; + return extractAndConvertPrefix(cfg, prefix, GENERAL); } @Override diff --git a/report/src/main/java/com/megaease/easeagent/report/sender/SenderWithEncoder.java b/report/src/main/java/com/megaease/easeagent/report/sender/SenderWithEncoder.java index feddb4f09..c2674ca93 100644 --- a/report/src/main/java/com/megaease/easeagent/report/sender/SenderWithEncoder.java +++ b/report/src/main/java/com/megaease/easeagent/report/sender/SenderWithEncoder.java @@ -17,7 +17,7 @@ */ package com.megaease.easeagent.report.sender; -import com.megaease.easeagent.plugin.report.Callback; +import com.megaease.easeagent.plugin.report.Call; import com.megaease.easeagent.plugin.report.Encoder; import com.megaease.easeagent.plugin.report.Sender; @@ -32,5 +32,5 @@ public interface SenderWithEncoder extends Sender { * @param encodedData list of encoded data, such as encoded spans. * @throws IllegalStateException if {@link #close() close} was called. */ - Callback send(List encodedData); + Call send(List encodedData); } diff --git a/report/src/main/java/com/megaease/easeagent/report/sender/ZipkinCallWrapper.java b/report/src/main/java/com/megaease/easeagent/report/sender/ZipkinCallWrapper.java new file mode 100644 index 000000000..a90117e77 --- /dev/null +++ b/report/src/main/java/com/megaease/easeagent/report/sender/ZipkinCallWrapper.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2021, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.megaease.easeagent.report.sender; + +import com.megaease.easeagent.plugin.report.Call; +import com.megaease.easeagent.plugin.report.Callback; + +import java.io.IOException; + +public class ZipkinCallWrapper implements Call { + private final zipkin2.Call call; + + public ZipkinCallWrapper(zipkin2.Call call) { + this.call = call; + } + + @Override + public V execute() throws IOException { + return call.execute(); + } + + @Override + public void enqueue(Callback cb) { + zipkin2.Callback zCb = new ZipkinCallbackWrapper<>(cb); + this.call.enqueue(zCb); + } + + static class ZipkinCallbackWrapper implements zipkin2.Callback { + final Callback delegate; + + ZipkinCallbackWrapper(Callback cb) { + this.delegate = cb; + } + + @Override + public void onSuccess(V value) { + this.delegate.onSuccess(value); + } + + @Override + public void onError(Throwable t) { + this.delegate.onError(t); + } + } +} diff --git a/report/src/main/java/com/megaease/easeagent/report/sender/ZipkinSender.java b/report/src/main/java/com/megaease/easeagent/report/sender/ZipkinSender.java index a941c2de6..71f78fb7f 100644 --- a/report/src/main/java/com/megaease/easeagent/report/sender/ZipkinSender.java +++ b/report/src/main/java/com/megaease/easeagent/report/sender/ZipkinSender.java @@ -19,7 +19,7 @@ import com.megaease.easeagent.config.report.ReportConfigConst; import com.megaease.easeagent.plugin.api.config.Config; -import com.megaease.easeagent.plugin.report.Callback; +import com.megaease.easeagent.plugin.report.Call; import com.megaease.easeagent.plugin.report.Sender; import java.io.IOException; @@ -39,7 +39,7 @@ public void init(Config config) { } @Override - public Callback send(byte[] encodedData) { + public Call send(byte[] encodedData) { return null; } diff --git a/report/src/main/java/com/megaease/easeagent/report/sender/metric/MetricKafkaSender.java b/report/src/main/java/com/megaease/easeagent/report/sender/metric/MetricKafkaSender.java index cd00defc6..f6c67c04c 100644 --- a/report/src/main/java/com/megaease/easeagent/report/sender/metric/MetricKafkaSender.java +++ b/report/src/main/java/com/megaease/easeagent/report/sender/metric/MetricKafkaSender.java @@ -20,11 +20,11 @@ import com.google.auto.service.AutoService; import com.megaease.easeagent.config.report.ReportConfigConst; import com.megaease.easeagent.plugin.api.config.Config; -import com.megaease.easeagent.plugin.report.Callback; +import com.megaease.easeagent.plugin.report.Call; import com.megaease.easeagent.plugin.report.Sender; import com.megaease.easeagent.report.OutputProperties; import com.megaease.easeagent.report.metric.MetricProps; -import com.megaease.easeagent.report.plugin.NoOpCallback; +import com.megaease.easeagent.report.plugin.NoOpCall; import com.megaease.easeagent.report.sender.metric.log4j.AppenderManager; import com.megaease.easeagent.report.sender.metric.log4j.LoggerFactory; import com.megaease.easeagent.report.sender.metric.log4j.RefreshableAppender; @@ -36,7 +36,7 @@ @AutoService(Sender.class) public class MetricKafkaSender implements Sender { - public static final String SENDER_NAME = ReportConfigConst.METRIC_SENDER_NAME; + public static final String SENDER_NAME = ReportConfigConst.METRIC_KAFKA_SENDER_NAME; private static AppenderManager appenderManager; private OutputProperties outputProperties; @@ -56,11 +56,11 @@ public void init(Config config) { } @Override - public Callback send(byte[] encodedData) { + public Call send(byte[] encodedData) { lazyInitLogger(); String msg = new String(encodedData); logger.info(msg); - return new NoOpCallback<>(); + return new NoOpCall<>(); } @Override diff --git a/report/src/main/java/com/megaease/easeagent/report/sender/okhttp/ByteRequestBody.java b/report/src/main/java/com/megaease/easeagent/report/sender/okhttp/ByteRequestBody.java new file mode 100644 index 000000000..345c72fee --- /dev/null +++ b/report/src/main/java/com/megaease/easeagent/report/sender/okhttp/ByteRequestBody.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2021, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.megaease.easeagent.report.sender.okhttp; + +import okhttp3.MediaType; +import okhttp3.RequestBody; +import okio.BufferedSink; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; + +public class ByteRequestBody extends RequestBody { + // for improved: this should given by encoder + static final MediaType CONTENT_TYPE = MediaType.parse("application/json"); + + private final byte[] data; + private final int contentLength; + + public ByteRequestBody(byte[] data) { + this.data = data; + this.contentLength = data.length; + } + + @Nullable + @Override + public MediaType contentType() { + return CONTENT_TYPE; + } + + @Override + public void writeTo(@NotNull BufferedSink sink) throws IOException { + sink.write(data); + } + + @Override public long contentLength() { + return contentLength; + } +} diff --git a/report/src/main/java/com/megaease/easeagent/report/sender/okhttp/HttpCall.java b/report/src/main/java/com/megaease/easeagent/report/sender/okhttp/HttpCall.java new file mode 100644 index 000000000..c5033ca16 --- /dev/null +++ b/report/src/main/java/com/megaease/easeagent/report/sender/okhttp/HttpCall.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2021, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.megaease.easeagent.report.sender.okhttp; + +import com.megaease.easeagent.plugin.report.Call; +import com.megaease.easeagent.plugin.report.Callback; +import okhttp3.Response; +import okhttp3.ResponseBody; +import okio.BufferedSource; +import okio.GzipSource; +import okio.Okio; + +import javax.annotation.Nonnull; +import java.io.IOException; + +// from zipkin-reporter-java +final class HttpCall implements Call { + + final okhttp3.Call call; + + HttpCall(okhttp3.Call call) { + this.call = call; + } + + @Override + public Void execute() throws IOException { + parseResponse(call.execute()); + return null; + } + + @Override + public void enqueue(Callback delegate) { + call.enqueue(new V2CallbackAdapter<>(delegate)); + } + + static void parseResponse(Response response) throws IOException { + ResponseBody responseBody = response.body(); + if (responseBody == null) { + if (response.isSuccessful()) { + return; + } else { + throw new IOException("response failed: " + response); + } + } + BufferedSource content = null; + try { + if ("gzip".equalsIgnoreCase(response.header("Content-Encoding"))) { + content = Okio.buffer(new GzipSource(responseBody.source())); + } else { + content = responseBody.source(); + } + if (!response.isSuccessful()) { + throw new IOException( + "response for " + response.request().tag() + " failed: " + content.readUtf8()); + } + } finally { + if (content != null) { + content.close(); + } + responseBody.close(); + } + } + + static class V2CallbackAdapter implements okhttp3.Callback { + final Callback delegate; + + V2CallbackAdapter(Callback delegate) { + this.delegate = delegate; + } + + @Override + public void onFailure(@Nonnull okhttp3.Call call, @Nonnull IOException e) { + delegate.onError(e); + } + + /** Note: this runs on the {@link okhttp3.OkHttpClient#dispatcher() dispatcher} thread! */ + @Override + public void onResponse(@Nonnull okhttp3.Call call, @Nonnull Response response) { + try { + parseResponse(response); + delegate.onSuccess(null); + } catch (Throwable e) { + delegate.onError(e); + } + } + } +} diff --git a/report/src/main/java/com/megaease/easeagent/report/sender/okhttp/HttpSender.java b/report/src/main/java/com/megaease/easeagent/report/sender/okhttp/HttpSender.java new file mode 100644 index 000000000..b3d6b093f --- /dev/null +++ b/report/src/main/java/com/megaease/easeagent/report/sender/okhttp/HttpSender.java @@ -0,0 +1,315 @@ +/* + * Copyright (c) 2021, MegaEase + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package com.megaease.easeagent.report.sender.okhttp; + +import com.google.auto.service.AutoService; +import com.megaease.easeagent.plugin.api.config.Config; +import com.megaease.easeagent.plugin.async.AgentThreadFactory; +import com.megaease.easeagent.plugin.report.Call; +import com.megaease.easeagent.plugin.report.Sender; +import com.megaease.easeagent.plugin.utils.NoNull; +import com.megaease.easeagent.plugin.utils.common.StringUtils; +import com.megaease.easeagent.report.plugin.NoOpCall; +import lombok.extern.slf4j.Slf4j; +import okhttp3.*; +import okio.Buffer; +import okio.BufferedSink; +import okio.GzipSink; +import okio.Okio; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static com.megaease.easeagent.config.report.ReportConfigConst.*; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +@Slf4j +@AutoService(Sender.class) +public class HttpSender implements Sender { + public static final String SENDER_NAME = "http"; + + private static final String AUTH_HEADER = "Authorization"; + + private static final String SERVER_USER_NAME_KEY = join(OUTPUT_SERVER_V2, "userName"); + private static final String SERVER_PASSWORD_KEY = join(OUTPUT_SERVER_V2, "password"); + private static final String SERVER_GZIP_KEY = join(OUTPUT_SERVER_V2, "compress"); + + private static final String URL_KEY = join(GENERAL_SENDER, "url"); + private static final String USER_NAME_KEY = join(GENERAL_SENDER, "userName"); + private static final String PASSWORD_KEY = join(GENERAL_SENDER, "password"); + private static final String GZIP_KEY = join(GENERAL_SENDER, "compress"); + private static final String MAX_REQUESTS_KEY = join(GENERAL_SENDER, "maxRequests"); + private static final int MIN_TIMEOUT = 30_000; + + private Config config; + + private String url; + private HttpUrl httpUrl; + private String userName; + private String password; + + private boolean enabled; + private boolean gzip; + private boolean isAuth; + + private int timeout; + private int maxRequests; + + private String credential; + private OkHttpClient client; + + // URL-USER-PASSWORD as unique key shared a client + static ConcurrentHashMap clientMap = new ConcurrentHashMap<>(); + + @Override + public String name() { + return SENDER_NAME; + } + + @Override + public void init(Config config) { + extractConfig(config); + this.config = config; + initClient(); + } + + private void extractConfig(Config config) { + this.url = getUrl(config); + this.userName = StringUtils.noEmptyOf(config.getString(USER_NAME_KEY), config.getString(SERVER_USER_NAME_KEY)); + this.password = StringUtils.noEmptyOf(config.getString(PASSWORD_KEY), config.getString(SERVER_PASSWORD_KEY)); + + this.gzip = NoNull.of(config.getBooleanNullForUnset(GZIP_KEY), + NoNull.of(config.getBooleanNullForUnset(SERVER_GZIP_KEY), true)); + + this.timeout = NoNull.of(config.getInt(OUTPUT_SERVERS_TIMEOUT), MIN_TIMEOUT); + if (this.timeout < MIN_TIMEOUT) { + this.timeout = MIN_TIMEOUT; + } + this.enabled = NoNull.of(config.getBooleanNullForUnset(GENERAL_SENDER_ENABLED), true); + this.maxRequests = NoNull.of(config.getInt(MAX_REQUESTS_KEY), 65); + + if (StringUtils.isEmpty(url) || Boolean.FALSE.equals(config.getBoolean(OUTPUT_SERVERS_ENABLE))) { + this.enabled = false; + } else { + this.httpUrl = HttpUrl.parse(this.url); + if (this.httpUrl == null) { + log.error("Invalid Url:{}", this.url); + this.enabled = false; + } + } + + this.isAuth = !StringUtils.isEmpty(userName) && !StringUtils.isEmpty(password); + if (isAuth) { + this.credential = Credentials.basic(userName, password); + } + } + + private String getUrl(Config config) { + // url + String outputServer = config.getString(BOOTSTRAP_SERVERS); + String cUrl = NoNull.of(config.getString(URL_KEY), ""); + if (!StringUtils.isEmpty(outputServer) && !cUrl.startsWith("http")) { + cUrl = outputServer + cUrl; + } + return cUrl; + } + + @Override + public Call send(byte[] encodedData) { + if (!enabled) { + return NoOpCall.getInstance(Void.class); + } + Request request; + try { + request = newRequest(new ByteRequestBody(encodedData)); + } catch (IOException e) { + return NoOpCall.getInstance(Void.class); + } + return new HttpCall(client.newCall(request)); + } + + @Override + public boolean isAvailable() { + return this.enabled; + } + + @Override + public void updateConfigs(Map changes) { + this.config.updateConfigsNotNotify(changes); + + // check new client + boolean renewClient = !getUrl(this.config).equals(this.url) + || !this.config.getString(USER_NAME_KEY).equals(this.userName) + || !this.config.getString(PASSWORD_KEY).equals(this.password); + + if (renewClient) { + clearClient(); + extractConfig(this.config); + newClient(); + } + } + + @Override + public void close() throws IOException { + clearClient(); + } + + /** Waits up to a second for in-flight requests to finish before cancelling them */ + private void clearClient() { + OkHttpClient dClient = clientMap.remove(getClientKey()); + if (dClient == null) { + return; + } + Dispatcher dispatcher = dClient.dispatcher(); + dispatcher.executorService().shutdown(); + try { + if (!dispatcher.executorService().awaitTermination(1, TimeUnit.SECONDS)) { + dispatcher.cancelAll(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + // different url for different business, so create separate clients with different dispatcher + private String getClientKey() { + return this.url + ":" + this.userName + ":" + this.password; + } + + private void newClient() { + String clientKey = getClientKey(); + OkHttpClient newClient = clientMap.get(clientKey); + if (newClient != null) { + client = newClient; + return; + } + OkHttpClient.Builder builder = new OkHttpClient.Builder(); + + // timeout + builder.connectTimeout(timeout, MILLISECONDS); + builder.readTimeout(timeout, MILLISECONDS); + builder.writeTimeout(timeout, MILLISECONDS); + + // auth + if (this.isAuth) { + builder.authenticator((route, response) -> { + if (response.request().header(AUTH_HEADER) != null) { + return null; + } + log.info("Authenticating for response: " + response); + log.info("Challenges: " + response.challenges()); + credential = Credentials.basic(userName, password); + return response.request().newBuilder() + .header(AUTH_HEADER, credential) + .build(); + }); + } + synchronized (HttpSender.class) { + if (clientMap.get(clientKey) != null) { + client = clientMap.get(clientKey); + } else { + builder.dispatcher(newDispatcher(maxRequests)); + newClient = builder.build(); + clientMap.putIfAbsent(clientKey, newClient); + client = newClient; + } + } + } + + private void initClient() { + if (client != null) { + return; + } + newClient(); + } + + // borrow form zipkin-reporter + private Request newRequest(RequestBody body) throws IOException { + Request.Builder request = new Request.Builder().url(httpUrl); + // Amplification can occur when the Zipkin endpoint is accessed through a proxy, and the proxy is instrumented. + // This prevents that in proxies, such as Envoy, that understand B3 single format, + request.addHeader("b3", "0"); + if (this.isAuth) { + request.header(AUTH_HEADER, credential); + } + if (this.gzip) { + request.addHeader("Content-Encoding", "gzip"); + Buffer gzipped = new Buffer(); + BufferedSink gzipSink = Okio.buffer(new GzipSink(gzipped)); + body.writeTo(gzipSink); + gzipSink.close(); + body = new BufferRequestBody(body.contentType(), gzipped); + } + request.post(body); + return request.build(); + } + + static Dispatcher newDispatcher(int maxRequests) { + // bound the executor so that we get consistent performance + ThreadPoolExecutor dispatchExecutor = + new ThreadPoolExecutor(0, maxRequests, 60, TimeUnit.SECONDS, + // Using a synchronous queue means messages will send immediately until we hit max + // in-flight requests. Once max requests are hit, send will block the caller, which is + // the AsyncReporter flush thread. This is ok, as the AsyncReporter has a buffer of + // unsent spans for this purpose. + new SynchronousQueue<>(), + OkHttpSenderThreadFactory.INSTANCE); + + Dispatcher dispatcher = new Dispatcher(dispatchExecutor); + dispatcher.setMaxRequests(maxRequests); + dispatcher.setMaxRequestsPerHost(maxRequests); + return dispatcher; + } + + static class OkHttpSenderThreadFactory extends AgentThreadFactory { + public static final OkHttpSenderThreadFactory INSTANCE = new OkHttpSenderThreadFactory(); + @Override public Thread newThread(Runnable r) { + return new Thread(r, "AgentHttpSenderDispatcher-" + createCount.getAndIncrement()); + } + } + + // from zipkin-reporter-java + static final class BufferRequestBody extends RequestBody { + final MediaType contentType; + final Buffer body; + + BufferRequestBody(MediaType contentType, Buffer body) { + this.contentType = contentType; + this.body = body; + } + + @Override + public long contentLength() { + return body.size(); + } + + @Override + public MediaType contentType() { + return contentType; + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + sink.write(body, body.size()); + } + } +}